Repository: oozie
Updated Branches:
  refs/heads/master bdd4ef5c1 -> bb9a6dae5


OOZIE-1923 ZKLocksService locks are not re-entrant like MemoryLocks


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/bb9a6dae
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/bb9a6dae
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/bb9a6dae

Branch: refs/heads/master
Commit: bb9a6dae5f8e538afab5cbcd06f273fa6591b71c
Parents: bdd4ef5
Author: Purshotam Shah <[email protected]>
Authored: Thu Sep 4 13:11:49 2014 -0700
Committer: Purshotam Shah <[email protected]>
Committed: Thu Sep 4 13:11:49 2014 -0700

----------------------------------------------------------------------
 .../apache/oozie/service/ZKLocksService.java    |  85 ++++++++++------
 .../oozie/service/TestZKLocksService.java       | 100 ++++++++++++++++++-
 release-log.txt                                 |   1 +
 3 files changed, 154 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/bb9a6dae/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java 
b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
index 44b9378..36c00b9 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
@@ -18,40 +18,35 @@
 
 package org.apache.oozie.service;
 
-import java.io.IOException;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.HashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.curator.framework.recipes.locks.ChildReaper;
-import org.apache.curator.framework.recipes.locks.Reaper;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
-import org.apache.curator.utils.ThreadUtils;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.ZKUtils;
-
+import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.curator.framework.recipes.locks.ChildReaper;
+import org.apache.curator.framework.recipes.locks.Reaper;
+import org.apache.curator.utils.ThreadUtils;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Service that provides distributed locks via ZooKeeper.  Requires that a 
ZooKeeper ensemble is available.  The locks will be
  * located under a ZNode named "locks" under the namespace (see {@link 
ZKUtils}).  For example, with default settings, if the
  * resource we're locking is called "foo", then the ZNode backing the lock 
will be at /oozie/locks/foo.
- * <p>
- * ChildReaper is used for deleting unused locks. Only one childreaper will be 
active in cluster.
- * ZK Path /oozie.zookeeper.namespace/services/locksChildReaperLeaderPath is 
used for leader selection.
  */
-
 public class ZKLocksService extends MemoryLocksService implements Service, 
Instrumentable {
 
     private ZKUtils zk;
     private static XLog LOG = XLog.getLog(ZKLocksService.class);
     public static final String LOCKS_NODE = "/locks";
-    private final AtomicLong lockCount = new AtomicLong();
+
+    final private HashMap<String, InterProcessReadWriteLock> zkLocks = new 
HashMap<String, InterProcessReadWriteLock>();
 
     private static final String REAPING_LEADER_PATH = 
ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath";
     public static final int DEFAULT_REAPING_THRESHOLD = 300; // In sec
@@ -76,7 +71,6 @@ public class ZKLocksService extends MemoryLocksService 
implements Service, Instr
         catch (Exception ex) {
             throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
         }
-        lockCount.set(0);
     }
 
     /**
@@ -108,10 +102,10 @@ public class ZKLocksService extends MemoryLocksService 
implements Service, Instr
     @Override
     public void instrument(Instrumentation instr) {
         // Similar to MemoryLocksService's instrumentation, though this is 
only the number of locks this Oozie server currently has
-        instr.addVariable(INSTRUMENTATION_GROUP, "locks", new 
Instrumentation.Variable<Long>() {
+        instr.addVariable(INSTRUMENTATION_GROUP, "locks", new 
Instrumentation.Variable<Integer>() {
             @Override
-            public Long getValue() {
-                return lockCount.get();
+            public Integer getValue() {
+                return zkLocks.size();
             }
         });
     }
@@ -126,9 +120,18 @@ public class ZKLocksService extends MemoryLocksService 
implements Service, Instr
      */
     @Override
     public LockToken getReadLock(String resource, long wait) throws 
