This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1f2f74d681f [fix](routine load) reset Kafka progress cache when
routine load job topic change (#38474)
1f2f74d681f is described below
commit 1f2f74d681f1f624823881533e491bcacacca81e
Author: hui lai <[email protected]>
AuthorDate: Tue Aug 6 16:46:35 2024 +0800
[fix](routine load) reset Kafka progress cache when routine load job topic
change (#38474)
When change routine load job topic from test_topic_before to
test_topic_after by
```
ALTER ROUTINE LOAD FOR test_topic_change FROM KAFKA("kafka_topic" =
"test_topic_after");
```
(test_topic_before has 5 rows and test_topic_after has 1 rows)
Exception happened, which cannot consume any data:
```
2024-07-29 15:57:28,122 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,123 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,125 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,126 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,128 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,129 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,131 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,133 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,134 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,136 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,137 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
```
It is necessary to reset Kafka progress cache when routine load job
topic change.
---
cloud/src/common/bvars.cpp | 1 +
cloud/src/common/bvars.h | 1 +
cloud/src/meta-service/meta_service.h | 10 ++
cloud/src/meta-service/meta_service_txn.cpp | 52 ++++++++
.../apache/doris/cloud/rpc/MetaServiceClient.java | 10 ++
.../apache/doris/cloud/rpc/MetaServiceProxy.java | 10 ++
.../doris/load/routineload/KafkaProgress.java | 8 +-
.../load/routineload/KafkaRoutineLoadJob.java | 56 +++++++--
gensrc/proto/cloud.proto | 11 ++
.../test_routine_load_topic_change.out | 16 +++
.../load_p0/routine_load/data/test_topic_after.csv | 1 +
.../routine_load/data/test_topic_before.csv | 5 +
.../test_routine_load_topic_change.groovy | 140 +++++++++++++++++++++
13 files changed, 309 insertions(+), 12 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index f09fa26cc6f..ad7d81c021f 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -74,6 +74,7 @@ BvarLatencyRecorderWithTag
g_bvar_ms_get_delete_bitmap_update_lock("ms",
"get_delete_bitmap_update_lock");
BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms",
"get_rl_task_commit_attach");
+BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms",
"reset_rl_progress");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms",
"start_tablet_job");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 164fc6f9940..44a2632c6af 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -175,6 +175,7 @@ extern BvarLatencyRecorderWithTag
g_bvar_ms_get_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
+extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index f226dca8598..8b812aeffc9 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -275,6 +275,10 @@ public:
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override;
+ void reset_rl_progress(::google::protobuf::RpcController* controller,
+ const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
+ ::google::protobuf::Closure* done) override;
+
void get_txn_id(::google::protobuf::RpcController* controller, const
GetTxnIdRequest* request,
GetTxnIdResponse* response, ::google::protobuf::Closure*
done) override;
@@ -641,6 +645,12 @@ public:
done);
}
+ void reset_rl_progress(::google::protobuf::RpcController* controller,
+ const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
+ ::google::protobuf::Closure* done) override {
+ call_impl(&cloud::MetaService::reset_rl_progress, controller, request,
response, done);
+ }
+
void get_txn_id(::google::protobuf::RpcController* controller, const
GetTxnIdRequest* request,
GetTxnIdResponse* response, ::google::protobuf::Closure*
done) override {
call_impl(&cloud::MetaService::get_txn_id, controller, request,
response, done);
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 15634ba2a8b..7b540654433 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -648,6 +648,58 @@ void
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle
}
}
+void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController*
controller,
+ const ResetRLProgressRequest* request,
+ ResetRLProgressResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(reset_rl_progress);
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+ RPC_RATE_LIMIT(reset_rl_progress)
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "filed to create txn, err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ if (!request->has_db_id() || !request->has_job_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty db_id or job_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+
+ int64_t db_id = request->db_id();
+ int64_t job_id = request->job_id();
+ std::string rl_progress_key;
+ std::string rl_progress_val;
+ RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
+ rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
+ txn->remove(rl_progress_key);
+ err = txn->commit();
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = MetaServiceCode::ROUTINE_LOAD_PROGRESS_NOT_FOUND;
+ ss << "progress info not found, db_id=" << db_id << " job_id=" <<
job_id << " err=" << err;
+ msg = ss.str();
+ return;
+ } else if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to remove progress info, db_id=" << db_id << " job_id="
<< job_id
+ << " err=" << err;
+ msg = ss.str();
+ return;
+ }
+}
+
void scan_tmp_rowset(
const std::string& instance_id, int64_t txn_id,
std::shared_ptr<TxnKv>& txn_kv,
MetaServiceCode& code, std::string& msg, int64_t* db_id,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 5fd42a31f92..aaa28e5eeaa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -361,6 +361,16 @@ public class MetaServiceClient {
return blockingStub.getRlTaskCommitAttach(request);
}
+ public Cloud. ResetRLProgressResponse resetRLProgress(Cloud.
ResetRLProgressRequest request) {
+ if (!request.hasCloudUniqueId()) {
+ Cloud. ResetRLProgressRequest.Builder builder =
+ Cloud. ResetRLProgressRequest.newBuilder();
+ builder.mergeFrom(request);
+ return
blockingStub.resetRlProgress(builder.setCloudUniqueId(Config.cloud_unique_id).build());
+ }
+ return blockingStub.resetRlProgress(request);
+ }
+
public Cloud.GetObjStoreInfoResponse
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) {
if (!request.hasCloudUniqueId()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 5f17692180b..7f466374cf6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -508,6 +508,16 @@ public class MetaServiceProxy {
}
}
+ public Cloud.ResetRLProgressResponse
resetRLProgress(Cloud.ResetRLProgressRequest request)
+ throws RpcException {
+ try {
+ final MetaServiceClient client = getProxy();
+ return client.resetRLProgress(request);
+ } catch (Exception e) {
+ throw new RpcException("", e.getMessage(), e);
+ }
+ }
+
public Cloud.GetObjStoreInfoResponse
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws
RpcException {
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index cb6c36dc2e6..1952ccaf332 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -132,15 +132,17 @@ public class KafkaProgress extends RoutineLoadProgress {
}
}
- // modify the partition offset of this progress.
- // throw exception is the specified partition does not exist in progress.
- public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets)
throws DdlException {
+ public void checkPartitions(List<Pair<Integer, Long>>
kafkaPartitionOffsets) throws DdlException {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
if (!partitionIdToOffset.containsKey(pair.first)) {
throw new DdlException("The specified partition " + pair.first
+ " is not in the consumed partitions");
}
}
+ }
+ // modify the partition offset of this progress.
+ // throw exception is the specified partition does not exist in progress.
+ public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
partitionIdToOffset.put(pair.first, pair.second);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 751516b559a..555ae986873 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -713,22 +713,35 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
customKafkaProperties =
dataSourceProperties.getCustomKafkaProperties();
}
- // modify partition offset first
- if (!kafkaPartitionOffsets.isEmpty()) {
- // we can only modify the partition that is being consumed
- ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
- }
-
+ // convertCustomProperties and check partitions before reset
progress to make modify operation atomic
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}
- // modify broker list and topic
- if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
- this.brokerList = dataSourceProperties.getBrokerList();
+
+ if (!kafkaPartitionOffsets.isEmpty()) {
+ ((KafkaProgress)
progress).checkPartitions(kafkaPartitionOffsets);
}
+
+ // It is necessary to reset the Kafka progress cache if topic
change,
+ // and should reset cache before modifying partition offset.
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
+ if (Config.isCloudMode()) {
+ resetCloudProgress();
+ }
this.topic = dataSourceProperties.getTopic();
+ this.progress = new KafkaProgress();
+ }
+
+ // modify partition offset
+ if (!kafkaPartitionOffsets.isEmpty()) {
+ // we can only modify the partition that is being consumed
+ ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
+ }
+
+ // modify broker list
+ if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
+ this.brokerList = dataSourceProperties.getBrokerList();
}
}
if (!jobProperties.isEmpty()) {
@@ -743,6 +756,31 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
this.id, jobProperties, dataSourceProperties);
}
+ private void resetCloudProgress() throws DdlException {
+ Cloud.ResetRLProgressRequest.Builder builder =
+ Cloud.ResetRLProgressRequest.newBuilder();
+ builder.setCloudUniqueId(Config.cloud_unique_id);
+ builder.setDbId(dbId);
+ builder.setJobId(id);
+
+ Cloud.ResetRLProgressResponse response;
+ try {
+ response =
MetaServiceProxy.getInstance().resetRLProgress(builder.build());
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ LOG.warn("failed to reset cloud progress, response: {}",
response);
+ if (response.getStatus().getCode() ==
Cloud.MetaServiceCode.ROUTINE_LOAD_PROGRESS_NOT_FOUND) {
+ LOG.warn("not found routine load progress, response: {}",
response);
+ return;
+ } else {
+ throw new DdlException(response.getStatus().getMsg());
+ }
+ }
+ } catch (RpcException e) {
+ LOG.info("failed to reset cloud progress {}", e);
+ throw new DdlException(e.getMessage());
+ }
+ }
+
@Override
public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 7d9c9c19169..e166975ec4c 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1436,6 +1436,16 @@ message GetRLTaskCommitAttachResponse {
optional RLTaskTxnCommitAttachmentPB commit_attach = 2;
}
+message ResetRLProgressRequest {
+ optional string cloud_unique_id = 1; // For auth
+ optional int64 db_id = 2;
+ optional int64 job_id = 3;
+}
+
+message ResetRLProgressResponse {
+ optional MetaServiceResponseStatus status = 1;
+}
+
message CheckKeyInfos {
repeated int64 db_ids = 1;
repeated int64 table_ids = 2;
@@ -1528,6 +1538,7 @@ service MetaService {
// routine load progress
rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns
(GetRLTaskCommitAttachResponse);
+ rpc reset_rl_progress(ResetRLProgressRequest) returns
(ResetRLProgressResponse);
// check KV
rpc check_kv(CheckKVRequest) returns (CheckKVResponse);
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out
b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out
new file mode 100644
index 00000000000..1f534d0a082
--- /dev/null
+++
b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_topic_change --
+1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+
+-- !sql_topic_change1 --
+1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+6 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv
b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv
new file mode 100644
index 00000000000..de1727d2d81
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv
@@ -0,0 +1 @@
+6,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv
b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv
new file mode 100644
index 00000000000..f1a48b1e411
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv
@@ -0,0 +1,5 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+2,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+3,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+4,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+5,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
new file mode 100644
index 00000000000..25bf9933d11
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_topic_change","p0") {
+ // send data to Kafka
+ def kafkaCsvTpoics = [
+ "test_topic_before",
+ "test_topic_after",
+ ]
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ // test create routine load job with enclose and escape
+ def tableName = "test_routine_load_topic_change"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def jobName = "test_topic_change"
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} on ${tableName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ qt_sql_topic_change "select * from ${tableName} order by k1"
+
+ sql "pause routine load for ${jobName}"
+ def res = sql "show routine load for ${jobName}"
+ log.info("routine load job properties:
${res[0][11].toString()}".toString())
+ sql "ALTER ROUTINE LOAD FOR ${jobName} FROM KAFKA(\"kafka_topic\"
= \"${kafkaCsvTpoics[1]}\", \"property.kafka_default_offsets\" =
\"OFFSET_BEGINNING\");"
+ sql "resume routine load for ${jobName}"
+ count = 0
+ while (true) {
+ res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 5) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ qt_sql_topic_change1 "select * from ${tableName} order by k1"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]