Repository: curator Updated Branches: refs/heads/CURATOR-462 [created] 3ce294890
Fix CURATOR-462 -- return lease created in org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2.internalAcquire1Lease(Builder<Lease>, long, boolean, long) when the wait for it to become active is interrupted; test for the fix Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f9128a87 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f9128a87 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f9128a87 Branch: refs/heads/CURATOR-462 Commit: f9128a87c837fd4a687c3d54c73ea779e46085fc Parents: eb6ad40 Author: krajcsovszkig-ms <[email protected]> Authored: Tue Apr 10 14:08:45 2018 +0200 Committer: krajcsovszkig-ms <[email protected]> Committed: Tue Apr 10 14:08:45 2018 +0200 ---------------------------------------------------------------------- .../recipes/locks/InterProcessSemaphoreV2.java | 18 +++++-- .../locks/TestInterProcessSemaphore.java | 49 ++++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/f9128a87/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java index 7bc98f5..03e1088 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java @@ -331,6 +331,7 @@ public class InterProcessSemaphoreV2 static volatile CountDownLatch debugAcquireLatch = null; static volatile CountDownLatch debugFailedGetChildrenLatch = null; + volatile CountDownLatch debugWaitLatch = null; private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception { @@ -353,6 +354,7 @@ public class InterProcessSemaphoreV2 } Lease lease = null; + boolean success = false; try { @@ -383,13 +385,11 @@ public class InterProcessSemaphoreV2 { debugFailedGetChildrenLatch.countDown(); } - returnLease(lease); // otherwise the just created ZNode will be orphaned causing a dead lock throw e; } if ( !children.contains(nodeName) ) { log.error("Sequential path not found: " + path); - returnLease(lease); return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE; } @@ -402,20 +402,32 @@ public class InterProcessSemaphoreV2 long thisWaitMs = getThisWaitMs(startMs, waitMs); if ( thisWaitMs <= 0 ) { - returnLease(lease); return InternalAcquireResult.RETURN_NULL; } + if ( debugWaitLatch != null ) + { + debugWaitLatch.countDown(); + } wait(thisWaitMs); } else { + if ( debugWaitLatch != null ) + { + debugWaitLatch.countDown(); + } wait(); } } + success = true; } } finally { + if ( !success ) + { + returnLease(lease); + } client.removeWatchers(); } } http://git-wip-us.apache.org/repos/asf/curator/blob/f9128a87/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java index 73c76e8..50f6bce 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java @@ -778,4 +778,53 @@ public class TestInterProcessSemaphore extends BaseClassForTests TestCleanState.closeAndTestClean(client); } } + + @Test + public void testInterruptAcquire() throws Exception + { + // CURATOR-462 + final Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + try + { + final InterProcessSemaphoreV2 s1 = new InterProcessSemaphoreV2(client, "/test", 1); + final InterProcessSemaphoreV2 s2 = new InterProcessSemaphoreV2(client, "/test", 1); + final InterProcessSemaphoreV2 s3 = new InterProcessSemaphoreV2(client, "/test", 1); + + final CountDownLatch debugWaitLatch = s2.debugWaitLatch = new CountDownLatch(1); + + // Acquire exclusive semaphore + Lease lease = s1.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS); + Assert.assertNotNull(lease); + + // Queue up another semaphore on the same path + Future<Object> handle = Executors.newSingleThreadExecutor().submit(new Callable<Object>() { + + @Override + public Object call() throws Exception { + s2.acquire(); + return null; + } + }); + + // Wait until second lease is created and the wait is started for it to become active + Assert.assertTrue(timing.awaitLatch(debugWaitLatch)); + + // Interrupt the wait + handle.cancel(true); + + // Assert that the second lease is gone + timing.sleepABit(); + Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 1); + + // Assert that after closing the first (current) semaphore, we can acquire a new one + s1.returnLease(lease); + Assert.assertNotNull(s3.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS)); + } + finally + { + TestCleanState.closeAndTestClean(client); + } + } }
