This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2086b0e8a6 [Improve] Improve MultiTableSinkWriter prepare commit 
performance (#6495)
2086b0e8a6 is described below

commit 2086b0e8a6b4b6c04089cb0507a38d357c33bd20
Author: Jia Fan <[email protected]>
AuthorDate: Wed Mar 27 19:23:19 2024 +0800

    [Improve] Improve MultiTableSinkWriter prepare commit performance (#6495)
    
    * [Improve] Improve MultiTableSinkWriter prepare commit performance
    
    * update
    
    * update
---
 .github/workflows/backend.yml                      | 14 +++++
 .../multitablesink/MultiTableSinkWriter.java       | 69 ++++++++++++++++++----
 .../seatunnel/jdbc/internal/JdbcOutputFormat.java  |  5 ++
 3 files changed, 75 insertions(+), 13 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 34a173cd98..5c8e75897d 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -304,6 +304,8 @@ jobs:
           java-version: ${{ matrix.java }}
           distribution: 'temurin'
           cache: 'maven'
+      - name: free disk space
+        run: tools/github/free_disk_space.sh
       - name: run updated modules integration test (part-1)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
@@ -333,6 +335,8 @@ jobs:
           java-version: ${{ matrix.java }}
           distribution: 'temurin'
           cache: 'maven'
+      - name: free disk space
+        run: tools/github/free_disk_space.sh
       - name: run updated modules integration test (part-2)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
@@ -393,6 +397,8 @@ jobs:
           java-version: ${{ matrix.java }}
           distribution: 'temurin'
           cache: 'maven'
+      - name: free disk space
+        run: tools/github/free_disk_space.sh
       - name: run updated modules integration test (part-4)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
@@ -421,6 +427,8 @@ jobs:
           java-version: ${{ matrix.java }}
           distribution: 'temurin'
           cache: 'maven'
+      - name: free disk space
+        run: tools/github/free_disk_space.sh
       - name: run updated modules integration test (part-5)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
@@ -449,6 +457,8 @@ jobs:
           java-version: ${{ matrix.java }}
           distribution: 'temurin'
           cache: 'maven'
+      - name: free disk space
+        run: tools/github/free_disk_space.sh
       - name: run updated modules integration test (part-6)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
@@ -477,6 +487,8 @@ jobs:
           java-version: ${{ matrix.java }}
           distribution: 'temurin'
           cache: 'maven'
+      - name: free disk space
+        run: tools/github/free_disk_space.sh
       - name: run updated modules integration test (part-7)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
@@ -506,6 +518,8 @@ jobs:
           java-version: ${{ matrix.java }}
           distribution: 'temurin'
           cache: 'maven'
+      - name: free disk space
+        run: tools/github/free_disk_space.sh
       - name: run updated modules integration test (part-8)
         if: needs.changes.outputs.api == 'false' && 
needs.changes.outputs.it-modules != ''
         run: |
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
index c296d387f0..12163676d7 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import lombok.extern.slf4j.Slf4j;
@@ -34,6 +35,7 @@ import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -58,7 +60,9 @@ public class MultiTableSinkWriter
         AtomicInteger cnt = new AtomicInteger(0);
         executorService =
                 Executors.newFixedThreadPool(
-                        queueSize,
+                        // we use it in `MultiTableWriterRunnable` and 
`prepare commit task`, so it
+                        // should be double.
+                        queueSize * 2,
                         runnable -> {
                             Thread thread = new Thread(runnable);
                             thread.setDaemon(true);
@@ -71,9 +75,9 @@ public class MultiTableSinkWriter
             BlockingQueue<SeaTunnelRow> queue = new 
LinkedBlockingQueue<>(1024);
             Map<String, SinkWriter<SeaTunnelRow, ?, ?>> tableIdWriterMap = new 
HashMap<>();
             Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> 
sinkIdentifierMap = new HashMap<>();
-            int finalI = i;
+            int queueIndex = i;
             sinkWriters.entrySet().stream()
-                    .filter(entry -> entry.getKey().getIndex() % queueSize == 
finalI)
+                    .filter(entry -> entry.getKey().getIndex() % queueSize == 
queueIndex)
                     .forEach(
                             entry -> {
                                 tableIdWriterMap.put(
@@ -119,6 +123,24 @@ public class MultiTableSinkWriter
         }
     }
 
+    @Override
+    public void applySchemaChange(SchemaChangeEvent event) throws IOException {
+        subSinkErrorCheck();
+        for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
+            for (Map.Entry<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> 
sinkWriterEntry :
+                    sinkWritersWithIndex.get(i).entrySet()) {
+                if (sinkWriterEntry
+                        .getKey()
+                        .getTableIdentifier()
+                        .equals(event.tablePath().getFullName())) {
+                    synchronized (runnable.get(i)) {
+                        sinkWriterEntry.getValue().applySchemaChange(event);
+                    }
+                }
+            }
+        }
+    }
+
     @Override
     public void write(SeaTunnelRow element) throws IOException {
         if (!submitted) {
@@ -178,17 +200,38 @@ public class MultiTableSinkWriter
         checkQueueRemain();
         subSinkErrorCheck();
         MultiTableCommitInfo multiTableCommitInfo = new 
MultiTableCommitInfo(new HashMap<>());
+        List<Future<?>> futures = new ArrayList<>();
         for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
-            for (Map.Entry<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> 
sinkWriterEntry :
-                    sinkWritersWithIndex.get(i).entrySet()) {
-                synchronized (runnable.get(i)) {
-                    Optional<?> commit = 
sinkWriterEntry.getValue().prepareCommit();
-                    commit.ifPresent(
-                            o ->
-                                    multiTableCommitInfo
-                                            .getCommitInfo()
-                                            .put(sinkWriterEntry.getKey(), o));
-                }
+            int subWriterIndex = i;
+            futures.add(
+                    executorService.submit(
+                            () -> {
+                                synchronized (runnable.get(subWriterIndex)) {
+                                    for (Map.Entry<SinkIdentifier, 
SinkWriter<SeaTunnelRow, ?, ?>>
+                                            sinkWriterEntry :
+                                                    sinkWritersWithIndex
+                                                            
.get(subWriterIndex)
+                                                            .entrySet()) {
+                                        Optional<?> commit;
+                                        try {
+                                            commit = 
sinkWriterEntry.getValue().prepareCommit();
+                                        } catch (IOException e) {
+                                            throw new RuntimeException(e);
+                                        }
+                                        commit.ifPresent(
+                                                o ->
+                                                        multiTableCommitInfo
+                                                                
.getCommitInfo()
+                                                                
.put(sinkWriterEntry.getKey(), o));
+                                    }
+                                }
+                            }));
+        }
+        for (Future<?> future : futures) {
+            try {
+                future.get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
         }
         return Optional.of(multiTableCommitInfo);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
index dafe5f9caf..32dee1786b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
@@ -125,6 +125,11 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>> implem
                             ExceptionUtils.getMessage(flushException)));
             return;
         }
+        if (batchCount == 0) {
+            LOG.debug("No data to flush.");
+            return;
+        }
+
         final int sleepMs = 1000;
         for (int i = 0; i <= jdbcConnectionConfig.getMaxRetries(); i++) {
             try {

Reply via email to