Repository: curator
Updated Branches:
  refs/heads/CURATOR-462 [created] 3ce294890


Fix CURATOR-462 -- return lease created in 
org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2.internalAcquire1Lease(Builder<Lease>,
 long, boolean, long) when the wait for it to become active is interrupted; 
test for the fix


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f9128a87
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f9128a87
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f9128a87

Branch: refs/heads/CURATOR-462
Commit: f9128a87c837fd4a687c3d54c73ea779e46085fc
Parents: eb6ad40
Author: krajcsovszkig-ms <[email protected]>
Authored: Tue Apr 10 14:08:45 2018 +0200
Committer: krajcsovszkig-ms <[email protected]>
Committed: Tue Apr 10 14:08:45 2018 +0200

----------------------------------------------------------------------
 .../recipes/locks/InterProcessSemaphoreV2.java  | 18 +++++--
 .../locks/TestInterProcessSemaphore.java        | 49 ++++++++++++++++++++
 2 files changed, 64 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/f9128a87/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 7bc98f5..03e1088 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -331,6 +331,7 @@ public class InterProcessSemaphoreV2
 
     static volatile CountDownLatch debugAcquireLatch = null;
     static volatile CountDownLatch debugFailedGetChildrenLatch = null;
+    volatile CountDownLatch debugWaitLatch = null;
 
     private InternalAcquireResult 
internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, 
boolean hasWait, long waitMs) throws Exception
     {
@@ -353,6 +354,7 @@ public class InterProcessSemaphoreV2
         }
 
         Lease lease = null;
+        boolean success = false;
 
         try
         {
@@ -383,13 +385,11 @@ public class InterProcessSemaphoreV2
                             {
                                 debugFailedGetChildrenLatch.countDown();
                             }
-                            returnLease(lease); // otherwise the just created 
ZNode will be orphaned causing a dead lock
                             throw e;
                         }
                         if ( !children.contains(nodeName) )
                         {
                             log.error("Sequential path not found: " + path);
-                            returnLease(lease);
                             return 
InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
                         }
     
@@ -402,20 +402,32 @@ public class InterProcessSemaphoreV2
                             long thisWaitMs = getThisWaitMs(startMs, waitMs);
                             if ( thisWaitMs <= 0 )
                             {
-                                returnLease(lease);
                                 return InternalAcquireResult.RETURN_NULL;
                             }
+                            if ( debugWaitLatch != null )
+                            {
+                                debugWaitLatch.countDown();
+                            }
                             wait(thisWaitMs);
                         }
                         else
                         {
+                            if ( debugWaitLatch != null )
+                            {
+                                debugWaitLatch.countDown();
+                            }
                             wait();
                         }
                     }
+                    success = true;
                 }
             }
             finally
             {
+                if ( !success )
+                {
+                    returnLease(lease);
+                }
                 client.removeWatchers();
             }
         }

http://git-wip-us.apache.org/repos/asf/curator/blob/f9128a87/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 73c76e8..50f6bce 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -778,4 +778,53 @@ public class TestInterProcessSemaphore extends 
BaseClassForTests
             TestCleanState.closeAndTestClean(client);
         }
     }
+    
+    @Test
+    public void testInterruptAcquire() throws Exception
+    {
+        // CURATOR-462
+        final Timing timing = new Timing();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            final InterProcessSemaphoreV2 s1 = new 
InterProcessSemaphoreV2(client, "/test", 1);
+            final InterProcessSemaphoreV2 s2 = new 
InterProcessSemaphoreV2(client, "/test", 1);
+            final InterProcessSemaphoreV2 s3 = new 
InterProcessSemaphoreV2(client, "/test", 1);
+            
+            final CountDownLatch debugWaitLatch = s2.debugWaitLatch = new 
CountDownLatch(1);
+            
+            // Acquire exclusive semaphore
+            Lease lease = s1.acquire(timing.forWaiting().seconds(), 
TimeUnit.SECONDS);
+            Assert.assertNotNull(lease);
+            
+            // Queue up another semaphore on the same path
+            Future<Object> handle = 
Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
+                
+                @Override
+                public Object call() throws Exception {
+                    s2.acquire();
+                    return null;
+                }
+            });
+
+            // Wait until second lease is created and the wait is started for 
it to become active
+            Assert.assertTrue(timing.awaitLatch(debugWaitLatch));
+            
+            // Interrupt the wait
+            handle.cancel(true);
+            
+            // Assert that the second lease is gone
+            timing.sleepABit();
+            
Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 1);
+            
+            // Assert that after closing the first (current) semaphore, we can 
acquire a new one
+            s1.returnLease(lease);
+            Assert.assertNotNull(s3.acquire(timing.forWaiting().seconds(), 
TimeUnit.SECONDS));
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
 }

Reply via email to