This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0d0f787d3e9901192a403d5eb61ea58c8ea17a8e Author: HHoflittlefish777 <[email protected]> AuthorDate: Fri Mar 22 16:27:33 2024 +0800 [fix](routine-load) fix timeout backoff can not work (#32661) --- .../doris/load/routineload/KafkaRoutineLoadJob.java | 2 +- .../apache/doris/load/routineload/KafkaTaskInfo.java | 18 +++++++++++++++--- .../doris/load/routineload/RoutineLoadTaskInfo.java | 14 ++++++++------ .../load/routineload/KafkaRoutineLoadJobTest.java | 2 +- .../load/routineload/RoutineLoadTaskSchedulerTest.java | 2 +- .../doris/transaction/GlobalTransactionMgrTest.java | 4 ++-- 6 files changed, 28 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 24929520ecf..bdcfb9e4a27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -227,7 +227,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { ((KafkaProgress) progress).getOffsetByPartition(kafkaPartition)); } KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, - maxBatchIntervalS * 2 * 1000, taskKafkaProgress, isMultiTable()); + maxBatchIntervalS * 2 * 1000, 0, taskKafkaProgress, isMultiTable()); routineLoadTaskInfoList.add(kafkaTaskInfo); result.add(kafkaTaskInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index d8b79d9bdce..86a084764ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -48,14 +48,16 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private Map<Integer, Long> partitionIdToOffset; public KafkaTaskInfo(UUID id, long jobId, - long timeoutMs, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) { - super(id, jobId, timeoutMs, isMultiTable); + long timeoutMs, int timeoutBackOffCount, + Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) { + super(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable); this.partitionIdToOffset = partitionIdToOffset; } public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) { super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), - kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), isMultiTable); + kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getTimeoutBackOffCount(), + kafkaTaskInfo.getBeId(), isMultiTable); this.partitionIdToOffset = partitionIdToOffset; } @@ -131,6 +133,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId); + // it needs update timeout to make task timeout backoff work + long timeoutS = this.getTimeoutMs() / 1000; + tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS); + tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS); + tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS); long wgId = routineLoadJob.getWorkloadId(); List<TPipelineWorkloadGroup> tWgList = new ArrayList<>(); @@ -153,6 +160,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId); + // it needs update timeout to make task timeout backoff work + long timeoutS = this.getTimeoutMs() / 1000; + tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS); + tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS); + tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS); long wgId = routineLoadJob.getWorkloadId(); List<TPipelineWorkloadGroup> tWgList = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 93ec573717c..3c6779769c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -76,17 +76,19 @@ public abstract class RoutineLoadTaskInfo { // so that user or other logic can know the status of the corresponding txn. protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; - public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean isMultiTable) { + public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, + int timeoutBackOffCount, boolean isMultiTable) { this.id = id; this.jobId = jobId; this.createTimeMs = System.currentTimeMillis(); this.timeoutMs = timeoutMs; + this.timeoutBackOffCount = timeoutBackOffCount; this.isMultiTable = isMultiTable; } - public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, long previousBeId, - boolean isMultiTable) { - this(id, jobId, timeoutMs, isMultiTable); + public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, int timeoutBackOffCount, + long previousBeId, boolean isMultiTable) { + this(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable); this.previousBeId = previousBeId; } @@ -149,8 +151,8 @@ public abstract class RoutineLoadTaskInfo { } if (isRunning() && System.currentTimeMillis() - executeStartTimeMs > timeoutMs) { - LOG.info("task {} is timeout. start: {}, timeout: {}", DebugUtil.printId(id), - executeStartTimeMs, timeoutMs); + LOG.info("task {} is timeout. start: {}, timeout: {}, timeoutBackOffCount: {}", DebugUtil.printId(id), + executeStartTimeMs, timeoutMs, timeoutBackOffCount); return true; } return false; diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 79786fe8be9..ba3c9ee626a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -221,7 +221,7 @@ public class KafkaRoutineLoadJobTest { Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap(); partitionIdsToOffset.put(100, 0L); KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, - maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false); + maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false); kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - maxBatchIntervalS * 2 * 1000 - 1); routineLoadTaskInfoList.add(kafkaTaskInfo); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index c1f5731329f..c5bf509464e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -68,7 +68,7 @@ public class RoutineLoadTaskSchedulerTest { Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset); LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = new LinkedBlockingDeque<>(); - KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000, + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000, 0, partitionIdToOffset, false); routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 0203d614e00..d9e2088e599 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -318,7 +318,7 @@ public class GlobalTransactionMgrTest { List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); Map<Integer, Long> partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L); - KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, + KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0, partitionIdToOffset, false); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); @@ -390,7 +390,7 @@ public class GlobalTransactionMgrTest { List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); Map<Integer, Long> partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L); - KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, + KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0, partitionIdToOffset, false); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
