This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ed07dc8596a [fix](cloud) fix routine load loss data when fe master
node restart (#46149)
ed07dc8596a is described below
commit ed07dc8596a70ab7e8aa44f0f222218b3e225663
Author: hui lai <[email protected]>
AuthorDate: Tue Jan 7 22:31:52 2025 +0800
[fix](cloud) fix routine load loss data when fe master node restart (#46149)
In cloud mode, routine load loss data when fe master node restart.
When updating progress, in order to avoid small values covering large
values, we introduced pr https://github.com/apache/doris/pull/39313, Due
to the pr that the routine load replays progress metadata by first
obtaining the set default offset and then pulling metadata from meta
service to update the local value, if the metadata pulled from meta
service is not larger than the set default offset, the correct value
cannot be assigned to memory.
To solve this problem, pulling metadata from meta service when restart,
determine whether to obtain default offset from Kafka based on the
pulled value.
---
.../java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java | 5 +++++
.../java/org/apache/doris/load/routineload/RoutineLoadScheduler.java | 5 -----
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 8cb0898eda8..0376cd3f366 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -366,6 +366,11 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
@Override
protected void unprotectUpdateProgress() throws UserException {
+ // For cloud mode, should update cloud progress from meta service,
+ // then update progress with default offset from Kafka if necessary.
+ if (Config.isCloudMode()) {
+ updateCloudProgress();
+ }
updateNewPartitionProgress();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index 51029c3d18b..023cd239e09 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -18,7 +18,6 @@
package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Env;
-import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
@@ -79,10 +78,6 @@ public class RoutineLoadScheduler extends MasterDaemon {
RoutineLoadJob.JobState errorJobState = null;
UserException userException = null;
try {
- if (Config.isCloudMode()) {
- routineLoadJob.updateCloudProgress();
- }
-
routineLoadJob.prepare();
// judge nums of tasks more than max concurrent tasks of
cluster
int desiredConcurrentTaskNum =
routineLoadJob.calculateCurrentConcurrentTaskNum();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]