This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c413040a6e [Fix] Fix MultiTableSink return committer but sink do not 
support (#5710)
c413040a6e is described below

commit c413040a6ed0e7d85e37ce37e33fcacb44514cf0
Author: Jia Fan <[email protected]>
AuthorDate: Fri Oct 27 09:31:59 2023 +0800

    [Fix] Fix MultiTableSink return committer but sink do not support (#5710)
    
    * [Fix] Fix MultiTableSink return committer but sink do not support
    
    * update
    
    * update
    
    * update
    
    * update
---
 .github/workflows/backend.yml                      | 42 ++++++++++++++++++----
 .../common/multitablesink/MultiTableSink.java      |  6 ++++
 .../client/MultipleTableJobConfigParserTest.java   |  8 ++++-
 3 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 5338e252e7..0fc5ffb77e 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -340,7 +340,7 @@ jobs:
       - name: run updated modules integration test (part-2)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
-          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 7 1`
+          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 8 1`
           if [ ! -z $sub_modules ]; then
             ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false 
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am 
-Pci
           else
@@ -369,7 +369,7 @@ jobs:
       - name: run updated modules integration test (part-3)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
-          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 7 2`
+          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 8 2`
           if [ ! -z $sub_modules ]; then
             ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false 
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am 
-Pci
           else
@@ -398,7 +398,7 @@ jobs:
       - name: run updated modules integration test (part-4)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
-          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 7 3`
+          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 8 3`
           if [ ! -z $sub_modules ]; then
             ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false 
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am 
-Pci
           else
@@ -426,7 +426,7 @@ jobs:
       - name: run updated modules integration test (part-5)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
-          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 7 4`
+          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 8 4`
           if [ ! -z $sub_modules ]; then
             ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false 
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am 
-Pci
           else
@@ -454,7 +454,7 @@ jobs:
       - name: run updated modules integration test (part-6)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
-          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 7 5`
+          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 8 5`
           if [ ! -z $sub_modules ]; then
             ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false 
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am 
-Pci
           else
@@ -482,7 +482,7 @@ jobs:
       - name: run updated modules integration test (part-7)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
-          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 7 6`
+          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 8 6`
           if [ ! -z $sub_modules ]; then
             ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false 
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am 
-Pci
           else
@@ -490,6 +490,36 @@ jobs:
           fi
         env:
           MAVEN_OPTS: -Xmx2048m
+
+  updated-modules-integration-test-part-8:
+    needs: [ changes, sanity-check ]
+    if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
+    runs-on: ${{ matrix.os }}
+    strategy:
+      matrix:
+        java: [ '8', '11' ]
+        os: [ 'ubuntu-latest' ]
+    timeout-minutes: 90
+    steps:
+      - uses: actions/checkout@v2
+      - name: Set up JDK ${{ matrix.java }}
+        uses: actions/setup-java@v3
+        with:
+          java-version: ${{ matrix.java }}
+          distribution: 'temurin'
+          cache: 'maven'
+      - name: run updated modules integration test (part-8)
+        if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
+        run: |
+          sub_modules=`python 
tools/update_modules_check/update_modules_check.py sub_update_it_module 
${{needs.changes.outputs.it-modules}} 8 7`
+          if [ ! -z $sub_modules ]; then
+            ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false 
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am 
-Pci
+          else
+            echo "sub modules is empty, skipping"
+          fi
+        env:
+          MAVEN_OPTS: -Xmx2048m
+
   engine-v2-it:
     needs: [ changes, sanity-check ]
     if: needs.changes.outputs.api == 'true'
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
index 2caf405ee0..21f25f4728 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
@@ -143,6 +143,9 @@ public class MultiTableSink
                             committer ->
                                     committers.put(tableIdentifier, 
(SinkCommitter<?>) committer));
         }
+        if (committers.isEmpty()) {
+            return Optional.empty();
+        }
         return Optional.of(new MultiTableSinkCommitter(committers));
     }
 
@@ -162,6 +165,9 @@ public class MultiTableSink
                     sinkAggregatedCommitter ->
                             aggCommitters.put(tableIdentifier, 
sinkAggregatedCommitter));
         }
+        if (aggCommitters.isEmpty()) {
+            return Optional.empty();
+        }
         return Optional.of(new 
MultiTableSinkAggregatedCommitter(aggCommitters));
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index 2248ed5d8e..b44e90b469 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -33,6 +34,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.net.URL;
 import java.nio.file.Paths;
 import java.util.Arrays;
@@ -109,7 +111,7 @@ public class MultipleTableJobConfigParserTest {
     }
 
     @Test
-    public void testMultipleTableSourceWithMultiTableSinkParse() {
+    public void testMultipleTableSourceWithMultiTableSinkParse() throws 
IOException {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = 
TestUtils.getResource("/batch_fake_to_console_multi_table.conf");
         JobConfig jobConfig = new JobConfig();
@@ -121,5 +123,9 @@ public class MultipleTableJobConfigParserTest {
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(1, actions.size());
         Assertions.assertEquals("MultiTableSink-Console", 
actions.get(0).getName());
+        Assertions.assertFalse(
+                ((SinkAction) 
actions.get(0)).getSink().createCommitter().isPresent());
+        Assertions.assertFalse(
+                ((SinkAction) 
actions.get(0)).getSink().createAggregatedCommitter().isPresent());
     }
 }

Reply via email to