This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 1fa60c6056e [fix](routineload) check offset when schedule tasks 
(#30211)
1fa60c6056e is described below

commit 1fa60c6056e50eef26606e071b7dde2d6930ed01
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Mon Jan 22 21:54:31 2024 +0800

    [fix](routineload) check offset when schedule tasks (#30211)
---
 .../load/routineload/KafkaRoutineLoadJob.java      | 23 ++++++++++++++++------
 .../doris/load/routineload/KafkaTaskInfo.java      |  2 +-
 .../load/routineload/RoutineLoadTaskInfo.java      |  2 +-
 .../load/routineload/RoutineLoadTaskScheduler.java | 16 +++++++--------
 4 files changed, 27 insertions(+), 16 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 5d3ae1f2f66..adbba831cf6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -704,7 +704,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     // check if given partitions has more data to consume.
     // 'partitionIdToOffset' to the offset to be consumed.
-    public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> 
partitionIdToOffset) {
+    public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> 
partitionIdToOffset) throws UserException {
         for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
             if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
                     && entry.getValue() < 
cachedPartitionWithLatestOffsets.get(entry.getKey())) {
@@ -734,11 +734,22 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
         // check again
         for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
-            if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
-                    && entry.getValue() < 
cachedPartitionWithLatestOffsets.get(entry.getKey())) {
-                LOG.debug("has more data to consume. offsets to be consumed: 
{}, latest offsets: {}, task {}, job {}",
-                        partitionIdToOffset, cachedPartitionWithLatestOffsets, 
taskId, id);
-                return true;
+            Integer partitionId = entry.getKey();
+            if (cachedPartitionWithLatestOffsets.containsKey(partitionId)) {
+                long partitionLatestOffset = 
cachedPartitionWithLatestOffsets.get(partitionId);
+                long recordPartitionOffset = entry.getValue();
+                if (recordPartitionOffset < partitionLatestOffset) {
+                    LOG.debug("has more data to consume. offsets to be 
consumed: {},"
+                            + " latest offsets: {}, task {}, job {}",
+                            partitionIdToOffset, 
cachedPartitionWithLatestOffsets, taskId, id);
+                    return true;
+                } else if (recordPartitionOffset > partitionLatestOffset) {
+                    String msg = "offset set in job: " + recordPartitionOffset
+                                + " is greater than kafka latest offset: "
+                                + partitionLatestOffset + " partition id: "
+                                + partitionId;
+                    throw new UserException(msg);
+                }
             }
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index cd05345ffe8..d6f0a287057 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -118,7 +118,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
     }
 
     @Override
-    boolean hasMoreDataToConsume() {
+    boolean hasMoreDataToConsume() throws UserException {
         KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) 
routineLoadManager.getJob(jobId);
         return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index f80e377dac7..7a5312b2c8f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -215,7 +215,7 @@ public abstract class RoutineLoadTaskInfo {
 
     abstract String getTaskDataSourceProperties();
 
-    abstract boolean hasMoreDataToConsume();
+    abstract boolean hasMoreDataToConsume() throws UserException;
 
     @Override
     public boolean equals(Object obj) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index f8b4ed9119e..5c01375e7dc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -128,15 +128,15 @@ public class RoutineLoadTaskScheduler extends 
MasterDaemon {
             return;
         }
 
-        // check if topic has more data to consume
-        if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
-            needScheduleTasksQueue.put(routineLoadTaskInfo);
-            return;
-        }
-
-        // allocate BE slot for this task.
-        // this should be done before txn begin, or the txn may be begun 
successfully but failed to be allocated.
         try {
+            // check if topic has more data to consume
+            if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
+                needScheduleTasksQueue.put(routineLoadTaskInfo);
+                return;
+            }
+
+            // allocate BE slot for this task.
+            // this should be done before txn begin, or the txn may be begun 
successfully but failed to be allocated.
             if (!allocateTaskToBe(routineLoadTaskInfo)) {
                 // allocate failed, push it back to the queue to wait next 
scheduling
                 needScheduleTasksQueue.put(routineLoadTaskInfo);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to