yihua commented on code in PR #17717:
URL: https://github.com/apache/hudi/pull/17717#discussion_r2647443221
##########
hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBDAO.java:
##########
@@ -239,6 +244,67 @@ public void testWithSerializableKey() {
assertFalse(new File(rocksDBBasePath).exists());
}
+ /**
+ * Test that concurrent access to RocksDBDAO does not cause
ConcurrentModificationException.
+ * This test verifies the thread-safety of the columnFamilySerializers map
which is accessed
+ * via getSerializerForColumnFamily() during get/put operations.
+ */
+ @Test
+ public void testConcurrentAccess() throws InterruptedException {
+ int numThreads = 10;
+ int numOperationsPerThread = 100;
+ int numColumnFamilies = 5;
+
+ List<String> columnFamilies = new ArrayList<>();
+ for (int i = 0; i < numColumnFamilies; i++) {
+ String family = "concurrent_family_" + i;
+ columnFamilies.add(family);
+ dbManager.addColumnFamily(family);
+ }
+
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(numThreads);
+ AtomicReference<Throwable> error = new AtomicReference<>(null);
+
+ // Spawn threads that concurrently access different column families
+ for (int t = 0; t < numThreads; t++) {
+ final int threadId = t;
+ executor.submit(() -> {
+ try {
+ // Wait for all threads to be ready
+ startLatch.await();
+
+ for (int i = 0; i < numOperationsPerThread; i++) {
+ // Each thread accesses different column families to trigger
+ // concurrent calls to getSerializerForColumnFamily()
+ String family = columnFamilies.get((threadId + i) %
numColumnFamilies);
+ String key = "key_" + threadId + "_" + i;
+ String value = "value_" + threadId + "_" + i;
+
+ dbManager.put(family, key, value);
+ String retrieved = dbManager.get(family, key);
+ assertEquals(value, retrieved, "Value mismatch for key: " + key);
+ }
+ } catch (Throwable t1) {
+ error.compareAndSet(null, t1);
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+
+ // Wait for all threads to complete
+ boolean completed = doneLatch.await(60, TimeUnit.SECONDS);
+ executor.shutdown();
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]