Repository: falcon Updated Branches: refs/heads/0.9 8dee7c9ea -> cc41b2070
FALCON-1723 Rerun with skip fail actions won't work in few cases (By Pavan Kolamuri) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/a4fd2c1f Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/a4fd2c1f Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/a4fd2c1f Branch: refs/heads/0.9 Commit: a4fd2c1fae4397fd1ef57660ce060f94654ea43c Parents: 14e209d Author: Pallavi Rao <[email protected]> Authored: Tue Jan 19 21:45:55 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Jan 19 21:45:55 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + docs/src/site/twiki/FalconCLI.twiki | 2 +- .../workflow/engine/OozieWorkflowEngine.java | 51 +++++++++++++++----- .../engine/OozieWorkflowEngine.java.rej | 32 ++++++++++++ .../workflow/engine/FalconWorkflowEngine.java | 3 ++ 5 files changed, 76 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/a4fd2c1f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a200b9c..1916c9a 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -113,6 +113,8 @@ Proposed Release Version: 0.9 OPTIMIZATIONS BUG FIXES + FALCON-1723 Rerun with skip fail actions won't work in few cases (Pavan Kolamuri via Pallavi Rao) + FALCON-1538 Prism status gives wrong info(Praveen Adlakha via Ajay Yadava) FALCON-1715 IllegalStateException in MetadataMappingService when entity is scheduled via native scheduler (Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/a4fd2c1f/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index 9f91143..8beb473 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -238,7 +238,7 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -continue Rerun option is used to rerun instances of a given process. On issuing a rerun, by default the execution resumes from the last failed node in the workflow. This option is valid only for process instances in terminal state, i.e. SUCCEEDED, KILLED or FAILED. If one wants to forcefully rerun the entire workflow, -force should be passed along with -rerun -Additionally, you can also specify properties to override via a properties file. +Additionally, you can also specify properties to override via a properties file and this will be prioritized over force option in case of contradiction. Usage: $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -rerun -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" [-force] [-file <<properties file>>] http://git-wip-us.apache.org/repos/asf/falcon/blob/a4fd2c1f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index e3f0831..0b5346b 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -559,10 +559,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException { - if (isForced != null && isForced) { - props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced)); + if (isForced == null) { + isForced = false; } - return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles); + return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles, isForced); } @Override @@ -628,8 +628,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, - Properties props, List<LifeCycle> lifeCycles) throws FalconException { + Properties props, List<LifeCycle> lifeCycles, + boolean isForced) throws FalconException { Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity, start, end, lifeCycles); List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS); List<String> sourceClusterList = getIncludedClusters(props, FALCON_INSTANCE_SOURCE_CLUSTERS); @@ -659,7 +662,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { new InstancesResult.Instance(cluster, nominalTimeStr, null); instance.sourceCluster = sourceCluster; try { - performAction(cluster, action, coordinatorAction, props, instance); + performAction(cluster, action, coordinatorAction, props, instance, isForced); } catch (FalconException e) { LOG.warn("Unable to perform action {} on cluster", action, e); instance.status = WorkflowStatus.ERROR; @@ -678,6 +681,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return instancesResult; } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck + + private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props, + List<LifeCycle> lifeCycles) throws FalconException { + return doJobAction(action, entity, start, end, props, lifeCycles, false); + } + private InstancesSummaryResult doSummaryJobAction(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles) throws FalconException { @@ -821,7 +831,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private void performAction(String cluster, JobAction action, CoordinatorAction coordinatorAction, - Properties props, InstancesResult.Instance instance) throws FalconException { + Properties props, InstancesResult.Instance instance, boolean isForced) throws FalconException { WorkflowJob jobInfo = null; String status = coordinatorAction.getStatus().name(); if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) { @@ -867,7 +877,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { status = Status.RUNNING.name(); } else if (jobInfo != null && WF_RERUN_PRECOND.contains(jobInfo.getStatus())) { //wf re-run - reRun(cluster, jobInfo.getId(), props, false); + reRun(cluster, jobInfo.getId(), props, isForced); status = Status.RUNNING.name(); } break; @@ -1423,17 +1433,31 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { OozieClient client = OozieClientFactory.get(cluster); try { WorkflowJob jobInfo = client.getJobInfo(jobId); - Properties jobprops = OozieUtils.toProperties(jobInfo.getConf()); - if (props != null) { - jobprops.putAll(props); + if (props == null) { + props = new Properties(); } + //if user has set any of these oozie rerun properties then force rerun flag is ignored - if (!jobprops.containsKey(OozieClient.RERUN_FAIL_NODES) - && !jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) { - jobprops.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced)); + if (!props.containsKey(OozieClient.RERUN_FAIL_NODES) + && !props.containsKey(OozieClient.RERUN_SKIP_NODES)) { + props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced)); } + + Properties jobprops = OozieUtils.toProperties(jobInfo.getConf()); + jobprops.putAll(props); + jobprops.remove(OozieClient.COORDINATOR_APP_PATH); jobprops.remove(OozieClient.BUNDLE_APP_PATH); + + // In case if both props exists one should be removed otherwise it will fail. + // This case will occur when user runs workflow with skip-nodes property and + // try to do force rerun or rerun with fail-nodes property. + if (jobprops.containsKey(OozieClient.RERUN_FAIL_NODES) + && jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) { + LOG.warn("Both " + OozieClient.RERUN_SKIP_NODES + " and " + OozieClient.RERUN_FAIL_NODES + + " are present in workflow params removing" + OozieClient.RERUN_SKIP_NODES); + jobprops.remove(OozieClient.RERUN_SKIP_NODES); + } client.reRun(jobId, jobprops); assertStatus(cluster, jobId, Job.Status.RUNNING); LOG.info("Rerun job {} on cluster {}", jobId, cluster); @@ -1443,6 +1467,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } + private void assertStatus(String cluster, String jobId, Status... statuses) throws FalconException { String actualStatus = null; http://git-wip-us.apache.org/repos/asf/falcon/blob/a4fd2c1f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java.rej ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java.rej b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java.rej new file mode 100644 index 0000000..4c08ae3 --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java.rej @@ -0,0 +1,32 @@ +diff a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java (rejected hunks) +@@ -637,8 +637,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { + return doJobAction(action, entity, start, end, props, lifeCycles, null); + } + +- private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props, +- List<LifeCycle> lifeCycles, Boolean allAttempts) throws FalconException { ++ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck ++ private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, ++ Properties props, List<LifeCycle> lifeCycles, ++ Boolean allAttempts, boolean isForced) throws FalconException { + Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity, start, end, lifeCycles); + List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS); + List<String> sourceClusterList = getIncludedClusters(props, FALCON_INSTANCE_SOURCE_CLUSTERS); +@@ -669,7 +671,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { + instance.sourceCluster = sourceCluster; + if (action.equals(JobAction.STATUS) && Boolean.TRUE.equals(allAttempts)) { + try { +- performAction(cluster, action, coordinatorAction, props, instance); ++ performAction(cluster, action, coordinatorAction, props, instance, isForced); + if (instance.getRunId() > 0) { + instanceList = getAllInstances(cluster, coordinatorAction, nominalTimeStr); + } else { +@@ -687,7 +689,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { + } + } else { + try { +- performAction(cluster, action, coordinatorAction, props, instance); ++ performAction(cluster, action, coordinatorAction, props, instance, isForced); + } catch (FalconException e) { + LOG.warn("Unable to perform action {} on cluster", action, e); + instance.status = WorkflowStatus.ERROR; http://git-wip-us.apache.org/repos/asf/falcon/blob/a4fd2c1f/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index b7379d4..eb39ec0 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -371,6 +371,9 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { @Override public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException { + if (isForced == null) { + isForced = false; + } return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles, isForced); }
