This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5ff4395dbe [Hotfix][Zeta] Fix zeta scheduler bug (#6050)
5ff4395dbe is described below

commit 5ff4395dbe2efe0ab2d210b0e58c2a85be100d89
Author: Eric <[email protected]>
AuthorDate: Tue Dec 26 11:07:03 2023 +0800

    [Hotfix][Zeta] Fix zeta scheduler bug (#6050)
---
 .../seatunnel/command/ClientExecuteCommand.java    |   9 +-
 .../container/seatunnel/SeaTunnelContainer.java    |   2 +-
 .../seatunnel/engine/e2e/JobClientJobProxyIT.java} | 129 ++++--------------
 .../seatunnel/engine/e2e/SeaTunnelSlotIT.java      | 145 +++++++++++++++++++++
 .../src/test/resources/batch_slot_not_enough.conf  |  48 +++++++
 .../test/resources/seatunnel_fixed_slot_num.yaml   |  34 +++++
 .../engine/client/job/ClientJobProxy.java          |   6 -
 .../org/apache/seatunnel/engine/core/job/Job.java  |   1 +
 .../engine/server/dag/physical/SubPlan.java        |  16 ++-
 9 files changed, 277 insertions(+), 113 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index eae7361ec7..ad41ae983c 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -32,6 +32,8 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
 
@@ -182,7 +184,12 @@ public class ClientExecuteCommand implements 
Command<ClientCommandArgs> {
                         
seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
                         TimeUnit.SECONDS);
                 // wait for job complete
-                jobStatus = clientJobProxy.waitForJobComplete();
+                JobResult jobResult = clientJobProxy.waitForJobCompleteV2();
+                jobStatus = jobResult.getStatus();
+                if (StringUtils.isNotEmpty(jobResult.getError())
+                        || jobResult.getStatus().equals(JobStatus.FAILED)) {
+                    throw new SeaTunnelEngineException(jobResult.getError());
+                }
                 // get job end time
                 endTime = LocalDateTime.now();
                 // get job statistic information when job finished
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 41b985bc9a..fe47b1988c 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -49,7 +49,7 @@ public class SeaTunnelContainer extends AbstractTestContainer 
{
     private static final String JDK_DOCKER_IMAGE = "openjdk:8";
     private static final String CLIENT_SHELL = "seatunnel.sh";
     private static final String SERVER_SHELL = "seatunnel-cluster.sh";
-    private GenericContainer<?> server;
+    protected GenericContainer<?> server;
 
     @Override
     public void startUp() throws Exception {
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
similarity index 53%
copy from 
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index 41b985bc9a..ce54ba84c2 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.common.container.seatunnel;
+package org.apache.seatunnel.engine.e2e;
 
-import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
-import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
-import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
+import org.apache.commons.lang3.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -30,30 +31,20 @@ import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.utility.DockerLoggerFactory;
 import org.testcontainers.utility.MountableFile;
 
-import com.google.auto.service.AutoService;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
 
 import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
 
-@NoArgsConstructor
-@Slf4j
-@AutoService(TestContainer.class)
-public class SeaTunnelContainer extends AbstractTestContainer {
+public class JobClientJobProxyIT extends SeaTunnelContainer {
     private static final String JDK_DOCKER_IMAGE = "openjdk:8";
-    private static final String CLIENT_SHELL = "seatunnel.sh";
     private static final String SERVER_SHELL = "seatunnel-cluster.sh";
-    private GenericContainer<?> server;
 
     @Override
+    @BeforeAll
     public void startUp() throws Exception {
-        server =
+        this.server =
                 new GenericContainer<>(getDockerImage())
                         .withNetwork(NETWORK)
                         .withCommand(
@@ -74,6 +65,13 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                                 + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
                 Paths.get(SEATUNNEL_HOME, "config").toString());
 
+        // use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in 
container
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml"),
+                Paths.get(SEATUNNEL_HOME, "config/seatunnel.yaml").toString());
+
         server.withCopyFileToContainer(
                 MountableFile.forHostPath(
                         PROJECT_ROOT_PATH
@@ -84,90 +82,15 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
         executeExtraCommands(server);
     }
 
-    @Override
-    public void tearDown() throws Exception {
-        if (server != null) {
-            server.close();
-        }
-    }
-
-    @Override
-    protected String getDockerImage() {
-        return JDK_DOCKER_IMAGE;
-    }
-
-    @Override
-    protected String getStartModuleName() {
-        return "seatunnel-starter";
-    }
-
-    @Override
-    protected String getStartShellName() {
-        return CLIENT_SHELL;
-    }
-
-    @Override
-    protected String getConnectorModulePath() {
-        return "seatunnel-connectors-v2";
-    }
-
-    @Override
-    protected String getConnectorType() {
-        return "seatunnel";
-    }
-
-    @Override
-    protected String getConnectorNamePrefix() {
-        return "connector-";
-    }
-
-    @Override
-    protected List<String> getExtraStartShellCommands() {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public TestContainerId identifier() {
-        return TestContainerId.SEATUNNEL;
-    }
-
-    @Override
-    protected String getSavePointCommand() {
-        return "-s";
-    }
-
-    @Override
-    protected String getRestoreCommand() {
-        return "-r";
-    }
-
-    @Override
-    public void executeExtraCommands(ContainerExtendedFactory extendedFactory)
-            throws IOException, InterruptedException {
-        extendedFactory.extend(server);
-    }
-
-    @Override
-    public Container.ExecResult executeJob(String confFile)
-            throws IOException, InterruptedException {
-        log.info("test in container: {}", identifier());
-        return executeJob(server, confFile);
-    }
-
-    @Override
-    public Container.ExecResult savepointJob(String jobId)
-            throws IOException, InterruptedException {
-        return savepointJob(server, jobId);
-    }
-
-    @Override
-    public Container.ExecResult restoreJob(String confFile, String jobId)
-            throws IOException, InterruptedException {
-        return restoreJob(server, confFile, jobId);
-    }
-
-    @Override
-    public String getServerLogs() {
-        return server.getLogs();
+    @Test
+    public void testJobFailedWillThrowException() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelJob("/batch_slot_not_enough.conf");
+        Assertions.assertNotEquals(0, execResult.getExitCode());
+        Assertions.assertTrue(
+                StringUtils.isNotBlank(execResult.getStderr())
+                        && execResult
+                                .getStderr()
+                                .contains(
+                                        
"org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException: 
can't apply resource request"));
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
new file mode 100644
index 0000000000..8f7b459c48
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class SeaTunnelSlotIT {
+    @Test
+    public void testSlotNotEnough() throws Exception {
+        HazelcastInstanceImpl node1 = null;
+        SeaTunnelClient engineClient = null;
+
+        try {
+            String testClusterName = "testSlotNotEnough";
+            SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+            
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+            // slot num is 3
+            
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
+            
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(3);
+
+            node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+            // client config
+            Common.setDeployMode(DeployMode.CLIENT);
+            String filePath = 
TestUtils.getResource("batch_slot_not_enough.conf");
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName(testClusterName);
+
+            ClientConfig clientConfig = 
ConfigProvider.locateAndGetClientConfig();
+            clientConfig.setClusterName(testClusterName);
+            engineClient = new SeaTunnelClient(clientConfig);
+            ClientJobExecutionEnvironment jobExecutionEnv =
+                    engineClient.createExecutionContext(filePath, jobConfig, 
seaTunnelConfig);
+
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            CompletableFuture<JobStatus> objectCompletableFuture =
+                    
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+            Awaitility.await()
+                    .atMost(600000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () -> {
+                                Thread.sleep(2000);
+                                Assertions.assertTrue(
+                                        objectCompletableFuture.isDone()
+                                                && JobStatus.FAILED.equals(
+                                                        
objectCompletableFuture.get()));
+                            });
+
+        } finally {
+            if (engineClient != null) {
+                engineClient.shutdown();
+            }
+
+            if (node1 != null) {
+                node1.shutdown();
+            }
+        }
+    }
+
+    @Test
+    public void testSlotEnough() throws Exception {
+        HazelcastInstanceImpl node1 = null;
+        SeaTunnelClient engineClient = null;
+
+        try {
+            String testClusterName = "testSlotEnough";
+            SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+            
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+            // slot num is 10
+            
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
+            
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(10);
+
+            node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+            // client config
+            Common.setDeployMode(DeployMode.CLIENT);
+            String filePath = 
TestUtils.getResource("batch_slot_not_enough.conf");
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName(testClusterName);
+
+            ClientConfig clientConfig = 
ConfigProvider.locateAndGetClientConfig();
+            clientConfig.setClusterName(testClusterName);
+            engineClient = new SeaTunnelClient(clientConfig);
+            ClientJobExecutionEnvironment jobExecutionEnv =
+                    engineClient.createExecutionContext(filePath, jobConfig, 
seaTunnelConfig);
+
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            CompletableFuture<JobStatus> objectCompletableFuture =
+                    
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+            Awaitility.await()
+                    .atMost(600000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () -> {
+                                Thread.sleep(2000);
+                                Assertions.assertTrue(
+                                        objectCompletableFuture.isDone()
+                                                && JobStatus.FINISHED.equals(
+                                                        
objectCompletableFuture.get()));
+                            });
+
+        } finally {
+            if (engineClient != null) {
+                engineClient.shutdown();
+            }
+
+            if (node1 != null) {
+                node1.shutdown();
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_slot_not_enough.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_slot_not_enough.conf
new file mode 100644
index 0000000000..99e93d6c00
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_slot_not_enough.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  job.mode = "BATCH"
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    parallelism = 4
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  console {
+    source_table_name="fake"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
new file mode 100644
index 0000000000..91736ce34a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+  engine:
+    history-job-expire-minutes: 1
+    backup-count: 2
+    queue-type: blockingqueue
+    print-execution-info-interval: 10
+    slot-service:
+      dynamic-slot: false
+      slot-num: 3
+    checkpoint:
+      interval: 300000
+      timeout: 100000
+      storage:
+        type: localfile
+        max-retained: 3
+        plugin-config:
+          namespace: /tmp/seatunnel/checkpoint_snapshot/
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index ceec9b33dc..21802c5215 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -33,8 +33,6 @@ import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCode
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
 
-import org.apache.commons.lang3.StringUtils;
-
 import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
@@ -117,10 +115,6 @@ public class ClientJobProxy implements Job {
             throw new RuntimeException(e);
         }
         LOGGER.info(String.format("Job (%s) end with state %s", jobId, 
jobResult.getStatus()));
-        if (StringUtils.isNotEmpty(jobResult.getError())
-                || jobResult.getStatus().equals(JobStatus.FAILED)) {
-            throw new SeaTunnelEngineException(jobResult.getError());
-        }
         return jobResult;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index 3d4ee7593b..52fba14205 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -29,6 +29,7 @@ public interface Job {
 
     JobStatus getJobStatus();
 
+    @Deprecated
     default JobStatus waitForJobComplete() {
         return waitForJobCompleteV2().getStatus();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 0f9141ed00..c1e7f975c4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -231,6 +231,15 @@ public class SubPlan {
                 errorByPhysicalVertex.compareAndSet(
                         null, checkpointCoordinatorState.getThrowableMsg());
             }
+
+            // Because the pipeline state must update by tasks, If the 
pipeline can not get enough
+            // slot, the pipeline state will turn to Failing and then cancel 
all tasks in this
+            // pipeline.
+            // Because the tasks never run, so the tasks will complete with 
CANCELED. But the actual
+            // status of the pipeline should be FAILED
+            if (getPipelineState().equals(PipelineStatus.FAILING)) {
+                pipelineStatus = PipelineStatus.FAILED;
+            }
         } else {
             pipelineStatus = PipelineStatus.FINISHED;
             CheckpointCoordinatorState checkpointCoordinatorState =
@@ -322,10 +331,11 @@ public class SubPlan {
             // now do the actual state transition
             // we must update runningJobStateTimestampsIMap first and then can 
update
             // runningJobStateIMap
+            PipelineStatus finalTargetState = targetState;
             RetryUtils.retryWithException(
                     () -> {
-                        updateStateTimestamps(targetState);
-                        runningJobStateIMap.set(pipelineLocation, targetState);
+                        updateStateTimestamps(finalTargetState);
+                        runningJobStateIMap.set(pipelineLocation, 
finalTargetState);
                         return null;
                     },
                     new RetryUtils.RetryMaterial(
@@ -614,11 +624,13 @@ public class SubPlan {
             case CANCELING:
                 coordinatorVertexList.forEach(
                         task -> {
+                            task.startPhysicalVertex();
                             task.cancel();
                         });
 
                 physicalVertexList.forEach(
                         task -> {
+                            task.startPhysicalVertex();
                             task.cancel();
                         });
                 break;

Reply via email to