This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f2f74d681f [fix](routine load) reset Kafka progress cache when 
routine load job topic change (#38474)
1f2f74d681f is described below

commit 1f2f74d681f1f624823881533e491bcacacca81e
Author: hui lai <[email protected]>
AuthorDate: Tue Aug 6 16:46:35 2024 +0800

    [fix](routine load) reset Kafka progress cache when routine load job topic 
change (#38474)
    
    When change routine load job topic from test_topic_before to
    test_topic_after by
    ```
    ALTER ROUTINE LOAD FOR test_topic_change FROM KAFKA("kafka_topic" = 
"test_topic_after");
    ```
    (test_topic_before has 5 rows and test_topic_after has 1 rows)
    
    Exception happened, which cannot consume any data:
    ```
    2024-07-29 15:57:28,122 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,123 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,125 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,126 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,128 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,129 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,131 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,133 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,134 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,136 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,137 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    ```
    
    It is necessary to reset Kafka progress cache when routine load job
    topic change.
---
 cloud/src/common/bvars.cpp                         |   1 +
 cloud/src/common/bvars.h                           |   1 +
 cloud/src/meta-service/meta_service.h              |  10 ++
 cloud/src/meta-service/meta_service_txn.cpp        |  52 ++++++++
 .../apache/doris/cloud/rpc/MetaServiceClient.java  |  10 ++
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |  10 ++
 .../doris/load/routineload/KafkaProgress.java      |   8 +-
 .../load/routineload/KafkaRoutineLoadJob.java      |  56 +++++++--
 gensrc/proto/cloud.proto                           |  11 ++
 .../test_routine_load_topic_change.out             |  16 +++
 .../load_p0/routine_load/data/test_topic_after.csv |   1 +
 .../routine_load/data/test_topic_before.csv        |   5 +
 .../test_routine_load_topic_change.groovy          | 140 +++++++++++++++++++++
 13 files changed, 309 insertions(+), 12 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index f09fa26cc6f..ad7d81c021f 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -74,6 +74,7 @@ BvarLatencyRecorderWithTag 
g_bvar_ms_get_delete_bitmap_update_lock("ms",
                                                                    
"get_delete_bitmap_update_lock");
 BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
 BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", 
"get_rl_task_commit_attach");
+BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", 
"reset_rl_progress");
 BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
 
 BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", 
"start_tablet_job");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 164fc6f9940..44a2632c6af 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -175,6 +175,7 @@ extern BvarLatencyRecorderWithTag 
g_bvar_ms_get_cluster_status;
 extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
+extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
 extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
 
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index f226dca8598..8b812aeffc9 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -275,6 +275,10 @@ public:
                                    GetRLTaskCommitAttachResponse* response,
                                    ::google::protobuf::Closure* done) override;
 
+    void reset_rl_progress(::google::protobuf::RpcController* controller,
+                           const ResetRLProgressRequest* request, 
ResetRLProgressResponse* response,
+                           ::google::protobuf::Closure* done) override;
+
     void get_txn_id(::google::protobuf::RpcController* controller, const 
GetTxnIdRequest* request,
                     GetTxnIdResponse* response, ::google::protobuf::Closure* 
done) override;
 
@@ -641,6 +645,12 @@ public:
                   done);
     }
 
+    void reset_rl_progress(::google::protobuf::RpcController* controller,
+                           const ResetRLProgressRequest* request, 
ResetRLProgressResponse* response,
+                           ::google::protobuf::Closure* done) override {
+        call_impl(&cloud::MetaService::reset_rl_progress, controller, request, 
response, done);
+    }
+
     void get_txn_id(::google::protobuf::RpcController* controller, const 
GetTxnIdRequest* request,
                     GetTxnIdResponse* response, ::google::protobuf::Closure* 
done) override {
         call_impl(&cloud::MetaService::get_txn_id, controller, request, 
response, done);
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 15634ba2a8b..7b540654433 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -648,6 +648,58 @@ void 
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle
     }
 }
 
