This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3e73933857d1173a11c4bc0114bf495afb3cb537
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Mon Jan 22 13:16:03 2024 +0800

    [fix](routineload) check offset when schedule tasks (#30136)
---
 .../load/routineload/KafkaRoutineLoadJob.java      | 23 ++++++++---
 .../doris/load/routineload/KafkaTaskInfo.java      |  2 +-
 .../load/routineload/RoutineLoadTaskInfo.java      |  2 +-
 .../load/routineload/RoutineLoadTaskScheduler.java | 16 ++++----
 .../routine_load/test_routine_load_error.groovy    | 47 ++++++++++++++++++++++
 5 files changed, 74 insertions(+), 16 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index d7a090c23e5..faad0a0248a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -704,7 +704,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     // check if given partitions has more data to consume.
     // 'partitionIdToOffset' to the offset to be consumed.
-    public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> 
partitionIdToOffset) {
+    public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> 
partitionIdToOffset) throws UserException {
         for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
             if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
                     && entry.getValue() < 
cachedPartitionWithLatestOffsets.get(entry.getKey())) {
@@ -734,11 +734,22 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
         // check again
         for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
-            if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
-                    && entry.getValue() < 
cachedPartitionWithLatestOffsets.get(entry.getKey())) {
-                LOG.debug("has more data to consume. offsets to be consumed: 
{}, latest offsets: {}, task {}, job {}",
-                        partitionIdToOffset, cachedPartitionWithLatestOffsets, 
taskId, id);
-                return true;
+            Integer partitionId = entry.getKey();
+            if (cachedPartitionWithLatestOffsets.containsKey(partitionId)) {
+                long partitionLatestOffset = 
cachedPartitionWithLatestOffsets.get(partitionId);
+                long recordPartitionOffset = entry.getValue();
+                if (recordPartitionOffset < partitionLatestOffset) {
+                    LOG.debug("has more data to consume. offsets to be 
consumed: {},"
+                            + " latest offsets: {}, task {}, job {}",
+                            partitionIdToOffset, 
cachedPartitionWithLatestOffsets, taskId, id);
+                    return true;
+                } else if (recordPartitionOffset > partitionLatestOffset) {
+                    String msg = "offset set in job: " + recordPartitionOffset
+                                + " is greater than kafka latest offset: "
+                                + partitionLatestOffset + " partition id: "
+                                + partitionId;
+                    throw new UserException(msg);
+                }
             }
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 2075e5548e5..a8d387a2f6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -119,7 +119,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
     }
 
     @Override
-    boolean hasMoreDataToConsume() {
+    boolean hasMoreDataToConsume() throws UserException {
         KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) 
routineLoadManager.getJob(jobId);
         return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index fd8e8182321..9c28dbfc6a8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -209,7 +209,7 @@ public abstract class RoutineLoadTaskInfo {
 
     abstract String getTaskDataSourceProperties();
 
-    abstract boolean hasMoreDataToConsume();
+    abstract boolean hasMoreDataToConsume() throws UserException;
 
     @Override
     public boolean equals(Object obj) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 0f594a2d508..a6cc796027a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -128,15 +128,15 @@ public class RoutineLoadTaskScheduler extends 
MasterDaemon {
             return;
         }
 
-        // check if topic has more data to consume
-        if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
-            needScheduleTasksQueue.put(routineLoadTaskInfo);
-            return;
-        }
-
-        // allocate BE slot for this task.
-        // this should be done before txn begin, or the txn may be begun 
successfully but failed to be allocated.
         try {
+            // check if topic has more data to consume
+            if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
+                needScheduleTasksQueue.put(routineLoadTaskInfo);
+                return;
+            }
+
+            // allocate BE slot for this task.
+            // this should be done before txn begin, or the txn may be begun 
successfully but failed to be allocated.
             if (!allocateTaskToBe(routineLoadTaskInfo)) {
                 // allocate failed, push it back to the queue to wait next 
scheduling
                 needScheduleTasksQueue.put(routineLoadTaskInfo);
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
index b04610740d9..c4180e35f4d 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
@@ -91,4 +91,51 @@ suite("test_routine_load_error","p0") {
             sql "stop routine load for testTableNoExist"
         }
     }
+
+    // test out of range
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def jobName = "testOutOfRange"
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName}
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_partitions" = "0",
+                    "kafka_topic" = "multi_table_load_invalid_table",
+                    "kafka_offsets" = "100"
+                );
+            """
+            sql "sync"
+
+            def count = 0
+            while (true) {
+                sleep(1000)
+                def res = sql "show routine load for ${jobName}"
+                def state = res[0][8].toString()
+                log.info("routine load state: 
${res[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${res[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${res[0][17].toString()}".toString())
+                if (state != "PAUSED") {
+                    count++
+                    if (count > 60) {
+                        assertEquals(1, 2)
+                    } 
+                    continue;
+                }
+                log.info("reason of state changed: 
${res[0][17].toString()}".toString())
+                assertTrue(res[0][17].toString().contains("is greater than 
kafka latest offset"))
+                break;
+            }
+        } finally {
+            sql "stop routine load for ${jobName}"
+        }
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to