This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a0113bbac06d fix: Fix ConcurrentModificationException in RocksDBDAO 
when accessed by Timeline Service (#17717)
a0113bbac06d is described below

commit a0113bbac06dbe9fcda20864250ae0fe734654c3
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Dec 26 04:01:42 2025 -0800

    fix: Fix ConcurrentModificationException in RocksDBDAO when accessed by 
Timeline Service (#17717)
    
    * fix: Fix ConcurrentModificationException in RocksDBDAO when accessed by 
Timeline Service
    
    * Address comment
    
    * Fix checkstyle
---
 .../hudi/common/util/collection/RocksDBDAO.java    | 10 ++--
 .../common/util/collection/RocksDbDiskMap.java     |  6 +-
 .../common/util/collection/TestRocksDBDAO.java     | 69 ++++++++++++++++++++++
 3 files changed, 77 insertions(+), 8 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
index e42840f0f9a1..b66caedf3ba8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
@@ -49,11 +49,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -73,14 +71,16 @@ public class RocksDBDAO {
   private transient RocksDB rocksDB;
   private boolean closed = false;
   private final String rocksDBBasePath;
-  private final transient Map<String, CustomSerializer<?>> 
columnFamilySerializers;
+  private final transient ConcurrentHashMap<String, CustomSerializer<?>> 
columnFamilySerializers;
   private long totalBytesWritten;
 
   public RocksDBDAO(String basePath, String rocksDBBasePath) {
-    this(basePath, rocksDBBasePath, new HashMap<>());
+    this(basePath, rocksDBBasePath, new ConcurrentHashMap<>());
   }
 
-  public RocksDBDAO(String basePath, String rocksDBBasePath, Map<String, 
CustomSerializer<?>> columnFamilySerializers) {
+  public RocksDBDAO(String basePath,
+                    String rocksDBBasePath,
+                    ConcurrentHashMap<String, CustomSerializer<?>> 
columnFamilySerializers) {
     this.rocksDBBasePath =
         String.format("%s/%s/%s", rocksDBBasePath, 
URI.create(basePath).getPath().replace(":","").replace("/", "_"), 
UUID.randomUUID());
     this.columnFamilySerializers = columnFamilySerializers;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
index a75ede6e6dd1..9c6d00d4284c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java
@@ -29,12 +29,12 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.AbstractMap;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.Spliterators;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -178,7 +178,7 @@ public final class RocksDbDiskMap<T extends Serializable, 
R> extends DiskMap<T,
     if (null == rocksDb) {
       synchronized (this) {
         if (null == rocksDb) {
-          Map<String, CustomSerializer<?>> serializerMap = new HashMap<>(4);
+          ConcurrentHashMap<String, CustomSerializer<?>> serializerMap = new 
ConcurrentHashMap<>(4);
           serializerMap.put(ROCKSDB_COL_FAMILY, valueSerializer);
           rocksDb = new RocksDBDAO(ROCKSDB_COL_FAMILY, diskMapPath, 
serializerMap);
           rocksDb.addColumnFamily(ROCKSDB_COL_FAMILY);
@@ -188,4 +188,4 @@ public final class RocksDbDiskMap<T extends Serializable, 
R> extends DiskMap<T,
     return rocksDb;
   }
 
-}
\ No newline at end of file
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBDAO.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBDAO.java
index a9de2bf26fed..765607ff86c7 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBDAO.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBDAO.java
@@ -35,6 +35,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -239,6 +244,70 @@ public class TestRocksDBDAO {
     assertFalse(new File(rocksDBBasePath).exists());
   }
 
+  /**
+   * Test that concurrent access to RocksDBDAO does not cause 
ConcurrentModificationException.
+   * This test verifies the thread-safety of the columnFamilySerializers map 
which is accessed
+   * via getSerializerForColumnFamily() during get/put operations.
+   */
+  @Test
+  public void testConcurrentAccess() throws InterruptedException {
+    int numThreads = 10;
+    int numOperationsPerThread = 100;
+    int numColumnFamilies = 5;
+
+    List<String> columnFamilies = new ArrayList<>();
+    for (int i = 0; i < numColumnFamilies; i++) {
+      String family = "concurrent_family_" + i;
+      columnFamilies.add(family);
+      dbManager.addColumnFamily(family);
+    }
+
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    try {
+      CountDownLatch startLatch = new CountDownLatch(1);
+      CountDownLatch doneLatch = new CountDownLatch(numThreads);
+      AtomicReference<Throwable> error = new AtomicReference<>(null);
+
+      // Spawn threads that concurrently access different column families
+      for (int t = 0; t < numThreads; t++) {
+        final int threadId = t;
+        executor.submit(() -> {
+          try {
+            // Wait for all threads to be ready
+            startLatch.await();
+
+            for (int i = 0; i < numOperationsPerThread; i++) {
+              // Each thread accesses different column families to trigger
+              // concurrent calls to getSerializerForColumnFamily()
+              String family = columnFamilies.get((threadId + i) % 
numColumnFamilies);
+              String key = "key_" + threadId + "_" + i;
+              String value = "value_" + threadId + "_" + i;
+
+              dbManager.put(family, key, value);
+              String retrieved = dbManager.get(family, key);
+              assertEquals(value, retrieved, "Value mismatch for key: " + key);
+            }
+          } catch (Throwable t1) {
+            error.compareAndSet(null, t1);
+          } finally {
+            doneLatch.countDown();
+          }
+        });
+      }
+
+      startLatch.countDown();
+
+      // Wait for all threads to complete
+      boolean completed = doneLatch.await(60, TimeUnit.SECONDS);
+
+      assertTrue(completed, "Test timed out - threads did not complete in 
time");
+      assertNull(error.get(), "Concurrent access caused an exception: "
+          + (error.get() != null ? error.get().getMessage() : ""));
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
   /**
    * Payload key object.
    */

Reply via email to