[ 
https://issues.apache.org/jira/browse/OOZIE-3717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenhaodan updated OOZIE-3717:
------------------------------
    Description: 
when fork actions parallel submit will add ForkedActionStartXCommand and 

RecoveryService will check pending action may add ActionStartXCommand, if 
ForkedActionStartXCommand enqueue and there is a ActionStartXCommand(the same 
action) in queue, it would be lose. The thread parallel submit actions block at 
CallableQueueService.blockingWait() wait for ForkedActionStartXCommand  to 
finish, but ForkedActionStartXCommand had lost and cause deadlock.
{code:java}
            Thread 1                      Thread 2
  (ForkedActionStartXCommand)      (ActionStartXCommand)
+----------------------------+          +---------+
| removeFromUniqueCallables  |          |  .....  |
+----------------------------+          +---------+
|           ......           |          |  queue  |
+----------------------------+          +---------+
|           queue            |       enqueue successed, in uniqueCallables
+----------------------------+ 
| wrapper.filterDuplicates() |
+----------------------------+

Thread 1 and Thread 2 execute CallableWrapper's execute function order :
  1. Thread 1 execute removeFromUniqueCallables; 
  2. Thread 2 execute queue add ActionStartXCommand into queue and add to 
uniqueCallables;
  3. Thread 1 execute queue add ForkedActionStartXCommand into queue, but 
filterDuplicates() function found a same name XCommand in uniqueCallables, so 
skip add to queue;

Becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, 
Thread 2 add ActionStartXCommand enqueue before Thread 1, so 
ForkedActionStartXCommand would be lost(never execute), and the thread that 
fork actions parallel submit block at CallableQueueService.blockingWait(). 
{code}
 
*CallableWrapper's code*
{code:java}
public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> 
implements Runnable, Callable<E> {
    private Instrumentation.Cron cron;

    public void run() {
        XCallable<?> callable = null;
        try {
            removeFromUniqueCallables();
            if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
                log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with 
[{1}]ms delay", getElement().getType(),
                        SAFE_MODE_DELAY);
                setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
                queue(this, true);
                return;
            }
            callable = getElement();
            if (callableBegin(callable)) {
                cron.stop();
                addInQueueCron(cron);
                XLog log = XLog.getLog(getClass());
                log.trace("executing callable [{0}]", callable.getName());

                try {
                    //FutureTask.run() will invoke cllable.call()
                    super.run();
                    incrCounter(INSTR_EXECUTED_COUNTER, 1);
                    log.trace("executed callable [{0}]", callable.getName());
                }
                catch (Exception ex) {
                    incrCounter(INSTR_FAILED_COUNTER, 1);
                    log.warn("exception callable [{0}], {1}", 
callable.getName(), ex.getMessage(), ex);
                }
            }
            else {
                log.warn("max concurrency for callable [{0}] exceeded, 
requeueing with [{1}]ms delay", callable
                        .getType(), CONCURRENCY_DELAY);
                setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
                queue(this, true);
                incrCounter(callable.getType() + "#exceeded.concurrency", 1);
            }
        }
        catch (Throwable t) {
            incrCounter(INSTR_FAILED_COUNTER, 1);
            log.warn("exception callable [{0}], {1}", callable == null ? "N/A" 
: callable.getName(),
                    t.getMessage(), t);
        }
        finally {
            if (callable != null) {
                callableEnd(callable);
            }
        }
    }
}
 {code}
 

 

  was:
Fork actions parallel submit, will add ForkedActionStartXCommand and 

RecoveryService will check pending action may add ActionStartXCommand, if 
ForkedActionStartXCommand enqueue and there is a ActionStartXCommand(the same 
action) in queue, it would be lose. The thread parallel submit actions block at 
CallableQueueService.blockingWait() wait for ForkedActionStartXCommand  to 
finish, but ForkedActionStartXCommand had lost and cause deadlock.
{code:java}
            Thread 1                      Thread 2
  (ForkedActionStartXCommand)      (ActionStartXCommand)
+----------------------------+          +---------+
| removeFromUniqueCallables  |          |  .....  |
+----------------------------+          +---------+
|           ......           |          |  queue  |
+----------------------------+          +---------+
|           queue            |       enqueue successed, in uniqueCallables
+----------------------------+ 
| wrapper.filterDuplicates() |
+----------------------------+

Thread 1 and Thread 2 execute CallableWrapper's execute function order :
  1. Thread 1 execute removeFromUniqueCallables; 
  2. Thread 2 execute queue add ActionStartXCommand into queue and add to 
uniqueCallables;
  3. Thread 1 execute queue add ForkedActionStartXCommand into queue, but 
filterDuplicates() function found a same name XCommand in uniqueCallables, so 
skip add to queue;

Becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, 
Thread 2 add ActionStartXCommand enqueue before Thread 1, so 
ForkedActionStartXCommand would be lost(never execute), and the thread that 
fork actions parallel submit block at CallableQueueService.blockingWait(). 
{code}
 
