This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 1fa60c6056e [fix](routineload) check offset when schedule tasks
(#30211)
1fa60c6056e is described below
commit 1fa60c6056e50eef26606e071b7dde2d6930ed01
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Mon Jan 22 21:54:31 2024 +0800
[fix](routineload) check offset when schedule tasks (#30211)
---
.../load/routineload/KafkaRoutineLoadJob.java | 23 ++++++++++++++++------
.../doris/load/routineload/KafkaTaskInfo.java | 2 +-
.../load/routineload/RoutineLoadTaskInfo.java | 2 +-
.../load/routineload/RoutineLoadTaskScheduler.java | 16 +++++++--------
4 files changed, 27 insertions(+), 16 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 5d3ae1f2f66..adbba831cf6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -704,7 +704,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// check if given partitions has more data to consume.
// 'partitionIdToOffset' to the offset to be consumed.
- public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long>
partitionIdToOffset) {
+ public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long>
partitionIdToOffset) throws UserException {
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
&& entry.getValue() <
cachedPartitionWithLatestOffsets.get(entry.getKey())) {
@@ -734,11 +734,22 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// check again
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
- if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
- && entry.getValue() <
cachedPartitionWithLatestOffsets.get(entry.getKey())) {
- LOG.debug("has more data to consume. offsets to be consumed:
{}, latest offsets: {}, task {}, job {}",
- partitionIdToOffset, cachedPartitionWithLatestOffsets,
taskId, id);
- return true;
+ Integer partitionId = entry.getKey();
+ if (cachedPartitionWithLatestOffsets.containsKey(partitionId)) {
+ long partitionLatestOffset =
cachedPartitionWithLatestOffsets.get(partitionId);
+ long recordPartitionOffset = entry.getValue();
+ if (recordPartitionOffset < partitionLatestOffset) {
+ LOG.debug("has more data to consume. offsets to be
consumed: {},"
+ + " latest offsets: {}, task {}, job {}",
+ partitionIdToOffset,
cachedPartitionWithLatestOffsets, taskId, id);
+ return true;
+ } else if (recordPartitionOffset > partitionLatestOffset) {
+ String msg = "offset set in job: " + recordPartitionOffset
+ + " is greater than kafka latest offset: "
+ + partitionLatestOffset + " partition id: "
+ + partitionId;
+ throw new UserException(msg);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index cd05345ffe8..d6f0a287057 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -118,7 +118,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
}
@Override
- boolean hasMoreDataToConsume() {
+ boolean hasMoreDataToConsume() throws UserException {
KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob)
routineLoadManager.getJob(jobId);
return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index f80e377dac7..7a5312b2c8f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -215,7 +215,7 @@ public abstract class RoutineLoadTaskInfo {
abstract String getTaskDataSourceProperties();
- abstract boolean hasMoreDataToConsume();
+ abstract boolean hasMoreDataToConsume() throws UserException;
@Override
public boolean equals(Object obj) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index f8b4ed9119e..5c01375e7dc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -128,15 +128,15 @@ public class RoutineLoadTaskScheduler extends
MasterDaemon {
return;
}
- // check if topic has more data to consume
- if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
- needScheduleTasksQueue.put(routineLoadTaskInfo);
- return;
- }
-
- // allocate BE slot for this task.
- // this should be done before txn begin, or the txn may be begun
successfully but failed to be allocated.
try {
+ // check if topic has more data to consume
+ if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
+ needScheduleTasksQueue.put(routineLoadTaskInfo);
+ return;
+ }
+
+ // allocate BE slot for this task.
+ // this should be done before txn begin, or the txn may be begun
successfully but failed to be allocated.
if (!allocateTaskToBe(routineLoadTaskInfo)) {
// allocate failed, push it back to the queue to wait next
scheduling
needScheduleTasksQueue.put(routineLoadTaskInfo);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]