This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5ff4395dbe [Hotfix][Zeta] Fix zeta scheduler bug (#6050)
5ff4395dbe is described below
commit 5ff4395dbe2efe0ab2d210b0e58c2a85be100d89
Author: Eric <[email protected]>
AuthorDate: Tue Dec 26 11:07:03 2023 +0800
[Hotfix][Zeta] Fix zeta scheduler bug (#6050)
---
.../seatunnel/command/ClientExecuteCommand.java | 9 +-
.../container/seatunnel/SeaTunnelContainer.java | 2 +-
.../seatunnel/engine/e2e/JobClientJobProxyIT.java} | 129 ++++--------------
.../seatunnel/engine/e2e/SeaTunnelSlotIT.java | 145 +++++++++++++++++++++
.../src/test/resources/batch_slot_not_enough.conf | 48 +++++++
.../test/resources/seatunnel_fixed_slot_num.yaml | 34 +++++
.../engine/client/job/ClientJobProxy.java | 6 -
.../org/apache/seatunnel/engine/core/job/Job.java | 1 +
.../engine/server/dag/physical/SubPlan.java | 16 ++-
9 files changed, 277 insertions(+), 113 deletions(-)
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index eae7361ec7..ad41ae983c 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -32,6 +32,8 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
@@ -182,7 +184,12 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
TimeUnit.SECONDS);
// wait for job complete
- jobStatus = clientJobProxy.waitForJobComplete();
+ JobResult jobResult = clientJobProxy.waitForJobCompleteV2();
+ jobStatus = jobResult.getStatus();
+ if (StringUtils.isNotEmpty(jobResult.getError())
+ || jobResult.getStatus().equals(JobStatus.FAILED)) {
+ throw new SeaTunnelEngineException(jobResult.getError());
+ }
// get job end time
endTime = LocalDateTime.now();
// get job statistic information when job finished
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 41b985bc9a..fe47b1988c 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -49,7 +49,7 @@ public class SeaTunnelContainer extends AbstractTestContainer
{
private static final String JDK_DOCKER_IMAGE = "openjdk:8";
private static final String CLIENT_SHELL = "seatunnel.sh";
private static final String SERVER_SHELL = "seatunnel-cluster.sh";
- private GenericContainer<?> server;
+ protected GenericContainer<?> server;
@Override
public void startUp() throws Exception {
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
similarity index 53%
copy from
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index 41b985bc9a..ce54ba84c2 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.seatunnel.e2e.common.container.seatunnel;
+package org.apache.seatunnel.engine.e2e;
-import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
-import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
-import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.commons.lang3.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -30,30 +31,20 @@ import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;
-import com.google.auto.service.AutoService;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import static
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
-@NoArgsConstructor
-@Slf4j
-@AutoService(TestContainer.class)
-public class SeaTunnelContainer extends AbstractTestContainer {
+public class JobClientJobProxyIT extends SeaTunnelContainer {
private static final String JDK_DOCKER_IMAGE = "openjdk:8";
- private static final String CLIENT_SHELL = "seatunnel.sh";
private static final String SERVER_SHELL = "seatunnel-cluster.sh";
- private GenericContainer<?> server;
@Override
+ @BeforeAll
public void startUp() throws Exception {
- server =
+ this.server =
new GenericContainer<>(getDockerImage())
.withNetwork(NETWORK)
.withCommand(
@@ -74,6 +65,13 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
+
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
Paths.get(SEATUNNEL_HOME, "config").toString());
+ // use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in
container
+ server.withCopyFileToContainer(
+ MountableFile.forHostPath(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml"),
+ Paths.get(SEATUNNEL_HOME, "config/seatunnel.yaml").toString());
+
server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
@@ -84,90 +82,15 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
executeExtraCommands(server);
}
- @Override
- public void tearDown() throws Exception {
- if (server != null) {
- server.close();
- }
- }
-
- @Override
- protected String getDockerImage() {
- return JDK_DOCKER_IMAGE;
- }
-
- @Override
- protected String getStartModuleName() {
- return "seatunnel-starter";
- }
-
- @Override
- protected String getStartShellName() {
- return CLIENT_SHELL;
- }
-
- @Override
- protected String getConnectorModulePath() {
- return "seatunnel-connectors-v2";
- }
-
- @Override
- protected String getConnectorType() {
- return "seatunnel";
- }
-
- @Override
- protected String getConnectorNamePrefix() {
- return "connector-";
- }
-
- @Override
- protected List<String> getExtraStartShellCommands() {
- return Collections.emptyList();
- }
-
- @Override
- public TestContainerId identifier() {
- return TestContainerId.SEATUNNEL;
- }
-
- @Override
- protected String getSavePointCommand() {
- return "-s";
- }
-
- @Override
- protected String getRestoreCommand() {
- return "-r";
- }
-
- @Override
- public void executeExtraCommands(ContainerExtendedFactory extendedFactory)
- throws IOException, InterruptedException {
- extendedFactory.extend(server);
- }
-
- @Override
- public Container.ExecResult executeJob(String confFile)
- throws IOException, InterruptedException {
- log.info("test in container: {}", identifier());
- return executeJob(server, confFile);
- }
-
- @Override
- public Container.ExecResult savepointJob(String jobId)
- throws IOException, InterruptedException {
- return savepointJob(server, jobId);
- }
-
- @Override
- public Container.ExecResult restoreJob(String confFile, String jobId)
- throws IOException, InterruptedException {
- return restoreJob(server, confFile, jobId);
- }
-
- @Override
- public String getServerLogs() {
- return server.getLogs();
+ @Test
+ public void testJobFailedWillThrowException() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelJob("/batch_slot_not_enough.conf");
+ Assertions.assertNotEquals(0, execResult.getExitCode());
+ Assertions.assertTrue(
+ StringUtils.isNotBlank(execResult.getStderr())
+ && execResult
+ .getStderr()
+ .contains(
+
"org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException:
can't apply resource request"));
}
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
new file mode 100644
index 0000000000..8f7b459c48
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class SeaTunnelSlotIT {
+ @Test
+ public void testSlotNotEnough() throws Exception {
+ HazelcastInstanceImpl node1 = null;
+ SeaTunnelClient engineClient = null;
+
+ try {
+ String testClusterName = "testSlotNotEnough";
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+ // slot num is 3
+
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
+
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(3);
+
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ // client config
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
TestUtils.getResource("batch_slot_not_enough.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testClusterName);
+
+ ClientConfig clientConfig =
ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(testClusterName);
+ engineClient = new SeaTunnelClient(clientConfig);
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig,
seaTunnelConfig);
+
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ CompletableFuture<JobStatus> objectCompletableFuture =
+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+ Awaitility.await()
+ .atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Thread.sleep(2000);
+ Assertions.assertTrue(
+ objectCompletableFuture.isDone()
+ && JobStatus.FAILED.equals(
+
objectCompletableFuture.get()));
+ });
+
+ } finally {
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testSlotEnough() throws Exception {
+ HazelcastInstanceImpl node1 = null;
+ SeaTunnelClient engineClient = null;
+
+ try {
+ String testClusterName = "testSlotEnough";
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+ // slot num is 10
+
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
+
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(10);
+
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ // client config
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
TestUtils.getResource("batch_slot_not_enough.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testClusterName);
+
+ ClientConfig clientConfig =
ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(testClusterName);
+ engineClient = new SeaTunnelClient(clientConfig);
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig,
seaTunnelConfig);
+
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ CompletableFuture<JobStatus> objectCompletableFuture =
+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+ Awaitility.await()
+ .atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Thread.sleep(2000);
+ Assertions.assertTrue(
+ objectCompletableFuture.isDone()
+ && JobStatus.FINISHED.equals(
+
objectCompletableFuture.get()));
+ });
+
+ } finally {
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_slot_not_enough.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_slot_not_enough.conf
new file mode 100644
index 0000000000..99e93d6c00
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_slot_not_enough.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ job.mode = "BATCH"
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ parallelism = 4
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ source_table_name="fake"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
new file mode 100644
index 0000000000..91736ce34a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+ engine:
+ history-job-expire-minutes: 1
+ backup-count: 2
+ queue-type: blockingqueue
+ print-execution-info-interval: 10
+ slot-service:
+ dynamic-slot: false
+ slot-num: 3
+ checkpoint:
+ interval: 300000
+ timeout: 100000
+ storage:
+ type: localfile
+ max-retained: 3
+ plugin-config:
+ namespace: /tmp/seatunnel/checkpoint_snapshot/
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index ceec9b33dc..21802c5215 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -33,8 +33,6 @@ import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCode
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
-import org.apache.commons.lang3.StringUtils;
-
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
@@ -117,10 +115,6 @@ public class ClientJobProxy implements Job {
throw new RuntimeException(e);
}
LOGGER.info(String.format("Job (%s) end with state %s", jobId,
jobResult.getStatus()));
- if (StringUtils.isNotEmpty(jobResult.getError())
- || jobResult.getStatus().equals(JobStatus.FAILED)) {
- throw new SeaTunnelEngineException(jobResult.getError());
- }
return jobResult;
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index 3d4ee7593b..52fba14205 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -29,6 +29,7 @@ public interface Job {
JobStatus getJobStatus();
+ @Deprecated
default JobStatus waitForJobComplete() {
return waitForJobCompleteV2().getStatus();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 0f9141ed00..c1e7f975c4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -231,6 +231,15 @@ public class SubPlan {
errorByPhysicalVertex.compareAndSet(
null, checkpointCoordinatorState.getThrowableMsg());
}
+
+ // Because the pipeline state must update by tasks, If the
pipeline can not get enough
+ // slot, the pipeline state will turn to Failing and then cancel
all tasks in this
+ // pipeline.
+ // Because the tasks never run, so the tasks will complete with
CANCELED. But the actual
+ // status of the pipeline should be FAILED
+ if (getPipelineState().equals(PipelineStatus.FAILING)) {
+ pipelineStatus = PipelineStatus.FAILED;
+ }
} else {
pipelineStatus = PipelineStatus.FINISHED;
CheckpointCoordinatorState checkpointCoordinatorState =
@@ -322,10 +331,11 @@ public class SubPlan {
// now do the actual state transition
// we must update runningJobStateTimestampsIMap first and then can
update
// runningJobStateIMap
+ PipelineStatus finalTargetState = targetState;
RetryUtils.retryWithException(
() -> {
- updateStateTimestamps(targetState);
- runningJobStateIMap.set(pipelineLocation, targetState);
+ updateStateTimestamps(finalTargetState);
+ runningJobStateIMap.set(pipelineLocation,
finalTargetState);
return null;
},
new RetryUtils.RetryMaterial(
@@ -614,11 +624,13 @@ public class SubPlan {
case CANCELING:
coordinatorVertexList.forEach(
task -> {
+ task.startPhysicalVertex();
task.cancel();
});
physicalVertexList.forEach(
task -> {
+ task.startPhysicalVertex();
task.cancel();
});
break;