This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7529f2a079 [Fix] Fix transform action return same name (#6034)
7529f2a079 is described below
commit 7529f2a07954603eaa48aa8cbf5b5f891777f501
Author: Jia Fan <[email protected]>
AuthorDate: Wed Dec 20 14:42:21 2023 +0800
[Fix] Fix transform action return same name (#6034)
---
seatunnel-engine/seatunnel-engine-client/pom.xml | 6 +++
.../client/MultipleTableJobConfigParserTest.java | 16 ++++++
..._fake_to_console_with_duplicated_transform.conf | 62 ++++++++++++++++++++++
.../core/parse/MultipleTableJobConfigParser.java | 10 ++--
.../dag/execution/ExecutionPlanGenerator.java | 9 ++--
5 files changed, 95 insertions(+), 8 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml
b/seatunnel-engine/seatunnel-engine-client/pom.xml
index 966a4c1a3d..73bd306fe7 100644
--- a/seatunnel-engine/seatunnel-engine-client/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -72,6 +72,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-local</artifactId>
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index 083e503d8b..319e751549 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -131,4 +131,20 @@ public class MultipleTableJobConfigParserTest {
Assertions.assertFalse(
((SinkAction)
actions.get(0)).getSink().createAggregatedCommitter().isPresent());
}
+
+ @Test
+ public void testDuplicatedTransformInOnePipeline() {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
+
TestUtils.getResource("/batch_fake_to_console_with_duplicated_transform.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setJobContext(new JobContext());
+ Config config = ConfigBuilder.of(Paths.get(filePath));
+ MultipleTableJobConfigParser jobConfigParser =
+ new MultipleTableJobConfigParser(config, new IdGenerator(),
jobConfig);
+ ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
+ List<Action> actions = parse.getLeft();
+ Assertions.assertEquals("Transform[0]-sql",
actions.get(0).getUpstream().get(0).getName());
+ Assertions.assertEquals("Transform[1]-sql",
actions.get(1).getUpstream().get(0).getName());
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_duplicated_transform.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_duplicated_transform.conf
new file mode 100644
index 0000000000..d1d7524421
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_duplicated_transform.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+ sql {
+ source_table_name = "fake"
+ query = "select 1 from fake"
+ result_table_name = "fake2"
+ }
+ sql {
+ source_table_name = "fake"
+ query = "select 1 from fake"
+ result_table_name = "fake3"
+ }
+}
+
+sink {
+ console {
+ source_table_name="fake2"
+ }
+ console {
+ source_table_name="fake3"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 0569117e83..96a3901f22 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -335,16 +335,18 @@ public class MultipleTableJobConfigParser {
List<? extends Config> transformConfigs,
ClassLoader classLoader,
LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap) {
- if (CollectionUtils.isEmpty(transformConfigs) ||
transformConfigs.size() == 0) {
+ if (CollectionUtils.isEmpty(transformConfigs) ||
transformConfigs.isEmpty()) {
return;
}
Queue<Config> configList = new LinkedList<>(transformConfigs);
+ int index = 0;
while (!configList.isEmpty()) {
- parseTransform(configList, classLoader, tableWithActionMap);
+ parseTransform(index++, configList, classLoader,
tableWithActionMap);
}
}
private void parseTransform(
+ int index,
Queue<Config> transforms,
ClassLoader classLoader,
LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap) {
@@ -418,9 +420,7 @@ public class MultipleTableJobConfigParser {
catalogTable, readonlyConfig, classLoader, factoryId);
transform.setJobContext(jobConfig.getJobContext());
long id = idGenerator.getNextId();
- // TODO If you need to support snapshot transform state, you need to
use ordered index to
- // generate unique names.
- String actionName = JobConfigParser.createTransformActionName(0,
factoryId);
+ String actionName = JobConfigParser.createTransformActionName(index,
factoryId);
TransformAction transformAction =
new TransformAction(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 45795cab5c..8f5cb3f776 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -447,7 +447,7 @@ public class ExecutionPlanGenerator {
new PipelineGenerator(executionVertices, new
ArrayList<>(executionEdges));
List<Pipeline> pipelines = pipelineGenerator.generatePipelines();
- long actionCount = 0;
+ Set<String> duplicatedActionNames = new HashSet<>();
Set<String> actionNames = new HashSet<>();
for (Pipeline pipeline : pipelines) {
Integer pipelineId = pipeline.getId();
@@ -455,12 +455,15 @@ public class ExecutionPlanGenerator {
Action action = vertex.getAction();
String actionName = String.format("pipeline-%s [%s]",
pipelineId, action.getName());
action.setName(actionName);
+ if (actionNames.contains(actionName)) {
+ duplicatedActionNames.add(actionName);
+ }
actionNames.add(actionName);
- actionCount++;
}
}
checkArgument(
- actionNames.size() == actionCount, "Action name is duplicated:
" + actionNames);
+ duplicatedActionNames.isEmpty(),
+ "Action name is duplicated: " + duplicatedActionNames);
return pipelines;
}