This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 79412ed2a63 [HUDI-6186] Fix lock identity in InProcessLockProvider
(#8658)
79412ed2a63 is described below
commit 79412ed2a634fb188da402370694c2baef7d1dfc
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sun May 7 20:53:06 2023 -0700
[HUDI-6186] Fix lock identity in InProcessLockProvider (#8658)
This commit fixes a bug introduced by #6847. #6847 extends the
InProcessLockProvider to support multiple tables in the same process, by having
an in-memory static final map storing the mapping of the table base path to the
read-write reentrant lock, so that the writer uses the corresponding lock based
on the base path. When closing the lock provider, close() removes the lock
entry. Since close() is called when closing the write client, the lock is
removed and subsequent concurrent wri [...]
---
.../transaction/lock/InProcessLockProvider.java | 1 -
.../transaction/TestInProcessLockProvider.java | 117 +++++++++++++++++++++
2 files changed, 117 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
index 9eab061ddfa..8e57190d1a9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
@@ -124,7 +124,6 @@ public class InProcessLockProvider implements
LockProvider<ReentrantReadWriteLoc
lock.writeLock().unlock();
}
LOG.info(getLogMessage(LockState.ALREADY_RELEASED));
- LOCK_INSTANCE_PER_BASEPATH.remove(basePath);
}
private String getLogMessage(LockState state) {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
index 8d39b8b5f2d..d1d43d7f3ae 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
@@ -24,12 +24,15 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;
+import junit.framework.AssertionFailedError;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,6 +55,120 @@ public class TestInProcessLockProvider {
lockConfiguration2 = new LockConfiguration(properties);
}
+ @Test
+ public void testLockIdentity() throws InterruptedException {
+ // The lifecycle of an InProcessLockProvider should not affect the
singleton lock
+ // for a single table, i.e., all three writers should hold the same
underlying lock instance
+ // on the same table.
+ // Writer 1: lock |----------------| unlock and close
+ // Writer 2: try lock | ... lock |------| unlock and close
+ // Writer 3: try lock | ... lock |------|
unlock and close
+ List<InProcessLockProvider> lockProviderList = new ArrayList<>();
+ InProcessLockProvider lockProvider1 = new
InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+ lockProviderList.add(lockProvider1);
+ AtomicBoolean writer1Completed = new AtomicBoolean(false);
+ AtomicBoolean writer2TryLock = new AtomicBoolean(false);
+ AtomicBoolean writer2Locked = new AtomicBoolean(false);
+ AtomicBoolean writer2Completed = new AtomicBoolean(false);
+ AtomicBoolean writer3TryLock = new AtomicBoolean(false);
+ AtomicBoolean writer3Completed = new AtomicBoolean(false);
+
+ // Writer 1
+ assertDoesNotThrow(() -> {
+ LOG.info("Writer 1 tries to acquire the lock.");
+ lockProvider1.lock();
+ LOG.info("Writer 1 acquires the lock.");
+ });
+ // Writer 2 thread in parallel, should block
+ // and later acquire the lock once it is released
+ Thread writer2 = new Thread(() -> {
+ InProcessLockProvider lockProvider2 = new
InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+ lockProviderList.add(lockProvider2);
+ assertDoesNotThrow(() -> {
+ LOG.info("Writer 2 tries to acquire the lock.");
+ writer2TryLock.set(true);
+ lockProvider2.lock();
+ LOG.info("Writer 2 acquires the lock.");
+ });
+ writer2Locked.set(true);
+
+ while (!writer3TryLock.get()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ assertDoesNotThrow(() -> {
+ lockProvider2.unlock();
+ LOG.info("Writer 2 releases the lock.");
+ });
+ lockProvider2.close();
+ LOG.info("Writer 2 closes the lock provider.");
+ writer2Completed.set(true);
+ });
+
+ Thread writer3 = new Thread(() -> {
+ while (!writer2Locked.get() || !writer1Completed.get()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ // Lock instance of Writer 3 should be held by Writer 2
+ InProcessLockProvider lockProvider3 = new
InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+ lockProviderList.add(lockProvider3);
+ boolean isLocked = lockProvider3.getLock().isWriteLocked();
+ if (!isLocked) {
+ writer3TryLock.set(true);
+ throw new AssertionFailedError("The lock instance in Writer 3 should
be held by Writer 2: "
+ + lockProvider3.getLock());
+ }
+ assertDoesNotThrow(() -> {
+ LOG.info("Writer 3 tries to acquire the lock.");
+ writer3TryLock.set(true);
+ lockProvider3.lock();
+ LOG.info("Writer 3 acquires the lock.");
+ });
+
+ assertDoesNotThrow(() -> {
+ lockProvider3.unlock();
+ LOG.info("Writer 3 releases the lock.");
+ });
+ lockProvider3.close();
+ LOG.info("Writer 3 closes the lock provider.");
+ writer3Completed.set(true);
+ });
+
+ writer2.start();
+ writer3.start();
+
+ while (!writer2TryLock.get()) {
+ Thread.sleep(100);
+ }
+
+ assertDoesNotThrow(() -> {
+ lockProvider1.unlock();
+ LOG.info("Writer 1 releases the lock.");
+ lockProvider1.close();
+ LOG.info("Writer 1 closes the lock provider.");
+ writer1Completed.set(true);
+ });
+
+ try {
+ writer2.join();
+ writer3.join();
+ } catch (InterruptedException e) {
+ // Ignore any exception
+ }
+ Assertions.assertTrue(writer2Completed.get());
+ Assertions.assertTrue(writer3Completed.get());
+ Assertions.assertEquals(lockProviderList.get(0).getLock(),
lockProviderList.get(1).getLock());
+ Assertions.assertEquals(lockProviderList.get(1).getLock(),
lockProviderList.get(2).getLock());
+ }
+
@Test
public void testLockAcquisition() {
InProcessLockProvider inProcessLockProvider = new
InProcessLockProvider(lockConfiguration1, hadoopConfiguration);