This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new b2239c672e8 [branch-2.0](routine-load) self-adaption backoff timeout
(#32591)
b2239c672e8 is described below
commit b2239c672e83b35b55435ea62b870a1f610b31e6
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Mar 21 21:35:38 2024 +0800
[branch-2.0](routine-load) self-adaption backoff timeout (#32591)
---
.../org/apache/doris/common/InternalErrorCode.java | 3 +-
.../load/routineload/KafkaRoutineLoadJob.java | 2 +-
.../doris/load/routineload/KafkaTaskInfo.java | 18 ++++++++--
.../doris/load/routineload/RoutineLoadJob.java | 13 +++++++
.../load/routineload/RoutineLoadTaskInfo.java | 41 +++++++++++++++++++---
.../load/routineload/KafkaRoutineLoadJobTest.java | 2 +-
.../routineload/RoutineLoadTaskSchedulerTest.java | 2 +-
.../transaction/GlobalTransactionMgrTest.java | 4 +--
8 files changed, 72 insertions(+), 13 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
index b871fd198cb..214f74a38f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
@@ -35,7 +35,8 @@ public enum InternalErrorCode {
TOO_MANY_FAILURE_ROWS_ERR(102),
CREATE_TASKS_ERR(103),
TASKS_ABORT_ERR(104),
- CANNOT_RESUME_ERR(105);
+ CANNOT_RESUME_ERR(105),
+ TIMEOUT_TOO_MUCH(106);
private long errCode;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 6e6dba068e9..bc1f1428a91 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -227,7 +227,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
((KafkaProgress)
progress).getOffsetByPartition(kafkaPartition));
}
KafkaTaskInfo kafkaTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), id, clusterName,
- maxBatchIntervalS * 2 * 1000, taskKafkaProgress,
isMultiTable());
+ maxBatchIntervalS * 2 * 1000, 0,
taskKafkaProgress, isMultiTable());
routineLoadTaskInfoList.add(kafkaTaskInfo);
result.add(kafkaTaskInfo);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index d6f0a287057..de1cf5096d2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -47,14 +47,16 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private Map<Integer, Long> partitionIdToOffset;
public KafkaTaskInfo(UUID id, long jobId, String clusterName,
- long timeoutMs, Map<Integer, Long>
partitionIdToOffset, boolean isMultiTable) {
- super(id, jobId, clusterName, timeoutMs, isMultiTable);
+ long timeoutMs, int timeoutBackOffCount,
+ Map<Integer, Long> partitionIdToOffset, boolean
isMultiTable) {
+ super(id, jobId, clusterName, timeoutMs, timeoutBackOffCount,
isMultiTable);
this.partitionIdToOffset = partitionIdToOffset;
}
public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long>
partitionIdToOffset, boolean isMultiTable) {
super(UUID.randomUUID(), kafkaTaskInfo.getJobId(),
kafkaTaskInfo.getClusterName(),
- kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(),
isMultiTable);
+ kafkaTaskInfo.getTimeoutMs(),
kafkaTaskInfo.getTimeoutBackOffCount(),
+ kafkaTaskInfo.getBeId(), isMultiTable);
this.partitionIdToOffset = partitionIdToOffset;
}
@@ -129,6 +131,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
TExecPlanFragmentParams tExecPlanFragmentParams =
routineLoadJob.plan(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+ // it need update timeout to make task timeout backoff work
+ long timeoutS = this.getTimeoutMs() / 1000;
+
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
+ tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int)
timeoutS);
+ tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int)
timeoutS);
return tExecPlanFragmentParams;
}
@@ -138,6 +145,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
TPipelineFragmentParams tExecPlanFragmentParams =
routineLoadJob.planForPipeline(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+ // it need update timeout to make task timeout backoff work
+ long timeoutS = this.getTimeoutMs() / 1000;
+
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
+ tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int)
timeoutS);
+ tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int)
timeoutS);
return tExecPlanFragmentParams;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index a55f0a02124..15f3e8c1270 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -709,6 +709,18 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
// and after renew, the previous task is removed from
routineLoadTaskInfoList,
// so task can no longer be committed successfully.
// the already committed task will not be handled here.
+ int timeoutBackOffCount =
routineLoadTaskInfo.getTimeoutBackOffCount();
+ if (timeoutBackOffCount >
RoutineLoadTaskInfo.MAX_TIMEOUT_BACK_OFF_COUNT) {
+ try {
+ updateState(JobState.PAUSED, new
ErrorReason(InternalErrorCode.TIMEOUT_TOO_MUCH,
+ "task " + routineLoadTaskInfo.getId()
+ " timeout too much"), false);
+ } catch (UserException e) {
+ LOG.warn("update job state to pause failed", e);
+ }
+ return;
+ }
+
routineLoadTaskInfo.setTimeoutBackOffCount(timeoutBackOffCount + 1);
+
routineLoadTaskInfo.setTimeoutMs((routineLoadTaskInfo.getTimeoutMs() << 1));
RoutineLoadTaskInfo newTask =
unprotectRenewTask(routineLoadTaskInfo);
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask);
}
@@ -1212,6 +1224,7 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
} else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState,
txnStatusChangeReason)) {
// step2: update job progress
updateProgress(rlTaskTxnCommitAttachment);
+ routineLoadTaskInfo.selfAdaptTimeout(rlTaskTxnCommitAttachment);
}
if (rlTaskTxnCommitAttachment != null &&
!Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 7a5312b2c8f..10d57e66d67 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -73,22 +73,28 @@ public abstract class RoutineLoadTaskInfo {
protected boolean isMultiTable = false;
+ protected static final int MAX_TIMEOUT_BACK_OFF_COUNT = 3;
+ protected int timeoutBackOffCount = 0;
+
// this status will be set when corresponding transaction's status is
changed.
// so that user or other logic can know the status of the corresponding
txn.
protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
- public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long
timeoutMs, boolean isMultiTable) {
+
+ public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long
timeoutMs,
+ int timeoutBackOffCount, boolean isMultiTable) {
this.id = id;
this.jobId = jobId;
this.clusterName = clusterName;
this.createTimeMs = System.currentTimeMillis();
this.timeoutMs = timeoutMs;
+ this.timeoutBackOffCount = timeoutBackOffCount;
this.isMultiTable = isMultiTable;
}
- public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long
timeoutMs, long previousBeId,
- boolean isMultiTable) {
- this(id, jobId, clusterName, timeoutMs, isMultiTable);
+ public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long
timeoutMs,
+ int timeoutBackOffCount, long previousBeId, boolean
isMultiTable) {
+ this(id, jobId, clusterName, timeoutMs, timeoutBackOffCount,
isMultiTable);
this.previousBeId = previousBeId;
}
@@ -136,6 +142,10 @@ public abstract class RoutineLoadTaskInfo {
this.lastScheduledTime = lastScheduledTime;
}
+ public void setTimeoutMs(long timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ }
+
public long getTimeoutMs() {
return timeoutMs;
}
@@ -148,6 +158,14 @@ public abstract class RoutineLoadTaskInfo {
return txnStatus;
}
+ public void setTimeoutBackOffCount(int timeoutBackOffCount) {
+ this.timeoutBackOffCount = timeoutBackOffCount;
+ }
+
+ public int getTimeoutBackOffCount() {
+ return timeoutBackOffCount;
+ }
+
public boolean isTimeout() {
if (txnStatus == TransactionStatus.COMMITTED || txnStatus ==
TransactionStatus.VISIBLE) {
// the corresponding txn is already finished, this task can not be
treated as timeout.
@@ -162,6 +180,21 @@ public abstract class RoutineLoadTaskInfo {
return false;
}
+ public void selfAdaptTimeout(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment) {
+ long taskExecutionTime =
rlTaskTxnCommitAttachment.getTaskExecutionTimeMs();
+ long timeoutMs = this.timeoutMs;
+
+ while (this.timeoutBackOffCount > 0) {
+ timeoutMs = timeoutMs >> 1;
+ if (timeoutMs <= taskExecutionTime) {
+ this.timeoutMs = timeoutMs << 1;
+ return;
+ }
+ this.timeoutBackOffCount--;
+ }
+ this.timeoutMs = timeoutMs;
+ }
+
abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;
// begin the txn of this task
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 57ded401bd9..73213fc7ffc 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -224,7 +224,7 @@ public class KafkaRoutineLoadJobTest {
Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
partitionIdsToOffset.put(100, 0L);
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L,
"default_cluster",
- maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
+ maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false);
kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() -
maxBatchIntervalS * 2 * 1000 - 1);
routineLoadTaskInfoList.add(kafkaTaskInfo);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index e0fdd92f737..02db47538fb 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -69,7 +69,7 @@ public class RoutineLoadTaskSchedulerTest {
Deencapsulation.setField(kafkaProgress, "partitionIdToOffset",
partitionIdToOffset);
Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue =
Queues.newLinkedBlockingQueue();
- KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1),
1L, "default_cluster", 20000,
+ KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1),
1L, "default_cluster", 20000, 0,
partitionIdToOffset, false);
routineLoadTaskInfoQueue.add(routineLoadTaskInfo1);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index a819c4f0301..414f5cf03c4 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -318,7 +318,7 @@ public class GlobalTransactionMgrTest {
List<RoutineLoadTaskInfo> routineLoadTaskInfoList =
Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
- KafkaTaskInfo routineLoadTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), 1L, "default_cluster", 20000,
+ KafkaTaskInfo routineLoadTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), 1L, "default_cluster", 20000, 0,
partitionIdToOffset, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
@@ -390,7 +390,7 @@ public class GlobalTransactionMgrTest {
List<RoutineLoadTaskInfo> routineLoadTaskInfoList =
Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
- KafkaTaskInfo routineLoadTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", 20000,
+ KafkaTaskInfo routineLoadTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", 20000, 0,
partitionIdToOffset, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]