*CallableWrapper's code*
{code:java}
public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> 
implements Runnable, Callable<E> {
    private Instrumentation.Cron cron;

    public void run() {
        XCallable<?> callable = null;
        try {
            removeFromUniqueCallables();
            if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
                log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with 
[{1}]ms delay", getElement().getType(),
                        SAFE_MODE_DELAY);
                setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
                queue(this, true);
                return;
            }
            callable = getElement();
            if (callableBegin(callable)) {
                cron.stop();
                addInQueueCron(cron);
                XLog log = XLog.getLog(getClass());
                log.trace("executing callable [{0}]", callable.getName());

                try {
                    //FutureTask.run() will invoke cllable.call()
                    super.run();
                    incrCounter(INSTR_EXECUTED_COUNTER, 1);
                    log.trace("executed callable [{0}]", callable.getName());
                }
                catch (Exception ex) {
                    incrCounter(INSTR_FAILED_COUNTER, 1);
                    log.warn("exception callable [{0}], {1}", 
callable.getName(), ex.getMessage(), ex);
                }
            }
            else {
                log.warn("max concurrency for callable [{0}] exceeded, 
requeueing with [{1}]ms delay", callable
                        .getType(), CONCURRENCY_DELAY);
                setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
                queue(this, true);
                incrCounter(callable.getType() + "#exceeded.concurrency", 1);
            }
        }
        catch (Throwable t) {
            incrCounter(INSTR_FAILED_COUNTER, 1);
            log.warn("exception callable [{0}], {1}", callable == null ? "N/A" 
: callable.getName(),
                    t.getMessage(), t);
        }
        finally {
            if (callable != null) {
                callableEnd(callable);
            }
        }
    }
}
 {code}
 

 


> When fork actions parallel submit, becasue ForkedActionStartXCommand and 
> ActionStartXCommand has the same name, so ForkedActionStartXCommand would be 
> lost, and cause deadlock
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: OOZIE-3717
>                 URL: https://issues.apache.org/jira/browse/OOZIE-3717
>             Project: Oozie
>          Issue Type: Bug
>          Components: action
>    Affects Versions: 5.2.1
>            Reporter: chenhaodan
>            Assignee: chenhaodan
>            Priority: Major
>             Fix For: trunk
>
>         Attachments: OOZIE-3717-001.patch
>
>
> when fork actions parallel submit will add ForkedActionStartXCommand and 
> RecoveryService will check pending action may add ActionStartXCommand, if 
> ForkedActionStartXCommand enqueue and there is a ActionStartXCommand(the same 
> action) in queue, it would be lose. The thread parallel submit actions block 
> at CallableQueueService.blockingWait() wait for ForkedActionStartXCommand  to 
> finish, but ForkedActionStartXCommand had lost and cause deadlock.
> {code:java}
>             Thread 1                      Thread 2
>   (ForkedActionStartXCommand)      (ActionStartXCommand)
> +----------------------------+          +---------+
> | removeFromUniqueCallables  |          |  .....  |
> +----------------------------+          +---------+
> |           ......           |          |  queue  |
> +----------------------------+          +---------+
> |           queue            |       enqueue successed, in uniqueCallables
> +----------------------------+ 
> | wrapper.filterDuplicates() |
> +----------------------------+
> Thread 1 and Thread 2 execute CallableWrapper's execute function order :
>   1. Thread 1 execute removeFromUniqueCallables; 
>   2. Thread 2 execute queue add ActionStartXCommand into queue and add to 
> uniqueCallables;
>   3. Thread 1 execute queue add ForkedActionStartXCommand into queue, but 
> filterDuplicates() function found a same name XCommand in uniqueCallables, so 
> skip add to queue;
> Becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, 
> Thread 2 add ActionStartXCommand enqueue before Thread 1, so 
> ForkedActionStartXCommand would be lost(never execute), and the thread that 
> fork actions parallel submit block at CallableQueueService.blockingWait(). 
> {code}
>  
> *CallableWrapper's code*
> {code:java}
> public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> 
> implements Runnable, Callable<E> {
>     private Instrumentation.Cron cron;
>     public void run() {
>         XCallable<?> callable = null;
>         try {
>             removeFromUniqueCallables();
>             if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
>                 log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with 
> [{1}]ms delay", getElement().getType(),
>                         SAFE_MODE_DELAY);
>                 setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
>                 queue(this, true);
>                 return;
>             }
>             callable = getElement();
>             if (callableBegin(callable)) {
>                 cron.stop();
>                 addInQueueCron(cron);
>                 XLog log = XLog.getLog(getClass());
>                 log.trace("executing callable [{0}]", callable.getName());
>                 try {
>                     //FutureTask.run() will invoke cllable.call()
>                     super.run();
>                     incrCounter(INSTR_EXECUTED_COUNTER, 1);
>                     log.trace("executed callable [{0}]", callable.getName());
>                 }
>                 catch (Exception ex) {
>                     incrCounter(INSTR_FAILED_COUNTER, 1);
>                     log.warn("exception callable [{0}], {1}", 
> callable.getName(), ex.getMessage(), ex);
>                 }
>             }
>             else {
>                 log.warn("max concurrency for callable [{0}] exceeded, 
> requeueing with [{1}]ms delay", callable
>                         .getType(), CONCURRENCY_DELAY);
>                 setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
>                 queue(this, true);
>                 incrCounter(callable.getType() + "#exceeded.concurrency", 1);
>             }
>         }
>         catch (Throwable t) {
>             incrCounter(INSTR_FAILED_COUNTER, 1);
>             log.warn("exception callable [{0}], {1}", callable == null ? 
> "N/A" : callable.getName(),
>                     t.getMessage(), t);
>         }
>         finally {
>             if (callable != null) {
>                 callableEnd(callable);
>             }
>         }
>     }
> }
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to