This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a0ee6bfa3c [Fix][Zeta] Fix apply resource again for another pipeline
when restore one pipeline (#7965)
a0ee6bfa3c is described below
commit a0ee6bfa3c7c7dff8b08d588a4b44f09ee5a29a0
Author: Jast <[email protected]>
AuthorDate: Mon Nov 4 10:24:51 2024 +0800
[Fix][Zeta] Fix apply resource again for another pipeline when restore one
pipeline (#7965)
---
.../apache/seatunnel/engine/e2e/JobRestoreIT.java | 48 ++++++++++++
.../restore-job/restore_job_apply_resources.conf | 84 +++++++++++++++++++++
.../seatunnel_job_restore_apply_resources.yaml | 34 +++++++++
.../engine/server/dag/physical/SubPlan.java | 2 +-
.../seatunnel/engine/server/master/JobMaster.java | 87 +++++++++++++++-------
5 files changed, 227 insertions(+), 28 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobRestoreIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobRestoreIT.java
new file mode 100644
index 0000000000..f251d9019f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobRestoreIT.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+import static
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+
+public class JobRestoreIT extends SeaTunnelContainer {
+
+ @Override
+ @BeforeAll
+ public void startUp() throws Exception {
+ this.server =
+ createSeaTunnelContainerWithFakeSourceAndInMemorySink(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_job_restore_apply_resources.yaml");
+ }
+
+ /** When testing job recovery, is it successful to reapply for resources */
+ @Test
+ public void testJobRestoreApplyResources() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
+ executeJob(server,
"/restore-job/restore_job_apply_resources.conf");
+ Assertions.assertEquals(1, execResult.getExitCode());
+
Assertions.assertFalse(server.getLogs().contains("NoEnoughResourceException"));
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/restore-job/restore_job_apply_resources.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/restore-job/restore_job_apply_resources.conf
new file mode 100644
index 0000000000..7f89d63f2a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/restore-job/restore_job_apply_resources.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 5000
+ job.retry.times = 1
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake1"
+ row.num = 10
+ split.num = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+ FakeSource {
+ result_table_name = "fake2"
+ row.num = 10
+ split.num = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+ FakeSource {
+ result_table_name = "fake3"
+ row.num = 10
+ split.num = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+}
+
+transform {
+}
+
+sink {
+ InMemory {
+ source_table_name="fake1"
+ throw_exception=true
+ }
+ InMemory {
+ source_table_name="fake2"
+ throw_exception=true
+ }
+ InMemory {
+ source_table_name="fake3"
+ throw_exception=true
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_job_restore_apply_resources.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_job_restore_apply_resources.yaml
new file mode 100644
index 0000000000..9dbb47fff7
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_job_restore_apply_resources.yaml
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+ engine:
+ history-job-expire-minutes: 1
+ backup-count: 2
+ queue-type: blockingqueue
+ print-execution-info-interval: 10
+ slot-service:
+ dynamic-slot: false
+ slot-num: 9
+ checkpoint:
+ interval: 300000
+ timeout: 100000
+ storage:
+ type: localfile
+ max-retained: 3
+ plugin-config:
+ namespace: /tmp/seatunnel/checkpoint_snapshot/
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 6ee28d5a5c..2d74b4c928 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -673,7 +673,7 @@ public class SubPlan {
case CANCELED:
if (checkNeedRestore(state) && prepareRestorePipeline()) {
jobMaster.releasePipelineResource(this);
- jobMaster.preApplyResources();
+ jobMaster.preApplyResources(this);
restorePipeline();
return;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 2f69e43d80..04812e078f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -91,6 +91,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -351,37 +352,30 @@ public class JobMaster {
}
/**
- * Apply for resources
+ * Apply for all resources
*
* @return true if apply resources successfully, otherwise false
*/
public boolean preApplyResources() {
+ return preApplyResources(null);
+ }
+
+ /**
+ * Apply for resources
+ *
+ * @return true if apply resources successfully, otherwise false
+ */
+ public boolean preApplyResources(SubPlan subPlan) {
+
Map<TaskGroupLocation, CompletableFuture<SlotProfile>>
preApplyResourceFutures =
new HashMap<>();
- for (SubPlan subPlan : physicalPlan.getPipelineList()) {
- Map<TaskGroupLocation, CompletableFuture<SlotProfile>>
coordinatorFutures =
- new HashMap<>();
- subPlan.getCoordinatorVertexList()
- .forEach(
- coordinator ->
- coordinatorFutures.put(
- coordinator.getTaskGroupLocation(),
- ResourceUtils.applyResourceForTask(
- resourceManager,
- coordinator,
- subPlan.getTags())));
-
- Map<TaskGroupLocation, CompletableFuture<SlotProfile>> taskFutures
= new HashMap<>();
- subPlan.getPhysicalVertexList()
- .forEach(
- task ->
- taskFutures.put(
- task.getTaskGroupLocation(),
- ResourceUtils.applyResourceForTask(
- resourceManager, task,
subPlan.getTags())));
-
- preApplyResourceFutures.putAll(coordinatorFutures);
- preApplyResourceFutures.putAll(taskFutures);
+
+ boolean isSubPlan = Objects.nonNull(subPlan);
+
+ if (isSubPlan) {
+ preApplyResourcesForSubPlan(subPlan, preApplyResourceFutures);
+ } else {
+ preApplyResourcesForAll(preApplyResourceFutures);
}
boolean enoughResource =
@@ -400,8 +394,14 @@ public class JobMaster {
== preApplyResourceFutures.size();
if (enoughResource) {
- // Adequate resources, pass on resources to the plan
- physicalPlan.setPreApplyResourceFutures(preApplyResourceFutures);
+ if (isSubPlan) {
+ // SubPlan applies for resources separately and needs to be
merged into the entire
+ // job's resources
+
physicalPlan.getPreApplyResourceFutures().putAll(preApplyResourceFutures);
+ } else {
+ // Adequate resources, pass on resources to the plan
+
physicalPlan.setPreApplyResourceFutures(preApplyResourceFutures);
+ }
} else {
// Release the resource that has been applied
try {
@@ -442,6 +442,39 @@ public class JobMaster {
return enoughResource;
}
+ private Map<TaskGroupLocation, CompletableFuture<SlotProfile>>
preApplyResourcesForAll(
+ Map<TaskGroupLocation, CompletableFuture<SlotProfile>>
preApplyResourceFutures) {
+ for (SubPlan subPlan : physicalPlan.getPipelineList()) {
+ preApplyResourcesForSubPlan(subPlan, preApplyResourceFutures);
+ }
+ return preApplyResourceFutures;
+ }
+
+ private void preApplyResourcesForSubPlan(
+ SubPlan subPlan,
+ Map<TaskGroupLocation, CompletableFuture<SlotProfile>>
preApplyResourceFutures) {
+ Map<TaskGroupLocation, CompletableFuture<SlotProfile>>
coordinatorFutures = new HashMap<>();
+ subPlan.getCoordinatorVertexList()
+ .forEach(
+ coordinator ->
+ coordinatorFutures.put(
+ coordinator.getTaskGroupLocation(),
+ ResourceUtils.applyResourceForTask(
+ resourceManager, coordinator,
subPlan.getTags())));
+
+ Map<TaskGroupLocation, CompletableFuture<SlotProfile>> taskFutures =
new HashMap<>();
+ subPlan.getPhysicalVertexList()
+ .forEach(
+ task ->
+ taskFutures.put(
+ task.getTaskGroupLocation(),
+ ResourceUtils.applyResourceForTask(
+ resourceManager, task,
subPlan.getTags())));
+
+ preApplyResourceFutures.putAll(coordinatorFutures);
+ preApplyResourceFutures.putAll(taskFutures);
+ }
+
public void run() {
try {
physicalPlan.startJob();