This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 44165480b2 [Fix-16978][Master] Fix AbstractDelayEvent compare method 
is incorrect (#16980)
44165480b2 is described below

commit 44165480b2b97ca569741b995d68940f29ae12a6
Author: lile <[email protected]>
AuthorDate: Wed Jan 29 10:13:06 2025 +0800

    [Fix-16978][Master] Fix AbstractDelayEvent compare method is incorrect 
(#16980)
---
 .../eventbus/AbstractDelayEvent.java               |  7 ++-
 .../engine/graph/IWorkflowExecutionGraph.java      |  5 ++
 .../engine/graph/WorkflowExecutionGraph.java       | 12 +++-
 .../task/statemachine/TaskFailureStateAction.java  | 31 +++++++----
 .../cases/WorkflowInstancePauseTestCase.java       | 50 +++++++++++++++++
 .../cases/WorkflowInstanceStopTestCase.java        | 50 +++++++++++++++++
 .../workflow_with_fake_task_failed_retrying.yaml   | 64 ++++++++++++++++++++++
 .../workflow_with_fake_task_failed_retrying.yaml   | 64 ++++++++++++++++++++++
 8 files changed, 271 insertions(+), 12 deletions(-)

diff --git 
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
 
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
index c4c61b9055..c013171377 100644
--- 
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
+++ 
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
@@ -39,6 +39,10 @@ public abstract class AbstractDelayEvent implements IEvent, 
Delayed {
     @Builder.Default
     protected long createTimeInNano = System.nanoTime();
 
+    // set create time as default if the inheritor didn't call super()
+    @Builder.Default
+    protected long expiredTimeInNano = System.nanoTime();
+
     public AbstractDelayEvent() {
         this(DEFAULT_DELAY_TIME);
     }
@@ -50,6 +54,7 @@ public abstract class AbstractDelayEvent implements IEvent, 
Delayed {
     public AbstractDelayEvent(final long delayTime, final long 
createTimeInNano) {
         this.delayTime = delayTime;
         this.createTimeInNano = createTimeInNano;
+        this.expiredTimeInNano = this.delayTime * 1_000_000 + 
this.createTimeInNano;
     }
 
     @Override
@@ -60,7 +65,7 @@ public abstract class AbstractDelayEvent implements IEvent, 
Delayed {
 
     @Override
     public int compareTo(Delayed other) {
-        return Long.compare(this.createTimeInNano, ((AbstractDelayEvent) 
other).createTimeInNano);
+        return Long.compare(this.expiredTimeInNano, ((AbstractDelayEvent) 
other).expiredTimeInNano);
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
index 76b04951b7..3f168cce6e 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
@@ -191,6 +191,11 @@ public interface IWorkflowExecutionGraph {
      */
     boolean isTaskExecutionRunnableForbidden(final ITaskExecutionRunnable 
taskExecutionRunnable);
 
+    /**
+     * Whether the given task's execution is failure and waiting for retry.
+     */
+    boolean isTaskExecutionRunnableRetrying(final ITaskExecutionRunnable 
taskExecutionRunnable);
+
     /**
      * Whether all predecessors task is skipped.
      * <p> Once all predecessors are marked as skipped, then the task will be 
marked as skipped, and will trigger its successors.
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
index 20a65d45a2..dd89debe6c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
@@ -140,7 +140,7 @@ public class WorkflowExecutionGraph implements 
IWorkflowExecutionGraph {
 
     @Override
     public boolean isTaskExecutionRunnableActive(final ITaskExecutionRunnable 
taskExecutionRunnable) {
-        return 
activeTaskExecutionRunnable.add(taskExecutionRunnable.getName());
+        return 
activeTaskExecutionRunnable.contains(taskExecutionRunnable.getName());
     }
 
     @Override
@@ -256,6 +256,16 @@ public class WorkflowExecutionGraph implements 
IWorkflowExecutionGraph {
         return (taskExecutionRunnable.getTaskDefinition().getFlag() == 
Flag.NO);
     }
 
+    @Override
+    public boolean isTaskExecutionRunnableRetrying(final 
ITaskExecutionRunnable taskExecutionRunnable) {
+        if (!taskExecutionRunnable.isTaskInstanceInitialized()) {
+            return false;
+        }
+        final TaskInstance taskInstance = 
taskExecutionRunnable.getTaskInstance();
+        return taskInstance.getState() == TaskExecutionStatus.FAILURE && 
taskExecutionRunnable.isTaskInstanceCanRetry()
+                && isTaskExecutionRunnableActive(taskExecutionRunnable);
+    }
+
     /**
      * Whether all predecessors are skipped.
      * <p> Only when all predecessors are skipped, will return true. If the 
given task doesn't have any predecessors, will return false.
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java
index 005a0003ca..b0b9139b8b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java
@@ -104,6 +104,12 @@ public class TaskFailureStateAction extends 
AbstractTaskStateAction {
                                  final ITaskExecutionRunnable 
taskExecutionRunnable,
                                  final TaskPauseLifecycleEvent taskPauseEvent) 
{
         throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
+        // When the failed task is awaiting retry, we can mark it as 'paused' 
to ignore the retry event.
+        if (isTaskRetrying(taskExecutionRunnable)) {
+            super.pausedEventAction(workflowExecutionRunnable, 
taskExecutionRunnable,
+                    TaskPausedLifecycleEvent.of(taskExecutionRunnable));
+            return;
+        }
         logWarningIfCannotDoAction(taskExecutionRunnable, taskPauseEvent);
     }
 
@@ -112,14 +118,11 @@ public class TaskFailureStateAction extends 
AbstractTaskStateAction {
                                   final ITaskExecutionRunnable 
taskExecutionRunnable,
                                   final TaskPausedLifecycleEvent 
taskPausedEvent) {
         throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
-        final IWorkflowExecutionGraph workflowExecutionGraph = 
taskExecutionRunnable.getWorkflowExecutionGraph();
         // This case happen when the task is failure but the task is in delay 
retry queue.
         // We don't remove the event in GlobalWorkflowDelayEventCoordinator 
the event should be dropped when the task is
         // killed.
-        if (taskExecutionRunnable.isTaskInstanceCanRetry()
-                && 
workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) {
-            
workflowExecutionGraph.markTaskExecutionRunnableChainPause(taskExecutionRunnable);
-            
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+        if (isTaskRetrying(taskExecutionRunnable)) {
+            super.pausedEventAction(workflowExecutionRunnable, 
taskExecutionRunnable, taskPausedEvent);
             return;
         }
         logWarningIfCannotDoAction(taskExecutionRunnable, taskPausedEvent);
@@ -130,6 +133,12 @@ public class TaskFailureStateAction extends 
AbstractTaskStateAction {
                                 final ITaskExecutionRunnable 
taskExecutionRunnable,
                                 final TaskKillLifecycleEvent taskKillEvent) {
         throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
+        // When the failed task is awaiting retry, we can mark it as 'killed' 
to ignore the retry event.
+        if (isTaskRetrying(taskExecutionRunnable)) {
+            super.killedEventAction(workflowExecutionRunnable, 
taskExecutionRunnable,
+                    TaskKilledLifecycleEvent.of(taskExecutionRunnable));
+            return;
+        }
         logWarningIfCannotDoAction(taskExecutionRunnable, taskKillEvent);
     }
 
@@ -138,14 +147,11 @@ public class TaskFailureStateAction extends 
AbstractTaskStateAction {
                                   final ITaskExecutionRunnable 
taskExecutionRunnable,
                                   final TaskKilledLifecycleEvent 
taskKilledEvent) {
         throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
-        final IWorkflowExecutionGraph workflowExecutionGraph = 
taskExecutionRunnable.getWorkflowExecutionGraph();
         // This case happen when the task is failure but the task is in delay 
retry queue.
         // We don't remove the event in GlobalWorkflowDelayEventCoordinator 
the event should be dropped when the task is
         // killed.
-        if (taskExecutionRunnable.isTaskInstanceCanRetry()
-                && 
workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) {
-            
workflowExecutionGraph.markTaskExecutionRunnableChainKill(taskExecutionRunnable);
-            
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+        if (isTaskRetrying(taskExecutionRunnable)) {
+            super.killedEventAction(workflowExecutionRunnable, 
taskExecutionRunnable, taskKilledEvent);
             return;
         }
         logWarningIfCannotDoAction(taskExecutionRunnable, taskKilledEvent);
@@ -179,4 +185,9 @@ public class TaskFailureStateAction extends 
AbstractTaskStateAction {
     public TaskExecutionStatus matchState() {
         return TaskExecutionStatus.FAILURE;
     }
+
+    private boolean isTaskRetrying(final ITaskExecutionRunnable 
taskExecutionRunnable) {
+        final IWorkflowExecutionGraph workflowExecutionGraph = 
taskExecutionRunnable.getWorkflowExecutionGraph();
+        return 
workflowExecutionGraph.isTaskExecutionRunnableRetrying(taskExecutionRunnable);
+    }
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java
index fd9eee60f4..6594e170de 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java
@@ -294,4 +294,54 @@ public class WorkflowInstancePauseTestCase extends 
AbstractMasterIntegrationTest
         masterContainer.assertAllResourceReleased();
     }
 
+    @Test
+    @DisplayName("Test pause a workflow with failed retrying task")
+    public void testPauseWorkflow_with_failedRetryingTask() {
+        final String yaml = 
"/it/pause/workflow_with_fake_task_failed_retrying.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        final Integer workflowInstanceId = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .pollInterval(Duration.ofMillis(100))
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
+                            
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
+
+                    
assertThat(repository.queryTaskInstance(workflowInstanceId))
+                            .satisfiesExactly(
+                                    taskInstance -> {
+                                        
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
+                                        
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+                                    });
+                });
+
+        
assertThat(workflowOperator.pauseWorkflowInstance(workflowInstanceId).isSuccess());
+
+        await()
+                .pollInterval(Duration.ofMillis(100))
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+                            .satisfies(
+                                    workflowInstance -> {
+                                        assertThat(workflowInstance.getState())
+                                                
.isEqualTo(WorkflowExecutionStatus.PAUSE);
+                                    });
+
+                    
assertThat(repository.queryTaskInstance(workflowInstanceId))
+                            .satisfiesExactly(
+                                    taskInstance -> {
+                                        
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
+                                        
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.PAUSE);
+                                    });
+                });
+        masterContainer.assertAllResourceReleased();
+    }
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java
index 0b80a4b40a..5174a2dc59 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java
@@ -246,4 +246,54 @@ public class WorkflowInstanceStopTestCase extends 
AbstractMasterIntegrationTestC
 
         masterContainer.assertAllResourceReleased();
     }
+
+    @Test
+    @DisplayName("Test stop a workflow with failed retrying task")
+    public void testStopWorkflow_with_failedRetryingTask() {
+        final String yaml = 
"/it/stop/workflow_with_fake_task_failed_retrying.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        final Integer workflowInstanceId = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .pollInterval(Duration.ofMillis(100))
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
+                            
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
+
+                    
assertThat(repository.queryTaskInstance(workflowInstanceId))
+                            .satisfiesExactly(
+                                    taskInstance -> {
+                                        
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
+                                        
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+                                    });
+                });
+
+        
assertThat(workflowOperator.stopWorkflowInstance(workflowInstanceId).isSuccess());
+
+        await()
+                .pollInterval(Duration.ofMillis(100))
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    
assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+                            .satisfies(
+                                    workflowInstance -> {
+                                        
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.STOP);
+                                    });
+
+                    
assertThat(repository.queryTaskInstance(workflowInstanceId))
+                            .satisfiesExactly(
+                                    taskInstance -> {
+                                        
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
+                                        
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
+                                    });
+                });
+        masterContainer.assertAllResourceReleased();
+    }
 }
diff --git 
a/dolphinscheduler-master/src/test/resources/it/pause/workflow_with_fake_task_failed_retrying.yaml
 
b/dolphinscheduler-master/src/test/resources/it/pause/workflow_with_fake_task_failed_retrying.yaml
new file mode 100644
index 0000000000..a86397370e
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/pause/workflow_with_fake_task_failed_retrying.yaml
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2021-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_one_fake_task_failed
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with single task
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    executionType: PARALLEL
+
+tasks:
+  - name: FAILED-RETRY
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls /-"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+    failRetryTimes: 10
+    failRetryInterval: 10
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+
diff --git 
a/dolphinscheduler-master/src/test/resources/it/stop/workflow_with_fake_task_failed_retrying.yaml
 
b/dolphinscheduler-master/src/test/resources/it/stop/workflow_with_fake_task_failed_retrying.yaml
new file mode 100644
index 0000000000..a86397370e
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/stop/workflow_with_fake_task_failed_retrying.yaml
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2021-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_one_fake_task_failed
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with single task
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    executionType: PARALLEL
+
+tasks:
+  - name: FAILED-RETRY
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls /-"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+    failRetryTimes: 10
+    failRetryInterval: 10
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+

Reply via email to