This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new acec23465 [core] Make ObjectsCache thread safe (#3836)
acec23465 is described below
commit acec23465337bdb547658be3d908ab9f40d960fc
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Jul 29 17:10:32 2024 +0800
[core] Make ObjectsCache thread safe (#3836)
---
.../java/org/apache/paimon/utils/ObjectsCache.java | 17 +++++++++++++----
.../org/apache/paimon/utils/ObjectsCacheTest.java | 22 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
index 40482c2f5..f80037911 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -28,6 +28,7 @@ import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentSource;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
import java.io.EOFException;
import java.io.IOException;
@@ -36,11 +37,12 @@ import java.util.List;
import java.util.function.BiFunction;
/** Cache records to {@link SegmentsCache} by compacted serializer. */
+@ThreadSafe
public class ObjectsCache<K, V> {
private final SegmentsCache<K> cache;
private final ObjectSerializer<V> serializer;
- private final InternalRowSerializer rowSerializer;
+ private final ThreadLocal<InternalRowSerializer> threadLocalRowSerializer;
private final BiFunction<K, Long, CloseableIterator<InternalRow>> reader;
public ObjectsCache(
@@ -49,7 +51,8 @@ public class ObjectsCache<K, V> {
BiFunction<K, Long, CloseableIterator<InternalRow>> reader) {
this.cache = cache;
this.serializer = serializer;
- this.rowSerializer = new
InternalRowSerializer(serializer.fieldTypes());
+ this.threadLocalRowSerializer =
+ ThreadLocal.withInitial(() -> new
InternalRowSerializer(serializer.fieldTypes()));
this.reader = reader;
}
@@ -59,7 +62,9 @@ public class ObjectsCache<K, V> {
Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter)
throws IOException {
- Segments segments = cache.getSegments(key, k -> readSegments(k,
fileSize, loadFilter));
+ InternalRowSerializer rowSerializer = threadLocalRowSerializer.get();
+ Segments segments =
+ cache.getSegments(key, k -> readSegments(k, fileSize,
loadFilter, rowSerializer));
List<V> entries = new ArrayList<>();
RandomAccessInputView view =
new RandomAccessInputView(
@@ -77,7 +82,11 @@ public class ObjectsCache<K, V> {
}
}
- private Segments readSegments(K key, @Nullable Long fileSize,
Filter<InternalRow> loadFilter) {
+ private Segments readSegments(
+ K key,
+ @Nullable Long fileSize,
+ Filter<InternalRow> loadFilter,
+ InternalRowSerializer rowSerializer) {
try (CloseableIterator<InternalRow> iterator = reader.apply(key,
fileSize)) {
ArrayList<MemorySegment> segments = new ArrayList<>();
MemorySegmentSource segmentSource =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
index 13271bd32..be9d2a48c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
@@ -99,6 +99,28 @@ public class ObjectsCacheTest {
r -> r.getString(0).toString().endsWith("5"),
Filter.alwaysTrue());
assertThat(values).isEmpty();
+
+ // test read concurrently
+ map.clear();
+ for (int i = 0; i < 10; i++) {
+ map.put(String.valueOf(i),
Collections.singletonList(String.valueOf(i)));
+ }
+ map.keySet().stream()
+ .parallel()
+ .forEach(
+ k -> {
+ try {
+ assertThat(
+ cache.read(
+ k,
+ null,
+ Filter.alwaysTrue(),
+ Filter.alwaysTrue()))
+ .containsExactly(k);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
private static class StringSerializer extends ObjectSerializer<String> {