This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new ddadeea CURATOR-607: InterProcessReadWriteLock should expose exposing
getLockPath (#394)
ddadeea is described below
commit ddadeeab9d8e1f42dac13ad65494c84e37f7f3d6
Author: faucct <[email protected]>
AuthorDate: Sun Oct 3 22:11:49 2021 +0300
CURATOR-607: InterProcessReadWriteLock should expose exposing getLockPath
(#394)
Co-authored-by: Nikita Sokolov <[email protected]>
---
.../recipes/locks/InterProcessReadWriteLock.java | 182 +++++++++++----------
.../locks/TestInterProcessReadWriteLock.java | 105 ++++++++++++
2 files changed, 200 insertions(+), 87 deletions(-)
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
index 57af212..a1ea94d 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
@@ -55,8 +55,8 @@ import java.util.List;
*/
public class InterProcessReadWriteLock
{
- private final InterProcessMutex readMutex;
- private final InterProcessMutex writeMutex;
+ private final ReadLock readMutex;
+ private final WriteLock writeMutex;
// must be the same length. LockInternals depends on it
private static final String READ_LOCK_NAME = "__READ__";
@@ -82,33 +82,100 @@ public class InterProcessReadWriteLock
{
super(client, path, lockName, maxLeases, driver);
this.lockName = lockName;
- this.lockData = lockData;
+ this.lockData = (lockData == null) ? null :
Arrays.copyOf(lockData, lockData.length);
}
@Override
- public Collection<String> getParticipantNodes() throws Exception
+ final public Collection<String> getParticipantNodes() throws Exception
{
- Collection<String> nodes = super.getParticipantNodes();
- Iterable<String> filtered = Iterables.filter
- (
- nodes,
- new Predicate<String>()
- {
- @Override
- public boolean apply(String node)
- {
- return node.contains(lockName);
- }
+ return
ImmutableList.copyOf(Iterables.filter(super.getParticipantNodes(), new
Predicate<String>() {
+ @Override
+ public boolean apply(String node) {
+ return node.contains(lockName);
}
- );
- return ImmutableList.copyOf(filtered);
+ }));
}
@Override
- protected byte[] getLockNodeBytes()
+ final protected byte[] getLockNodeBytes()
{
return lockData;
}
+
+ @Override
+ protected String getLockPath()
+ {
+ return super.getLockPath();
+ }
+ }
+
+ public static class WriteLock extends InternalInterProcessMutex
+ {
+ public WriteLock(CuratorFramework client, String basePath, byte[]
lockData)
+ {
+ super(client, basePath, WRITE_LOCK_NAME, lockData, 1, new
SortingLockInternalsDriver() {
+ @Override
+ public PredicateResults getsTheLock(
+ CuratorFramework client,
+ List<String> children,
+ String sequenceNodeName,
+ int maxLeases
+ ) throws Exception {
+ return super.getsTheLock(client, children,
sequenceNodeName, maxLeases);
+ }
+ });
+ }
+
+ @Override
+ protected String getLockPath()
+ {
+ return super.getLockPath();
+ }
+ }
+
+ public static class ReadLock extends InternalInterProcessMutex {
+ public ReadLock(CuratorFramework client, String basePath, byte[]
lockData, WriteLock writeLock)
+ {
+ super(client, basePath, READ_LOCK_NAME, lockData,
Integer.MAX_VALUE, new SortingLockInternalsDriver() {
+ @Override
+ public PredicateResults getsTheLock(
+ CuratorFramework client,
+ List<String> children,
+ String sequenceNodeName,
+ int maxLeases
+ ) throws Exception {
+ if (writeLock.isOwnedByCurrentThread()) {
+ return new PredicateResults(null, true);
+ }
+
+ int index = 0;
+ int firstWriteIndex = Integer.MAX_VALUE;
+ int ourIndex = -1;
+ for (String node : children) {
+ if (node.contains(WRITE_LOCK_NAME)) {
+ firstWriteIndex = Math.min(index, firstWriteIndex);
+ } else if (node.startsWith(sequenceNodeName)) {
+ ourIndex = index;
+ break;
+ }
+
+ ++index;
+ }
+
+ validateOurIndex(sequenceNodeName, ourIndex);
+
+ boolean getsTheLock = (ourIndex < firstWriteIndex);
+ String pathToWatch = getsTheLock ? null :
children.get(firstWriteIndex);
+ return new PredicateResults(pathToWatch, getsTheLock);
+ }
+ });
+ }
+
+ @Override
+ protected String getLockPath()
+ {
+ return super.getLockPath();
+ }
}
/**
@@ -127,41 +194,14 @@ public class InterProcessReadWriteLock
*/
public InterProcessReadWriteLock(CuratorFramework client, String basePath,
byte[] lockData)
{
- lockData = (lockData == null) ? null : Arrays.copyOf(lockData,
lockData.length);
-
- writeMutex = new InternalInterProcessMutex
- (
- client,
- basePath,
- WRITE_LOCK_NAME,
- lockData,
- 1,
- new SortingLockInternalsDriver()
- {
- @Override
- public PredicateResults getsTheLock(CuratorFramework client,
List<String> children, String sequenceNodeName, int maxLeases) throws Exception
- {
- return super.getsTheLock(client, children,
sequenceNodeName, maxLeases);
- }
- }
- );
-
- readMutex = new InternalInterProcessMutex
- (
- client,
- basePath,
- READ_LOCK_NAME,
- lockData,
- Integer.MAX_VALUE,
- new SortingLockInternalsDriver()
- {
- @Override
- public PredicateResults getsTheLock(CuratorFramework client,
List<String> children, String sequenceNodeName, int maxLeases) throws Exception
- {
- return readLockPredicate(children, sequenceNodeName);
- }
- }
- );
+ this.writeMutex = new WriteLock(client, basePath, lockData);
+ this.readMutex = new ReadLock(client, basePath, lockData, writeMutex);
+ }
+
+ protected InterProcessReadWriteLock(WriteLock writeLock, ReadLock readLock)
+ {
+ this.writeMutex = writeLock;
+ this.readMutex = readLock;
}
/**
@@ -169,7 +209,7 @@ public class InterProcessReadWriteLock
*
* @return read lock
*/
- public InterProcessMutex readLock()
+ public ReadLock readLock()
{
return readMutex;
}
@@ -179,40 +219,8 @@ public class InterProcessReadWriteLock
*
* @return write lock
*/
- public InterProcessMutex writeLock()
+ public WriteLock writeLock()
{
return writeMutex;
}
-
- private PredicateResults readLockPredicate(List<String> children, String
sequenceNodeName) throws Exception
- {
- if ( writeMutex.isOwnedByCurrentThread() )
- {
- return new PredicateResults(null, true);
- }
-
- int index = 0;
- int firstWriteIndex = Integer.MAX_VALUE;
- int ourIndex = -1;
- for ( String node : children )
- {
- if ( node.contains(WRITE_LOCK_NAME) )
- {
- firstWriteIndex = Math.min(index, firstWriteIndex);
- }
- else if ( node.startsWith(sequenceNodeName) )
- {
- ourIndex = index;
- break;
- }
-
- ++index;
- }
-
- StandardLockInternalsDriver.validateOurIndex(sequenceNodeName,
ourIndex);
-
- boolean getsTheLock = (ourIndex < firstWriteIndex);
- String pathToWatch = getsTheLock ? null :
children.get(firstWriteIndex);
- return new PredicateResults(pathToWatch, getsTheLock);
- }
}
diff --git
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
index a601241..b54ae50 100644
---
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
+++
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
@@ -24,12 +24,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.KillSession;
+import org.apache.zookeeper.KeeperException;
import org.junit.jupiter.api.Test;
import java.util.Collection;
@@ -362,4 +365,106 @@ public class TestInterProcessReadWriteLock extends
BaseClassForTests
}
}
}
+
+ public static class LockPathInterProcessReadWriteLock extends
InterProcessReadWriteLock
+ {
+ private final WriteLock writeLock;
+ private final ReadLock readLock;
+
+ public LockPathInterProcessReadWriteLock(CuratorFramework client,
String basePath)
+ {
+ this(client, basePath, null);
+ }
+
+ public LockPathInterProcessReadWriteLock(CuratorFramework client,
String basePath, byte[] lockData)
+ {
+ this(client, basePath, lockData, new WriteLock(client, basePath,
lockData));
+ }
+
+ private LockPathInterProcessReadWriteLock(
+ CuratorFramework client,
+ String basePath,
+ byte[] lockData,
+ WriteLock writeLock
+ )
+ {
+ this(writeLock, new ReadLock(client, basePath, lockData,
writeLock));
+ }
+
+ private LockPathInterProcessReadWriteLock(WriteLock writeLock,
ReadLock readLock)
+ {
+ super(writeLock, readLock);
+ this.writeLock = writeLock;
+ this.readLock = readLock;
+ }
+
+ @Override
+ public WriteLock writeLock()
+ {
+ return writeLock;
+ }
+
+ @Override
+ public ReadLock readLock()
+ {
+ return readLock;
+ }
+
+ public static class WriteLock extends
InterProcessReadWriteLock.WriteLock
+ {
+ private WriteLock(CuratorFramework client, String basePath, byte[]
lockData)
+ {
+ super(client, basePath, lockData);
+ }
+
+ @Override
+ public String getLockPath()
+ {
+ return super.getLockPath();
+ }
+ }
+
+ public static class ReadLock extends InterProcessReadWriteLock.ReadLock
+ {
+ private ReadLock(CuratorFramework client, String basePath, byte[]
lockData, WriteLock writeLock)
+ {
+ super(client, basePath, lockData, writeLock);
+ }
+
+ @Override
+ public String getLockPath()
+ {
+ return super.getLockPath();
+ }
+ }
+ }
+
+ @Test
+ public void testLockPath() throws Exception
+ {
+ CuratorFramework client1 =
CuratorFrameworkFactory.newClient(server.getConnectString(), new
RetryOneTime(1));
+ CuratorFramework client2 =
CuratorFrameworkFactory.newClient(server.getConnectString(), new
RetryOneTime(1));
+ try
+ {
+ client1.start();
+ client2.start();
+ LockPathInterProcessReadWriteLock lock1 = new
LockPathInterProcessReadWriteLock(client1, "/lock");
+ LockPathInterProcessReadWriteLock lock2 = new
LockPathInterProcessReadWriteLock(client2, "/lock");
+ lock1.writeLock().acquire();
+ KillSession.kill(client1.getZookeeperClient().getZooKeeper());
+ lock2.readLock().acquire();
+ try {
+ client1.getData().forPath(lock1.writeLock().getLockPath());
+ fail("expected not to find node");
+ } catch (KeeperException.NoNodeException ignored) {
+ }
+ lock2.readLock().release();
+ lock1.writeLock().release();
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client2);
+ TestCleanState.closeAndTestClean(client1);
+ }
+ }
}