This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new b2239c672e8 [branch-2.0](routine-load) self-adaption backoff timeout 
(#32591)
b2239c672e8 is described below

commit b2239c672e83b35b55435ea62b870a1f610b31e6
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Mar 21 21:35:38 2024 +0800

    [branch-2.0](routine-load) self-adaption backoff timeout (#32591)
---
 .../org/apache/doris/common/InternalErrorCode.java |  3 +-
 .../load/routineload/KafkaRoutineLoadJob.java      |  2 +-
 .../doris/load/routineload/KafkaTaskInfo.java      | 18 ++++++++--
 .../doris/load/routineload/RoutineLoadJob.java     | 13 +++++++
 .../load/routineload/RoutineLoadTaskInfo.java      | 41 +++++++++++++++++++---
 .../load/routineload/KafkaRoutineLoadJobTest.java  |  2 +-
 .../routineload/RoutineLoadTaskSchedulerTest.java  |  2 +-
 .../transaction/GlobalTransactionMgrTest.java      |  4 +--
 8 files changed, 72 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
index b871fd198cb..214f74a38f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
@@ -35,7 +35,8 @@ public enum InternalErrorCode {
     TOO_MANY_FAILURE_ROWS_ERR(102),
     CREATE_TASKS_ERR(103),
     TASKS_ABORT_ERR(104),
-    CANNOT_RESUME_ERR(105);
+    CANNOT_RESUME_ERR(105),
+    TIMEOUT_TOO_MUCH(106);
 
     private long errCode;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 6e6dba068e9..bc1f1428a91 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -227,7 +227,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                                 ((KafkaProgress) 
progress).getOffsetByPartition(kafkaPartition));
                     }
                     KafkaTaskInfo kafkaTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), id, clusterName,
-                            maxBatchIntervalS * 2 * 1000, taskKafkaProgress, 
isMultiTable());
+                            maxBatchIntervalS * 2 * 1000, 0, 
taskKafkaProgress, isMultiTable());
                     routineLoadTaskInfoList.add(kafkaTaskInfo);
                     result.add(kafkaTaskInfo);
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index d6f0a287057..de1cf5096d2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -47,14 +47,16 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
     private Map<Integer, Long> partitionIdToOffset;
 
     public KafkaTaskInfo(UUID id, long jobId, String clusterName,
-                         long timeoutMs, Map<Integer, Long> 
partitionIdToOffset, boolean isMultiTable) {
-        super(id, jobId, clusterName, timeoutMs, isMultiTable);
+                        long timeoutMs, int timeoutBackOffCount,
+                        Map<Integer, Long> partitionIdToOffset, boolean 
isMultiTable) {
+        super(id, jobId, clusterName, timeoutMs, timeoutBackOffCount, 
isMultiTable);
         this.partitionIdToOffset = partitionIdToOffset;
     }
 
     public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> 
partitionIdToOffset, boolean isMultiTable) {
         super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), 
kafkaTaskInfo.getClusterName(),
-                kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), 
isMultiTable);
+                kafkaTaskInfo.getTimeoutMs(), 
kafkaTaskInfo.getTimeoutBackOffCount(),
+                kafkaTaskInfo.getBeId(), isMultiTable);
         this.partitionIdToOffset = partitionIdToOffset;
     }
 
@@ -129,6 +131,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         TExecPlanFragmentParams tExecPlanFragmentParams = 
routineLoadJob.plan(loadId, txnId);
         TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
         tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+        // it need update timeout to make task timeout backoff work
+        long timeoutS = this.getTimeoutMs() / 1000;
+        
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
+        tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) 
timeoutS);
+        tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) 
timeoutS);
         return tExecPlanFragmentParams;
     }
 
@@ -138,6 +145,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         TPipelineFragmentParams tExecPlanFragmentParams = 
routineLoadJob.planForPipeline(loadId, txnId);
         TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
         tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+        // it need update timeout to make task timeout backoff work
+        long timeoutS = this.getTimeoutMs() / 1000;
+        
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
+        tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) 
timeoutS);
+        tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) 
timeoutS);
         return tExecPlanFragmentParams;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index a55f0a02124..15f3e8c1270 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -709,6 +709,18 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                     // and after renew, the previous task is removed from 
routineLoadTaskInfoList,
                     // so task can no longer be committed successfully.
                     // the already committed task will not be handled here.
+                    int timeoutBackOffCount = 
routineLoadTaskInfo.getTimeoutBackOffCount();
+                    if (timeoutBackOffCount > 
RoutineLoadTaskInfo.MAX_TIMEOUT_BACK_OFF_COUNT) {
+                        try {
+                            updateState(JobState.PAUSED, new 
ErrorReason(InternalErrorCode.TIMEOUT_TOO_MUCH,
+                                        "task " + routineLoadTaskInfo.getId() 
+ " timeout too much"), false);
+                        } catch (UserException e) {
+                            LOG.warn("update job state to pause failed", e);
+                        }
+                        return;
+                    }
+                    
routineLoadTaskInfo.setTimeoutBackOffCount(timeoutBackOffCount + 1);
+                    
routineLoadTaskInfo.setTimeoutMs((routineLoadTaskInfo.getTimeoutMs() << 1));
                     RoutineLoadTaskInfo newTask = 
