Repository: oozie Updated Branches: refs/heads/branch-4.1 2f5b4235e -> 000e007d7
OOZIE-1923 ZKLocksService locks are not re-entrant like MemoryLocks Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/000e007d Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/000e007d Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/000e007d Branch: refs/heads/branch-4.1 Commit: 000e007d705c4889401385a8f9bc26e1d52d7cea Parents: 2f5b423 Author: Purshotam Shah <[email protected]> Authored: Thu Sep 4 13:17:01 2014 -0700 Committer: Purshotam Shah <[email protected]> Committed: Thu Sep 4 13:17:01 2014 -0700 ---------------------------------------------------------------------- .../apache/oozie/service/ZKLocksService.java | 85 ++++++++++------ .../oozie/service/TestZKLocksService.java | 100 ++++++++++++++++++- release-log.txt | 1 + 3 files changed, 154 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/000e007d/core/src/main/java/org/apache/oozie/service/ZKLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java index 3c642db..8478f8a 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java @@ -17,40 +17,35 @@ */ package org.apache.oozie.service; -import java.io.IOException; -import java.util.concurrent.ScheduledExecutorService; +import java.util.HashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.curator.framework.recipes.locks.ChildReaper; -import org.apache.curator.framework.recipes.locks.Reaper; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; -import org.apache.curator.utils.ThreadUtils; import org.apache.oozie.ErrorCode; import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; import org.apache.oozie.lock.LockToken; import org.apache.oozie.util.XLog; import org.apache.oozie.util.ZKUtils; - +import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.curator.framework.recipes.locks.ChildReaper; +import org.apache.curator.framework.recipes.locks.Reaper; +import org.apache.curator.utils.ThreadUtils; import com.google.common.annotations.VisibleForTesting; /** * Service that provides distributed locks via ZooKeeper. Requires that a ZooKeeper ensemble is available. The locks will be * located under a ZNode named "locks" under the namespace (see {@link ZKUtils}). For example, with default settings, if the * resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo. - * <p> - * ChildReaper is used for deleting unused locks. Only one childreaper will be active in cluster. - * ZK Path /oozie.zookeeper.namespace/services/locksChildReaperLeaderPath is used for leader selection. */ - public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable { private ZKUtils zk; private static XLog LOG = XLog.getLog(ZKLocksService.class); public static final String LOCKS_NODE = "/locks"; - private final AtomicLong lockCount = new AtomicLong(); + + final private HashMap<String, InterProcessReadWriteLock> zkLocks = new HashMap<String, InterProcessReadWriteLock>(); private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath"; public static final int DEFAULT_REAPING_THRESHOLD = 300; // In sec @@ -75,7 +70,6 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr catch (Exception ex) { throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); } - lockCount.set(0); } /** @@ -107,10 +101,10 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr @Override public void instrument(Instrumentation instr) { // Similar to MemoryLocksService's instrumentation, though this is only the number of locks this Oozie server currently has - instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Long>() { + instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Integer>() { @Override - public Long getValue() { - return lockCount.get(); + public Integer getValue() { + return zkLocks.size(); } }); } @@ -125,9 +119,18 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr */ @Override public LockToken getReadLock(String resource, long wait) throws InterruptedException { - InterProcessReadWriteLock lock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); - InterProcessMutex readLock = lock.readLock(); - return acquireLock(wait, readLock); + InterProcessReadWriteLock lockEntry; + synchronized (zkLocks) { + if (zkLocks.containsKey(resource)) { + lockEntry = zkLocks.get(resource); + } + else { + lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); + zkLocks.put(resource, lockEntry); + } + } + InterProcessMutex readLock = lockEntry.readLock(); + return acquireLock(wait, readLock, resource); } /** @@ -140,20 +143,29 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr */ @Override public LockToken getWriteLock(String resource, long wait) throws InterruptedException { - InterProcessReadWriteLock lock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); - InterProcessMutex writeLock = lock.writeLock(); - return acquireLock(wait, writeLock); + InterProcessReadWriteLock lockEntry; + synchronized (zkLocks) { + if (zkLocks.containsKey(resource)) { + lockEntry = zkLocks.get(resource); + } + else { + lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); + zkLocks.put(resource, lockEntry); + } + } + InterProcessMutex writeLock = lockEntry.writeLock(); + return acquireLock(wait, writeLock, resource); } - private LockToken acquireLock(long wait, InterProcessMutex lock) { + private LockToken acquireLock(long wait, InterProcessMutex lock, String resource) { ZKLockToken token = null; try { if (wait == -1) { lock.acquire(); - token = new ZKLockToken(lock); + token = new ZKLockToken(lock, resource); } else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) { - token = new ZKLockToken(lock); + token = new ZKLockToken(lock, resource); } } catch (Exception ex) { @@ -167,10 +179,11 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr */ class ZKLockToken implements LockToken { private final InterProcessMutex lock; + private final String resource; - private ZKLockToken(InterProcessMutex lock) { + private ZKLockToken(InterProcessMutex lock, String resource) { this.lock = lock; - lockCount.incrementAndGet(); + this.resource = resource; } /** @@ -180,14 +193,28 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr public void release() { try { lock.release(); - lockCount.decrementAndGet(); + int val = lock.getParticipantNodes().size(); + //TODO this might break, when count is zero and before we remove lock, same thread may ask for same lock. + // Hashmap will return the lock, but eventually release will remove it from hashmap and a immediate getlock will + //create a new instance. Will fix this as part of OOZIE-1922 + if (val == 0) { + synchronized (zkLocks) { + zkLocks.remove(resource); + } + } } catch (Exception ex) { LOG.warn("Could not release lock: " + ex.getMessage(), ex); } + } } + @VisibleForTesting + public HashMap<String, InterProcessReadWriteLock> getLocks(){ + return zkLocks; + } + private static ScheduledExecutorService getExecutorService() { return ThreadUtils.newFixedThreadScheduledPool(Services.get().getConf().getInt(REAPING_THREADS, 2), "ZKLocksChildReaper"); http://git-wip-us.apache.org/repos/asf/oozie/blob/000e007d/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java index 5ce8ecb..370ddf1 100644 --- a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java +++ b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java @@ -17,9 +17,13 @@ */ package org.apache.oozie.service; +import java.util.UUID; + import org.apache.oozie.lock.LockToken; -import org.apache.oozie.util.*; +import org.apache.oozie.service.ZKLocksService.ZKLockToken; import org.apache.oozie.test.ZKXTestCase; +import org.apache.oozie.util.XLog; +import org.apache.oozie.util.ZKUtils; import org.apache.zookeeper.data.Stat; public class TestZKLocksService extends ZKXTestCase { @@ -417,6 +421,97 @@ public class TestZKLocksService extends ZKXTestCase { assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim()); } + public void testLockRelease() throws ServiceException, InterruptedException { + final String path = UUID.randomUUID().toString(); + ZKLocksService zkls = new ZKLocksService(); + try { + zkls.init(Services.get()); + ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, 5000); + assertTrue(zkls.getLocks().containsKey(path)); + lock.release(); + assertFalse(zkls.getLocks().containsKey(path)); + } + finally { + zkls.destroy(); + } + } + + public void testReentrantMultipleCall() throws ServiceException, InterruptedException { + final String path = UUID.randomUUID().toString(); + ZKLocksService zkls = new ZKLocksService(); + try { + zkls.init(Services.get()); + ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, 5000); + lock = (ZKLockToken) zkls.getWriteLock(path, 5000); + lock = (ZKLockToken) zkls.getWriteLock(path, 5000); + assertTrue(zkls.getLocks().containsKey(path)); + lock.release(); + assertTrue(zkls.getLocks().containsKey(path)); + lock.release(); + assertTrue(zkls.getLocks().containsKey(path)); + lock.release(); + assertFalse(zkls.getLocks().containsKey(path)); + } + catch (Exception e) { + fail("Reentrant property, it should have acquired lock"); + } + finally { + zkls.destroy(); + } + } + + public void testReentrantMultipleThread() throws ServiceException, InterruptedException { + final String path = UUID.randomUUID().toString(); + final ZKLocksService zkls = new ZKLocksService(); + final LockToken[] locks = new LockToken[2]; + + try { + zkls.init(Services.get()); + Thread t1 = new Thread() { + public void run() { + try { + locks[0] = zkls.getWriteLock(path, 5000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + Thread t2 = new Thread() { + public void run() { + try { + locks[1] = zkls.getWriteLock(path, 5000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + t1.start(); + t2.start(); + t1.join(); + t2.join(); + + if (locks[0] != null) { + assertNull(locks[1]); + } + if (locks[1] != null) { + assertNull(locks[0]); + } + + if (locks[0] != null) { + locks[0].release(); + } + if (locks[1] != null) { + locks[1].release(); + } + assertTrue(zkls.getLocks().containsKey(path)); + } + finally { + zkls.destroy(); + } + } + public void testLockReaper() throws Exception { Services.get().getConf().set(ZKLocksService.REAPING_THRESHOLD, "1"); ZKLocksService zkls = new ZKLocksService(); @@ -435,5 +530,4 @@ public class TestZKLocksService extends ZKXTestCase { zkls.destroy(); } } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/000e007d/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index dd6e3b1..85fdef4 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (4.1 - unreleased) +OOZIE-1923 ZKLocksService locks are not re-entrant like MemoryLocks (puru) OOZIE-1945 NPE in JaveActionExecutor#check() (sree2k via rkanter) OOZIE-1984 SLACalculator in HA mode performs duplicate operations on records with completed jobs (mona) OOZIE-1958 address duplication of env variables in oozie.launcher.yarn.app.mapreduce.am.env when running with uber mode (ryota)
