This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new acec23465 [core] Make ObjectsCache thread safe (#3836)
acec23465 is described below

commit acec23465337bdb547658be3d908ab9f40d960fc
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Jul 29 17:10:32 2024 +0800

    [core] Make ObjectsCache thread safe (#3836)
---
 .../java/org/apache/paimon/utils/ObjectsCache.java | 17 +++++++++++++----
 .../org/apache/paimon/utils/ObjectsCacheTest.java  | 22 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
index 40482c2f5..f80037911 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -28,6 +28,7 @@ import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySegmentSource;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -36,11 +37,12 @@ import java.util.List;
 import java.util.function.BiFunction;
 
 /** Cache records to {@link SegmentsCache} by compacted serializer. */
+@ThreadSafe
 public class ObjectsCache<K, V> {
 
     private final SegmentsCache<K> cache;
     private final ObjectSerializer<V> serializer;
-    private final InternalRowSerializer rowSerializer;
+    private final ThreadLocal<InternalRowSerializer> threadLocalRowSerializer;
     private final BiFunction<K, Long, CloseableIterator<InternalRow>> reader;
 
     public ObjectsCache(
@@ -49,7 +51,8 @@ public class ObjectsCache<K, V> {
             BiFunction<K, Long, CloseableIterator<InternalRow>> reader) {
         this.cache = cache;
         this.serializer = serializer;
-        this.rowSerializer = new 
InternalRowSerializer(serializer.fieldTypes());
+        this.threadLocalRowSerializer =
+                ThreadLocal.withInitial(() -> new 
InternalRowSerializer(serializer.fieldTypes()));
         this.reader = reader;
     }
 
@@ -59,7 +62,9 @@ public class ObjectsCache<K, V> {
             Filter<InternalRow> loadFilter,
             Filter<InternalRow> readFilter)
             throws IOException {
-        Segments segments = cache.getSegments(key, k -> readSegments(k, 
fileSize, loadFilter));
+        InternalRowSerializer rowSerializer = threadLocalRowSerializer.get();
+        Segments segments =
+                cache.getSegments(key, k -> readSegments(k, fileSize, 
loadFilter, rowSerializer));
         List<V> entries = new ArrayList<>();
         RandomAccessInputView view =
                 new RandomAccessInputView(
@@ -77,7 +82,11 @@ public class ObjectsCache<K, V> {
         }
     }
 
-    private Segments readSegments(K key, @Nullable Long fileSize, 
Filter<InternalRow> loadFilter) {
+    private Segments readSegments(
+            K key,
+            @Nullable Long fileSize,
+            Filter<InternalRow> loadFilter,
+            InternalRowSerializer rowSerializer) {
         try (CloseableIterator<InternalRow> iterator = reader.apply(key, 
fileSize)) {
             ArrayList<MemorySegment> segments = new ArrayList<>();
             MemorySegmentSource segmentSource =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
index 13271bd32..be9d2a48c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
@@ -99,6 +99,28 @@ public class ObjectsCacheTest {
                         r -> r.getString(0).toString().endsWith("5"),
                         Filter.alwaysTrue());
         assertThat(values).isEmpty();
+
+        // test read concurrently
+        map.clear();
+        for (int i = 0; i < 10; i++) {
+            map.put(String.valueOf(i), 
Collections.singletonList(String.valueOf(i)));
+        }
+        map.keySet().stream()
+                .parallel()
+                .forEach(
+                        k -> {
+                            try {
+                                assertThat(
+                                                cache.read(
+                                                        k,
+                                                        null,
+                                                        Filter.alwaysTrue(),
+                                                        Filter.alwaysTrue()))
+                                        .containsExactly(k);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
     }
 
     private static class StringSerializer extends ObjectSerializer<String> {

Reply via email to