This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 57c80f2af5 [Fix-16382] Fix the bug of async master task casthread pool
invocations ramp-up
57c80f2af5 is described below
commit 57c80f2af5d1f29417fdbea09b27221f54987655
Author: Dyqer <[email protected]>
AuthorDate: Thu Aug 15 20:46:51 2024 +0800
[Fix-16382] Fix the bug of async master task casthread pool invocations
ramp-up
---
.../runner/execute/AsyncTaskExecutionContext.java | 19 ++++---------------
1 file changed, 4 insertions(+), 15 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskExecutionContext.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskExecutionContext.java
index ac254d210e..14ec091886 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskExecutionContext.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskExecutionContext.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@@ -38,7 +37,6 @@ public class AsyncTaskExecutionContext implements Delayed {
private long currentStartTime;
private int executeTimes;
private final long executeInterval;
- private long timeout;
public AsyncTaskExecutionContext(@NonNull TaskExecutionContext
taskExecutionContext,
@NonNull AsyncTaskExecuteFunction
asyncTaskExecuteFunction,
@@ -48,29 +46,20 @@ public class AsyncTaskExecutionContext implements Delayed {
this.asyncTaskCallbackFunction = asyncTaskCallbackFunction;
this.currentStartTime = 0;
this.executeTimes = 0;
- if
(TaskTimeoutStrategy.FAILED.equals(taskExecutionContext.getTaskTimeoutStrategy())
- ||
TaskTimeoutStrategy.WARNFAILED.equals(taskExecutionContext.getTaskTimeoutStrategy()))
{
- // will timeout
- this.timeout = taskExecutionContext.getStartTime()
- +
TimeUnit.SECONDS.toMillis(taskExecutionContext.getTaskTimeout());
- } else {
- this.timeout = TimeUnit.SECONDS.toMillis(Integer.MAX_VALUE);
- }
this.executeInterval =
Math.max(asyncTaskExecuteFunction.getAsyncTaskStateCheckInterval().toMillis(),
1000L);
}
- public void refreshStartTime() {
- if (executeTimes == 0) {
+ public synchronized void refreshStartTime() {
+ if (executeTimes != 0) {
// The first time doesn't have delay
- executeTimes++;
- } else {
currentStartTime = System.currentTimeMillis();
}
+ executeTimes++;
}
@Override
public long getDelay(TimeUnit unit) {
- long nextExecuteTimeDelay = Math.min(currentStartTime +
executeInterval, timeout) - System.currentTimeMillis();
+ long nextExecuteTimeDelay = currentStartTime + executeInterval -
System.currentTimeMillis();
return unit.convert(nextExecuteTimeDelay, TimeUnit.MILLISECONDS);
}