This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cecd79e947b [HUDI-6652] Implement basePath-level synchronization in
runHoodieMetaSync (#9374)
cecd79e947b is described below
commit cecd79e947b3a274a26e72a1f9c730323393f8f9
Author: Sagar Sumit <[email protected]>
AuthorDate: Sun Aug 6 11:34:46 2023 +0530
[HUDI-6652] Implement basePath-level synchronization in runHoodieMetaSync
(#9374)
---
.../hudi/sync/common/util/SyncUtilHelpers.java | 65 +++++++++++++++-------
.../hudi/sync/common/util/TestSyncUtilHelpers.java | 64 +++++++++++++++++++++
2 files changed, 108 insertions(+), 21 deletions(-)
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index 466c0c71ffa..440baa86c0c 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetaSyncException;
-import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncTool;
import org.apache.hadoop.conf.Configuration;
@@ -33,36 +32,56 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
/**
* Helper class for syncing Hudi commit data with external metastores.
*/
public class SyncUtilHelpers {
private static final Logger LOG =
LoggerFactory.getLogger(SyncUtilHelpers.class);
+
+ // Locks for each table (base path) to avoid concurrent modification of the
same underneath meta storage.
+ // Meta store such as Hive may encounter {@code
ConcurrentModificationException} for #alter_table.
+ private static final ConcurrentHashMap<String, Lock> TABLE_LOCKS = new
ConcurrentHashMap<>();
+
/**
* Create an instance of an implementation of {@link HoodieSyncTool} that
will sync all the relevant meta information
* with an external metastore such as Hive etc. to ensure Hoodie tables can
be queried or read via external systems.
*
- * <p>IMPORTANT: make this method class level thread safe to avoid
concurrent modification of the same underneath meta storage.
- * Meta store such as Hive may encounter {@code
ConcurrentModificationException} for #alter_table.
- *
- * @param syncToolClassName Class name of the {@link HoodieSyncTool}
implementation.
- * @param props property map.
- * @param hadoopConfig Hadoop confs.
- * @param fs Filesystem used.
- * @param targetBasePath The target base path that contains the hoodie
table.
- * @param baseFileFormat The file format used by the hoodie table
(defaults to PARQUET).
+ * @param syncToolClassName Class name of the {@link HoodieSyncTool}
implementation.
+ * @param props property map.
+ * @param hadoopConfig Hadoop confs.
+ * @param fs Filesystem used.
+ * @param targetBasePath The target base path that contains the hoodie
table.
+ * @param baseFileFormat The file format used by the hoodie table
(defaults to PARQUET).
*/
- public static synchronized void runHoodieMetaSync(String syncToolClassName,
+ public static void runHoodieMetaSync(String syncToolClassName,
TypedProperties props,
Configuration hadoopConfig,
FileSystem fs,
String targetBasePath,
String baseFileFormat) {
- try (HoodieSyncTool syncTool = instantiateMetaSyncTool(syncToolClassName,
props, hadoopConfig, fs, targetBasePath, baseFileFormat)) {
- syncTool.syncHoodieTable();
- } catch (Throwable e) {
- throw new HoodieMetaSyncException("Could not sync using the meta sync
class " + syncToolClassName, e);
+ if (targetBasePath == null) {
+ throw new IllegalArgumentException("Target base path must not be null");
+ }
+
+ // Get or create a lock for the specific table
+ Lock tableLock = TABLE_LOCKS.computeIfAbsent(targetBasePath, k -> new
ReentrantLock());
+ tableLock.lock();
+ try {
+ try (HoodieSyncTool syncTool =
instantiateMetaSyncTool(syncToolClassName, props, hadoopConfig, fs,
targetBasePath, baseFileFormat)) {
+ syncTool.syncHoodieTable();
+ } catch (Throwable e) {
+ throw new HoodieMetaSyncException("Could not sync using the meta sync
class " + syncToolClassName, e);
+ }
+ } finally {
+ tableLock.unlock();
}
}
@@ -74,10 +93,10 @@ public class SyncUtilHelpers {
String baseFileFormat) {
TypedProperties properties = new TypedProperties();
properties.putAll(props);
- properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), targetBasePath);
- properties.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(),
baseFileFormat);
- if (properties.containsKey(HoodieSyncConfig.META_SYNC_TABLE_NAME.key())) {
- String tableName =
properties.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key());
+ properties.put(META_SYNC_BASE_PATH.key(), targetBasePath);
+ properties.put(META_SYNC_BASE_FILE_FORMAT.key(), baseFileFormat);
+ if (properties.containsKey(META_SYNC_TABLE_NAME.key())) {
+ String tableName = properties.getString(META_SYNC_TABLE_NAME.key());
if (!tableName.equals(tableName.toLowerCase())) {
LOG.warn(
"Table name \"" + tableName + "\" contains capital letters. Your
metastore may automatically convert this to lower case and can cause table not
found errors during subsequent syncs.");
@@ -110,7 +129,7 @@ public class SyncUtilHelpers {
}
}
- public static HoodieException
getHoodieMetaSyncException(Map<String,HoodieException> failedMetaSyncs) {
+ public static HoodieException getHoodieMetaSyncException(Map<String,
HoodieException> failedMetaSyncs) {
if (failedMetaSyncs.size() == 1) {
return failedMetaSyncs.values().stream().findFirst().get();
}
@@ -122,6 +141,10 @@ public class SyncUtilHelpers {
sb.append(failedMetaSyncs.get(impl).getMessage());
sb.append("\n");
}
- return new HoodieMetaSyncException(sb.toString(),failedMetaSyncs);
+ return new HoodieMetaSyncException(sb.toString(), failedMetaSyncs);
+ }
+
+ static int getNumberOfLocks() {
+ return TABLE_LOCKS.size();
}
}
diff --git
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
index 155c96f8560..2e730493bb4 100644
---
a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
+++
b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
@@ -31,10 +31,17 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
public class TestSyncUtilHelpers {
private static final String BASE_PATH = "/tmp/test";
@@ -100,6 +107,63 @@ public class TestSyncUtilHelpers {
assertEquals(expectedMessage, t.getMessage());
}
+ @Test
+ void testMetaSyncConcurrency() throws Exception {
+ String syncToolClassName = DummySyncTool1.class.getName();
+ String baseFileFormat = "PARQUET";
+ String tableName1 = "table1";
+ String tableName2 = "table2";
+ String targetBasePath1 = "path/to/target1";
+ String targetBasePath2 = "path/to/target2";
+
+ TypedProperties props1 = new TypedProperties();
+ props1.setProperty(META_SYNC_TABLE_NAME.key(), tableName1);
+
+ TypedProperties props2 = new TypedProperties();
+ props1.setProperty(META_SYNC_TABLE_NAME.key(), tableName2);
+
+ // Simulate processing time here
+ HoodieSyncTool syncToolMock = mock(HoodieSyncTool.class);
+ doAnswer(invocation -> {
+ Thread.sleep(1000);
+ return null;
+ }).when(syncToolMock).syncHoodieTable();
+
+ AtomicBoolean targetBasePath1Running = new AtomicBoolean(false);
+ AtomicBoolean targetBasePath2Running = new AtomicBoolean(false);
+
+ ExecutorService executor = Executors.newFixedThreadPool(4);
+
+ // Submitting tasks with targetBasePath1
+ executor.submit(() -> {
+ targetBasePath1Running.set(true);
+ SyncUtilHelpers.runHoodieMetaSync(syncToolClassName, props1, hadoopConf,
fileSystem, targetBasePath1, baseFileFormat);
+ targetBasePath1Running.set(false);
+ });
+ executor.submit(() -> {
+ assertEquals(1, SyncUtilHelpers.getNumberOfLocks()); // Only one lock
should exist for this base path
+ SyncUtilHelpers.runHoodieMetaSync(syncToolClassName, props1, hadoopConf,
fileSystem, targetBasePath1, baseFileFormat);
+ });
+
+ // Submitting tasks with targetBasePath2
+ executor.submit(() -> {
+ targetBasePath2Running.set(true);
+ SyncUtilHelpers.runHoodieMetaSync(syncToolClassName, props2, hadoopConf,
fileSystem, targetBasePath2, baseFileFormat);
+ targetBasePath2Running.set(false);
+ });
+ executor.submit(() -> {
+ assertEquals(2, SyncUtilHelpers.getNumberOfLocks()); // Two locks should
exist for both base paths
+ SyncUtilHelpers.runHoodieMetaSync(syncToolClassName, props2, hadoopConf,
fileSystem, targetBasePath2, baseFileFormat);
+ });
+
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+
+ // Check if there was a time when both paths were running in parallel
+ assertTrue(targetBasePath1Running.get() && targetBasePath2Running.get(),
+ "The methods did not run concurrently for different base paths");
+ }
+
public static class DummySyncTool1 extends HoodieSyncTool {
public DummySyncTool1(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);