[
https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15226728#comment-15226728
]
Purshotam Shah commented on OOZIE-2501:
---------------------------------------
We already have an existing test case to test this behavior. it's very
difficult to have reproducible test case for this kind of scenario.
{code:tile=TestZKLocksService.java}
public void testReentrantMultipleThread() throws ServiceException,
InterruptedException {
final String path = UUID.randomUUID().toString();
final ZKLocksService zkls = new ZKLocksService();
zkls.init(Services.get());
try {
ThreadLock t1 = new ThreadLock(zkls, path);
ThreadLock t2 = new ThreadLock(zkls, path);
t1.start();
t1.join();
assertFalse(zkls.getLocks().containsKey(path));
t2.start();
t2.join();
assertFalse(zkls.getLocks().containsKey(path));
}
finally {
zkls.destroy();
}
}
{code}
> ZK reentrant lock doesn't work for few cases
> --------------------------------------------
>
> Key: OOZIE-2501
> URL: https://issues.apache.org/jira/browse/OOZIE-2501
> Project: Oozie
> Issue Type: Bug
> Reporter: Purshotam Shah
> Assignee: Purshotam Shah
> Attachments: OOZIE-2501-V1.patch
>
>
> We will have an issue when oozie trying to acquire a lock and at the same
> time, some other thread is releasing the same lock .
> acquireLock will wait for 5 sec to acquire the lock. It will bypass the
> synchronized block and get lockEntry from the hashmap.
> While it waiting for 5 sec to acquire the lock, other thread releases the
> lock and may execute the release code which will remove lockEntry from the
> map.
> If some other command from same thread tries to acquire the lock, it will
> create a new InterProcessReadWriteLock object and use that for acquiring the
> lock.
> Logic for lock acquiring.
> {code}
> public LockToken getWriteLock(String resource, long wait) throws
> InterruptedException {
> InterProcessReadWriteLock lockEntry;
> synchronized (zkLocks) {
> if (zkLocks.containsKey(resource)) {
> lockEntry = zkLocks.get(resource);
> }
> else {
> lockEntry = new InterProcessReadWriteLock(zk.getClient(),
> LOCKS_NODE + "/" + resource);
> zkLocks.put(resource, lockEntry);
> }
> }
> InterProcessMutex writeLock = lockEntry.writeLock();
> return acquireLock(wait, writeLock, resource);
> }
> {code}
> Logic for lock releasing
> {code}
> public void release() {
> try {
> lock.release();
> if (zkLocks.get(resource) == null) {
> return;
> }
> if (!isLockHeld()) {
> synchronized (zkLocks) {
> if (zkLocks.get(resource) != null) {
> if (!isLockHeld()) {
> zkLocks.remove(resource);
> }
> }
> }
> }
> }
> catch (Exception ex) {
> LOG.warn("Could not release lock: " + ex.getMessage(), ex);
> }
> }
> {code}
> Curator code to acquire lock.
> {code}
> private boolean internalLock(long time, TimeUnit unit) throws Exception
> {
> /*
> Note on concurrency: a given lockData instance
> can be only acted on by a single thread so locking isn't necessary
> */
> Thread currentThread = Thread.currentThread();
> LockData lockData = threadData.get(currentThread);
> if ( lockData != null )
> {
> // re-entering
> lockData.lockCount.incrementAndGet();
> return true;
> }
> String lockPath = internals.attemptLock(time, unit,
> getLockNodeBytes());
> if ( lockPath != null )
> {
> LockData newLockData = new LockData(currentThread,
> lockPath);
> threadData.put(currentThread, newLockData);
> return true;
> }
> return false;
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)