This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9a9463d5953 [fix](cloud) overwrite job statistic without data quality
check when update cloud progress (#39790)
9a9463d5953 is described below
commit 9a9463d5953fc9af3697479d6c3d733fba59b555
Author: hui lai <[email protected]>
AuthorDate: Tue Aug 27 17:36:57 2024 +0800
[fix](cloud) overwrite job statistic without data quality check when update
cloud progress (#39790)
---
.../apache/doris/load/routineload/KafkaRoutineLoadJob.java | 8 +++++++-
.../org/apache/doris/load/routineload/RoutineLoadJob.java | 12 ++++++++++++
2 files changed, 19 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 292f87f8a22..abd1800a19d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -281,7 +281,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
RLTaskTxnCommitAttachment commitAttach = new
RLTaskTxnCommitAttachment(response.getCommitAttach());
- updateProgress(commitAttach);
+ updateCloudProgress(commitAttach);
}
@Override
@@ -346,6 +346,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
updateProgressAndOffsetsCache(attachment);
}
+ @Override
+ protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) {
+ super.updateCloudProgress(attachment);
+ updateProgressAndOffsetsCache(attachment);
+ }
+
@Override
protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo) {
KafkaTaskInfo oldKafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index de1bffe1d56..2b8cbbd81ac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -866,6 +866,18 @@ public abstract class RoutineLoadJob
attachment.getReceivedBytes(), false /* not replay */);
}
+ protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) {
+ // In the cloud mode, the reason for needing to overwrite jobStatistic
is that
+ // pulling the progress of meta service is equivalent to a replay
operation of edit log,
+ // but this method will be called whenever scheduled by
RoutineLoadScheduler,
+ // and accumulation will result in incorrect jobStatistic information.
+ this.jobStatistic.totalRows = attachment.getTotalRows();
+ this.jobStatistic.errorRows = attachment.getFilteredRows();
+ this.jobStatistic.unselectedRows = attachment.getUnselectedRows();
+ this.jobStatistic.receivedBytes = attachment.getReceivedBytes();
+ this.jobStatistic.totalTaskExcutionTimeMs = System.currentTimeMillis()
- createTimestamp;
+ }
+
private void updateNumOfData(long numOfTotalRows, long numOfErrorRows,
long unselectedRows, long receivedBytes,
boolean isReplay) throws UserException {
this.jobStatistic.totalRows += numOfTotalRows;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]