[ 
https://issues.apache.org/jira/browse/OOZIE-3717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751273#comment-17751273
 ] 

Hadoop QA commented on OOZIE-3717:
----------------------------------

PreCommit-OOZIE-Build started


> When fork actions parallel submit, becasue ForkedActionStartXCommand and 
> ActionStartXCommand has the same name, so ForkedActionStartXCommand would be 
> lost, and cause deadlock
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: OOZIE-3717
>                 URL: https://issues.apache.org/jira/browse/OOZIE-3717
>             Project: Oozie
>          Issue Type: Bug
>          Components: action
>    Affects Versions: 5.2.1
>            Reporter: chenhaodan
>            Assignee: chenhaodan
>            Priority: Major
>         Attachments: OOZIE-3717-001.patch, OOZIE-3717-002.patch, 
> OOZIE-3717-003.patch
>
>
> when fork actions parallel submit will add ForkedActionStartXCommand and 
> RecoveryService will check pending action may add ActionStartXCommand, if 
> ForkedActionStartXCommand enqueue and there is a ActionStartXCommand(the same 
> action) in queue, it would be lose. The thread parallel submit actions block 
> at CallableQueueService.blockingWait() wait for ForkedActionStartXCommand  to 
> finish, but ForkedActionStartXCommand had lost and cause deadlock.
> {code:java}
>             Thread 1                      Thread 2
>   (ForkedActionStartXCommand)      (ActionStartXCommand)
> +----------------------------+          +---------+
> | removeFromUniqueCallables  |          |  .....  |
> +----------------------------+          +---------+
> |           ......           |          |  queue  |
> +----------------------------+          +---------+
> |           queue            |       enqueue successed, in uniqueCallables
> +----------------------------+ 
> | wrapper.filterDuplicates() |
> +----------------------------+
> Thread 1 and Thread 2 execute CallableWrapper's execute function order like:
>   1. Thread 1 execute removeFromUniqueCallables; 
>   2. Thread 2 execute queue add ActionStartXCommand into queue and add to 
> uniqueCallables;
>   3. Thread 1 execute queue add ForkedActionStartXCommand into queue, but 
> filterDuplicates() function found a same name XCommand in uniqueCallables, so 
> skip add to queue;
> Becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, 
> Thread 2 add ActionStartXCommand enqueue before Thread 1, so 
> ForkedActionStartXCommand would be lost(never execute), and the thread that 
> fork actions parallel submit block at CallableQueueService.blockingWait(). 
> {code}
>  
> *CallableWrapper's code*
> {code:java}
> public class CallableWrapper<E> extends PriorityDelayQueue.QueueElement<E> 
> implements Runnable, Callable<E> {
>     private Instrumentation.Cron cron;
>     public void run() {
>         XCallable<?> callable = null;
>         try {
>             removeFromUniqueCallables();
>             if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
>                 log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with 
> [{1}]ms delay", getElement().getType(),
>                         SAFE_MODE_DELAY);
>                 setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
>                 queue(this, true);
>                 return;
>             }
>             callable = getElement();
>             if (callableBegin(callable)) {
>                 cron.stop();
>                 addInQueueCron(cron);
>                 XLog log = XLog.getLog(getClass());
>                 log.trace("executing callable [{0}]", callable.getName());
>                 try {
>                     //FutureTask.run() will invoke cllable.call()
>                     super.run();
>                     incrCounter(INSTR_EXECUTED_COUNTER, 1);
>                     log.trace("executed callable [{0}]", callable.getName());
>                 }
>                 catch (Exception ex) {
>                     incrCounter(INSTR_FAILED_COUNTER, 1);
>                     log.warn("exception callable [{0}], {1}", 
> callable.getName(), ex.getMessage(), ex);
>                 }
>             }
>             else {
>                 log.warn("max concurrency for callable [{0}] exceeded, 
> requeueing with [{1}]ms delay", callable
>                         .getType(), CONCURRENCY_DELAY);
>                 setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
>                 queue(this, true);
>                 incrCounter(callable.getType() + "#exceeded.concurrency", 1);
>             }
>         }
>         catch (Throwable t) {
>             incrCounter(INSTR_FAILED_COUNTER, 1);
>             log.warn("exception callable [{0}], {1}", callable == null ? 
> "N/A" : callable.getName(),
>                     t.getMessage(), t);
>         }
>         finally {
>             if (callable != null) {
>                 callableEnd(callable);
>             }
>         }
>     }
> }
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to