[
https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15406818#comment-15406818
]
Purshotam Shah commented on OOZIE-2501:
---------------------------------------
Thanks [~pbacsko] I will go through your comments and if needed I will upload a
new patch.
> ZK reentrant lock doesn't work for few cases
> --------------------------------------------
>
> Key: OOZIE-2501
> URL: https://issues.apache.org/jira/browse/OOZIE-2501
> Project: Oozie
> Issue Type: Bug
> Reporter: Purshotam Shah
> Assignee: Purshotam Shah
> Fix For: 4.3.0
>
> Attachments: OOZIE-2501-V2.patch, OOZIE-2501-V4.patch
>
>
> We will have an issue when oozie trying to acquire a lock and at the same
> time, some other thread is releasing the same lock .
> acquireLock will wait for 5 sec to acquire the lock. It will bypass the
> synchronized block and get lockEntry from the hashmap.
> While it waiting for 5 sec to acquire the lock, other thread releases the
> lock and may execute the release code which will remove lockEntry from the
> map.
> If some other command from same thread tries to acquire the lock, it will
> create a new InterProcessReadWriteLock object and use that for acquiring the
> lock.
> Logic for lock acquiring.
> {code}
> public LockToken getWriteLock(String resource, long wait) throws
> InterruptedException {
> InterProcessReadWriteLock lockEntry;
> synchronized (zkLocks) {
> if (zkLocks.containsKey(resource)) {
> lockEntry = zkLocks.get(resource);
> }
> else {
> lockEntry = new InterProcessReadWriteLock(zk.getClient(),
> LOCKS_NODE + "/" + resource);
> zkLocks.put(resource, lockEntry);
> }
> }
> InterProcessMutex writeLock = lockEntry.writeLock();
> return acquireLock(wait, writeLock, resource);
> }
> {code}
> Logic for lock releasing
> {code}
> public void release() {
> try {
> lock.release();
> if (zkLocks.get(resource) == null) {
> return;
> }
> if (!isLockHeld()) {
> synchronized (zkLocks) {
> if (zkLocks.get(resource) != null) {
> if (!isLockHeld()) {
> zkLocks.remove(resource);
> }
> }
> }
> }
> }
> catch (Exception ex) {
> LOG.warn("Could not release lock: " + ex.getMessage(), ex);
> }
> }
> {code}
> Curator code to acquire lock.
> {code}
> private boolean internalLock(long time, TimeUnit unit) throws Exception
> {
> /*
> Note on concurrency: a given lockData instance
> can be only acted on by a single thread so locking isn't necessary
> */
> Thread currentThread = Thread.currentThread();
> LockData lockData = threadData.get(currentThread);
> if ( lockData != null )
> {
> // re-entering
> lockData.lockCount.incrementAndGet();
> return true;
> }
> String lockPath = internals.attemptLock(time, unit,
> getLockNodeBytes());
> if ( lockPath != null )
> {
> LockData newLockData = new LockData(currentThread,
> lockPath);
> threadData.put(currentThread, newLockData);
> return true;
> }
> return false;
> }
> {code}
> The approach we have followed is to use map with weakvalue. Once the lock is
> unreachable. GC will remove it from the map. We don't have to explicitly
> remove it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)