[ 
https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15513993#comment-15513993
 ] 

Purshotam Shah commented on OOZIE-2501:
---------------------------------------

[~pbacsko] I have uploaded new patch addressing your comment.

> ZK reentrant lock doesn't work for few cases
> --------------------------------------------
>
>                 Key: OOZIE-2501
>                 URL: https://issues.apache.org/jira/browse/OOZIE-2501
>             Project: Oozie
>          Issue Type: Bug
>            Reporter: Purshotam Shah
>            Assignee: Purshotam Shah
>            Priority: Blocker
>             Fix For: 4.3.0
>
>         Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch, 
> OOZIE-2501-V7.patch
>
>
> We will have an issue when oozie trying to acquire a lock and at the same 
> time, some other thread is releasing the same lock .
> acquireLock will wait for 5 sec to acquire the lock. It will bypass the 
> synchronized block and get lockEntry from the hashmap.
> While it waiting for 5 sec to acquire the lock, other thread releases the 
> lock and may execute the release code which will remove  lockEntry from the 
> map.
> If some other command from same thread tries to acquire the lock, it will 
> create a new InterProcessReadWriteLock object and use that for acquiring the 
> lock. 
> Logic for lock acquiring.
> {code}
>  public LockToken getWriteLock(String resource, long wait) throws 
> InterruptedException {
>         InterProcessReadWriteLock lockEntry;
>         synchronized (zkLocks) {
>             if (zkLocks.containsKey(resource)) {
>                 lockEntry = zkLocks.get(resource);
>             }
>             else {
>                 lockEntry = new InterProcessReadWriteLock(zk.getClient(), 
> LOCKS_NODE + "/" + resource);
>                 zkLocks.put(resource, lockEntry);
>             }
>         }
>         InterProcessMutex writeLock = lockEntry.writeLock();
>         return acquireLock(wait, writeLock, resource);
>     }
> {code}
> Logic for lock releasing
> {code}
> public void release() {
>             try {
>                 lock.release();
>                 if (zkLocks.get(resource) == null) {
>                     return;
>                 }
>                 if (!isLockHeld()) {
>                     synchronized (zkLocks) {
>                         if (zkLocks.get(resource) != null) {
>                             if (!isLockHeld()) {
>                                 zkLocks.remove(resource);
>                             }
>                         }
>                     }
>                 }
>             }
>             catch (Exception ex) {
>                 LOG.warn("Could not release lock: " + ex.getMessage(), ex);
>             }
>         }
> {code}
> Curator code to acquire lock.
> {code}
> private boolean internalLock(long time, TimeUnit unit) throws Exception
>     {
>         /*
>            Note on concurrency: a given lockData instance
>            can be only acted on by a single thread so locking isn't necessary
>         */
>         Thread          currentThread = Thread.currentThread();
>         LockData        lockData = threadData.get(currentThread);
>         if ( lockData != null )
>         {
>             // re-entering
>             lockData.lockCount.incrementAndGet();
>             return true;
>         }
>         String lockPath = internals.attemptLock(time, unit, 
> getLockNodeBytes());
>         if ( lockPath != null )
>         {
>             LockData        newLockData = new LockData(currentThread, 
> lockPath);
>             threadData.put(currentThread, newLockData);
>             return true;
>         }
>         return false;
>     }
> {code}
> The approach we have followed is to use map with weakvalue. Once the lock is 
> unreachable. GC will remove it from the map. We don't have to explicitly 
> remove it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to