+void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* 
controller,
+                                        const ResetRLProgressRequest* request,
+                                        ResetRLProgressResponse* response,
+                                        ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(reset_rl_progress);
+    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+    RPC_RATE_LIMIT(reset_rl_progress)
+
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "filed to create txn, err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    if (!request->has_db_id() || !request->has_job_id()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty db_id or job_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+
+    int64_t db_id = request->db_id();
+    int64_t job_id = request->job_id();
+    std::string rl_progress_key;
+    std::string rl_progress_val;
+    RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
+    rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
+    txn->remove(rl_progress_key);
+    err = txn->commit();
+    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+        code = MetaServiceCode::ROUTINE_LOAD_PROGRESS_NOT_FOUND;
+        ss << "progress info not found, db_id=" << db_id << " job_id=" << 
job_id << " err=" << err;
+        msg = ss.str();
+        return;
+    } else if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to remove progress info, db_id=" << db_id << " job_id=" 
<< job_id
+           << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+}
+
 void scan_tmp_rowset(
         const std::string& instance_id, int64_t txn_id, 
std::shared_ptr<TxnKv>& txn_kv,
         MetaServiceCode& code, std::string& msg, int64_t* db_id,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 5fd42a31f92..aaa28e5eeaa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -361,6 +361,16 @@ public class MetaServiceClient {
         return blockingStub.getRlTaskCommitAttach(request);
     }
 
+    public Cloud. ResetRLProgressResponse resetRLProgress(Cloud. 
ResetRLProgressRequest request) {
+        if (!request.hasCloudUniqueId()) {
+            Cloud. ResetRLProgressRequest.Builder builder =
+                    Cloud. ResetRLProgressRequest.newBuilder();
+            builder.mergeFrom(request);
+            return 
blockingStub.resetRlProgress(builder.setCloudUniqueId(Config.cloud_unique_id).build());
+        }
+        return blockingStub.resetRlProgress(request);
+    }
+
     public Cloud.GetObjStoreInfoResponse
             getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) {
         if (!request.hasCloudUniqueId()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 5f17692180b..7f466374cf6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -508,6 +508,16 @@ public class MetaServiceProxy {
         }
     }
 
+    public Cloud.ResetRLProgressResponse 
resetRLProgress(Cloud.ResetRLProgressRequest request)
+            throws RpcException {
+        try {
+            final MetaServiceClient client = getProxy();
+            return client.resetRLProgress(request);
+        } catch (Exception e) {
+            throw new RpcException("", e.getMessage(), e);
+        }
+    }
+
     public Cloud.GetObjStoreInfoResponse
             getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws 
RpcException {
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index cb6c36dc2e6..1952ccaf332 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -132,15 +132,17 @@ public class KafkaProgress extends RoutineLoadProgress {
         }
     }
 
-    // modify the partition offset of this progress.
-    // throw exception is the specified partition does not exist in progress.
-    public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) 
throws DdlException {
+    public void checkPartitions(List<Pair<Integer, Long>> 
kafkaPartitionOffsets) throws DdlException {
         for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
             if (!partitionIdToOffset.containsKey(pair.first)) {
                 throw new DdlException("The specified partition " + pair.first 
+ " is not in the consumed partitions");
             }
         }
+    }
 
+    // modify the partition offset of this progress.
+    // throw exception is the specified partition does not exist in progress.
+    public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
         for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
             partitionIdToOffset.put(pair.first, pair.second);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 751516b559a..555ae986873 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -713,22 +713,35 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                 customKafkaProperties = 
dataSourceProperties.getCustomKafkaProperties();
             }
 
-            // modify partition offset first
-            if (!kafkaPartitionOffsets.isEmpty()) {
-                // we can only modify the partition that is being consumed
-                ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
-            }
-
+            // convertCustomProperties and check partitions before reset 
progress to make modify operation atomic
             if (!customKafkaProperties.isEmpty()) {
                 this.customProperties.putAll(customKafkaProperties);
                 convertCustomProperties(true);
             }
-            // modify broker list and topic
-            if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
-                this.brokerList = dataSourceProperties.getBrokerList();
+
+            if (!kafkaPartitionOffsets.isEmpty()) {
+                ((KafkaProgress) 
progress).checkPartitions(kafkaPartitionOffsets);
             }
+
+            // It is necessary to reset the Kafka progress cache if topic 
change,
+            // and should reset cache before modifying partition offset.
             if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
+                if (Config.isCloudMode()) {
+                    resetCloudProgress();
+                }
                 this.topic = dataSourceProperties.getTopic();
