This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a0113bbac06d fix: Fix ConcurrentModificationException in RocksDBDAO
when accessed by Timeline Service (#17717)
a0113bbac06d is described below
commit a0113bbac06dbe9fcda20864250ae0fe734654c3
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Dec 26 04:01:42 2025 -0800
fix: Fix ConcurrentModificationException in RocksDBDAO when accessed by
Timeline Service (#17717)
* fix: Fix ConcurrentModificationException in RocksDBDAO when accessed by
Timeline Service
* Address comment
* Fix checkstyle
---
.../hudi/common/util/collection/RocksDBDAO.java | 10 ++--
.../common/util/collection/RocksDbDiskMap.java | 6 +-
.../common/util/collection/TestRocksDBDAO.java | 69 ++++++++++++++++++++++
3 files changed, 77 insertions(+), 8 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
index e42840f0f9a1..b66caedf3ba8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
@@ -49,11 +49,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -73,14 +71,16 @@ public class RocksDBDAO {
private transient RocksDB rocksDB;
private boolean closed = false;
private final String rocksDBBasePath;
- private final transient Map<String, CustomSerializer<?>>
columnFamilySerializers;
+ private final transient ConcurrentHashMap<String, CustomSerializer<?>>
columnFamilySerializers;
private long totalBytesWritten;
public RocksDBDAO(String basePath, String rocksDBBasePath) {
- this(basePath, rocksDBBasePath, new HashMap<>());
+ this(basePath, rocksDBBasePath, new ConcurrentHashMap<>());
}
- public RocksDBDAO(String basePath, String rocksDBBasePath, Map<String,
CustomSerializer<?>> columnFamilySerializers) {
+ public RocksDBDAO(String basePath,
+ String rocksDBBasePath,
+ ConcurrentHashMap<String, CustomSerializer<?>>
columnFamilySerializers) {
this.rocksDBBasePath =
String.format("%s/%s/%s", rocksDBBasePath,
URI.create(basePath).getPath().replace(":","").replace("/", "_"),
UUID.randomUUID());
this.columnFamilySerializers = columnFamilySerializers;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
index a75ede6e6dd1..9c6d00d4284c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
@@ -29,12 +29,12 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Spliterators;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@@ -178,7 +178,7 @@ public final class RocksDbDiskMap<T extends Serializable,
R> extends DiskMap<T,
if (null == rocksDb) {
synchronized (this) {
if (null == rocksDb) {
- Map<String, CustomSerializer<?>> serializerMap = new HashMap<>(4);
+ ConcurrentHashMap<String, CustomSerializer<?>> serializerMap = new
ConcurrentHashMap<>(4);
serializerMap.put(ROCKSDB_COL_FAMILY, valueSerializer);
rocksDb = new RocksDBDAO(ROCKSDB_COL_FAMILY, diskMapPath,
serializerMap);
rocksDb.addColumnFamily(ROCKSDB_COL_FAMILY);
@@ -188,4 +188,4 @@ public final class RocksDbDiskMap<T extends Serializable,
R> extends DiskMap<T,
return rocksDb;
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBDAO.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBDAO.java
index a9de2bf26fed..765607ff86c7 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBDAO.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBDAO.java
@@ -35,6 +35,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -239,6 +244,70 @@ public class TestRocksDBDAO {
assertFalse(new File(rocksDBBasePath).exists());
}
+ /**
+ * Test that concurrent access to RocksDBDAO does not cause
ConcurrentModificationException.
+ * This test verifies the thread-safety of the columnFamilySerializers map
which is accessed
+ * via getSerializerForColumnFamily() during get/put operations.
+ */
+ @Test
+ public void testConcurrentAccess() throws InterruptedException {
+ int numThreads = 10;
+ int numOperationsPerThread = 100;
+ int numColumnFamilies = 5;
+
+ List<String> columnFamilies = new ArrayList<>();
+ for (int i = 0; i < numColumnFamilies; i++) {
+ String family = "concurrent_family_" + i;
+ columnFamilies.add(family);
+ dbManager.addColumnFamily(family);
+ }
+
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ try {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(numThreads);
+ AtomicReference<Throwable> error = new AtomicReference<>(null);
+
+ // Spawn threads that concurrently access different column families
+ for (int t = 0; t < numThreads; t++) {
+ final int threadId = t;
+ executor.submit(() -> {
+ try {
+ // Wait for all threads to be ready
+ startLatch.await();
+
+ for (int i = 0; i < numOperationsPerThread; i++) {
+ // Each thread accesses different column families to trigger
+ // concurrent calls to getSerializerForColumnFamily()
+ String family = columnFamilies.get((threadId + i) %
numColumnFamilies);
+ String key = "key_" + threadId + "_" + i;
+ String value = "value_" + threadId + "_" + i;
+
+ dbManager.put(family, key, value);
+ String retrieved = dbManager.get(family, key);
+ assertEquals(value, retrieved, "Value mismatch for key: " + key);
+ }
+ } catch (Throwable t1) {
+ error.compareAndSet(null, t1);
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+
+ // Wait for all threads to complete
+ boolean completed = doneLatch.await(60, TimeUnit.SECONDS);
+
+ assertTrue(completed, "Test timed out - threads did not complete in
time");
+ assertNull(error.get(), "Concurrent access caused an exception: "
+ + (error.get() != null ? error.get().getMessage() : ""));
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
/**
* Payload key object.
*/