This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bd2b7c5599 [Hotfix][Core] Fix concurrency exceptions when 
MultiTableSink#PrepareCommit (#7686)
bd2b7c5599 is described below

commit bd2b7c5599eb5088f513614969f2d45da227e9af
Author: hailin0 <[email protected]>
AuthorDate: Mon Sep 23 09:31:21 2024 +0800

    [Hotfix][Core] Fix concurrency exceptions when MultiTableSink#PrepareCommit 
(#7686)
---
 .../sink/multitablesink/MultiTableCommitInfo.java  |   4 +-
 .../sink/multitablesink/MultiTableSinkWriter.java  |  14 ++-
 .../multitablesink/MultiTableSinkWriterTest.java   | 111 +++++++++++++++++++++
 3 files changed, 124 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
index 8b12fa07c5..d541c891fd 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
@@ -21,10 +21,10 @@ import lombok.AllArgsConstructor;
 import lombok.Getter;
 
 import java.io.Serializable;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 @Getter
 @AllArgsConstructor
 public class MultiTableCommitInfo implements Serializable {
-    private Map<SinkIdentifier, Object> commitInfo;
+    private ConcurrentMap<SinkIdentifier, Object> commitInfo;
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index 5a2f51fd16..f01c3d65dc 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -35,6 +35,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -49,7 +51,8 @@ public class MultiTableSinkWriter
     private final Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> 
sinkWriters;
     private final Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext;
     private final Map<String, Optional<Integer>> sinkPrimaryKeys = new 
HashMap<>();
-    private final List<Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>> 
sinkWritersWithIndex;
+    private final List<ConcurrentMap<SinkIdentifier, SinkWriter<SeaTunnelRow, 
?, ?>>>
+            sinkWritersWithIndex;
     private final List<MultiTableWriterRunnable> runnable = new ArrayList<>();
     private final Random random = new Random();
     private final List<BlockingQueue<SeaTunnelRow>> blockingQueues = new 
ArrayList<>();
@@ -84,7 +87,8 @@ public class MultiTableSinkWriter
         for (int i = 0; i < queueSize; i++) {
             BlockingQueue<SeaTunnelRow> queue = new 
LinkedBlockingQueue<>(1024);
             Map<String, SinkWriter<SeaTunnelRow, ?, ?>> tableIdWriterMap = new 
HashMap<>();
-            Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> 
sinkIdentifierMap = new HashMap<>();
+            ConcurrentMap<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> 
sinkIdentifierMap =
+                    new ConcurrentHashMap<>();
             int queueIndex = i;
             sinkWriters.entrySet().stream()
                     .filter(entry -> entry.getKey().getIndex() % queueSize == 
queueIndex)
@@ -218,7 +222,8 @@ public class MultiTableSinkWriter
     public Optional<MultiTableCommitInfo> prepareCommit() throws IOException {
         checkQueueRemain();
         subSinkErrorCheck();
-        MultiTableCommitInfo multiTableCommitInfo = new 
MultiTableCommitInfo(new HashMap<>());
+        MultiTableCommitInfo multiTableCommitInfo =
+                new MultiTableCommitInfo(new ConcurrentHashMap<>());
         List<Future<?>> futures = new ArrayList<>();
         for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
             int subWriterIndex = i;
@@ -253,6 +258,9 @@ public class MultiTableSinkWriter
                 throw new RuntimeException(e);
             }
         }
+        if (multiTableCommitInfo.getCommitInfo().isEmpty()) {
+            return Optional.empty();
+        }
         return Optional.of(multiTableCommitInfo);
     }
 
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
new file mode 100644
index 0000000000..66e0ff0d4e
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriterTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink.multitablesink;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.event.DefaultEventProcessor;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Test;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class MultiTableSinkWriterTest {
+
+    @Test
+    public void testPrepareCommitState() throws IOException {
+        int threads = 50;
+        Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters = new 
HashMap<>();
+        Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new 
HashMap<>();
+        for (int i = 0; i < threads; i++) {
+            sinkWriters.put(
+                    SinkIdentifier.of(TablePath.DEFAULT.toString(), i), new 
TestSinkWriter());
+            sinkWritersContext.put(
+                    SinkIdentifier.of(TablePath.DEFAULT.toString(), i),
+                    new TestSinkWriterContext());
+        }
+        MultiTableSinkWriter multiTableSinkWriter =
+                new MultiTableSinkWriter(sinkWriters, threads, 
sinkWritersContext);
+        DefaultSerializer<Serializable> defaultSerializer = new 
DefaultSerializer<>();
+
+        for (int i = 0; i < 100; i++) {
+            byte[] bytes = 
defaultSerializer.serialize(multiTableSinkWriter.prepareCommit().get());
+            defaultSerializer.deserialize(bytes);
+        }
+    }
+
+    static class TestSinkWriter
+            implements SinkWriter<SeaTunnelRow, TestSinkState, Object>,
+                    SupportMultiTableSinkWriter {
+        @Override
+        public void write(SeaTunnelRow seaTunnelRow) {}
+
+        @Override
+        public Optional<TestSinkState> prepareCommit() throws IOException {
+            return Optional.of(new TestSinkState("test"));
+        }
+
+        @Override
+        public List<Object> snapshotState(long checkpointId) throws 
IOException {
+            return SinkWriter.super.snapshotState(checkpointId);
+        }
+
+        @Override
+        public void abortPrepare() {}
+
+        @Override
+        public void close() throws IOException {}
+    }
+
+    static class TestSinkWriterContext implements SinkWriter.Context {
+
+        @Override
+        public int getIndexOfSubtask() {
+            return 0;
+        }
+
+        @Override
+        public MetricsContext getMetricsContext() {
+            return null;
+        }
+
+        @Override
+        public EventListener getEventListener() {
+            return new DefaultEventProcessor();
+        }
+    }
+
+    @Data
+    @AllArgsConstructor
+    static class TestSinkState implements Serializable {
+        private String state;
+    }
+}

Reply via email to