This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 54a5e2b52 [Bugfix][DAG] Fix the incorrect setting of transform 
parallelism (#4814)
54a5e2b52 is described below

commit 54a5e2b5264b7bc6ad8c23ba84a019611f4dd8f8
Author: ic4y <[email protected]>
AuthorDate: Tue May 30 15:57:38 2023 +0800

    [Bugfix][DAG] Fix the incorrect setting of transform parallelism (#4814)
---
 .../seatunnel/engine/core/parse/MultipleTableJobConfigParser.java    | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 8fd9d90bd..5fd4892cd 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -409,9 +409,9 @@ public class MultipleTableJobConfigParser {
         SeaTunnelDataType<?> expectedType = 
getProducedType(inputs.get(0)._2());
         checkProducedTypeEquals(inputActions);
         int spareParallelism = inputs.get(0)._2().getParallelism();
+        int parallelism =
+                
readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
         if (fallback) {
-            int parallelism =
-                    
readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
             Tuple2<CatalogTable, Action> tuple =
                     fallbackParser.parseTransform(
                             config,
@@ -437,6 +437,7 @@ public class MultipleTableJobConfigParser {
         TransformAction transformAction =
                 new TransformAction(
                         id, actionName, new ArrayList<>(inputActions), 
transform, factoryUrls);
+        transformAction.setParallelism(parallelism);
         tableWithActionMap.put(
                 tableId,
                 Collections.singletonList(

Reply via email to