This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7529f2a079 [Fix] Fix transform action return same name (#6034)
7529f2a079 is described below

commit 7529f2a07954603eaa48aa8cbf5b5f891777f501
Author: Jia Fan <[email protected]>
AuthorDate: Wed Dec 20 14:42:21 2023 +0800

    [Fix] Fix transform action return same name (#6034)
---
 seatunnel-engine/seatunnel-engine-client/pom.xml   |  6 +++
 .../client/MultipleTableJobConfigParserTest.java   | 16 ++++++
 ..._fake_to_console_with_duplicated_transform.conf | 62 ++++++++++++++++++++++
 .../core/parse/MultipleTableJobConfigParser.java   | 10 ++--
 .../dag/execution/ExecutionPlanGenerator.java      |  9 ++--
 5 files changed, 95 insertions(+), 8 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml 
b/seatunnel-engine/seatunnel-engine-client/pom.xml
index 966a4c1a3d..73bd306fe7 100644
--- a/seatunnel-engine/seatunnel-engine-client/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -72,6 +72,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transforms-v2</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-file-local</artifactId>
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index 083e503d8b..319e751549 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -131,4 +131,20 @@ public class MultipleTableJobConfigParserTest {
         Assertions.assertFalse(
                 ((SinkAction) 
actions.get(0)).getSink().createAggregatedCommitter().isPresent());
     }
+
+    @Test
+    public void testDuplicatedTransformInOnePipeline() {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath =
+                
TestUtils.getResource("/batch_fake_to_console_with_duplicated_transform.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setJobContext(new JobContext());
+        Config config = ConfigBuilder.of(Paths.get(filePath));
+        MultipleTableJobConfigParser jobConfigParser =
+                new MultipleTableJobConfigParser(config, new IdGenerator(), 
jobConfig);
+        ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
+        List<Action> actions = parse.getLeft();
+        Assertions.assertEquals("Transform[0]-sql", 
actions.get(0).getUpstream().get(0).getName());
+        Assertions.assertEquals("Transform[1]-sql", 
actions.get(1).getUpstream().get(0).getName());
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_duplicated_transform.conf
 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_duplicated_transform.conf
new file mode 100644
index 0000000000..d1d7524421
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_duplicated_transform.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    parallelism = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+  sql {
+    source_table_name = "fake"
+    query = "select 1 from fake"
+    result_table_name = "fake2"
+  }
+  sql {
+    source_table_name = "fake"
+    query = "select 1 from fake"
+    result_table_name = "fake3"
+  }
+}
+
+sink {
+  console {
+    source_table_name="fake2"
+  }
+  console {
+    source_table_name="fake3"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 0569117e83..96a3901f22 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -335,16 +335,18 @@ public class MultipleTableJobConfigParser {
             List<? extends Config> transformConfigs,
             ClassLoader classLoader,
             LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap) {
-        if (CollectionUtils.isEmpty(transformConfigs) || 
transformConfigs.size() == 0) {
+        if (CollectionUtils.isEmpty(transformConfigs) || 
transformConfigs.isEmpty()) {
             return;
         }
         Queue<Config> configList = new LinkedList<>(transformConfigs);
+        int index = 0;
         while (!configList.isEmpty()) {
-            parseTransform(configList, classLoader, tableWithActionMap);
+            parseTransform(index++, configList, classLoader, 
tableWithActionMap);
         }
     }
 
     private void parseTransform(
+            int index,
             Queue<Config> transforms,
             ClassLoader classLoader,
             LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap) {
@@ -418,9 +420,7 @@ public class MultipleTableJobConfigParser {
                         catalogTable, readonlyConfig, classLoader, factoryId);
         transform.setJobContext(jobConfig.getJobContext());
         long id = idGenerator.getNextId();
-        // TODO If you need to support snapshot transform state, you need to 
use ordered index to
-        // generate unique names.
-        String actionName = JobConfigParser.createTransformActionName(0, 
factoryId);
+        String actionName = JobConfigParser.createTransformActionName(index, 
factoryId);
 
         TransformAction transformAction =
                 new TransformAction(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 45795cab5c..8f5cb3f776 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -447,7 +447,7 @@ public class ExecutionPlanGenerator {
                 new PipelineGenerator(executionVertices, new 
ArrayList<>(executionEdges));
         List<Pipeline> pipelines = pipelineGenerator.generatePipelines();
 
-        long actionCount = 0;
+        Set<String> duplicatedActionNames = new HashSet<>();
         Set<String> actionNames = new HashSet<>();
         for (Pipeline pipeline : pipelines) {
             Integer pipelineId = pipeline.getId();
@@ -455,12 +455,15 @@ public class ExecutionPlanGenerator {
                 Action action = vertex.getAction();
                 String actionName = String.format("pipeline-%s [%s]", 
pipelineId, action.getName());
                 action.setName(actionName);
+                if (actionNames.contains(actionName)) {
+                    duplicatedActionNames.add(actionName);
+                }
                 actionNames.add(actionName);
-                actionCount++;
             }
         }
         checkArgument(
-                actionNames.size() == actionCount, "Action name is duplicated: 
" + actionNames);
+                duplicatedActionNames.isEmpty(),
+                "Action name is duplicated: " + duplicatedActionNames);
 
         return pipelines;
     }

Reply via email to