This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a6f4710bd59 [fix](routine load) reset Kafka progress cache when 
routine load job topic change (#38474) (#39528)
a6f4710bd59 is described below

commit a6f4710bd5985a859188bda0a36b85643970ef3e
Author: hui lai <[email protected]>
AuthorDate: Mon Aug 26 21:18:13 2024 +0800

    [fix](routine load) reset Kafka progress cache when routine load job topic 
change (#38474) (#39528)
    
    pick (#38474)
    
    When change routine load job topic from test_topic_before to
    test_topic_after by
    ```
    ALTER ROUTINE LOAD FOR test_topic_change FROM KAFKA("kafka_topic" = 
"test_topic_after");
    ```
    (test_topic_before has 5 rows and test_topic_after has 1 rows)
    
    Exception happened, which cannot consume any data:
    ```
    2024-07-29 15:57:28,122 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,123 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,125 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,126 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,128 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,129 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,131 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,133 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,134 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,136 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    2024-07-29 15:57:28,137 WARN (Routine load task scheduler|55) 
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. 
partition: 0, cache offset: 5 get latest of
    fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
    ```
    
    It is necessary to reset Kafka progress cache when routine load job
    topic change.
---
 .../doris/load/routineload/KafkaProgress.java      |  8 ++++---
 .../load/routineload/KafkaRoutineLoadJob.java      | 28 +++++++++++++++-------
 2 files changed, 24 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index 3b219f48323..8bf2a957eb6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -118,15 +118,17 @@ public class KafkaProgress extends RoutineLoadProgress {
         }
     }
 
-    // modify the partition offset of this progress.
-    // throw exception is the specified partition does not exist in progress.
-    public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) 
throws DdlException {
+    public void checkPartitions(List<Pair<Integer, Long>> 
kafkaPartitionOffsets) throws DdlException {
         for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
             if (!partitionIdToOffset.containsKey(pair.first)) {
                 throw new DdlException("The specified partition " + pair.first 
+ " is not in the consumed partitions");
             }
         }
+    }
 
+    // modify the partition offset of this progress.
+    // throw exception is the specified partition does not exist in progress.
+    public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
         for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
             partitionIdToOffset.put(pair.first, pair.second);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 692d79d445e..8fedd791b1e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -673,22 +673,32 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                 customKafkaProperties = 
dataSourceProperties.getCustomKafkaProperties();
             }
 
-            // modify partition offset first
-            if (!kafkaPartitionOffsets.isEmpty()) {
-                // we can only modify the partition that is being consumed
-                ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
-            }
-
+            // convertCustomProperties and check partitions before reset 
progress to make modify operation atomic
             if (!customKafkaProperties.isEmpty()) {
                 this.customProperties.putAll(customKafkaProperties);
                 convertCustomProperties(true);
             }
-            // modify broker list and topic
-            if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
-                this.brokerList = dataSourceProperties.getBrokerList();
+
+            if (!kafkaPartitionOffsets.isEmpty()) {
+                ((KafkaProgress) 
progress).checkPartitions(kafkaPartitionOffsets);
             }
+
+            // It is necessary to reset the Kafka progress cache if topic 
change,
+            // and should reset cache before modifying partition offset.
             if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
                 this.topic = dataSourceProperties.getTopic();
+                this.progress = new KafkaProgress();
+            }
+
+            // modify partition offset
+            if (!kafkaPartitionOffsets.isEmpty()) {
+                // we can only modify the partition that is being consumed
+                ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
+            }
+
+            // modify broker list
+            if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
+                this.brokerList = dataSourceProperties.getBrokerList();
             }
         }
         if (!jobProperties.isEmpty()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to