This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 49e09d9e78271b3dac3114dcb810b291a63e8355 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Thu Jan 6 13:04:10 2022 -0500 [HUDI-3165] Enabling InProcessLockProvider for all multi-writer tests instead of FileSystemBasedLockProviderTestClass (#4427) --- .../transaction/lock/InProcessLockProvider.java | 5 ++-- .../hudi/client/TestHoodieClientMultiWriter.java | 26 ++++++++--------- .../functional/TestHoodieBackedMetadata.java | 19 ++++++------ .../TestHoodieDeltaStreamerWithMultiWriter.java | 34 +++++++++------------- 4 files changed, 37 insertions(+), 47 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java index cab9d95..426d1cf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java @@ -30,6 +30,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.jetbrains.annotations.NotNull; +import java.io.Serializable; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -42,9 +43,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * HoodieLockException. Threads other than the current lock owner, will * block on lock() and return false on tryLock(). */ -public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLock> { +public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLock>, Serializable { - private static final Logger LOG = LogManager.getLogger(ZookeeperBasedLockProvider.class); + private static final Logger LOG = LogManager.getLogger(InProcessLockProvider.class); private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock(); private final long maxWaitTimeMillis; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index a676729..93f35ff 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -18,7 +18,8 @@ package org.apache.hudi.client; -import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; @@ -95,10 +96,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { } Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); HoodieWriteConfig writeConfig = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) @@ -106,7 +104,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { .withEmbeddedTimelineServerEnabled(false) // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) - .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) .build()).withAutoCommit(false).withProperties(properties).build(); // Create the first commit @@ -187,8 +185,9 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"3000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"20"); HoodieWriteConfig cfg = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() @@ -198,7 +197,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { .build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withLockConfig(HoodieLockConfig.newBuilder() - .withLockProvider(FileSystemBasedLockProviderTestClass.class) + .withLockProvider(InProcessLockProvider.class) .build()) .withAutoCommit(false) // Timeline-server-based markers are not used for multi-writer tests @@ -245,10 +244,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { } Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); // Disabling embedded timeline server, it doesn't work with multiwriter HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false) @@ -261,7 +258,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType( FileSystemViewStorageType.MEMORY).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) - .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) .build()).withAutoCommit(false).withProperties(properties); Set<String> validInstants = new HashSet<>(); // Create the first commit with inserts @@ -367,13 +364,14 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { } Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) .withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) - .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) .build()).withAutoCommit(false).withProperties(properties); HoodieWriteConfig cfg = writeConfigBuilder.build(); HoodieWriteConfig cfg2 = writeConfigBuilder.build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 392d687..2051696 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -22,7 +22,7 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass; +import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.SerializableConfiguration; @@ -891,13 +891,12 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) - .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build()) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) .withProperties(properties) .build(); @@ -955,14 +954,14 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); + HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4).build()) .withAutoCommit(false) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) - .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build()) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) .withProperties(properties) .build(); @@ -1286,12 +1285,12 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) - .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build()) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) .withProperties(properties) .build(); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { @@ -1321,7 +1320,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) - .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build()) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) .withProperties(properties) .build(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 7315a84..ce48211 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -82,17 +82,13 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona @ParameterizedTest @EnumSource(HoodieTableType.class) void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception { - // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts + // NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); // enable carrying forward latest checkpoint TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); - props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); - props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10"); + props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, @@ -125,22 +121,18 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona @ParameterizedTest @EnumSource(HoodieTableType.class) void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType tableType) throws Exception { - // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts + // NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); - props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); - props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10"); + props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); // create new ingestion & backfill job config to generate only INSERTS to avoid conflict props = prepareMultiWriterProps(fs(), basePath, propsFilePath); - props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); - props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); + props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); props.setProperty("hoodie.test.source.generate.inserts", "true"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER); HoodieDeltaStreamer.Config cfgBackfillJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT, @@ -176,12 +168,12 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona } private void testCheckpointCarryOver(HoodieTableType tableType) throws Exception { - // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts + // NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); - props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); - props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); + props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, @@ -202,8 +194,8 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona // run the backfill job props = prepareMultiWriterProps(fs(), basePath, propsFilePath); - props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); - props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); + props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); // get current checkpoint after preparing base dataset with some commits
