This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a0ee6bfa3c [Fix][Zeta] Fix apply resource again for another pipeline 
when restore one pipeline (#7965)
a0ee6bfa3c is described below

commit a0ee6bfa3c7c7dff8b08d588a4b44f09ee5a29a0
Author: Jast <[email protected]>
AuthorDate: Mon Nov 4 10:24:51 2024 +0800

    [Fix][Zeta] Fix apply resource again for another pipeline when restore one 
pipeline (#7965)
---
 .../apache/seatunnel/engine/e2e/JobRestoreIT.java  | 48 ++++++++++++
 .../restore-job/restore_job_apply_resources.conf   | 84 +++++++++++++++++++++
 .../seatunnel_job_restore_apply_resources.yaml     | 34 +++++++++
 .../engine/server/dag/physical/SubPlan.java        |  2 +-
 .../seatunnel/engine/server/master/JobMaster.java  | 87 +++++++++++++++-------
 5 files changed, 227 insertions(+), 28 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobRestoreIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobRestoreIT.java
new file mode 100644
index 0000000000..f251d9019f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobRestoreIT.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+
+public class JobRestoreIT extends SeaTunnelContainer {
+
+    @Override
+    @BeforeAll
+    public void startUp() throws Exception {
+        this.server =
+                createSeaTunnelContainerWithFakeSourceAndInMemorySink(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_job_restore_apply_resources.yaml");
+    }
+
+    /** When testing job recovery, is it successful to reapply for resources */
+    @Test
+    public void testJobRestoreApplyResources() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult =
+                executeJob(server, 
"/restore-job/restore_job_apply_resources.conf");
+        Assertions.assertEquals(1, execResult.getExitCode());
+        
Assertions.assertFalse(server.getLogs().contains("NoEnoughResourceException"));
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/restore-job/restore_job_apply_resources.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/restore-job/restore_job_apply_resources.conf
new file mode 100644
index 0000000000..7f89d63f2a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/restore-job/restore_job_apply_resources.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 5000
+  job.retry.times = 1
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    FakeSource {
+      result_table_name = "fake1"
+      row.num = 10
+      split.num = 1
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
+    }
+  FakeSource {
+    result_table_name = "fake2"
+    row.num = 10
+    split.num = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+    parallelism = 1
+  }
+  FakeSource {
+    result_table_name = "fake3"
+    row.num = 10
+    split.num = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+    parallelism = 1
+  }
+}
+
+transform {
+}
+
+sink {
+  InMemory {
+    source_table_name="fake1"
+    throw_exception=true
+  }
+  InMemory {
+    source_table_name="fake2"
+    throw_exception=true
+  }
+  InMemory {
+    source_table_name="fake3"
+    throw_exception=true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_job_restore_apply_resources.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_job_restore_apply_resources.yaml
new file mode 100644
index 0000000000..9dbb47fff7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_job_restore_apply_resources.yaml
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+  engine:
+    history-job-expire-minutes: 1
+    backup-count: 2
+    queue-type: blockingqueue
+    print-execution-info-interval: 10
+    slot-service:
+      dynamic-slot: false
+      slot-num: 9
+    checkpoint:
+      interval: 300000
+      timeout: 100000
+      storage:
+        type: localfile
+        max-retained: 3
+        plugin-config:
+          namespace: /tmp/seatunnel/checkpoint_snapshot/
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 6ee28d5a5c..2d74b4c928 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -673,7 +673,7 @@ public class SubPlan {
             case CANCELED:
                 if (checkNeedRestore(state) && prepareRestorePipeline()) {
                     jobMaster.releasePipelineResource(this);
-                    jobMaster.preApplyResources();
+                    jobMaster.preApplyResources(this);
                     restorePipeline();
                     return;
                 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 2f69e43d80..04812e078f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -91,6 +91,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -351,37 +352,30 @@ public class JobMaster {
     }
 
     /**
-     * Apply for resources
+     * Apply for all resources
      *
      * @return true if apply resources successfully, otherwise false
      */
     public boolean preApplyResources() {
+        return preApplyResources(null);
+    }
+
+    /**
+     * Apply for resources
+     *
+     * @return true if apply resources successfully, otherwise false
+     */
+    public boolean preApplyResources(SubPlan subPlan) {
+
         Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
preApplyResourceFutures =
                 new HashMap<>();
-        for (SubPlan subPlan : physicalPlan.getPipelineList()) {
-            Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
coordinatorFutures =
-                    new HashMap<>();
-            subPlan.getCoordinatorVertexList()
-                    .forEach(
-                            coordinator ->
-                                    coordinatorFutures.put(
-                                            coordinator.getTaskGroupLocation(),
-                                            ResourceUtils.applyResourceForTask(
-                                                    resourceManager,
-                                                    coordinator,
-                                                    subPlan.getTags())));
-
-            Map<TaskGroupLocation, CompletableFuture<SlotProfile>> taskFutures 
= new HashMap<>();
-            subPlan.getPhysicalVertexList()
-                    .forEach(
-                            task ->
-                                    taskFutures.put(
-                                            task.getTaskGroupLocation(),
-                                            ResourceUtils.applyResourceForTask(
-                                                    resourceManager, task, 
subPlan.getTags())));
-
-            preApplyResourceFutures.putAll(coordinatorFutures);
-            preApplyResourceFutures.putAll(taskFutures);
+
+        boolean isSubPlan = Objects.nonNull(subPlan);
+
+        if (isSubPlan) {
+            preApplyResourcesForSubPlan(subPlan, preApplyResourceFutures);
+        } else {
+            preApplyResourcesForAll(preApplyResourceFutures);
         }
 
         boolean enoughResource =
@@ -400,8 +394,14 @@ public class JobMaster {
                         == preApplyResourceFutures.size();
 
         if (enoughResource) {
-            // Adequate resources, pass on resources to the plan
-            physicalPlan.setPreApplyResourceFutures(preApplyResourceFutures);
+            if (isSubPlan) {
+                // SubPlan applies for resources separately and needs to be 
merged into the entire
+                // job's resources
+                
physicalPlan.getPreApplyResourceFutures().putAll(preApplyResourceFutures);
+            } else {
+                // Adequate resources, pass on resources to the plan
+                
physicalPlan.setPreApplyResourceFutures(preApplyResourceFutures);
+            }
         } else {
             // Release the resource that has been applied
             try {
@@ -442,6 +442,39 @@ public class JobMaster {
         return enoughResource;
     }
 
+    private Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
preApplyResourcesForAll(
+            Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
preApplyResourceFutures) {
+        for (SubPlan subPlan : physicalPlan.getPipelineList()) {
+            preApplyResourcesForSubPlan(subPlan, preApplyResourceFutures);
+        }
+        return preApplyResourceFutures;
+    }
+
+    private void preApplyResourcesForSubPlan(
+            SubPlan subPlan,
+            Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
preApplyResourceFutures) {
+        Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
coordinatorFutures = new HashMap<>();
+        subPlan.getCoordinatorVertexList()
+                .forEach(
+                        coordinator ->
+                                coordinatorFutures.put(
+                                        coordinator.getTaskGroupLocation(),
+                                        ResourceUtils.applyResourceForTask(
+                                                resourceManager, coordinator, 
subPlan.getTags())));
+
+        Map<TaskGroupLocation, CompletableFuture<SlotProfile>> taskFutures = 
new HashMap<>();
+        subPlan.getPhysicalVertexList()
+                .forEach(
+                        task ->
+                                taskFutures.put(
+                                        task.getTaskGroupLocation(),
+                                        ResourceUtils.applyResourceForTask(
+                                                resourceManager, task, 
subPlan.getTags())));
+
+        preApplyResourceFutures.putAll(coordinatorFutures);
+        preApplyResourceFutures.putAll(taskFutures);
+    }
+
     public void run() {
         try {
             physicalPlan.startJob();

Reply via email to