This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c413040a6e [Fix] Fix MultiTableSink return committer but sink do not
support (#5710)
c413040a6e is described below
commit c413040a6ed0e7d85e37ce37e33fcacb44514cf0
Author: Jia Fan <[email protected]>
AuthorDate: Fri Oct 27 09:31:59 2023 +0800
[Fix] Fix MultiTableSink return committer but sink do not support (#5710)
* [Fix] Fix MultiTableSink return committer but sink do not support
* update
* update
* update
* update
---
.github/workflows/backend.yml | 42 ++++++++++++++++++----
.../common/multitablesink/MultiTableSink.java | 6 ++++
.../client/MultipleTableJobConfigParserTest.java | 8 ++++-
3 files changed, 49 insertions(+), 7 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 5338e252e7..0fc5ffb77e 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -340,7 +340,7 @@ jobs:
- name: run updated modules integration test (part-2)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
- sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 7 1`
+ sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 8 1`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am
-Pci
else
@@ -369,7 +369,7 @@ jobs:
- name: run updated modules integration test (part-3)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
- sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 7 2`
+ sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 8 2`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am
-Pci
else
@@ -398,7 +398,7 @@ jobs:
- name: run updated modules integration test (part-4)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
- sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 7 3`
+ sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 8 3`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am
-Pci
else
@@ -426,7 +426,7 @@ jobs:
- name: run updated modules integration test (part-5)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
- sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 7 4`
+ sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 8 4`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am
-Pci
else
@@ -454,7 +454,7 @@ jobs:
- name: run updated modules integration test (part-6)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
- sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 7 5`
+ sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 8 5`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am
-Pci
else
@@ -482,7 +482,7 @@ jobs:
- name: run updated modules integration test (part-7)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
- sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 7 6`
+ sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 8 6`
if [ ! -z $sub_modules ]; then
./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am
-Pci
else
@@ -490,6 +490,36 @@ jobs:
fi
env:
MAVEN_OPTS: -Xmx2048m
+
+ updated-modules-integration-test-part-8:
+ needs: [ changes, sanity-check ]
+ if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
+ runs-on: ${{ matrix.os }}
+ strategy:
+ matrix:
+ java: [ '8', '11' ]
+ os: [ 'ubuntu-latest' ]
+ timeout-minutes: 90
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK ${{ matrix.java }}
+ uses: actions/setup-java@v3
+ with:
+ java-version: ${{ matrix.java }}
+ distribution: 'temurin'
+ cache: 'maven'
+ - name: run updated modules integration test (part-8)
+ if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
+ run: |
+ sub_modules=`python
tools/update_modules_check/update_modules_check.py sub_update_it_module
${{needs.changes.outputs.it-modules}} 8 7`
+ if [ ! -z $sub_modules ]; then
+ ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am
-Pci
+ else
+ echo "sub modules is empty, skipping"
+ fi
+ env:
+ MAVEN_OPTS: -Xmx2048m
+
engine-v2-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
index 2caf405ee0..21f25f4728 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
@@ -143,6 +143,9 @@ public class MultiTableSink
committer ->
committers.put(tableIdentifier,
(SinkCommitter<?>) committer));
}
+ if (committers.isEmpty()) {
+ return Optional.empty();
+ }
return Optional.of(new MultiTableSinkCommitter(committers));
}
@@ -162,6 +165,9 @@ public class MultiTableSink
sinkAggregatedCommitter ->
aggCommitters.put(tableIdentifier,
sinkAggregatedCommitter));
}
+ if (aggCommitters.isEmpty()) {
+ return Optional.empty();
+ }
return Optional.of(new
MultiTableSinkAggregatedCommitter(aggCommitters));
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index 2248ed5d8e..b44e90b469 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -33,6 +34,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.Arrays;
@@ -109,7 +111,7 @@ public class MultipleTableJobConfigParserTest {
}
@Test
- public void testMultipleTableSourceWithMultiTableSinkParse() {
+ public void testMultipleTableSourceWithMultiTableSinkParse() throws
IOException {
Common.setDeployMode(DeployMode.CLIENT);
String filePath =
TestUtils.getResource("/batch_fake_to_console_multi_table.conf");
JobConfig jobConfig = new JobConfig();
@@ -121,5 +123,9 @@ public class MultipleTableJobConfigParserTest {
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
Assertions.assertEquals("MultiTableSink-Console",
actions.get(0).getName());
+ Assertions.assertFalse(
+ ((SinkAction)
actions.get(0)).getSink().createCommitter().isPresent());
+ Assertions.assertFalse(
+ ((SinkAction)
actions.get(0)).getSink().createAggregatedCommitter().isPresent());
}
}