This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2086b0e8a6 [Improve] Improve MultiTableSinkWriter prepare commit
performance (#6495)
2086b0e8a6 is described below
commit 2086b0e8a6b4b6c04089cb0507a38d357c33bd20
Author: Jia Fan <[email protected]>
AuthorDate: Wed Mar 27 19:23:19 2024 +0800
[Improve] Improve MultiTableSinkWriter prepare commit performance (#6495)
* [Improve] Improve MultiTableSinkWriter prepare commit performance
* update
* update
---
.github/workflows/backend.yml | 14 +++++
.../multitablesink/MultiTableSinkWriter.java | 69 ++++++++++++++++++----
.../seatunnel/jdbc/internal/JdbcOutputFormat.java | 5 ++
3 files changed, 75 insertions(+), 13 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 34a173cd98..5c8e75897d 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -304,6 +304,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-1)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
@@ -333,6 +335,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-2)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
@@ -393,6 +397,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-4)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
@@ -421,6 +427,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-5)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
@@ -449,6 +457,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-6)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
@@ -477,6 +487,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-7)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
@@ -506,6 +518,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-8)
if: needs.changes.outputs.api == 'false' &&
needs.changes.outputs.it-modules != ''
run: |
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
index c296d387f0..12163676d7 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import lombok.extern.slf4j.Slf4j;
@@ -34,6 +35,7 @@ import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -58,7 +60,9 @@ public class MultiTableSinkWriter
AtomicInteger cnt = new AtomicInteger(0);
executorService =
Executors.newFixedThreadPool(
- queueSize,
+ // we use it in `MultiTableWriterRunnable` and
`prepare commit task`, so it
+ // should be double.
+ queueSize * 2,
runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
@@ -71,9 +75,9 @@ public class MultiTableSinkWriter
BlockingQueue<SeaTunnelRow> queue = new
LinkedBlockingQueue<>(1024);
Map<String, SinkWriter<SeaTunnelRow, ?, ?>> tableIdWriterMap = new
HashMap<>();
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>
sinkIdentifierMap = new HashMap<>();
- int finalI = i;
+ int queueIndex = i;
sinkWriters.entrySet().stream()
- .filter(entry -> entry.getKey().getIndex() % queueSize ==
finalI)
+ .filter(entry -> entry.getKey().getIndex() % queueSize ==
queueIndex)
.forEach(
entry -> {
tableIdWriterMap.put(
@@ -119,6 +123,24 @@ public class MultiTableSinkWriter
}
}
+ @Override
+ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
+ subSinkErrorCheck();
+ for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
+ for (Map.Entry<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>
sinkWriterEntry :
+ sinkWritersWithIndex.get(i).entrySet()) {
+ if (sinkWriterEntry
+ .getKey()
+ .getTableIdentifier()
+ .equals(event.tablePath().getFullName())) {
+ synchronized (runnable.get(i)) {
+ sinkWriterEntry.getValue().applySchemaChange(event);
+ }
+ }
+ }
+ }
+ }
+
@Override
public void write(SeaTunnelRow element) throws IOException {
if (!submitted) {
@@ -178,17 +200,38 @@ public class MultiTableSinkWriter
checkQueueRemain();
subSinkErrorCheck();
MultiTableCommitInfo multiTableCommitInfo = new
MultiTableCommitInfo(new HashMap<>());
+ List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
- for (Map.Entry<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>
sinkWriterEntry :
- sinkWritersWithIndex.get(i).entrySet()) {
- synchronized (runnable.get(i)) {
- Optional<?> commit =
sinkWriterEntry.getValue().prepareCommit();
- commit.ifPresent(
- o ->
- multiTableCommitInfo
- .getCommitInfo()
- .put(sinkWriterEntry.getKey(), o));
- }
+ int subWriterIndex = i;
+ futures.add(
+ executorService.submit(
+ () -> {
+ synchronized (runnable.get(subWriterIndex)) {
+ for (Map.Entry<SinkIdentifier,
SinkWriter<SeaTunnelRow, ?, ?>>
+ sinkWriterEntry :
+ sinkWritersWithIndex
+
.get(subWriterIndex)
+ .entrySet()) {
+ Optional<?> commit;
+ try {
+ commit =
sinkWriterEntry.getValue().prepareCommit();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ commit.ifPresent(
+ o ->
+ multiTableCommitInfo
+
.getCommitInfo()
+
.put(sinkWriterEntry.getKey(), o));
+ }
+ }
+ }));
+ }
+ for (Future<?> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
return Optional.of(multiTableCommitInfo);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
index dafe5f9caf..32dee1786b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
@@ -125,6 +125,11 @@ public class JdbcOutputFormat<I, E extends
JdbcBatchStatementExecutor<I>> implem
ExceptionUtils.getMessage(flushException)));
return;
}
+ if (batchCount == 0) {
+ LOG.debug("No data to flush.");
+ return;
+ }
+
final int sleepMs = 1000;
for (int i = 0; i <= jdbcConnectionConfig.getMaxRetries(); i++) {
try {