unprotectRenewTask(routineLoadTaskInfo);
                     
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask);
                 }
@@ -1212,6 +1224,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, 
txnStatusChangeReason)) {
             // step2: update job progress
             updateProgress(rlTaskTxnCommitAttachment);
+            routineLoadTaskInfo.selfAdaptTimeout(rlTaskTxnCommitAttachment);
         }
 
         if (rlTaskTxnCommitAttachment != null && 
!Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 7a5312b2c8f..10d57e66d67 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -73,22 +73,28 @@ public abstract class RoutineLoadTaskInfo {
 
     protected boolean isMultiTable = false;
 
+    protected static final int MAX_TIMEOUT_BACK_OFF_COUNT = 3;
+    protected int timeoutBackOffCount = 0;
+
     // this status will be set when corresponding transaction's status is 
changed.
     // so that user or other logic can know the status of the corresponding 
txn.
     protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
 
-    public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long 
timeoutMs, boolean isMultiTable) {
+
+    public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long 
timeoutMs,
+                        int timeoutBackOffCount, boolean isMultiTable) {
         this.id = id;
         this.jobId = jobId;
         this.clusterName = clusterName;
         this.createTimeMs = System.currentTimeMillis();
         this.timeoutMs = timeoutMs;
+        this.timeoutBackOffCount = timeoutBackOffCount;
         this.isMultiTable = isMultiTable;
     }
 
-    public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long 
timeoutMs, long previousBeId,
-                               boolean isMultiTable) {
-        this(id, jobId, clusterName, timeoutMs, isMultiTable);
+    public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long 
timeoutMs,
+                        int timeoutBackOffCount, long previousBeId, boolean 
isMultiTable) {
+        this(id, jobId, clusterName, timeoutMs, timeoutBackOffCount, 
isMultiTable);
         this.previousBeId = previousBeId;
     }
 
@@ -136,6 +142,10 @@ public abstract class RoutineLoadTaskInfo {
         this.lastScheduledTime = lastScheduledTime;
     }
 
+    public void setTimeoutMs(long timeoutMs) {
+        this.timeoutMs = timeoutMs;
+    }
+
     public long getTimeoutMs() {
         return timeoutMs;
     }
@@ -148,6 +158,14 @@ public abstract class RoutineLoadTaskInfo {
         return txnStatus;
     }
 
+    public void setTimeoutBackOffCount(int timeoutBackOffCount) {
+        this.timeoutBackOffCount = timeoutBackOffCount;
+    }
+
+    public int getTimeoutBackOffCount() {
+        return timeoutBackOffCount;
+    }
+
     public boolean isTimeout() {
         if (txnStatus == TransactionStatus.COMMITTED || txnStatus == 
TransactionStatus.VISIBLE) {
             // the corresponding txn is already finished, this task can not be 
treated as timeout.
@@ -162,6 +180,21 @@ public abstract class RoutineLoadTaskInfo {
         return false;
     }
 
+    public void selfAdaptTimeout(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment) {
+        long taskExecutionTime = 
rlTaskTxnCommitAttachment.getTaskExecutionTimeMs();
+        long timeoutMs = this.timeoutMs;
+
+        while (this.timeoutBackOffCount > 0) {
+            timeoutMs = timeoutMs >> 1;
+            if (timeoutMs <= taskExecutionTime) {
+                this.timeoutMs = timeoutMs << 1;
+                return;
+            }
+            this.timeoutBackOffCount--;
+        }
+        this.timeoutMs = timeoutMs;
+    }
+
     abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;
 
     // begin the txn of this task
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 57ded401bd9..73213fc7ffc 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -224,7 +224,7 @@ public class KafkaRoutineLoadJobTest {
         Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
         partitionIdsToOffset.put(100, 0L);
         KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, 
"default_cluster",
-                maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
+                maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false);
         kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - 
maxBatchIntervalS * 2 * 1000 - 1);
         routineLoadTaskInfoList.add(kafkaTaskInfo);
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index e0fdd92f737..02db47538fb 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -69,7 +69,7 @@ public class RoutineLoadTaskSchedulerTest {
         Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", 
partitionIdToOffset);
 
         Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = 
Queues.newLinkedBlockingQueue();
-        KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 
1L, "default_cluster", 20000,
+        KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 
1L, "default_cluster", 20000, 0,
                 partitionIdToOffset, false);
         routineLoadTaskInfoQueue.add(routineLoadTaskInfo1);
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index a819c4f0301..414f5cf03c4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -318,7 +318,7 @@ public class GlobalTransactionMgrTest {
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = 
Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
-        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, "default_cluster", 20000,
+        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, "default_cluster", 20000, 0,
                 partitionIdToOffset, false);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
@@ -390,7 +390,7 @@ public class GlobalTransactionMgrTest {
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = 
Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
-        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", 20000,
+        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", 20000, 0,
                 partitionIdToOffset, false);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to