+                this.progress = new KafkaProgress();
+            }
+
+            // modify partition offset
+            if (!kafkaPartitionOffsets.isEmpty()) {
+                // we can only modify the partition that is being consumed
+                ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
+            }
+
+            // modify broker list
+            if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
+                this.brokerList = dataSourceProperties.getBrokerList();
             }
         }
         if (!jobProperties.isEmpty()) {
@@ -743,6 +756,31 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                 this.id, jobProperties, dataSourceProperties);
     }
 
+    private void resetCloudProgress() throws DdlException {
+        Cloud.ResetRLProgressRequest.Builder builder =
+                Cloud.ResetRLProgressRequest.newBuilder();
+        builder.setCloudUniqueId(Config.cloud_unique_id);
+        builder.setDbId(dbId);
+        builder.setJobId(id);
+
+        Cloud.ResetRLProgressResponse response;
+        try {
+            response = 
MetaServiceProxy.getInstance().resetRLProgress(builder.build());
+            if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+                LOG.warn("failed to reset cloud progress, response: {}", 
response);
+                if (response.getStatus().getCode() == 
Cloud.MetaServiceCode.ROUTINE_LOAD_PROGRESS_NOT_FOUND) {
+                    LOG.warn("not found routine load progress, response: {}", 
response);
+                    return;
+                } else {
+                    throw new DdlException(response.getStatus().getMsg());
+                }
+            }
+        } catch (RpcException e) {
+            LOG.info("failed to reset cloud progress {}", e);
+            throw new DdlException(e.getMessage());
+        }
+    }
+
     @Override
     public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
         try {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 7d9c9c19169..e166975ec4c 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1436,6 +1436,16 @@ message GetRLTaskCommitAttachResponse {
     optional RLTaskTxnCommitAttachmentPB commit_attach = 2;
 }
 
+message ResetRLProgressRequest {
+    optional string cloud_unique_id = 1; // For auth
+    optional int64 db_id = 2;
+    optional int64 job_id = 3;
+}
+
+message ResetRLProgressResponse {
+    optional MetaServiceResponseStatus status = 1;
+}
+
 message CheckKeyInfos {
     repeated int64 db_ids = 1;
     repeated int64 table_ids = 2;
@@ -1528,6 +1538,7 @@ service MetaService {
 
     // routine load progress
     rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns 
(GetRLTaskCommitAttachResponse);
+    rpc reset_rl_progress(ResetRLProgressRequest) returns 
(ResetRLProgressResponse);
 
     // check KV
     rpc check_kv(CheckKVRequest) returns (CheckKVResponse);
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out 
b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out
new file mode 100644
index 00000000000..1f534d0a082
--- /dev/null
+++ 
b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_topic_change --
+1      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+2      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+3      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+4      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+5      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+
+-- !sql_topic_change1 --
+1      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+2      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+3      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+4      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+5      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+6      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv 
b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv
new file mode 100644
index 00000000000..de1727d2d81
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv
@@ -0,0 +1 @@
+6,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv 
b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv
new file mode 100644
index 00000000000..f1a48b1e411
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv
@@ -0,0 +1,5 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+2,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+3,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+4,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+5,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
new file mode 100644
index 00000000000..25bf9933d11
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_topic_change","p0") {
+    // send data to Kafka
+    def kafkaCsvTpoics = [
+                  "test_topic_before",
+                  "test_topic_after",
+                ]
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    // test create routine load job with enclose and escape
+    def tableName = "test_routine_load_topic_change"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int(20) NULL,
+            `k2` string NULL,
+            `v1` date  NULL,
+            `v2` string  NULL,
+            `v3` datetime  NULL,
+            `v4` string  NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def jobName = "test_topic_change"
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} on ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            qt_sql_topic_change "select * from ${tableName} order by k1"
+
+            sql "pause routine load for ${jobName}"
+            def res = sql "show routine load for ${jobName}"
+            log.info("routine load job properties: 
${res[0][11].toString()}".toString())
+            sql "ALTER ROUTINE LOAD FOR ${jobName} FROM KAFKA(\"kafka_topic\" 
= \"${kafkaCsvTpoics[1]}\", \"property.kafka_default_offsets\" = 
\"OFFSET_BEGINNING\");"
+            sql "resume routine load for ${jobName}"
+            count = 0
+            while (true) {
+                res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 5) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            qt_sql_topic_change1 "select * from ${tableName} order by k1"
+        } finally {
+            sql "stop routine load for ${jobName}"
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to