This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0d0f787d3e9901192a403d5eb61ea58c8ea17a8e
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Fri Mar 22 16:27:33 2024 +0800

    [fix](routine-load) fix timeout backoff can not work (#32661)
---
 .../doris/load/routineload/KafkaRoutineLoadJob.java    |  2 +-
 .../apache/doris/load/routineload/KafkaTaskInfo.java   | 18 +++++++++++++++---
 .../doris/load/routineload/RoutineLoadTaskInfo.java    | 14 ++++++++------
 .../load/routineload/KafkaRoutineLoadJobTest.java      |  2 +-
 .../load/routineload/RoutineLoadTaskSchedulerTest.java |  2 +-
 .../doris/transaction/GlobalTransactionMgrTest.java    |  4 ++--
 6 files changed, 28 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 24929520ecf..bdcfb9e4a27 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -227,7 +227,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                                 ((KafkaProgress) 
progress).getOffsetByPartition(kafkaPartition));
                     }
                     KafkaTaskInfo kafkaTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), id,
-                            maxBatchIntervalS * 2 * 1000, taskKafkaProgress, 
isMultiTable());
+                            maxBatchIntervalS * 2 * 1000, 0, 
taskKafkaProgress, isMultiTable());
                     routineLoadTaskInfoList.add(kafkaTaskInfo);
                     result.add(kafkaTaskInfo);
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index d8b79d9bdce..86a084764ea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -48,14 +48,16 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
     private Map<Integer, Long> partitionIdToOffset;
 
     public KafkaTaskInfo(UUID id, long jobId,
-                         long timeoutMs, Map<Integer, Long> 
partitionIdToOffset, boolean isMultiTable) {
-        super(id, jobId, timeoutMs, isMultiTable);
+                        long timeoutMs, int timeoutBackOffCount,
+                        Map<Integer, Long> partitionIdToOffset, boolean 
isMultiTable) {
+        super(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable);
         this.partitionIdToOffset = partitionIdToOffset;
     }
 
     public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> 
partitionIdToOffset, boolean isMultiTable) {
         super(UUID.randomUUID(), kafkaTaskInfo.getJobId(),
-                kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), 
isMultiTable);
+                kafkaTaskInfo.getTimeoutMs(), 
kafkaTaskInfo.getTimeoutBackOffCount(),
+                kafkaTaskInfo.getBeId(), isMultiTable);
         this.partitionIdToOffset = partitionIdToOffset;
     }
 
@@ -131,6 +133,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         TExecPlanFragmentParams tExecPlanFragmentParams = 
routineLoadJob.plan(loadId, txnId);
         TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
         tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+        // it needs update timeout to make task timeout backoff work
+        long timeoutS = this.getTimeoutMs() / 1000;
+        
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
+        tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) 
timeoutS);
+        tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) 
timeoutS);
 
         long wgId = routineLoadJob.getWorkloadId();
         List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
@@ -153,6 +160,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         TPipelineFragmentParams tExecPlanFragmentParams = 
routineLoadJob.planForPipeline(loadId, txnId);
         TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
         tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+        // it needs update timeout to make task timeout backoff work
+        long timeoutS = this.getTimeoutMs() / 1000;
+        
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
+        tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) 
timeoutS);
+        tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) 
timeoutS);
 
         long wgId = routineLoadJob.getWorkloadId();
         List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 93ec573717c..3c6779769c0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -76,17 +76,19 @@ public abstract class RoutineLoadTaskInfo {
     // so that user or other logic can know the status of the corresponding 
txn.
     protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
 
-    public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean 
isMultiTable) {
+    public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs,
+                        int timeoutBackOffCount, boolean isMultiTable) {
         this.id = id;
         this.jobId = jobId;
         this.createTimeMs = System.currentTimeMillis();
         this.timeoutMs = timeoutMs;
+        this.timeoutBackOffCount = timeoutBackOffCount;
         this.isMultiTable = isMultiTable;
     }
 
-    public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, long 
previousBeId,
-                               boolean isMultiTable) {
-        this(id, jobId, timeoutMs, isMultiTable);
+    public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, int 
timeoutBackOffCount,
+                        long previousBeId, boolean isMultiTable) {
+        this(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable);
         this.previousBeId = previousBeId;
     }
 
@@ -149,8 +151,8 @@ public abstract class RoutineLoadTaskInfo {
         }
 
         if (isRunning() && System.currentTimeMillis() - executeStartTimeMs > 
timeoutMs) {
-            LOG.info("task {} is timeout. start: {}, timeout: {}", 
DebugUtil.printId(id),
-                    executeStartTimeMs, timeoutMs);
+            LOG.info("task {} is timeout. start: {}, timeout: {}, 
timeoutBackOffCount: {}", DebugUtil.printId(id),
+                    executeStartTimeMs, timeoutMs, timeoutBackOffCount);
             return true;
         }
         return false;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 79786fe8be9..ba3c9ee626a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -221,7 +221,7 @@ public class KafkaRoutineLoadJobTest {
         Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
         partitionIdsToOffset.put(100, 0L);
         KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L,
-                maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
+                maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false);
         kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - 
maxBatchIntervalS * 2 * 1000 - 1);
         routineLoadTaskInfoList.add(kafkaTaskInfo);
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index c1f5731329f..c5bf509464e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -68,7 +68,7 @@ public class RoutineLoadTaskSchedulerTest {
         Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", 
partitionIdToOffset);
 
         LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = 
new LinkedBlockingDeque<>();
-        KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000,
+        KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000, 0,
                 partitionIdToOffset, false);
         routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1);
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 0203d614e00..d9e2088e599 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -318,7 +318,7 @@ public class GlobalTransactionMgrTest {
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = 
Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
-        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
+        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0,
                 partitionIdToOffset, false);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
@@ -390,7 +390,7 @@ public class GlobalTransactionMgrTest {
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = 
Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
-        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
+        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0,
                 partitionIdToOffset, false);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to