[ 
https://issues.apache.org/jira/browse/OOZIE-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17741695#comment-17741695
 ] 

ASF subversion and git services commented on OOZIE-3715:
--------------------------------------------------------

Commit 603900735d8682bad3d0d1a62f7ca1db9fa1d2b3 in oozie's branch 
refs/heads/master from Denes Bodo
[ https://gitbox.apache.org/repos/asf?p=oozie.git;h=603900735 ]

OOZIE-3715 Fix fork out more than one transitions submit , one transition 
submit fail can't execute KillXCommand (chenhd via dionusos)


> Fix fork out more than one transitions submit , one transition submit fail 
> can't execute KillXCommand
> -----------------------------------------------------------------------------------------------------
>
>                 Key: OOZIE-3715
>                 URL: https://issues.apache.org/jira/browse/OOZIE-3715
>             Project: Oozie
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 5.2.1
>            Reporter: chenhaodan
>            Assignee: chenhaodan
>            Priority: Major
>              Labels: patch
>             Fix For: trunk
>
>         Attachments: OOZIE-3715-001.patch, OOZIE-3715-002.patch, 
> OOZIE-3715-003.patch, OOZIE-3715-004.patch, OOZIE-3715-005.patch, 
> OOZIE-3715-006.patch, forkSubmitFail_issue.txt, status.png
>
>
> When I fork 2 transitions( A and B) to submit , when A transition failed , B 
> transition  still Running , because can't execute KillXCommand.
> SignalXCommand.startForkedActions, when one transition  submit fail will 
> create a new ActionStartXCommand and invoke failJob, failJob will add 
> WorkflowNotificationXCommand and KillXCommand to 
> {color:#ff0000}*commandQueue*{color} , and callback at XCommand.call method , 
> but we add WorkflowNotificationXCommand and KillXCommand to 
> ActionStartXCommand‘s {color:#ff0000}*commandQueue*{color}  , but not 
> SignalXCommand  ,  so can't execute KillXCommand. 
> The code is as follows :
>  
> {code:java}
>     public void startForkedActions(List<WorkflowActionBean> 
> workflowActionBeanListForForked) throws CommandException {
>         ......
>             for (Future<ActionExecutorContext> result : futures) {
>              ......
>                 if (context.getJobStatus() != null && 
> context.getJobStatus().equals(Job.Status.FAILED)) {
>                     new ActionStartXCommand(context.getAction().getId(), 
> null).failJob(context);
>              ......
>         }
>        ......
>     }
> {code}
>  
> {code:java}
> public void failJob(ActionExecutor.Context context, WorkflowActionBean 
> action) throws CommandException {
>         WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
>         if (!handleUserRetry(context, action)) {
>             incrActionErrorCounter(action.getType(), "failed", 1);
>             LOG.warn("Failing Job due to failed action [{0}]", 
> action.getName());
>             try {
>                 workflow.getWorkflowInstance().fail(action.getName());
>                 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
>                 ((LiteWorkflowInstance) 
> wfInstance).setStatus(WorkflowInstance.Status.FAILED);
>                 workflow.setWorkflowInstance(wfInstance);
>                 workflow.setStatus(WorkflowJob.Status.FAILED);
>                 action.setStatus(WorkflowAction.Status.FAILED);
>                 action.resetPending();
>                 queue(new WorkflowNotificationXCommand(workflow, action));
>                 queue(new KillXCommand(workflow.getId()));             
> InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, 
> getInstrumentation());
>             }
>             catch (WorkflowException ex) {
>                 throw new CommandException(ex);
>             }
>         }
>     }
> {code}
>  
> {code:java}
> public final T call() throws CommandException {
>     if (commandQueue != null) {
>         for (Map.Entry<Long, List<XCommand<?>>> entry : 
> commandQueue.entrySet()) {
>             LOG.debug("Queuing [{0}] commands with delay [{1}]ms", 
> entry.getValue().size(), entry.getKey());
>             if (!callableQueueService.queueSerial(entry.getValue(), 
> entry.getKey())) {
>                 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, 
> queue full", entry.getValue()
>                     .size(), entry.getKey());
>             }
>         }
>      } 
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to