This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bd2b7c5599 [Hotfix][Core] Fix concurrency exceptions when
MultiTableSink#PrepareCommit (#7686)
bd2b7c5599 is described below
commit bd2b7c5599eb5088f513614969f2d45da227e9af
Author: hailin0 <[email protected]>
AuthorDate: Mon Sep 23 09:31:21 2024 +0800
[Hotfix][Core] Fix concurrency exceptions when MultiTableSink#PrepareCommit
(#7686)
---
.../sink/multitablesink/MultiTableCommitInfo.java | 4 +-
.../sink/multitablesink/MultiTableSinkWriter.java | 14 ++-
.../multitablesink/MultiTableSinkWriterTest.java | 111 +++++++++++++++++++++
3 files changed, 124 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
index 8b12fa07c5..d541c891fd 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
@@ -21,10 +21,10 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import java.io.Serializable;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
@Getter
@AllArgsConstructor
public class MultiTableCommitInfo implements Serializable {
- private Map<SinkIdentifier, Object> commitInfo;
+ private ConcurrentMap<SinkIdentifier, Object> commitInfo;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index 5a2f51fd16..f01c3d65dc 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -35,6 +35,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -49,7 +51,8 @@ public class MultiTableSinkWriter
private final Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>
sinkWriters;
private final Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext;
private final Map<String, Optional<Integer>> sinkPrimaryKeys = new
HashMap<>();
- private final List<Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>>
sinkWritersWithIndex;
+ private final List<ConcurrentMap<SinkIdentifier, SinkWriter<SeaTunnelRow,
?, ?>>>
+ sinkWritersWithIndex;
private final List<MultiTableWriterRunnable> runnable = new ArrayList<>();
private final Random random = new Random();
private final List<BlockingQueue<SeaTunnelRow>> blockingQueues = new
ArrayList<>();
@@ -84,7 +87,8 @@ public class MultiTableSinkWriter
for (int i = 0; i < queueSize; i++) {
BlockingQueue<SeaTunnelRow> queue = new
LinkedBlockingQueue<>(1024);
Map<String, SinkWriter<SeaTunnelRow, ?, ?>> tableIdWriterMap = new
HashMap<>();
- Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>
sinkIdentifierMap = new HashMap<>();
+ ConcurrentMap<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>
sinkIdentifierMap =
+ new ConcurrentHashMap<>();
int queueIndex = i;
sinkWriters.entrySet().stream()
.filter(entry -> entry.getKey().getIndex() % queueSize ==
queueIndex)
@@ -218,7 +222,8 @@ public class MultiTableSinkWriter
public Optional<MultiTableCommitInfo> prepareCommit() throws IOException {
checkQueueRemain();
subSinkErrorCheck();
- MultiTableCommitInfo multiTableCommitInfo = new
MultiTableCommitInfo(new HashMap<>());
+ MultiTableCommitInfo multiTableCommitInfo =
+ new MultiTableCommitInfo(new ConcurrentHashMap<>());
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
int subWriterIndex = i;
@@ -253,6 +258,9 @@ public class MultiTableSinkWriter
throw new RuntimeException(e);
}
}
+ if (multiTableCommitInfo.getCommitInfo().isEmpty()) {
+ return Optional.empty();
+ }
return Optional.of(multiTableCommitInfo);
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
new file mode 100644
index 0000000000..66e0ff0d4e
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink.multitablesink;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.event.DefaultEventProcessor;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Test;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class MultiTableSinkWriterTest {
+
+ @Test
+ public void testPrepareCommitState() throws IOException {
+ int threads = 50;
+ Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters = new
HashMap<>();
+ Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new
HashMap<>();
+ for (int i = 0; i < threads; i++) {
+ sinkWriters.put(
+ SinkIdentifier.of(TablePath.DEFAULT.toString(), i), new
TestSinkWriter());
+ sinkWritersContext.put(
+ SinkIdentifier.of(TablePath.DEFAULT.toString(), i),
+ new TestSinkWriterContext());
+ }
+ MultiTableSinkWriter multiTableSinkWriter =
+ new MultiTableSinkWriter(sinkWriters, threads,
sinkWritersContext);
+ DefaultSerializer<Serializable> defaultSerializer = new
DefaultSerializer<>();
+
+ for (int i = 0; i < 100; i++) {
+ byte[] bytes =
defaultSerializer.serialize(multiTableSinkWriter.prepareCommit().get());
+ defaultSerializer.deserialize(bytes);
+ }
+ }
+
+ static class TestSinkWriter
+ implements SinkWriter<SeaTunnelRow, TestSinkState, Object>,
+ SupportMultiTableSinkWriter {
+ @Override
+ public void write(SeaTunnelRow seaTunnelRow) {}
+
+ @Override
+ public Optional<TestSinkState> prepareCommit() throws IOException {
+ return Optional.of(new TestSinkState("test"));
+ }
+
+ @Override
+ public List<Object> snapshotState(long checkpointId) throws
IOException {
+ return SinkWriter.super.snapshotState(checkpointId);
+ }
+
+ @Override
+ public void abortPrepare() {}
+
+ @Override
+ public void close() throws IOException {}
+ }
+
+ static class TestSinkWriterContext implements SinkWriter.Context {
+
+ @Override
+ public int getIndexOfSubtask() {
+ return 0;
+ }
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return null;
+ }
+
+ @Override
+ public EventListener getEventListener() {
+ return new DefaultEventProcessor();
+ }
+ }
+
+ @Data
+ @AllArgsConstructor
+ static class TestSinkState implements Serializable {
+ private String state;
+ }
+}