[
https://issues.apache.org/jira/browse/OOZIE-2501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Purshotam Shah updated OOZIE-2501:
----------------------------------
Description:
We will have an issue when oozie trying to acquire a lock and at the same time,
some other thread is releasing the same lock .
acquireLock will wait for 5 sec to acquire the lock. It will bypass the
synchronized block and get lockEntry from the hashmap.
While it waiting for 5 sec to acquire the lock, other thread releases the lock
and may execute the release code which will remove lockEntry from the map.
If some other command from same thread tries to acquire the lock, it will
create a new InterProcessReadWriteLock object and use that for acquiring the
lock.
Logic for lock acquiring.
{code}
public LockToken getWriteLock(String resource, long wait) throws
InterruptedException {
InterProcessReadWriteLock lockEntry;
synchronized (zkLocks) {
if (zkLocks.containsKey(resource)) {
lockEntry = zkLocks.get(resource);
}
else {
lockEntry = new InterProcessReadWriteLock(zk.getClient(),
LOCKS_NODE + "/" + resource);
zkLocks.put(resource, lockEntry);
}
}
InterProcessMutex writeLock = lockEntry.writeLock();
return acquireLock(wait, writeLock, resource);
}
{code}
Logic for lock releasing
{code}
public void release() {
try {
lock.release();
if (zkLocks.get(resource) == null) {
return;
}
if (!isLockHeld()) {
synchronized (zkLocks) {
if (zkLocks.get(resource) != null) {
if (!isLockHeld()) {
zkLocks.remove(resource);
}
}
}
}
}
catch (Exception ex) {
LOG.warn("Could not release lock: " + ex.getMessage(), ex);
}
}
{code}
Curator code to acquire lock.
{code}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
{code}
> ZK reentrant lock doesn't work for few cases
> --------------------------------------------
>
> Key: OOZIE-2501
> URL: https://issues.apache.org/jira/browse/OOZIE-2501
> Project: Oozie
> Issue Type: Bug
> Reporter: Purshotam Shah
>
> We will have an issue when oozie trying to acquire a lock and at the same
> time, some other thread is releasing the same lock .
> acquireLock will wait for 5 sec to acquire the lock. It will bypass the
> synchronized block and get lockEntry from the hashmap.
> While it waiting for 5 sec to acquire the lock, other thread releases the
> lock and may execute the release code which will remove lockEntry from the
> map.
> If some other command from same thread tries to acquire the lock, it will
> create a new InterProcessReadWriteLock object and use that for acquiring the
> lock.
> Logic for lock acquiring.
> {code}
> public LockToken getWriteLock(String resource, long wait) throws
> InterruptedException {
> InterProcessReadWriteLock lockEntry;
> synchronized (zkLocks) {
> if (zkLocks.containsKey(resource)) {
> lockEntry = zkLocks.get(resource);
> }
> else {
> lockEntry = new InterProcessReadWriteLock(zk.getClient(),
> LOCKS_NODE + "/" + resource);
> zkLocks.put(resource, lockEntry);
> }
> }
> InterProcessMutex writeLock = lockEntry.writeLock();
> return acquireLock(wait, writeLock, resource);
> }
> {code}
> Logic for lock releasing
> {code}
> public void release() {
> try {
> lock.release();
> if (zkLocks.get(resource) == null) {
> return;
> }
> if (!isLockHeld()) {
> synchronized (zkLocks) {
> if (zkLocks.get(resource) != null) {
> if (!isLockHeld()) {
> zkLocks.remove(resource);
> }
> }
> }
> }
> }
> catch (Exception ex) {
> LOG.warn("Could not release lock: " + ex.getMessage(), ex);
> }
> }
> {code}
> Curator code to acquire lock.
> {code}
> private boolean internalLock(long time, TimeUnit unit) throws Exception
> {
> /*
> Note on concurrency: a given lockData instance
> can be only acted on by a single thread so locking isn't necessary
> */
> Thread currentThread = Thread.currentThread();
> LockData lockData = threadData.get(currentThread);
> if ( lockData != null )
> {
> // re-entering
> lockData.lockCount.incrementAndGet();
> return true;
> }
> String lockPath = internals.attemptLock(time, unit,
> getLockNodeBytes());
> if ( lockPath != null )
> {
> LockData newLockData = new LockData(currentThread,
> lockPath);
> threadData.put(currentThread, newLockData);
> return true;
> }
> return false;
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)