InterruptedException {
-        InterProcessReadWriteLock lock = new 
InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
-        InterProcessMutex readLock = lock.readLock();
-        return acquireLock(wait, readLock);
+        InterProcessReadWriteLock lockEntry;
+        synchronized (zkLocks) {
+            if (zkLocks.containsKey(resource)) {
+                lockEntry = zkLocks.get(resource);
+            }
+            else {
+                lockEntry = new InterProcessReadWriteLock(zk.getClient(), 
LOCKS_NODE + "/" + resource);
+                zkLocks.put(resource, lockEntry);
+            }
+        }
+        InterProcessMutex readLock = lockEntry.readLock();
+        return acquireLock(wait, readLock, resource);
     }
 
     /**
@@ -141,20 +144,29 @@ public class ZKLocksService extends MemoryLocksService 
implements Service, Instr
      */
     @Override
     public LockToken getWriteLock(String resource, long wait) throws 
InterruptedException {
-        InterProcessReadWriteLock lock = new 
InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
-        InterProcessMutex writeLock = lock.writeLock();
-        return acquireLock(wait, writeLock);
+        InterProcessReadWriteLock lockEntry;
+        synchronized (zkLocks) {
+            if (zkLocks.containsKey(resource)) {
+                lockEntry = zkLocks.get(resource);
+            }
+            else {
+                lockEntry = new InterProcessReadWriteLock(zk.getClient(), 
LOCKS_NODE + "/" + resource);
+                zkLocks.put(resource, lockEntry);
+            }
+        }
+        InterProcessMutex writeLock = lockEntry.writeLock();
+        return acquireLock(wait, writeLock, resource);
     }
 
