[ 
https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15226728#comment-15226728
 ] 

Purshotam Shah commented on OOZIE-2501:
---------------------------------------

We already have an existing test case to test this behavior.  it's very 
difficult to have reproducible test case for this kind of scenario.

{code:tile=TestZKLocksService.java}
public void testReentrantMultipleThread() throws ServiceException, 
InterruptedException {
        final String path = UUID.randomUUID().toString();
        final ZKLocksService zkls = new ZKLocksService();
        zkls.init(Services.get());
        try {
            ThreadLock t1 = new ThreadLock(zkls, path);
            ThreadLock t2 = new ThreadLock(zkls, path);
            t1.start();
            t1.join();
            assertFalse(zkls.getLocks().containsKey(path));
            t2.start();
            t2.join();
            assertFalse(zkls.getLocks().containsKey(path));
        }
        finally {
            zkls.destroy();
        }
    }
{code}

> ZK reentrant lock doesn't work for few cases
> --------------------------------------------
>
>                 Key: OOZIE-2501
>                 URL: https://issues.apache.org/jira/browse/OOZIE-2501
>             Project: Oozie
>          Issue Type: Bug
>            Reporter: Purshotam Shah
>            Assignee: Purshotam Shah
>         Attachments: OOZIE-2501-V1.patch
>
>
> We will have an issue when oozie trying to acquire a lock and at the same 
> time, some other thread is releasing the same lock .
> acquireLock will wait for 5 sec to acquire the lock. It will bypass the 
> synchronized block and get lockEntry from the hashmap.
> While it waiting for 5 sec to acquire the lock, other thread releases the 
> lock and may execute the release code which will remove  lockEntry from the 
> map.
> If some other command from same thread tries to acquire the lock, it will 
> create a new InterProcessReadWriteLock object and use that for acquiring the 
> lock. 
> Logic for lock acquiring.
> {code}
>  public LockToken getWriteLock(String resource, long wait) throws 
> InterruptedException {
>         InterProcessReadWriteLock lockEntry;
>         synchronized (zkLocks) {
>             if (zkLocks.containsKey(resource)) {
>                 lockEntry = zkLocks.get(resource);
>             }
>             else {
>                 lockEntry = new InterProcessReadWriteLock(zk.getClient(), 
> LOCKS_NODE + "/" + resource);
>                 zkLocks.put(resource, lockEntry);
>             }
>         }
>         InterProcessMutex writeLock = lockEntry.writeLock();
>         return acquireLock(wait, writeLock, resource);
>     }
> {code}
> Logic for lock releasing
> {code}
> public void release() {
>             try {
>                 lock.release();
>                 if (zkLocks.get(resource) == null) {
>                     return;
>                 }
>                 if (!isLockHeld()) {
>                     synchronized (zkLocks) {
>                         if (zkLocks.get(resource) != null) {
>                             if (!isLockHeld()) {
>                                 zkLocks.remove(resource);
>                             }
>                         }
>                     }
>                 }
>             }
>             catch (Exception ex) {
>                 LOG.warn("Could not release lock: " + ex.getMessage(), ex);
>             }
>         }
> {code}
> Curator code to acquire lock.
> {code}
> private boolean internalLock(long time, TimeUnit unit) throws Exception
>     {
>         /*
>            Note on concurrency: a given lockData instance
>            can be only acted on by a single thread so locking isn't necessary
>         */
>         Thread          currentThread = Thread.currentThread();
>         LockData        lockData = threadData.get(currentThread);
>         if ( lockData != null )
>         {
>             // re-entering
>             lockData.lockCount.incrementAndGet();
>             return true;
>         }
>         String lockPath = internals.attemptLock(time, unit, 
> getLockNodeBytes());
>         if ( lockPath != null )
>         {
>             LockData        newLockData = new LockData(currentThread, 
> lockPath);
>             threadData.put(currentThread, newLockData);
>             return true;
>         }
>         return false;
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to