This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 44165480b2 [Fix-16978][Master] Fix AbstractDelayEvent compare method
is incorrect (#16980)
44165480b2 is described below
commit 44165480b2b97ca569741b995d68940f29ae12a6
Author: lile <[email protected]>
AuthorDate: Wed Jan 29 10:13:06 2025 +0800
[Fix-16978][Master] Fix AbstractDelayEvent compare method is incorrect
(#16980)
---
.../eventbus/AbstractDelayEvent.java | 7 ++-
.../engine/graph/IWorkflowExecutionGraph.java | 5 ++
.../engine/graph/WorkflowExecutionGraph.java | 12 +++-
.../task/statemachine/TaskFailureStateAction.java | 31 +++++++----
.../cases/WorkflowInstancePauseTestCase.java | 50 +++++++++++++++++
.../cases/WorkflowInstanceStopTestCase.java | 50 +++++++++++++++++
.../workflow_with_fake_task_failed_retrying.yaml | 64 ++++++++++++++++++++++
.../workflow_with_fake_task_failed_retrying.yaml | 64 ++++++++++++++++++++++
8 files changed, 271 insertions(+), 12 deletions(-)
diff --git
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
index c4c61b9055..c013171377 100644
---
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
+++
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
@@ -39,6 +39,10 @@ public abstract class AbstractDelayEvent implements IEvent,
Delayed {
@Builder.Default
protected long createTimeInNano = System.nanoTime();
+ // set create time as default if the inheritor didn't call super()
+ @Builder.Default
+ protected long expiredTimeInNano = System.nanoTime();
+
public AbstractDelayEvent() {
this(DEFAULT_DELAY_TIME);
}
@@ -50,6 +54,7 @@ public abstract class AbstractDelayEvent implements IEvent,
Delayed {
public AbstractDelayEvent(final long delayTime, final long
createTimeInNano) {
this.delayTime = delayTime;
this.createTimeInNano = createTimeInNano;
+ this.expiredTimeInNano = this.delayTime * 1_000_000 +
this.createTimeInNano;
}
@Override
@@ -60,7 +65,7 @@ public abstract class AbstractDelayEvent implements IEvent,
Delayed {
@Override
public int compareTo(Delayed other) {
- return Long.compare(this.createTimeInNano, ((AbstractDelayEvent)
other).createTimeInNano);
+ return Long.compare(this.expiredTimeInNano, ((AbstractDelayEvent)
other).expiredTimeInNano);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
index 76b04951b7..3f168cce6e 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
@@ -191,6 +191,11 @@ public interface IWorkflowExecutionGraph {
*/
boolean isTaskExecutionRunnableForbidden(final ITaskExecutionRunnable
taskExecutionRunnable);
+ /**
+ * Whether the given task's execution is failure and waiting for retry.
+ */
+ boolean isTaskExecutionRunnableRetrying(final ITaskExecutionRunnable
taskExecutionRunnable);
+
/**
* Whether all predecessors task is skipped.
* <p> Once all predecessors are marked as skipped, then the task will be
marked as skipped, and will trigger its successors.
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
index 20a65d45a2..dd89debe6c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
@@ -140,7 +140,7 @@ public class WorkflowExecutionGraph implements
IWorkflowExecutionGraph {
@Override
public boolean isTaskExecutionRunnableActive(final ITaskExecutionRunnable
taskExecutionRunnable) {
- return
activeTaskExecutionRunnable.add(taskExecutionRunnable.getName());
+ return
activeTaskExecutionRunnable.contains(taskExecutionRunnable.getName());
}
@Override
@@ -256,6 +256,16 @@ public class WorkflowExecutionGraph implements
IWorkflowExecutionGraph {
return (taskExecutionRunnable.getTaskDefinition().getFlag() ==
Flag.NO);
}
+ @Override
+ public boolean isTaskExecutionRunnableRetrying(final
ITaskExecutionRunnable taskExecutionRunnable) {
+ if (!taskExecutionRunnable.isTaskInstanceInitialized()) {
+ return false;
+ }
+ final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
+ return taskInstance.getState() == TaskExecutionStatus.FAILURE &&
taskExecutionRunnable.isTaskInstanceCanRetry()
+ && isTaskExecutionRunnableActive(taskExecutionRunnable);
+ }
+
/**
* Whether all predecessors are skipped.
* <p> Only when all predecessors are skipped, will return true. If the
given task doesn't have any predecessors, will return false.
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java
index 005a0003ca..b0b9139b8b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java
@@ -104,6 +104,12 @@ public class TaskFailureStateAction extends
AbstractTaskStateAction {
final ITaskExecutionRunnable
taskExecutionRunnable,
final TaskPauseLifecycleEvent taskPauseEvent)
{
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
+ // When the failed task is awaiting retry, we can mark it as 'paused'
to ignore the retry event.
+ if (isTaskRetrying(taskExecutionRunnable)) {
+ super.pausedEventAction(workflowExecutionRunnable,
taskExecutionRunnable,
+ TaskPausedLifecycleEvent.of(taskExecutionRunnable));
+ return;
+ }
logWarningIfCannotDoAction(taskExecutionRunnable, taskPauseEvent);
}
@@ -112,14 +118,11 @@ public class TaskFailureStateAction extends
AbstractTaskStateAction {
final ITaskExecutionRunnable
taskExecutionRunnable,
final TaskPausedLifecycleEvent
taskPausedEvent) {
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
- final IWorkflowExecutionGraph workflowExecutionGraph =
taskExecutionRunnable.getWorkflowExecutionGraph();
// This case happen when the task is failure but the task is in delay
retry queue.
// We don't remove the event in GlobalWorkflowDelayEventCoordinator
the event should be dropped when the task is
// killed.
- if (taskExecutionRunnable.isTaskInstanceCanRetry()
- &&
workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) {
-
workflowExecutionGraph.markTaskExecutionRunnableChainPause(taskExecutionRunnable);
-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+ if (isTaskRetrying(taskExecutionRunnable)) {
+ super.pausedEventAction(workflowExecutionRunnable,
taskExecutionRunnable, taskPausedEvent);
return;
}
logWarningIfCannotDoAction(taskExecutionRunnable, taskPausedEvent);
@@ -130,6 +133,12 @@ public class TaskFailureStateAction extends
AbstractTaskStateAction {
final ITaskExecutionRunnable
taskExecutionRunnable,
final TaskKillLifecycleEvent taskKillEvent) {
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
+ // When the failed task is awaiting retry, we can mark it as 'killed'
to ignore the retry event.
+ if (isTaskRetrying(taskExecutionRunnable)) {
+ super.killedEventAction(workflowExecutionRunnable,
taskExecutionRunnable,
+ TaskKilledLifecycleEvent.of(taskExecutionRunnable));
+ return;
+ }
logWarningIfCannotDoAction(taskExecutionRunnable, taskKillEvent);
}
@@ -138,14 +147,11 @@ public class TaskFailureStateAction extends
AbstractTaskStateAction {
final ITaskExecutionRunnable
taskExecutionRunnable,
final TaskKilledLifecycleEvent
taskKilledEvent) {
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
- final IWorkflowExecutionGraph workflowExecutionGraph =
taskExecutionRunnable.getWorkflowExecutionGraph();
// This case happen when the task is failure but the task is in delay
retry queue.
// We don't remove the event in GlobalWorkflowDelayEventCoordinator
the event should be dropped when the task is
// killed.
- if (taskExecutionRunnable.isTaskInstanceCanRetry()
- &&
workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) {
-
workflowExecutionGraph.markTaskExecutionRunnableChainKill(taskExecutionRunnable);
-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+ if (isTaskRetrying(taskExecutionRunnable)) {
+ super.killedEventAction(workflowExecutionRunnable,
taskExecutionRunnable, taskKilledEvent);
return;
}
logWarningIfCannotDoAction(taskExecutionRunnable, taskKilledEvent);
@@ -179,4 +185,9 @@ public class TaskFailureStateAction extends
AbstractTaskStateAction {
public TaskExecutionStatus matchState() {
return TaskExecutionStatus.FAILURE;
}
+
+ private boolean isTaskRetrying(final ITaskExecutionRunnable
taskExecutionRunnable) {
+ final IWorkflowExecutionGraph workflowExecutionGraph =
taskExecutionRunnable.getWorkflowExecutionGraph();
+ return
workflowExecutionGraph.isTaskExecutionRunnableRetrying(taskExecutionRunnable);
+ }
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java
index fd9eee60f4..6594e170de 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java
@@ -294,4 +294,54 @@ public class WorkflowInstancePauseTestCase extends
AbstractMasterIntegrationTest
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test pause a workflow with failed retrying task")
+ public void testPauseWorkflow_with_failedRetryingTask() {
+ final String yaml =
"/it/pause/workflow_with_fake_task_failed_retrying.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ final Integer workflowInstanceId =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .pollInterval(Duration.ofMillis(100))
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
+
+
assertThat(repository.queryTaskInstance(workflowInstanceId))
+ .satisfiesExactly(
+ taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+ });
+ });
+
+
assertThat(workflowOperator.pauseWorkflowInstance(workflowInstanceId).isSuccess());
+
+ await()
+ .pollInterval(Duration.ofMillis(100))
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+ .satisfies(
+ workflowInstance -> {
+ assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.PAUSE);
+ });
+
+
assertThat(repository.queryTaskInstance(workflowInstanceId))
+ .satisfiesExactly(
+ taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.PAUSE);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java
index 0b80a4b40a..5174a2dc59 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java
@@ -246,4 +246,54 @@ public class WorkflowInstanceStopTestCase extends
AbstractMasterIntegrationTestC
masterContainer.assertAllResourceReleased();
}
+
+ @Test
+ @DisplayName("Test stop a workflow with failed retrying task")
+ public void testStopWorkflow_with_failedRetryingTask() {
+ final String yaml =
"/it/stop/workflow_with_fake_task_failed_retrying.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ final Integer workflowInstanceId =
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .pollInterval(Duration.ofMillis(100))
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
+
+
assertThat(repository.queryTaskInstance(workflowInstanceId))
+ .satisfiesExactly(
+ taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+ });
+ });
+
+
assertThat(workflowOperator.stopWorkflowInstance(workflowInstanceId).isSuccess());
+
+ await()
+ .pollInterval(Duration.ofMillis(100))
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+
assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+ .satisfies(
+ workflowInstance -> {
+
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.STOP);
+ });
+
+
assertThat(repository.queryTaskInstance(workflowInstanceId))
+ .satisfiesExactly(
+ taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
}
diff --git
a/dolphinscheduler-master/src/test/resources/it/pause/workflow_with_fake_task_failed_retrying.yaml
b/dolphinscheduler-master/src/test/resources/it/pause/workflow_with_fake_task_failed_retrying.yaml
new file mode 100644
index 0000000000..a86397370e
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/pause/workflow_with_fake_task_failed_retrying.yaml
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_one_fake_task_failed
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: FAILED-RETRY
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls /-"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ failRetryTimes: 10
+ failRetryInterval: 10
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+
diff --git
a/dolphinscheduler-master/src/test/resources/it/stop/workflow_with_fake_task_failed_retrying.yaml
b/dolphinscheduler-master/src/test/resources/it/stop/workflow_with_fake_task_failed_retrying.yaml
new file mode 100644
index 0000000000..a86397370e
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/stop/workflow_with_fake_task_failed_retrying.yaml
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_one_fake_task_failed
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: FAILED-RETRY
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls /-"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ failRetryTimes: 10
+ failRetryInterval: 10
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+