This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cecd79e947b [HUDI-6652] Implement basePath-level synchronization in 
runHoodieMetaSync (#9374)
cecd79e947b is described below

commit cecd79e947b3a274a26e72a1f9c730323393f8f9
Author: Sagar Sumit <[email protected]>
AuthorDate: Sun Aug 6 11:34:46 2023 +0530

    [HUDI-6652] Implement basePath-level synchronization in runHoodieMetaSync 
(#9374)
---
 .../hudi/sync/common/util/SyncUtilHelpers.java     | 65 +++++++++++++++-------
 .../hudi/sync/common/util/TestSyncUtilHelpers.java | 64 +++++++++++++++++++++
 2 files changed, 108 insertions(+), 21 deletions(-)

diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index 466c0c71ffa..440baa86c0c 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieMetaSyncException;
-import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.sync.common.HoodieSyncTool;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,36 +32,56 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
 
 /**
  * Helper class for syncing Hudi commit data with external metastores.
  */
 public class SyncUtilHelpers {
   private static final Logger LOG = 
LoggerFactory.getLogger(SyncUtilHelpers.class);
+
+  // Locks for each table (base path) to avoid concurrent modification of the 
same underneath meta storage.
+  // Meta store such as Hive may encounter {@code 
ConcurrentModificationException} for #alter_table.
+  private static final ConcurrentHashMap<String, Lock> TABLE_LOCKS = new 
ConcurrentHashMap<>();
+
   /**
    * Create an instance of an implementation of {@link HoodieSyncTool} that 
will sync all the relevant meta information
    * with an external metastore such as Hive etc. to ensure Hoodie tables can 
be queried or read via external systems.
    *
-   * <p>IMPORTANT: make this method class level thread safe to avoid 
concurrent modification of the same underneath meta storage.
-   * Meta store such as Hive may encounter {@code 
ConcurrentModificationException} for #alter_table.
-   *
-   * @param syncToolClassName   Class name of the {@link HoodieSyncTool} 
implementation.
-   * @param props               property map.
-   * @param hadoopConfig        Hadoop confs.
-   * @param fs                  Filesystem used.
-   * @param targetBasePath      The target base path that contains the hoodie 
table.
-   * @param baseFileFormat      The file format used by the hoodie table 
(defaults to PARQUET).
+   * @param syncToolClassName Class name of the {@link HoodieSyncTool} 
implementation.
+   * @param props             property map.
+   * @param hadoopConfig      Hadoop confs.
+   * @param fs                Filesystem used.
+   * @param targetBasePath    The target base path that contains the hoodie 
table.
+   * @param baseFileFormat    The file format used by the hoodie table 
(defaults to PARQUET).
    */
-  public static synchronized void runHoodieMetaSync(String syncToolClassName,
+  public static void runHoodieMetaSync(String syncToolClassName,
                                        TypedProperties props,
                                        Configuration hadoopConfig,
                                        FileSystem fs,
                                        String targetBasePath,
                                        String baseFileFormat) {
-    try (HoodieSyncTool syncTool = instantiateMetaSyncTool(syncToolClassName, 
props, hadoopConfig, fs, targetBasePath, baseFileFormat)) {
-      syncTool.syncHoodieTable();
-    } catch (Throwable e) {
-      throw new HoodieMetaSyncException("Could not sync using the meta sync 
class " + syncToolClassName, e);
+    if (targetBasePath == null) {
+      throw new IllegalArgumentException("Target base path must not be null");
+    }
+
+    // Get or create a lock for the specific table
+    Lock tableLock = TABLE_LOCKS.computeIfAbsent(targetBasePath, k -> new 
ReentrantLock());
+    tableLock.lock();
+    try {
+      try (HoodieSyncTool syncTool = 
instantiateMetaSyncTool(syncToolClassName, props, hadoopConfig, fs, 
targetBasePath, baseFileFormat)) {
+        syncTool.syncHoodieTable();
+      } catch (Throwable e) {
+        throw new HoodieMetaSyncException("Could not sync using the meta sync 
class " + syncToolClassName, e);
+      }
+    } finally {
+      tableLock.unlock();
     }
   }
 
@@ -74,10 +93,10 @@ public class SyncUtilHelpers {
                                                 String baseFileFormat) {
     TypedProperties properties = new TypedProperties();
     properties.putAll(props);
-    properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), targetBasePath);
-    properties.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(), 
baseFileFormat);
-    if (properties.containsKey(HoodieSyncConfig.META_SYNC_TABLE_NAME.key())) {
-      String tableName = 
properties.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key());
+    properties.put(META_SYNC_BASE_PATH.key(), targetBasePath);
+    properties.put(META_SYNC_BASE_FILE_FORMAT.key(), baseFileFormat);
+    if (properties.containsKey(META_SYNC_TABLE_NAME.key())) {
+      String tableName = properties.getString(META_SYNC_TABLE_NAME.key());
       if (!tableName.equals(tableName.toLowerCase())) {
         LOG.warn(
             "Table name \"" + tableName + "\" contains capital letters. Your 
metastore may automatically convert this to lower case and can cause table not 
found errors during subsequent syncs.");
@@ -110,7 +129,7 @@ public class SyncUtilHelpers {
     }
   }
 
-  public static HoodieException 
getHoodieMetaSyncException(Map<String,HoodieException> failedMetaSyncs) {
+  public static HoodieException getHoodieMetaSyncException(Map<String, 
HoodieException> failedMetaSyncs) {
     if (failedMetaSyncs.size() == 1) {
       return failedMetaSyncs.values().stream().findFirst().get();
     }
@@ -122,6 +141,10 @@ public class SyncUtilHelpers {
       sb.append(failedMetaSyncs.get(impl).getMessage());
       sb.append("\n");
     }
-    return new HoodieMetaSyncException(sb.toString(),failedMetaSyncs);
+    return new HoodieMetaSyncException(sb.toString(), failedMetaSyncs);
+  }
+
+  static int getNumberOfLocks() {
+    return TABLE_LOCKS.size();
   }
 }
diff --git 
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
 
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
index 155c96f8560..2e730493bb4 100644
--- 
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
+++ 
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
@@ -31,10 +31,17 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 
 public class TestSyncUtilHelpers {
   private static final String BASE_PATH = "/tmp/test";
@@ -100,6 +107,63 @@ public class TestSyncUtilHelpers {
     assertEquals(expectedMessage, t.getMessage());
   }
 
+  @Test
+  void testMetaSyncConcurrency() throws Exception {
+    String syncToolClassName = DummySyncTool1.class.getName();
+    String baseFileFormat = "PARQUET";
+    String tableName1 = "table1";
+    String tableName2 = "table2";
+    String targetBasePath1 = "path/to/target1";
+    String targetBasePath2 = "path/to/target2";
+
+    TypedProperties props1 = new TypedProperties();
+    props1.setProperty(META_SYNC_TABLE_NAME.key(), tableName1);
+
+    TypedProperties props2 = new TypedProperties();
+    props1.setProperty(META_SYNC_TABLE_NAME.key(), tableName2);
+
+    // Simulate processing time here
+    HoodieSyncTool syncToolMock = mock(HoodieSyncTool.class);
+    doAnswer(invocation -> {
+      Thread.sleep(1000);
+      return null;
+    }).when(syncToolMock).syncHoodieTable();
+
+    AtomicBoolean targetBasePath1Running = new AtomicBoolean(false);
+    AtomicBoolean targetBasePath2Running = new AtomicBoolean(false);
+
+    ExecutorService executor = Executors.newFixedThreadPool(4);
+
+    // Submitting tasks with targetBasePath1
+    executor.submit(() -> {
+      targetBasePath1Running.set(true);
+      SyncUtilHelpers.runHoodieMetaSync(syncToolClassName, props1, hadoopConf, 
fileSystem, targetBasePath1, baseFileFormat);
+      targetBasePath1Running.set(false);
+    });
+    executor.submit(() -> {
+      assertEquals(1, SyncUtilHelpers.getNumberOfLocks()); // Only one lock 
should exist for this base path
+      SyncUtilHelpers.runHoodieMetaSync(syncToolClassName, props1, hadoopConf, 
fileSystem, targetBasePath1, baseFileFormat);
+    });
+
+    // Submitting tasks with targetBasePath2
+    executor.submit(() -> {
+      targetBasePath2Running.set(true);
+      SyncUtilHelpers.runHoodieMetaSync(syncToolClassName, props2, hadoopConf, 
fileSystem, targetBasePath2, baseFileFormat);
+      targetBasePath2Running.set(false);
+    });
+    executor.submit(() -> {
+      assertEquals(2, SyncUtilHelpers.getNumberOfLocks()); // Two locks should 
exist for both base paths
+      SyncUtilHelpers.runHoodieMetaSync(syncToolClassName, props2, hadoopConf, 
fileSystem, targetBasePath2, baseFileFormat);
+    });
+
+    executor.shutdown();
+    assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+
+    // Check if there was a time when both paths were running in parallel
+    assertTrue(targetBasePath1Running.get() && targetBasePath2Running.get(),
+        "The methods did not run concurrently for different base paths");
+  }
+
   public static class DummySyncTool1 extends HoodieSyncTool {
     public DummySyncTool1(Properties props, Configuration hadoopConf) {
       super(props, hadoopConf);

Reply via email to