This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4a48f99 [HUDI-3064][HUDI-3054] FileSystemBasedLockProviderTestClass
tryLock fix and TestHoodieClientMultiWriter test fixes (#4384)
4a48f99 is described below
commit 4a48f99a596b5596fb654b56a6533b908bb9d8f3
Author: Manoj Govindassamy <[email protected]>
AuthorDate: Sun Dec 19 10:31:02 2021 -0800
[HUDI-3064][HUDI-3054] FileSystemBasedLockProviderTestClass tryLock fix and
TestHoodieClientMultiWriter test fixes (#4384)
- Made FileSystemBasedLockProviderTestClass thread safe and fixed the
tryLock retry logic.
- Made TestHoodieClientMultiWriter. testHoodieClientBasicMultiWriter
deterministic in verifying the HoodieWriteConflictException.
---
.../FileSystemBasedLockProviderTestClass.java | 77 ++++++++-------
.../hudi/client/TestHoodieClientMultiWriter.java | 106 ++++++++++++++++-----
2 files changed, 121 insertions(+), 62 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
index 2fc6ba4..97ad050 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
@@ -36,38 +36,37 @@ import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_R
import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
/**
- * This lock provider is used for testing purposes only. It provides a simple
file system based lock using HDFS atomic
- * create operation. This lock does not support cleaning/expiring the lock
after a failed write hence cannot be used
- * in production environments.
+ * This lock provider is used for testing purposes only. It provides a simple
file system based lock
+ * using filesystem's atomic create operation. This lock does not support
cleaning/expiring the lock
+ * after a failed write. Must not be used in production environments.
*/
public class FileSystemBasedLockProviderTestClass implements
LockProvider<String>, Serializable {
- private static final String LOCK_NAME = "acquired";
+ private static final String LOCK = "lock";
- private String lockPath;
+ private final int retryMaxCount;
+ private final int retryWaitTimeMs;
private transient FileSystem fs;
+ private transient Path lockFile;
protected LockConfiguration lockConfiguration;
public FileSystemBasedLockProviderTestClass(final LockConfiguration
lockConfiguration, final Configuration configuration) {
this.lockConfiguration = lockConfiguration;
- this.lockPath =
lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY);
- this.fs = FSUtils.getFs(this.lockPath, configuration);
- }
-
- public void acquireLock() {
- try {
- fs.create(new Path(lockPath + "/" + LOCK_NAME), false).close();
- } catch (IOException e) {
- throw new HoodieIOException("Failed to acquire lock", e);
- }
+ final String lockDirectory =
lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY);
+ this.retryWaitTimeMs =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY);
+ this.retryMaxCount =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY);
+ this.lockFile = new Path(lockDirectory + "/" + LOCK);
+ this.fs = FSUtils.getFs(this.lockFile.toString(), configuration);
}
@Override
public void close() {
- try {
- fs.delete(new Path(lockPath + "/" + LOCK_NAME), true);
- } catch (IOException e) {
- throw new HoodieLockException("Unable to release lock", e);
+ synchronized (LOCK) {
+ try {
+ fs.delete(this.lockFile, true);
+ } catch (IOException e) {
+ throw new HoodieLockException("Unable to release lock: " + getLock(),
e);
+ }
}
}
@@ -75,39 +74,45 @@ public class FileSystemBasedLockProviderTestClass
implements LockProvider<String
public boolean tryLock(long time, TimeUnit unit) {
try {
int numRetries = 0;
- while (fs.exists(new Path(lockPath + "/" + LOCK_NAME))
- && (numRetries++ <=
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY))) {
-
Thread.sleep(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY));
- }
- synchronized (LOCK_NAME) {
- if (fs.exists(new Path(lockPath + "/" + LOCK_NAME))) {
- return false;
+ synchronized (LOCK) {
+ while (fs.exists(this.lockFile)) {
+ LOCK.wait(retryWaitTimeMs);
+ numRetries++;
+ if (numRetries > retryMaxCount) {
+ return false;
+ }
}
acquireLock();
+ return fs.exists(this.lockFile);
}
- return true;
} catch (IOException | InterruptedException e) {
- throw new HoodieLockException("Failed to acquire lock", e);
+ throw new HoodieLockException("Failed to acquire lock: " + getLock(), e);
}
}
@Override
public void unlock() {
- try {
- if (fs.exists(new Path(lockPath + "/" + LOCK_NAME))) {
- fs.delete(new Path(lockPath + "/" + LOCK_NAME), true);
+ synchronized (LOCK) {
+ try {
+ if (fs.exists(this.lockFile)) {
+ fs.delete(this.lockFile, true);
+ }
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to delete lock " + getLock() + "on
disk", io);
}
- } catch (IOException io) {
- throw new HoodieIOException("Unable to delete lock on disk", io);
}
}
@Override
public String getLock() {
+ return this.lockFile.toString();
+ }
+
+ private void acquireLock() {
try {
- return fs.listStatus(new Path(lockPath))[0].getPath().toString();
- } catch (Exception e) {
- throw new HoodieLockException("Failed to retrieve lock status from lock
path " + lockPath);
+ fs.create(this.lockFile, false).close();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to acquire lock: " + getLock(), e);
}
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 69fc401..5f8b26b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -23,6 +23,7 @@ import
org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -40,7 +41,6 @@ import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -53,6 +53,7 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -61,6 +62,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -96,7 +98,7 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"250");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10");
- HoodieWriteConfig cfg = getConfigBuilder()
+ HoodieWriteConfig writeConfig = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
@@ -104,41 +106,64 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class)
.build()).withAutoCommit(false).withProperties(properties).build();
+
// Create the first commit
- createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200);
- ExecutorService executors = Executors.newFixedThreadPool(2);
- SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
- SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
- AtomicBoolean writer1Conflict = new AtomicBoolean(false);
- AtomicBoolean writer2Conflict = new AtomicBoolean(false);
+ createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig),
"000", "001", 200);
+
+ final int threadCount = 2;
+ final ExecutorService executors = Executors.newFixedThreadPool(2);
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig);
+
+ final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+ final AtomicBoolean writer1Completed = new AtomicBoolean(false);
+ final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+
Future future1 = executors.submit(() -> {
- String newCommitTime = "004";
- int numRecords = 100;
- String commitTimeBetweenPrevAndNew = "002";
try {
- createCommitWithUpserts(cfg, client1, "002",
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
- } catch (Exception e1) {
- assertTrue(e1 instanceof HoodieWriteConflictException);
- writer1Conflict.set(true);
+ final String nextCommitTime = "002";
+ final JavaRDD<WriteStatus> writeStatusList =
startCommitForUpdate(writeConfig, client1, nextCommitTime, 100);
+
+ // Wait for the 2nd writer to start the commit
+ cyclicBarrier.await(60, TimeUnit.SECONDS);
+
+ // Commit the update before the 2nd writer
+ assertDoesNotThrow(() -> {
+ client1.commit(nextCommitTime, writeStatusList);
+ });
+
+ // Signal the 2nd writer to go ahead for his commit
+ cyclicBarrier.await(60, TimeUnit.SECONDS);
+ writer1Completed.set(true);
+ } catch (Exception e) {
+ writer1Completed.set(false);
}
});
+
Future future2 = executors.submit(() -> {
- String newCommitTime = "005";
- int numRecords = 100;
- String commitTimeBetweenPrevAndNew = "002";
try {
- createCommitWithUpserts(cfg, client2, "002",
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
- } catch (Exception e2) {
- assertTrue(e2 instanceof HoodieWriteConflictException);
- writer2Conflict.set(true);
+ final String nextCommitTime = "003";
+
+ // Wait for the 1st writer to make progress with the commit
+ cyclicBarrier.await(60, TimeUnit.SECONDS);
+ final JavaRDD<WriteStatus> writeStatusList =
startCommitForUpdate(writeConfig, client2, nextCommitTime, 100);
+
+ // Wait for the 1st writer to complete the commit
+ cyclicBarrier.await(60, TimeUnit.SECONDS);
+ assertThrows(HoodieWriteConflictException.class, () -> {
+ client2.commit(nextCommitTime, writeStatusList);
+ });
+ writer2Completed.set(true);
+ } catch (Exception e) {
+ writer2Completed.set(false);
}
});
+
future1.get();
future2.get();
- Assertions.assertTrue(writer1Conflict.get() || writer2Conflict.get(),
"Either of writer1 or writer2 should have failed "
- + "with conflict");
- Assertions.assertFalse(writer1Conflict.get() && writer2Conflict.get(),
"Both writer1 and writer2 should not result "
- + "in conflict");
+
+ // both should have been completed successfully. I mean, we already assert
for conflict for writer2 at L155.
+ assertTrue(writer1Completed.get() && writer2Completed.get());
}
@Test
@@ -443,4 +468,33 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
numRecords, 200, 2);
client.commit(newCommitTime, result);
}
+
+ /**
+ * Start the commit for an update operation with given number of records
+ *
+ * @param writeConfig - Write config
+ * @param writeClient - Write client for starting the commit
+ * @param newCommitTime - Commit time for the update
+ * @param numRecords - Number of records to update
+ * @return RDD of write status from the update
+ * @throws Exception
+ */
+ private JavaRDD<WriteStatus> startCommitForUpdate(HoodieWriteConfig
writeConfig, SparkRDDWriteClient writeClient,
+ String newCommitTime, int
numRecords) throws Exception {
+ // Start the new commit
+ writeClient.startCommitWithTime(newCommitTime);
+
+ // Prepare update records
+ final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+ generateWrapRecordsFn(false, writeConfig,
dataGen::generateUniqueUpdates);
+ final List<HoodieRecord> records = recordGenFunction.apply(newCommitTime,
numRecords);
+ final JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+ // Write updates
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> writeFn = SparkRDDWriteClient::upsert;
+ JavaRDD<WriteStatus> result = writeFn.apply(writeClient, writeRecords,
newCommitTime);
+ List<WriteStatus> statuses = result.collect();
+ assertNoWriteErrors(statuses);
+ return result;
+ }
}