-    private LockToken acquireLock(long wait, InterProcessMutex lock) {
+    private LockToken acquireLock(long wait, InterProcessMutex lock, String 
resource) {
         ZKLockToken token = null;
         try {
             if (wait == -1) {
                 lock.acquire();
-                token = new ZKLockToken(lock);
+                token = new ZKLockToken(lock, resource);
             }
             else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) {
-                token = new ZKLockToken(lock);
+                token = new ZKLockToken(lock, resource);
             }
         }
         catch (Exception ex) {
@@ -168,10 +180,11 @@ public class ZKLocksService extends MemoryLocksService 
implements Service, Instr
      */
     class ZKLockToken implements LockToken {
         private final InterProcessMutex lock;
+        private final String resource;
 
-        private ZKLockToken(InterProcessMutex lock) {
+        private ZKLockToken(InterProcessMutex lock, String resource) {
             this.lock = lock;
-            lockCount.incrementAndGet();
+            this.resource = resource;
         }
 
         /**
@@ -181,14 +194,28 @@ public class ZKLocksService extends MemoryLocksService 
implements Service, Instr
         public void release() {
             try {
                 lock.release();
-                lockCount.decrementAndGet();
+                int val = lock.getParticipantNodes().size();
+                //TODO this might break, when count is zero and before we 
remove lock, same thread may ask for same lock.
+                // Hashmap will return the lock, but eventually release will 
remove it from hashmap and a immediate getlock will
+                //create a new instance. Will fix this as part of OOZIE-1922
+                if (val == 0) {
+                    synchronized (zkLocks) {
+                        zkLocks.remove(resource);
+                    }
+                }
             }
             catch (Exception ex) {
                 LOG.warn("Could not release lock: " + ex.getMessage(), ex);
             }
+
         }
     }
 
+    @VisibleForTesting
+    public HashMap<String, InterProcessReadWriteLock> getLocks(){
+        return zkLocks;
+    }
+
     private static ScheduledExecutorService getExecutorService() {
         return 
ThreadUtils.newFixedThreadScheduledPool(Services.get().getConf().getInt(REAPING_THREADS,
 2),
                 "ZKLocksChildReaper");

http://git-wip-us.apache.org/repos/asf/oozie/blob/bb9a6dae/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java 
b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
index 02f260a..02cc137 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
@@ -18,9 +18,13 @@
 
 package org.apache.oozie.service;
 
+import java.util.UUID;
+
 import org.apache.oozie.lock.LockToken;
-import org.apache.oozie.util.*;
+import org.apache.oozie.service.ZKLocksService.ZKLockToken;
 import org.apache.oozie.test.ZKXTestCase;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.ZKUtils;
 import org.apache.zookeeper.data.Stat;
 
 public class TestZKLocksService extends ZKXTestCase {
@@ -418,6 +422,97 @@ public class TestZKLocksService extends ZKXTestCase {
         assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
     }
 
+    public void testLockRelease() throws ServiceException, 
InterruptedException {
+        final String path = UUID.randomUUID().toString();
+        ZKLocksService zkls = new ZKLocksService();
+        try {
+            zkls.init(Services.get());
+            ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, 5000);
+            assertTrue(zkls.getLocks().containsKey(path));
+            lock.release();
+            assertFalse(zkls.getLocks().containsKey(path));
+        }
+        finally {
+            zkls.destroy();
+        }
+    }
+
+    public void testReentrantMultipleCall() throws ServiceException, 
InterruptedException {
+        final String path = UUID.randomUUID().toString();
+        ZKLocksService zkls = new ZKLocksService();
+        try {
+            zkls.init(Services.get());
+            ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, 5000);
+            lock = (ZKLockToken) zkls.getWriteLock(path, 5000);
+            lock = (ZKLockToken) zkls.getWriteLock(path, 5000);
+            assertTrue(zkls.getLocks().containsKey(path));
+            lock.release();
+            assertTrue(zkls.getLocks().containsKey(path));
+            lock.release();
+            assertTrue(zkls.getLocks().containsKey(path));
+            lock.release();
+            assertFalse(zkls.getLocks().containsKey(path));
+        }
+        catch (Exception e) {
+            fail("Reentrant property, it should have acquired lock");
+        }
+        finally {
+            zkls.destroy();
+        }
+    }
+
+    public void testReentrantMultipleThread() throws ServiceException, 
InterruptedException {
+        final String path = UUID.randomUUID().toString();
+        final ZKLocksService zkls = new ZKLocksService();
+        final LockToken[] locks = new LockToken[2];
+
+        try {
+            zkls.init(Services.get());
+            Thread t1 = new Thread() {
+                public void run() {
+                    try {
+                        locks[0] = zkls.getWriteLock(path, 5000);
+                    }
+                    catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            Thread t2 = new Thread() {
+                public void run() {
+                    try {
+                        locks[1] = zkls.getWriteLock(path, 5000);
+                    }
+                    catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            t1.start();
+            t2.start();
+            t1.join();
+            t2.join();
+
+            if (locks[0] != null) {
+                assertNull(locks[1]);
+            }
+            if (locks[1] != null) {
+                assertNull(locks[0]);
+            }
+
+            if (locks[0] != null) {
+                locks[0].release();
+            }
+            if (locks[1] != null) {
+                locks[1].release();
+            }
+            assertTrue(zkls.getLocks().containsKey(path));
+        }
+        finally {
+            zkls.destroy();
+        }
+    }
+
     public void testLockReaper() throws Exception {
         Services.get().getConf().set(ZKLocksService.REAPING_THRESHOLD, "1");
         ZKLocksService zkls = new ZKLocksService();
@@ -436,5 +531,4 @@ public class TestZKLocksService extends ZKXTestCase {
             zkls.destroy();
         }
     }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/bb9a6dae/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 922e4b3..bcd098c 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-1923 ZKLocksService locks are not re-entrant like MemoryLocks (puru)
 OOZIE-1843 Bulk update for coord last modified time for 
CoordMaterializeTriggerService (puru)
 OOZIE-1941 Bundle coordinator name can't be parameterized (puru)
 OOZIE-1966 Fix Headers in java code (shwethags via rkanter)

Reply via email to