[
https://issues.apache.org/jira/browse/OOZIE-3715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dénes Bodó reassigned OOZIE-3715:
---------------------------------
Assignee: chenhaodan
> Fix fork out more than one transitions submit , one transition submit fail
> can't execute KillXCommand
> -----------------------------------------------------------------------------------------------------
>
> Key: OOZIE-3715
> URL: https://issues.apache.org/jira/browse/OOZIE-3715
> Project: Oozie
> Issue Type: Bug
> Components: core
> Affects Versions: 5.2.1
> Reporter: chenhaodan
> Assignee: chenhaodan
> Priority: Major
> Labels: patch
> Attachments: OOZIE-3715-1-1.patch, OOZIE-3715-1.patch
>
>
> When I fork 2 transitions( A and B) to submit , when A transition failed , B
> transition still Running , because can't execute KillXCommand.
> SignalXCommand.startForkedActions, when one transition submit fail will
> create a new ActionStartXCommand and invoke failJob, failJob will add
> WorkflowNotificationXCommand and KillXCommand to
> {color:#ff0000}*commandQueue*{color} , and callback at XCommand.call method ,
> but we add WorkflowNotificationXCommand and KillXCommand to
> ActionStartXCommand‘s {color:#ff0000}*commandQueue*{color} , but not
> SignalXCommand , so can't execute KillXCommand.
> The code is as follows :
>
> {code:java}
> public void startForkedActions(List<WorkflowActionBean>
> workflowActionBeanListForForked) throws CommandException {
> ......
> for (Future<ActionExecutorContext> result : futures) {
> ......
> if (context.getJobStatus() != null &&
> context.getJobStatus().equals(Job.Status.FAILED)) {
> new ActionStartXCommand(context.getAction().getId(),
> null).failJob(context);
> ......
> }
> ......
> }
> {code}
>
> {code:java}
> public void failJob(ActionExecutor.Context context, WorkflowActionBean
> action) throws CommandException {
> WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
> if (!handleUserRetry(context, action)) {
> incrActionErrorCounter(action.getType(), "failed", 1);
> LOG.warn("Failing Job due to failed action [{0}]",
> action.getName());
> try {
> workflow.getWorkflowInstance().fail(action.getName());
> WorkflowInstance wfInstance = workflow.getWorkflowInstance();
> ((LiteWorkflowInstance)
> wfInstance).setStatus(WorkflowInstance.Status.FAILED);
> workflow.setWorkflowInstance(wfInstance);
> workflow.setStatus(WorkflowJob.Status.FAILED);
> action.setStatus(WorkflowAction.Status.FAILED);
> action.resetPending();
> queue(new WorkflowNotificationXCommand(workflow, action));
> queue(new KillXCommand(workflow.getId()));
> InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1,
> getInstrumentation());
> }
> catch (WorkflowException ex) {
> throw new CommandException(ex);
> }
> }
> }
> {code}
>
> {code:java}
> public final T call() throws CommandException {
> if (commandQueue != null) {
> for (Map.Entry<Long, List<XCommand<?>>> entry :
> commandQueue.entrySet()) {
> LOG.debug("Queuing [{0}] commands with delay [{1}]ms",
> entry.getValue().size(), entry.getKey());
> if (!callableQueueService.queueSerial(entry.getValue(),
> entry.getKey())) {
> LOG.warn("Could not queue [{0}] commands with delay [{1}]ms,
> queue full", entry.getValue()
> .size(), entry.getKey());
> }
> }
> }
> }
> {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)