This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3cfc23595c [BUG][Zeta] Multiple sink actions of the same type have the 
same name (#5499)
3cfc23595c is described below

commit 3cfc23595c0c7aa312a63d97b8ec11d2521dfae3
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Mon Sep 18 16:44:13 2023 +0800

    [BUG][Zeta] Multiple sink actions of the same type have the same name 
(#5499)
---
 .../client/MultipleTableJobConfigParserTest.java   | 16 ++++
 .../resources/batch_fakesource_to_two_file.conf    | 89 ++++++++++++++++++++++
 .../engine/core/parse/JobConfigParser.java         |  7 +-
 .../core/parse/MultipleTableJobConfigParser.java   |  2 +-
 4 files changed, 112 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index e5faaea25c..d869713550 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -87,4 +87,20 @@ public class MultipleTableJobConfigParserTest {
         Assertions.assertEquals(3, 
actions.get(0).getUpstream().get(1).getParallelism());
         Assertions.assertEquals(3, actions.get(0).getParallelism());
     }
+
+    @Test
+    public void testMultipleSinkName() {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = 
TestUtils.getResource("/batch_fakesource_to_two_file.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setJobContext(new JobContext());
+        MultipleTableJobConfigParser jobConfigParser =
+                new MultipleTableJobConfigParser(filePath, new IdGenerator(), 
jobConfig);
+        ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
+        List<Action> actions = parse.getLeft();
+        Assertions.assertEquals(2, actions.size());
+
+        Assertions.assertEquals("Sink[0]-LocalFile-default-identifier", 
actions.get(0).getName());
+        Assertions.assertEquals("Sink[1]-LocalFile-default-identifier", 
actions.get(1).getName());
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
new file mode 100644
index 0000000000..7ff3c21f78
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  execution.checkpoint.interval = 5000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+    parallelism = 3
+  }
+
+  FakeSource {
+    result_table_name = "fake2"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+    parallelism = 3
+  }
+}
+
+transform {
+}
+
+sink {
+  LocalFile {
+    path = "/tmp/hive/warehouse/test2"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "text"
+    sink_columns = ["name", "age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+    save_mode = "error",
+    source_table_name = ["fake", "fake2"]
+  }
+
+  LocalFile {
+    path = "/tmp/hive/warehouse/test2"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "text"
+    sink_columns = ["name", "age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+    save_mode = "error",
+    source_table_name = ["fake"]
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 09bae74f5a..46c28b6397 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -130,6 +130,7 @@ public class JobConfigParser {
     }
 
     public List<SinkAction<?, ?, ?, ?>> parseSinks(
+            int configIndex,
             List<List<Tuple2<CatalogTable, Action>>> inputVertices,
             Config sinkConfig,
             JobConfig jobConfig) {
@@ -145,6 +146,7 @@ public class JobConfigParser {
             checkProducedTypeEquals(inputActions);
             SinkAction<?, ?, ?, ?> sinkAction =
                     parseSink(
+                            configIndex,
                             sinkConfig,
                             jobConfig,
                             spareParallelism,
@@ -164,6 +166,7 @@ public class JobConfigParser {
                 int parallelism = inputAction.getParallelism();
                 SinkAction<?, ?, ?, ?> sinkAction =
                         parseSink(
+                                configIndex,
                                 sinkConfig,
                                 jobConfig,
                                 parallelism,
@@ -176,6 +179,7 @@ public class JobConfigParser {
     }
 
     private SinkAction<?, ?, ?, ?> parseSink(
+            int configIndex,
             Config config,
             JobConfig jobConfig,
             int parallelism,
@@ -198,7 +202,8 @@ public class JobConfigParser {
             handleSaveMode(sink);
         }
         final String actionName =
-                createSinkActionName(0, tuple.getLeft().getPluginName(), 
getTableName(config));
+                createSinkActionName(
+                        configIndex, tuple.getLeft().getPluginName(), 
getTableName(config));
         final SinkAction action =
                 new SinkAction<>(
                         idGenerator.getNextId(),
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index ee2505286f..c83ceade12 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -531,7 +531,7 @@ public class MultipleTableJobConfigParser {
                         factoryId,
                         (factory) -> factory.createSink(null));
         if (fallback) {
-            return fallbackParser.parseSinks(inputVertices, sinkConfig, 
jobConfig);
+            return fallbackParser.parseSinks(configIndex, inputVertices, 
sinkConfig, jobConfig);
         }
 
         Map<TablePath, CatalogTable> tableMap =

Reply via email to