This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8514fabe16c Revert "[fix](routine-load) fix timeout backoff can not 
work (#32661)" (#32709)
8514fabe16c is described below

commit 8514fabe16ca1de9e56ce30b02a33af7dc312d63
Author: Dongyang Li <[email protected]>
AuthorDate: Fri Mar 22 22:27:37 2024 +0800

    Revert "[fix](routine-load) fix timeout backoff can not work (#32661)" 
(#32709)
    
    This reverts commit 0d0f787d3e9901192a403d5eb61ea58c8ea17a8e.
    
    Co-authored-by: stephen <[email protected]>
---
 .../doris/load/routineload/KafkaRoutineLoadJob.java    |  2 +-
 .../apache/doris/load/routineload/KafkaTaskInfo.java   | 18 +++---------------
 .../doris/load/routineload/RoutineLoadTaskInfo.java    | 14 ++++++--------
 .../load/routineload/KafkaRoutineLoadJobTest.java      |  2 +-
 .../load/routineload/RoutineLoadTaskSchedulerTest.java |  2 +-
 .../doris/transaction/GlobalTransactionMgrTest.java    |  4 ++--
 6 files changed, 14 insertions(+), 28 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index bdcfb9e4a27..24929520ecf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -227,7 +227,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                                 ((KafkaProgress) 
progress).getOffsetByPartition(kafkaPartition));
                     }
                     KafkaTaskInfo kafkaTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), id,
-                            maxBatchIntervalS * 2 * 1000, 0, 
taskKafkaProgress, isMultiTable());
+                            maxBatchIntervalS * 2 * 1000, taskKafkaProgress, 
isMultiTable());
                     routineLoadTaskInfoList.add(kafkaTaskInfo);
                     result.add(kafkaTaskInfo);
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 86a084764ea..d8b79d9bdce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -48,16 +48,14 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
     private Map<Integer, Long> partitionIdToOffset;
 
     public KafkaTaskInfo(UUID id, long jobId,
-                        long timeoutMs, int timeoutBackOffCount,
-                        Map<Integer, Long> partitionIdToOffset, boolean 
isMultiTable) {
-        super(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable);
+                         long timeoutMs, Map<Integer, Long> 
partitionIdToOffset, boolean isMultiTable) {
+        super(id, jobId, timeoutMs, isMultiTable);
         this.partitionIdToOffset = partitionIdToOffset;
     }
 
     public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> 
partitionIdToOffset, boolean isMultiTable) {
         super(UUID.randomUUID(), kafkaTaskInfo.getJobId(),
-                kafkaTaskInfo.getTimeoutMs(), 
kafkaTaskInfo.getTimeoutBackOffCount(),
-                kafkaTaskInfo.getBeId(), isMultiTable);
+                kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), 
isMultiTable);
         this.partitionIdToOffset = partitionIdToOffset;
     }
 
@@ -133,11 +131,6 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         TExecPlanFragmentParams tExecPlanFragmentParams = 
routineLoadJob.plan(loadId, txnId);
         TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
         tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
-        // it needs update timeout to make task timeout backoff work
-        long timeoutS = this.getTimeoutMs() / 1000;
-        
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
-        tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) 
timeoutS);
-        tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) 
timeoutS);
 
         long wgId = routineLoadJob.getWorkloadId();
         List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
@@ -160,11 +153,6 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         TPipelineFragmentParams tExecPlanFragmentParams = 
routineLoadJob.planForPipeline(loadId, txnId);
         TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
         tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
-        // it needs update timeout to make task timeout backoff work
-        long timeoutS = this.getTimeoutMs() / 1000;
-        
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
-        tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) 
timeoutS);
-        tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) 
timeoutS);
 
         long wgId = routineLoadJob.getWorkloadId();
         List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 3c6779769c0..93ec573717c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -76,19 +76,17 @@ public abstract class RoutineLoadTaskInfo {
     // so that user or other logic can know the status of the corresponding 
txn.
     protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
 
-    public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs,
-                        int timeoutBackOffCount, boolean isMultiTable) {
+    public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean 
isMultiTable) {
         this.id = id;
         this.jobId = jobId;
         this.createTimeMs = System.currentTimeMillis();
         this.timeoutMs = timeoutMs;
-        this.timeoutBackOffCount = timeoutBackOffCount;
         this.isMultiTable = isMultiTable;
     }
 
-    public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, int 
timeoutBackOffCount,
-                        long previousBeId, boolean isMultiTable) {
-        this(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable);
+    public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, long 
previousBeId,
+                               boolean isMultiTable) {
+        this(id, jobId, timeoutMs, isMultiTable);
         this.previousBeId = previousBeId;
     }
 
@@ -151,8 +149,8 @@ public abstract class RoutineLoadTaskInfo {
         }
 
         if (isRunning() && System.currentTimeMillis() - executeStartTimeMs > 
timeoutMs) {
-            LOG.info("task {} is timeout. start: {}, timeout: {}, 
timeoutBackOffCount: {}", DebugUtil.printId(id),
-                    executeStartTimeMs, timeoutMs, timeoutBackOffCount);
+            LOG.info("task {} is timeout. start: {}, timeout: {}", 
DebugUtil.printId(id),
+                    executeStartTimeMs, timeoutMs);
             return true;
         }
         return false;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index ba3c9ee626a..79786fe8be9 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -221,7 +221,7 @@ public class KafkaRoutineLoadJobTest {
         Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
         partitionIdsToOffset.put(100, 0L);
         KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L,
-                maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false);
+                maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
         kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - 
maxBatchIntervalS * 2 * 1000 - 1);
         routineLoadTaskInfoList.add(kafkaTaskInfo);
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index c5bf509464e..c1f5731329f 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -68,7 +68,7 @@ public class RoutineLoadTaskSchedulerTest {
         Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", 
partitionIdToOffset);
 
         LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = 
new LinkedBlockingDeque<>();
-        KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000, 0,
+        KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000,
                 partitionIdToOffset, false);
         routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1);
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index d9e2088e599..0203d614e00 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -318,7 +318,7 @@ public class GlobalTransactionMgrTest {
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = 
Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
-        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0,
+        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
                 partitionIdToOffset, false);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
@@ -390,7 +390,7 @@ public class GlobalTransactionMgrTest {
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = 
Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
-        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0,
+        KafkaTaskInfo routineLoadTaskInfo = new 
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
                 partitionIdToOffset, false);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to