Repository: falcon Updated Branches: refs/heads/master 609fc5bc1 -> 0da207404
FALCON-2060 Retry does not happen if instance timedout These got broken when we moved from wf rerun to coord rerun. Author: Pallavi Rao <[email protected]> Reviewers: @peeyushb, @sandeepSamudrala Closes #228 from pallavi-rao/2060 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/0da20740 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/0da20740 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/0da20740 Branch: refs/heads/master Commit: 0da2074040b95c64b49ab6fb65e57644bb022e55 Parents: 609fc5b Author: Pallavi Rao <[email protected]> Authored: Fri Jul 15 14:26:19 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Fri Jul 15 14:26:19 2016 +0530 ---------------------------------------------------------------------- .../falcon/workflow/WorkflowExecutionContext.java | 4 ++-- .../workflow/WorkflowJobEndNotificationService.java | 4 +++- .../falcon/workflow/engine/OozieWorkflowEngine.java | 2 ++ .../org/apache/falcon/rerun/handler/RetryConsumer.java | 6 +++--- .../org/apache/falcon/rerun/handler/RetryHandler.java | 12 ++++++------ 5 files changed, 16 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index 9b1e1f4..9b011b8 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -304,11 +304,11 @@ public class WorkflowExecutionContext { } public int getWorkflowRunId() { - return Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID)); + return Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID, "0")); } public String getWorkflowRunIdString() { - return String.valueOf(Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID))); + return String.valueOf(Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID, "0"))); } public String getWorkflowUser() { http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java index fbd1e3f..6d1332e 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java @@ -155,6 +155,7 @@ public class WorkflowJobEndNotificationService implements FalconService { private boolean updateContextFromWFConf(WorkflowExecutionContext context) throws FalconException { Properties wfProps = contextMap.get(context.getWorkflowId()); if (wfProps == null) { + wfProps = new Properties(); Entity entity = null; try { entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName()); @@ -166,11 +167,12 @@ public class WorkflowJobEndNotificationService implements FalconService { return false; } for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + wfProps.setProperty(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster); try { InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity) .getJobDetails(cluster, context.getWorkflowId()).getInstances(); if (instances != null && instances.length > 0) { - wfProps = getWFProps(instances[0].getWfParams()); + wfProps.putAll(getWFProps(instances[0].getWfParams())); // Required by RetryService. But, is not part of conf. wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(), Integer.toString(instances[0].getRunId())); http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index ecbe7ee..9a09f18 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -1603,6 +1603,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } else if (jobId.endsWith("-B")) { BundleJob bundle = client.getBundleJobInfo(jobId); return bundle.getStatus().name(); + } else if (jobId.contains("-C@")) { + return client.getCoordActionInfo(jobId).getStatus().name(); } throw new IllegalArgumentException("Unhandled jobs id: " + jobId); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java index 4c763c2..3cad362 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java @@ -59,9 +59,9 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>> (message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(), message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis()))); // Use coord action id for rerun if available - String id = message.getParentId(); - if (StringUtils.isBlank(id)) { - id = message.getWfId(); + String id = message.getWfId(); + if (!id.contains("-C@") && StringUtils.isNotBlank(message.getParentId())) { + id = message.getParentId(); } handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, false); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/falcon/blob/0da20740/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java index c691922..b8adeef 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java @@ -110,17 +110,17 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends @Override public void onFailure(WorkflowExecutionContext context) throws FalconException { - // Re-run does not make sense when killed by user. - if (context.isWorkflowKilledManually()) { - LOG.debug("Workflow: {} Instance: {} Entity: {}, killed manually by user. Will not retry.", - context.getWorkflowId(), context.getNominalTimeAsISO8601(), context.getEntityName()); - return; - } else if (context.hasWorkflowTimedOut()) { + if (context.hasWorkflowTimedOut()) { Entity entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName()); Retry retry = getRetry(entity); if (!retry.isOnTimeout()) { return; } + // Re-run does not make sense when killed by user. + } else if (context.isWorkflowKilledManually()) { + LOG.debug("Workflow: {} Instance: {} Entity: {}, killed manually by user. Will not retry.", + context.getWorkflowId(), context.getNominalTimeAsISO8601(), context.getEntityName()); + return; } handleRerun(context.getClusterName(), context.getEntityType(), context.getEntityName(), context.getNominalTimeAsISO8601(),
