This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 8514fabe16c Revert "[fix](routine-load) fix timeout backoff can not
work (#32661)" (#32709)
8514fabe16c is described below
commit 8514fabe16ca1de9e56ce30b02a33af7dc312d63
Author: Dongyang Li <[email protected]>
AuthorDate: Fri Mar 22 22:27:37 2024 +0800
Revert "[fix](routine-load) fix timeout backoff can not work (#32661)"
(#32709)
This reverts commit 0d0f787d3e9901192a403d5eb61ea58c8ea17a8e.
Co-authored-by: stephen <[email protected]>
---
.../doris/load/routineload/KafkaRoutineLoadJob.java | 2 +-
.../apache/doris/load/routineload/KafkaTaskInfo.java | 18 +++---------------
.../doris/load/routineload/RoutineLoadTaskInfo.java | 14 ++++++--------
.../load/routineload/KafkaRoutineLoadJobTest.java | 2 +-
.../load/routineload/RoutineLoadTaskSchedulerTest.java | 2 +-
.../doris/transaction/GlobalTransactionMgrTest.java | 4 ++--
6 files changed, 14 insertions(+), 28 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index bdcfb9e4a27..24929520ecf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -227,7 +227,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
((KafkaProgress)
progress).getOffsetByPartition(kafkaPartition));
}
KafkaTaskInfo kafkaTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), id,
- maxBatchIntervalS * 2 * 1000, 0,
taskKafkaProgress, isMultiTable());
+ maxBatchIntervalS * 2 * 1000, taskKafkaProgress,
isMultiTable());
routineLoadTaskInfoList.add(kafkaTaskInfo);
result.add(kafkaTaskInfo);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 86a084764ea..d8b79d9bdce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -48,16 +48,14 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private Map<Integer, Long> partitionIdToOffset;
public KafkaTaskInfo(UUID id, long jobId,
- long timeoutMs, int timeoutBackOffCount,
- Map<Integer, Long> partitionIdToOffset, boolean
isMultiTable) {
- super(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable);
+ long timeoutMs, Map<Integer, Long>
partitionIdToOffset, boolean isMultiTable) {
+ super(id, jobId, timeoutMs, isMultiTable);
this.partitionIdToOffset = partitionIdToOffset;
}
public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long>
partitionIdToOffset, boolean isMultiTable) {
super(UUID.randomUUID(), kafkaTaskInfo.getJobId(),
- kafkaTaskInfo.getTimeoutMs(),
kafkaTaskInfo.getTimeoutBackOffCount(),
- kafkaTaskInfo.getBeId(), isMultiTable);
+ kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(),
isMultiTable);
this.partitionIdToOffset = partitionIdToOffset;
}
@@ -133,11 +131,6 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
TExecPlanFragmentParams tExecPlanFragmentParams =
routineLoadJob.plan(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
- // it needs update timeout to make task timeout backoff work
- long timeoutS = this.getTimeoutMs() / 1000;
-
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
- tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int)
timeoutS);
- tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int)
timeoutS);
long wgId = routineLoadJob.getWorkloadId();
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
@@ -160,11 +153,6 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
TPipelineFragmentParams tExecPlanFragmentParams =
routineLoadJob.planForPipeline(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
- // it needs update timeout to make task timeout backoff work
- long timeoutS = this.getTimeoutMs() / 1000;
-
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
- tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int)
timeoutS);
- tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int)
timeoutS);
long wgId = routineLoadJob.getWorkloadId();
List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 3c6779769c0..93ec573717c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -76,19 +76,17 @@ public abstract class RoutineLoadTaskInfo {
// so that user or other logic can know the status of the corresponding
txn.
protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
- public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs,
- int timeoutBackOffCount, boolean isMultiTable) {
+ public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean
isMultiTable) {
this.id = id;
this.jobId = jobId;
this.createTimeMs = System.currentTimeMillis();
this.timeoutMs = timeoutMs;
- this.timeoutBackOffCount = timeoutBackOffCount;
this.isMultiTable = isMultiTable;
}
- public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, int
timeoutBackOffCount,
- long previousBeId, boolean isMultiTable) {
- this(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable);
+ public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, long
previousBeId,
+ boolean isMultiTable) {
+ this(id, jobId, timeoutMs, isMultiTable);
this.previousBeId = previousBeId;
}
@@ -151,8 +149,8 @@ public abstract class RoutineLoadTaskInfo {
}
if (isRunning() && System.currentTimeMillis() - executeStartTimeMs >
timeoutMs) {
- LOG.info("task {} is timeout. start: {}, timeout: {},
timeoutBackOffCount: {}", DebugUtil.printId(id),
- executeStartTimeMs, timeoutMs, timeoutBackOffCount);
+ LOG.info("task {} is timeout. start: {}, timeout: {}",
DebugUtil.printId(id),
+ executeStartTimeMs, timeoutMs);
return true;
}
return false;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index ba3c9ee626a..79786fe8be9 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -221,7 +221,7 @@ public class KafkaRoutineLoadJobTest {
Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
partitionIdsToOffset.put(100, 0L);
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L,
- maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false);
+ maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() -
maxBatchIntervalS * 2 * 1000 - 1);
routineLoadTaskInfoList.add(kafkaTaskInfo);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index c5bf509464e..c1f5731329f 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -68,7 +68,7 @@ public class RoutineLoadTaskSchedulerTest {
Deencapsulation.setField(kafkaProgress, "partitionIdToOffset",
partitionIdToOffset);
LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue =
new LinkedBlockingDeque<>();
- KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1),
1L, 20000, 0,
+ KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1),
1L, 20000,
partitionIdToOffset, false);
routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index d9e2088e599..0203d614e00 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -318,7 +318,7 @@ public class GlobalTransactionMgrTest {
List<RoutineLoadTaskInfo> routineLoadTaskInfoList =
Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
- KafkaTaskInfo routineLoadTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0,
+ KafkaTaskInfo routineLoadTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
partitionIdToOffset, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
@@ -390,7 +390,7 @@ public class GlobalTransactionMgrTest {
List<RoutineLoadTaskInfo> routineLoadTaskInfoList =
Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
- KafkaTaskInfo routineLoadTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0,
+ KafkaTaskInfo routineLoadTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
partitionIdToOffset, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]