OOZIE-1923 ZKLocksService locks are not re-entrant like MemoryLocks
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/bb9a6dae Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/bb9a6dae Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/bb9a6dae Branch: refs/remotes/trunk Commit: bb9a6dae5f8e538afab5cbcd06f273fa6591b71c Parents: bdd4ef5 Author: Purshotam Shah <[email protected]> Authored: Thu Sep 4 13:11:49 2014 -0700 Committer: Purshotam Shah <[email protected]> Committed: Thu Sep 4 13:11:49 2014 -0700 ---------------------------------------------------------------------- .../apache/oozie/service/ZKLocksService.java | 85 ++++++++++------ .../oozie/service/TestZKLocksService.java | 100 ++++++++++++++++++- release-log.txt | 1 + 3 files changed, 154 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/bb9a6dae/core/src/main/java/org/apache/oozie/service/ZKLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java index 44b9378..36c00b9 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java @@ -18,40 +18,35 @@ package org.apache.oozie.service; -import java.io.IOException; -import java.util.concurrent.ScheduledExecutorService; +import java.util.HashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.curator.framework.recipes.locks.ChildReaper; -import org.apache.curator.framework.recipes.locks.Reaper; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; -import org.apache.curator.utils.ThreadUtils; import org.apache.oozie.ErrorCode; import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; import org.apache.oozie.lock.LockToken; import org.apache.oozie.util.XLog; import org.apache.oozie.util.ZKUtils; - +import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.curator.framework.recipes.locks.ChildReaper; +import org.apache.curator.framework.recipes.locks.Reaper; +import org.apache.curator.utils.ThreadUtils; import com.google.common.annotations.VisibleForTesting; /** * Service that provides distributed locks via ZooKeeper. Requires that a ZooKeeper ensemble is available. The locks will be * located under a ZNode named "locks" under the namespace (see {@link ZKUtils}). For example, with default settings, if the * resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo. - * <p> - * ChildReaper is used for deleting unused locks. Only one childreaper will be active in cluster. - * ZK Path /oozie.zookeeper.namespace/services/locksChildReaperLeaderPath is used for leader selection. */ - public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable { private ZKUtils zk; private static XLog LOG = XLog.getLog(ZKLocksService.class); public static final String LOCKS_NODE = "/locks"; - private final AtomicLong lockCount = new AtomicLong(); + + final private HashMap<String, InterProcessReadWriteLock> zkLocks = new HashMap<String, InterProcessReadWriteLock>(); private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath"; public static final int DEFAULT_REAPING_THRESHOLD = 300; // In sec @@ -76,7 +71,6 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr catch (Exception ex) { throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); } - lockCount.set(0); } /** @@ -108,10 +102,10 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr @Override public void instrument(Instrumentation instr) { // Similar to MemoryLocksService's instrumentation, though this is only the number of locks this Oozie server currently has - instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Long>() { + instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Integer>() { @Override - public Long getValue() { - return lockCount.get(); + public Integer getValue() { + return zkLocks.size(); } }); } @@ -126,9 +120,18 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr */ @Override public LockToken getReadLock(String resource, long wait) throws InterruptedException { - InterProcessReadWriteLock lock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); - InterProcessMutex readLock = lock.readLock(); - return acquireLock(wait, readLock); + InterProcessReadWriteLock lockEntry; + synchronized (zkLocks) { + if (zkLocks.containsKey(resource)) { + lockEntry = zkLocks.get(resource); + } + else { + lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); + zkLocks.put(resource, lockEntry); + } + } + InterProcessMutex readLock = lockEntry.readLock(); + return acquireLock(wait, readLock, resource); } /** @@ -141,20 +144,29 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr */ @Override public LockToken getWriteLock(String resource, long wait) throws InterruptedException { - InterProcessReadWriteLock lock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); - InterProcessMutex writeLock = lock.writeLock(); - return acquireLock(wait, writeLock); + InterProcessReadWriteLock lockEntry; + synchronized (zkLocks) { + if (zkLocks.containsKey(resource)) { + lockEntry = zkLocks.get(resource); + } + else { + lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); + zkLocks.put(resource, lockEntry); + } + } + InterProcessMutex writeLock = lockEntry.writeLock(); + return acquireLock(wait, writeLock, resource); } - private LockToken acquireLock(long wait, InterProcessMutex lock) { + private LockToken acquireLock(long wait, InterProcessMutex lock, String resource) { ZKLockToken token = null; try { if (wait == -1) { lock.acquire(); - token = new ZKLockToken(lock); + token = new ZKLockToken(lock, resource); } else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) { - token = new ZKLockToken(lock); + token = new ZKLockToken(lock, resource); } } catch (Exception ex) { @@ -168,10 +180,11 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr */ class ZKLockToken implements LockToken { private final InterProcessMutex lock; + private final String resource; - private ZKLockToken(InterProcessMutex lock) { + private ZKLockToken(InterProcessMutex lock, String resource) { this.lock = lock; - lockCount.incrementAndGet(); + this.resource = resource; } /** @@ -181,14 +194,28 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr public void release() { try { lock.release(); - lockCount.decrementAndGet(); + int val = lock.getParticipantNodes().size(); + //TODO this might break, when count is zero and before we remove lock, same thread may ask for same lock. + // Hashmap will return the lock, but eventually release will remove it from hashmap and a immediate getlock will + //create a new instance. Will fix this as part of OOZIE-1922 + if (val == 0) { + synchronized (zkLocks) { + zkLocks.remove(resource); + } + } } catch (Exception ex) { LOG.warn("Could not release lock: " + ex.getMessage(), ex); } + } } + @VisibleForTesting + public HashMap<String, InterProcessReadWriteLock> getLocks(){ + return zkLocks; + } + private static ScheduledExecutorService getExecutorService() { return ThreadUtils.newFixedThreadScheduledPool(Services.get().getConf().getInt(REAPING_THREADS, 2), "ZKLocksChildReaper"); http://git-wip-us.apache.org/repos/asf/oozie/blob/bb9a6dae/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java index 02f260a..02cc137 100644 --- a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java +++ b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java @@ -18,9 +18,13 @@ package org.apache.oozie.service; +import java.util.UUID; + import org.apache.oozie.lock.LockToken; -import org.apache.oozie.util.*; +import org.apache.oozie.service.ZKLocksService.ZKLockToken; import org.apache.oozie.test.ZKXTestCase; +import org.apache.oozie.util.XLog; +import org.apache.oozie.util.ZKUtils; import org.apache.zookeeper.data.Stat; public class TestZKLocksService extends ZKXTestCase { @@ -418,6 +422,97 @@ public class TestZKLocksService extends ZKXTestCase { assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim()); } + public void testLockRelease() throws ServiceException, InterruptedException { + final String path = UUID.randomUUID().toString(); + ZKLocksService zkls = new ZKLocksService(); + try { + zkls.init(Services.get()); + ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, 5000); + assertTrue(zkls.getLocks().containsKey(path)); + lock.release(); + assertFalse(zkls.getLocks().containsKey(path)); + } + finally { + zkls.destroy(); + } + } + + public void testReentrantMultipleCall() throws ServiceException, InterruptedException { + final String path = UUID.randomUUID().toString(); + ZKLocksService zkls = new ZKLocksService(); + try { + zkls.init(Services.get()); + ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, 5000); + lock = (ZKLockToken) zkls.getWriteLock(path, 5000); + lock = (ZKLockToken) zkls.getWriteLock(path, 5000); + assertTrue(zkls.getLocks().containsKey(path)); + lock.release(); + assertTrue(zkls.getLocks().containsKey(path)); + lock.release(); + assertTrue(zkls.getLocks().containsKey(path)); + lock.release(); + assertFalse(zkls.getLocks().containsKey(path)); + } + catch (Exception e) { + fail("Reentrant property, it should have acquired lock"); + } + finally { + zkls.destroy(); + } + } + + public void testReentrantMultipleThread() throws ServiceException, InterruptedException { + final String path = UUID.randomUUID().toString(); + final ZKLocksService zkls = new ZKLocksService(); + final LockToken[] locks = new LockToken[2]; + + try { + zkls.init(Services.get()); + Thread t1 = new Thread() { + public void run() { + try { + locks[0] = zkls.getWriteLock(path, 5000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + Thread t2 = new Thread() { + public void run() { + try { + locks[1] = zkls.getWriteLock(path, 5000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + t1.start(); + t2.start(); + t1.join(); + t2.join(); + + if (locks[0] != null) { + assertNull(locks[1]); + } + if (locks[1] != null) { + assertNull(locks[0]); + } + + if (locks[0] != null) { + locks[0].release(); + } + if (locks[1] != null) { + locks[1].release(); + } + assertTrue(zkls.getLocks().containsKey(path)); + } + finally { + zkls.destroy(); + } + } + public void testLockReaper() throws Exception { Services.get().getConf().set(ZKLocksService.REAPING_THRESHOLD, "1"); ZKLocksService zkls = new ZKLocksService(); @@ -436,5 +531,4 @@ public class TestZKLocksService extends ZKXTestCase { zkls.destroy(); } } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/bb9a6dae/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 922e4b3..bcd098c 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-1923 ZKLocksService locks are not re-entrant like MemoryLocks (puru) OOZIE-1843 Bulk update for coord last modified time for CoordMaterializeTriggerService (puru) OOZIE-1941 Bundle coordinator name can't be parameterized (puru) OOZIE-1966 Fix Headers in java code (shwethags via rkanter)
