This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0ee0dd6ae3a [fix](routine load) reset Kafka progress cache when 
routine load job topic change (#38474) (#39181)
0ee0dd6ae3a is described below

commit 0ee0dd6ae3abb50d56add222443f2be6214650ff
Author: hui lai <[email protected]>
AuthorDate: Sat Aug 10 23:00:39 2024 +0800

    [fix](routine load) reset Kafka progress cache when routine load job topic 
change (#38474) (#39181)
    
    pick (#38474)
    
    When change routine load job topic from test_topic_before to
    test_topic_after by
    ```
    ALTER ROUTINE LOAD FOR test_topic_change FROM KAFKA("kafka_topic" = 
"test_topic_after");
    ```
    (test_topic_before has 5 rows and test_topic_after has 1 rows)
    
    Exception happened, which cannot consume any data:
    ```
    2024-07-29 15:57:28,122 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,123 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,125 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,126 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,128 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,129 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,131 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,133 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,134 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,136 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,137 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    ```
    
    It is necessary to reset Kafka progress cache when routine load job
    topic change.
---
 .../doris/load/routineload/KafkaProgress.java      |   8 +-
 .../load/routineload/KafkaRoutineLoadJob.java      |  28 +++--
 .../test_routine_load_topic_change.out             |  16 +++
 .../load_p0/routine_load/data/test_topic_after.csv |   1 +
 .../routine_load/data/test_topic_before.csv        |   5 +
 .../test_routine_load_topic_change.groovy          | 140 +++++++++++++++++++++
 6 files changed, 186 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index 53c57a1cceb..7dd49ba1ec7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -118,15 +118,17 @@ public class KafkaProgress extends RoutineLoadProgress {
         }
     }
 
-    // modify the partition offset of this progress.
-    // throw exception is the specified partition does not exist in progress.
-    public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) 
throws DdlException {
+    public void checkPartitions(List<Pair<Integer, Long>> 
kafkaPartitionOffsets) throws DdlException {
         for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
             if (!partitionIdToOffset.containsKey(pair.first)) {
                 throw new DdlException("The specified partition " + pair.first 
+ " is not in the consumed partitions");
             }
         }
+    }
 
+    // modify the partition offset of this progress.
+    // throw exception is the specified partition does not exist in progress.
+    public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
         for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
             partitionIdToOffset.put(pair.first, pair.second);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 201412027ab..22de3ef3574 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -692,22 +692,32 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                 customKafkaProperties = 
dataSourceProperties.getCustomKafkaProperties();
             }
 
-            // modify partition offset first
-            if (!kafkaPartitionOffsets.isEmpty()) {
-                // we can only modify the partition that is being consumed
-                ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
-            }
-
+            // convertCustomProperties and check partitions before reset 
progress to make modify operation atomic
             if (!customKafkaProperties.isEmpty()) {
                 this.customProperties.putAll(customKafkaProperties);
                 convertCustomProperties(true);
             }
-            // modify broker list and topic
-            if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
-                this.brokerList = dataSourceProperties.getBrokerList();
+
+            if (!kafkaPartitionOffsets.isEmpty()) {
+                ((KafkaProgress) 
progress).checkPartitions(kafkaPartitionOffsets);
             }
+
+            // It is necessary to reset the Kafka progress cache if topic 
change,
+            // and should reset cache before modifying partition offset.
             if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
                 this.topic = dataSourceProperties.getTopic();
+                this.progress = new KafkaProgress();
+            }
+
+            // modify partition offset
+            if (!kafkaPartitionOffsets.isEmpty()) {
+                // we can only modify the partition that is being consumed
+                ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
+            }
+
+            // modify broker list
+            if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
+                this.brokerList = dataSourceProperties.getBrokerList();
             }
         }
         if (!jobProperties.isEmpty()) {
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out 
b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out
new file mode 100644
index 00000000000..1f534d0a082
--- /dev/null
+++ 
b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_topic_change --
+1      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+2      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+3      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+4      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+5      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+
+-- !sql_topic_change1 --
+1      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+2      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+3      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+4      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+5      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+6      eab     2023-07-15      def     2023-07-20T05:48:31     "ghi"
+
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv 
b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv
new file mode 100644
index 00000000000..de1727d2d81
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv
@@ -0,0 +1 @@
+6,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv 
b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv
new file mode 100644
index 00000000000..f1a48b1e411
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv
@@ -0,0 +1,5 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+2,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+3,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+4,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+5,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
new file mode 100644
index 00000000000..25bf9933d11
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_topic_change","p0") {
+    // send data to Kafka
+    def kafkaCsvTpoics = [
+                  "test_topic_before",
+                  "test_topic_after",
+                ]
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    // test create routine load job with enclose and escape
+    def tableName = "test_routine_load_topic_change"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int(20) NULL,
+            `k2` string NULL,
+            `v1` date  NULL,
+            `v2` string  NULL,
+            `v3` datetime  NULL,
+            `v4` string  NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def jobName = "test_topic_change"
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} on ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            qt_sql_topic_change "select * from ${tableName} order by k1"
+
+            sql "pause routine load for ${jobName}"
+            def res = sql "show routine load for ${jobName}"
+            log.info("routine load job properties: 
${res[0][11].toString()}".toString())
+            sql "ALTER ROUTINE LOAD FOR ${jobName} FROM KAFKA(\"kafka_topic\" 
= \"${kafkaCsvTpoics[1]}\", \"property.kafka_default_offsets\" = 
\"OFFSET_BEGINNING\");"
+            sql "resume routine load for ${jobName}"
+            count = 0
+            while (true) {
+                res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 5) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            qt_sql_topic_change1 "select * from ${tableName} order by k1"
+        } finally {
+            sql "stop routine load for ${jobName}"
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to