Repository: falcon Updated Branches: refs/heads/master ae59db3c8 -> ecefdd079
FALCON-1723 Rerun with skip fail actions won't work in few cases (by Pavan Kolamuri) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ccd536a8 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ccd536a8 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ccd536a8 Branch: refs/heads/master Commit: ccd536a8da34e9f2547c1faaf3f32987bb976098 Parents: 0084c35 Author: Pallavi Rao <[email protected]> Authored: Tue Jan 19 19:24:42 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Jan 19 19:24:42 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + docs/src/site/twiki/FalconCLI.twiki | 2 +- .../workflow/engine/OozieWorkflowEngine.java | 54 ++++++++++++++------ .../workflow/engine/FalconWorkflowEngine.java | 3 ++ 4 files changed, 45 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/ccd536a8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 43c7b81..f80e7a1 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -116,6 +116,8 @@ Proposed Release Version: 0.9 OPTIMIZATIONS BUG FIXES + FALCON-1723 Rerun with skip fail actions won't work in few cases (Pavan Kolamuri via Pallavi Rao) + FALCON-1538 Prism status gives wrong info(Praveen Adlakha via Ajay Yadava) FALCON-1715 IllegalStateException in MetadataMappingService when entity is scheduled via native scheduler (Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/ccd536a8/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index c62d56d..5395f12 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -241,7 +241,7 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -continue Rerun option is used to rerun instances of a given process. On issuing a rerun, by default the execution resumes from the last failed node in the workflow. This option is valid only for process instances in terminal state, i.e. SUCCEEDED, KILLED or FAILED. If one wants to forcefully rerun the entire workflow, -force should be passed along with -rerun -Additionally, you can also specify properties to override via a properties file. +Additionally, you can also specify properties to override via a properties file and this will be prioritized over force option in case of contradiction. Usage: $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -rerun -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" [-force] [-file <<properties file>>] http://git-wip-us.apache.org/repos/asf/falcon/blob/ccd536a8/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 72c029c..04f5e93 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -560,10 +560,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException { - if (isForced != null && isForced) { - props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced)); + if (isForced == null) { + isForced = false; } - return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles); + return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles, false, isForced); } @Override @@ -642,8 +642,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return doJobAction(action, entity, start, end, props, lifeCycles, null); } - private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props, - List<LifeCycle> lifeCycles, Boolean allAttempts) throws FalconException { + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, + Properties props, List<LifeCycle> lifeCycles, + Boolean allAttempts, boolean isForced) throws FalconException { Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity, start, end, lifeCycles); List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS); List<String> sourceClusterList = getIncludedClusters(props, FALCON_INSTANCE_SOURCE_CLUSTERS); @@ -674,7 +676,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { instance.sourceCluster = sourceCluster; if (action.equals(JobAction.STATUS) && Boolean.TRUE.equals(allAttempts)) { try { - performAction(cluster, action, coordinatorAction, props, instance); + performAction(cluster, action, coordinatorAction, props, instance, isForced); if (instance.getRunId() > 0) { instanceList = getAllInstances(cluster, coordinatorAction, nominalTimeStr); } else { @@ -692,7 +694,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } else { try { - performAction(cluster, action, coordinatorAction, props, instance); + performAction(cluster, action, coordinatorAction, props, instance, isForced); } catch (FalconException e) { LOG.warn("Unable to perform action {} on cluster", action, e); instance.status = WorkflowStatus.ERROR; @@ -711,6 +713,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return instancesResult; } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck + + private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props, + List<LifeCycle> lifeCycles, Boolean allAttempts) throws FalconException { + return doJobAction(action, entity, start, end, props, lifeCycles, allAttempts, false); + } + private InstancesSummaryResult doSummaryJobAction(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles) throws FalconException { @@ -878,7 +887,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private void performAction(String cluster, JobAction action, CoordinatorAction coordinatorAction, - Properties props, InstancesResult.Instance instance) throws FalconException { + Properties props, InstancesResult.Instance instance, boolean isForced) throws FalconException { WorkflowJob jobInfo = null; String status = coordinatorAction.getStatus().name(); if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) { @@ -925,7 +934,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { status = Status.RUNNING.name(); } else if (jobInfo != null && WF_RERUN_PRECOND.contains(jobInfo.getStatus())) { //wf re-run - reRun(cluster, jobInfo.getId(), props, false); + reRun(cluster, jobInfo.getId(), props, isForced); status = Status.RUNNING.name(); } break; @@ -1481,17 +1490,31 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { OozieClient client = OozieClientFactory.get(cluster); try { WorkflowJob jobInfo = client.getJobInfo(jobId); - Properties jobprops = OozieUtils.toProperties(jobInfo.getConf()); - if (props != null) { - jobprops.putAll(props); + if (props == null) { + props = new Properties(); } + //if user has set any of these oozie rerun properties then force rerun flag is ignored - if (!jobprops.containsKey(OozieClient.RERUN_FAIL_NODES) - && !jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) { - jobprops.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced)); + if (!props.containsKey(OozieClient.RERUN_FAIL_NODES) + && !props.containsKey(OozieClient.RERUN_SKIP_NODES)) { + props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced)); } + + Properties jobprops = OozieUtils.toProperties(jobInfo.getConf()); + jobprops.putAll(props); + jobprops.remove(OozieClient.COORDINATOR_APP_PATH); jobprops.remove(OozieClient.BUNDLE_APP_PATH); + + // In case if both props exists one should be removed otherwise it will fail. + // This case will occur when user runs workflow with skip-nodes property and + // try to do force rerun or rerun with fail-nodes property. + if (jobprops.containsKey(OozieClient.RERUN_FAIL_NODES) + && jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) { + LOG.warn("Both " + OozieClient.RERUN_SKIP_NODES + " and " + OozieClient.RERUN_FAIL_NODES + + " are present in workflow params removing" + OozieClient.RERUN_SKIP_NODES); + jobprops.remove(OozieClient.RERUN_SKIP_NODES); + } client.reRun(jobId, jobprops); assertStatus(cluster, jobId, Job.Status.RUNNING); LOG.info("Rerun job {} on cluster {}", jobId, cluster); @@ -1501,6 +1524,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } + private void assertStatus(String cluster, String jobId, Status... statuses) throws FalconException { String actualStatus = null; http://git-wip-us.apache.org/repos/asf/falcon/blob/ccd536a8/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index efe9049..c9d6b86 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -371,6 +371,9 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { @Override public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException { + if (isForced == null) { + isForced = false; + } return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles, isForced); }
