Repository: falcon
Updated Branches:
  refs/heads/master e270b5c70 -> a058cf2b4


FALCON-1835 [IMPROVEMENT] Falcon should do coord rerun rather than workflow 
rerun

…to ensure concurrency

Patch contains the following changes:
1. Upgrade to Oozie client 4.2 (from 4.1).
2. Remove workflow rerun completely and switch to coord rerun.
3. Introduced parentId in WorkflowExecutionContext in order to store the coord 
action of a given workflow. This will be used during retry and late data rerun.
4. In cases where parentId cannot be populated (notification from Post 
processing action will not contain this), rerun method queries Oozie to 
retrieve the parent id.
5. The rerun method now returns the actual status after rerun, rather than 
hardcoded “RUNNING”.
6. In 4.2, Oozie does a Yarn job kill to kill any lingering executions of 
previous run of a workflow. It uses RMClientProxy to do the same. Since in 
local mode, RPC cannot happen, had to introduce LocalFalconRPCClientFactory to 
prevent RPC in local mode.

Author: Pallavi Rao <[email protected]>

Reviewers: Sandeep Samudrala <[email protected]>, Srikanth 
Sundarrajan<[email protected]>, Pavan Kolamuri 
<[email protected]>

Closes #48 from pallavi-rao/1835 and squashes the following commits:

0e0b317 [Pallavi Rao] FALCON-1835 Clearing old settings from config retrieved
0c0d2c4 [Pallavi Rao] FALCON-1835 Updated doc.
418ac16 [Pallavi Rao] FALCON-1835 Fixed checkstyle issue
96cd509 [Pallavi Rao] FALCON-1835 Updated rerun method to throw exception when 
both skip.nodes and failnodes are specified
8b0cb1f [Pallavi Rao] FALCON-1835 Updated rerun method to merge user props and 
job conf.
43df66d [Pallavi Rao] FALCON-1835 Falcon should do coord rerun rather than 
workflow rerun to ensure concurrency


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/a058cf2b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/a058cf2b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/a058cf2b

Branch: refs/heads/master
Commit: a058cf2b4481769b1eaf96c4a6a45e5d76c1556f
Parents: e270b5c
Author: Pallavi Rao <[email protected]>
Authored: Wed Feb 24 17:25:23 2016 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Wed Feb 24 17:25:23 2016 +0530

----------------------------------------------------------------------
 .../falcon/workflow/WorkflowExecutionArgs.java  |   2 +-
 .../workflow/WorkflowExecutionContext.java      |   4 +
 .../WorkflowJobEndNotificationService.java      |   5 -
 .../workflow/engine/AbstractWorkflowEngine.java |   3 +-
 docs/src/site/twiki/EntitySpecification.twiki   |   2 +
 .../falcon/messaging/JMSMessageConsumer.java    |   3 +
 .../workflow/engine/OozieWorkflowEngine.java    | 111 ++++-----
 .../oozie/client/LocalProxyOozieClient.java     |  11 +
 pom.xml                                         |   2 +-
 .../apache/falcon/rerun/event/LaterunEvent.java |   4 +-
 .../apache/falcon/rerun/event/RerunEvent.java   |   8 +-
 .../falcon/rerun/event/RerunEventFactory.java   |   4 +-
 .../apache/falcon/rerun/event/RetryEvent.java   |   4 +-
 .../rerun/handler/AbstractRerunHandler.java     |   2 +-
 .../falcon/rerun/handler/LateRerunConsumer.java |  12 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |   6 +-
 .../falcon/rerun/handler/RetryConsumer.java     |   9 +-
 .../falcon/rerun/handler/RetryHandler.java      |   6 +-
 .../apache/falcon/rerun/queue/ActiveMQTest.java |   2 +-
 .../falcon/rerun/queue/InMemoryQueueTest.java   |   6 +-
 .../workflow/engine/FalconWorkflowEngine.java   |   3 +-
 .../java/org/apache/falcon/unit/FalconUnit.java |   3 +
 .../unit/LocalFalconRPCClientFactory.java       | 241 +++++++++++++++++++
 unit/src/main/resources/yarn-site.xml           |  30 +++
 24 files changed, 398 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 3363e1f..2171092 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -55,6 +55,7 @@ public enum WorkflowExecutionArgs {
     STATUS("status", "status of the user workflow isnstance"),
     WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, 
ex:oozie", false),
     USER_SUBFLOW_ID("subflowId", "external id of user workflow", false),
+    PARENT_ID("parentId", "The parent of the current workflow, typically coord 
action", false),
 
     WF_START_TIME("workflowStartTime", "workflow start time", false),
     WF_END_TIME("workflowEndTime", "workflow end time", false),
@@ -89,7 +90,6 @@ public enum WorkflowExecutionArgs {
     CONTEXT_TYPE("contextType", "wf execution context type, pre or post 
processing", false),
     COUNTERS("counters", "store job counters", false);
 
-
     private final String name;
     private final String description;
     private final boolean isRequired;

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 5866369..9b1e1f4 100644
--- 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -295,6 +295,10 @@ public class WorkflowExecutionContext {
         return getValue(WorkflowExecutionArgs.WORKFLOW_ID);
     }
 
+    public String getWorkflowParentId() {
+        return getValue(WorkflowExecutionArgs.PARENT_ID);
+    }
+
     public String getUserSubflowId() {
         return getValue(WorkflowExecutionArgs.USER_SUBFLOW_ID);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index faea25c..b692258 100644
--- 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -139,11 +139,6 @@ public class WorkflowJobEndNotificationService implements 
FalconService {
 
     public void notifyWait(WorkflowExecutionContext context) throws 
FalconException {
         // Wait notifications can only be from Oozie JMS notifications
-
-        if (!updateContextFromWFConf(context)) {
-            return;
-        }
-
         LOG.debug("Sending workflow wait notification to listeners with 
context : {} ", context);
         for (WorkflowExecutionListener listener : listeners) {
             try {

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
 
b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index b899a58..4d8402a 100644
--- 
a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ 
b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -61,7 +61,8 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract String delete(Entity entity, String cluster) throws 
FalconException;
 
-    public abstract void reRun(String cluster, String wfId, Properties props, 
boolean isForced) throws FalconException;
+    public abstract String reRun(String cluster, String wfId, Properties 
props, boolean isForced)
+        throws FalconException;
 
     public abstract void dryRun(Entity entity, String clusterName, Boolean 
skipDryRun) throws FalconException;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki 
b/docs/src/site/twiki/EntitySpecification.twiki
index b3d80e2..d08c3a3 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -919,6 +919,8 @@ Examples:
 </verbatim>
 The workflow is re-tried after 10 mins, 20 mins and 30 mins. With exponential 
backoff, the workflow will be re-tried after 10 mins, 20 mins and 40 mins.
 
+*NOTE :* If user does a manual rerun with -force option (using the instance 
rerun API), then the runId will get reset and user might see more Falcon system 
retries than configured in the process definition.
+
 To enable retries for instances for feeds, user will have to set the following 
properties in runtime.properties
 <verbatim>
 falcon.recipe.retry.policy=periodic

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java 
b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index ccc2cfb..90bbdd3 100644
--- 
a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ 
b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -147,6 +147,9 @@ public class JMSMessageConsumer implements MessageListener, 
ExceptionListener {
                 wfProperties.put(WorkflowExecutionArgs.NOMINAL_TIME,
                         
getNominalTimeString(Long.parseLong(json.getString("nominalTime"))));
             }
+            if (!json.isNull("parentId")) {
+                wfProperties.put(WorkflowExecutionArgs.PARENT_ID, 
json.getString("parentId"));
+            }
             String appName = message.getStringProperty("appName");
             Pair<String, EntityType> entityTypePair = 
WorkflowNameBuilder.WorkflowName.getEntityNameAndType(appName);
             wfProperties.put(WorkflowExecutionArgs.ENTITY_NAME, 
entityTypePair.first);

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index ebf23da..ab2dd5a 100644
--- 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -97,11 +97,9 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
                 WorkflowJob.Status.FAILED);
     private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = 
Arrays.asList(WorkflowJob.Status.RUNNING);
     private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = 
Arrays.asList(WorkflowJob.Status.SUSPENDED);
-    private static final List<WorkflowJob.Status> WF_RERUN_PRECOND =
-        Arrays.asList(WorkflowJob.Status.FAILED, WorkflowJob.Status.KILLED, 
WorkflowJob.Status.SUCCEEDED);
     private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND =
-        Arrays.asList(CoordinatorAction.Status.TIMEDOUT, 
CoordinatorAction.Status.FAILED);
-
+        Arrays.asList(CoordinatorAction.Status.TIMEDOUT, 
CoordinatorAction.Status.FAILED,
+                CoordinatorAction.Status.KILLED, 
CoordinatorAction.Status.SUCCEEDED);
     private static final List<Job.Status> BUNDLE_ACTIVE_STATUS =
         Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, 
Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED,
             Job.Status.RUNNINGWITHERROR, Job.Status.PAUSED, Status.PREPPAUSED, 
Status.PAUSEDWITHERROR);
@@ -937,18 +935,11 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
             break;
 
         case RERUN:
-            if (jobInfo == null && 
COORD_RERUN_PRECOND.contains(coordinatorAction.getStatus())) {
-                //Coord action re-run
-                reRunCoordAction(cluster, coordinatorAction);
-                status = Status.RUNNING.name();
-            } else if (jobInfo != null && 
WF_RERUN_PRECOND.contains(jobInfo.getStatus())) {
-                //wf re-run
-                reRun(cluster, jobInfo.getId(), props, isForced);
-                status = Status.RUNNING.name();
+            if (COORD_RERUN_PRECOND.contains(coordinatorAction.getStatus())) {
+                status = reRunCoordAction(cluster, coordinatorAction, props, 
isForced).name();
             }
             break;
 
-
         case STATUS:
             if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) {
                 populateInstanceActions(cluster, jobInfo, instance);
@@ -974,30 +965,64 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
         }
     }
 
-    private void reRunCoordAction(String cluster, CoordinatorAction 
coordinatorAction) throws FalconException {
+    public CoordinatorAction.Status reRunCoordAction(String cluster, 
CoordinatorAction coordinatorAction,
+                                                      Properties props, 
boolean isForced) throws FalconException {
         try {
             OozieClient client = OozieClientFactory.get(cluster);
+            if (props == null) {
+                props = new Properties();
+            }
+            // In case if both props exists, throw an exception.
+            // This case will occur when user runs workflow with skip-nodes 
property and
+            // try to do force rerun or rerun with fail-nodes property.
+            if (props.containsKey(OozieClient.RERUN_FAIL_NODES)
+                    && props.containsKey(OozieClient.RERUN_SKIP_NODES)) {
+                String msg = "Both " + OozieClient.RERUN_SKIP_NODES + " and " 
+ OozieClient.RERUN_FAIL_NODES
+                        + " are present in workflow params for " + 
coordinatorAction.getExternalId();
+                LOG.error(msg);
+                throw new FalconException(msg);
+            }
+
+            //if user has set any of these oozie rerun properties then force 
rerun flag is ignored
+            if (props.containsKey(OozieClient.RERUN_FAIL_NODES)) {
+                isForced = false;
+            }
+            Properties jobprops;
+            // Get conf when workflow is launched.
+            if (coordinatorAction.getExternalId() != null) {
+                WorkflowJob jobInfo = 
client.getJobInfo(coordinatorAction.getExternalId());
+
+                jobprops = OozieUtils.toProperties(jobInfo.getConf());
+                // Clear the rerun properties from existing configuration
+                jobprops.remove(OozieClient.RERUN_FAIL_NODES);
+                jobprops.remove(OozieClient.RERUN_SKIP_NODES);
+                jobprops.putAll(props);
+                jobprops.remove(OozieClient.BUNDLE_APP_PATH);
+            } else {
+                jobprops = props;
+            }
+
             client.reRunCoord(coordinatorAction.getJobId(), 
RestConstants.JOB_COORD_SCOPE_ACTION,
-                Integer.toString(coordinatorAction.getActionNumber()), true, 
true);
-            assertCoordActionStatus(cluster, coordinatorAction.getId(),
-                org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
-                org.apache.oozie.client.CoordinatorAction.Status.WAITING,
-                org.apache.oozie.client.CoordinatorAction.Status.READY);
+                    Integer.toString(coordinatorAction.getActionNumber()), 
true, true, !isForced, jobprops);
             LOG.info("Rerun job {} on cluster {}", coordinatorAction.getId(), 
cluster);
+            return assertCoordActionStatus(cluster, coordinatorAction.getId(),
+                    org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
+                    org.apache.oozie.client.CoordinatorAction.Status.WAITING,
+                    org.apache.oozie.client.CoordinatorAction.Status.READY);
         } catch (Exception e) {
             LOG.error("Unable to rerun workflows", e);
             throw new FalconException(e);
         }
     }
 
-    private void assertCoordActionStatus(String cluster, String coordActionId,
+    private CoordinatorAction.Status assertCoordActionStatus(String cluster, 
String coordActionId,
         org.apache.oozie.client.CoordinatorAction.Status... statuses) throws 
FalconException, OozieClientException {
         OozieClient client = OozieClientFactory.get(cluster);
         CoordinatorAction actualStatus = 
client.getCoordActionInfo(coordActionId);
         for (int counter = 0; counter < 3; counter++) {
             for (org.apache.oozie.client.CoordinatorAction.Status status : 
statuses) {
                 if (status.equals(actualStatus.getStatus())) {
-                    return;
+                    return status;
                 }
             }
             try {
@@ -1494,48 +1519,26 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
     }
 
     @Override
-    public void reRun(String cluster, String jobId, Properties props, boolean 
isForced) throws FalconException {
-
+    public String reRun(String cluster, String id, Properties props, boolean 
isForced) throws FalconException {
         OozieClient client = OozieClientFactory.get(cluster);
+        String actionId = id;
         try {
-            WorkflowJob jobInfo = client.getJobInfo(jobId);
-            if (props == null) {
-                props = new Properties();
+            // If a workflow job is supplied, get its parent coord action
+            if (id.endsWith("-W")) {
+                actionId = client.getJobInfo(id).getParentId();
             }
-
-            //if user has set any of these oozie rerun properties then force 
rerun flag is ignored
-            if (!props.containsKey(OozieClient.RERUN_FAIL_NODES)
-                    && !props.containsKey(OozieClient.RERUN_SKIP_NODES)) {
-                props.put(OozieClient.RERUN_FAIL_NODES, 
String.valueOf(!isForced));
+            if (StringUtils.isBlank(actionId) || !actionId.contains("-C@")) {
+                throw new FalconException("coord action id supplied for rerun, 
" + actionId + ", is not valid.");
             }
-
-            Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
-            jobprops.putAll(props);
-
-            jobprops.remove(OozieClient.COORDINATOR_APP_PATH);
-            jobprops.remove(OozieClient.BUNDLE_APP_PATH);
-
-            // In case if both props exists one should be removed otherwise it 
will fail.
-            // This case will occur when user runs workflow with skip-nodes 
property and
-            // try to do force rerun or rerun with fail-nodes property.
-            if (jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
-                    && jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
-                LOG.warn("Both " + OozieClient.RERUN_SKIP_NODES + " and " + 
OozieClient.RERUN_FAIL_NODES
-                        + " are present in workflow params removing" + 
OozieClient.RERUN_SKIP_NODES);
-                jobprops.remove(OozieClient.RERUN_SKIP_NODES);
-            }
-            client.reRun(jobId, jobprops);
-            assertStatus(cluster, jobId, Job.Status.RUNNING);
-            LOG.info("Rerun job {} on cluster {}", jobId, cluster);
+            return reRunCoordAction(cluster, 
client.getCoordActionInfo(actionId), props, isForced).name();
         } catch (Exception e) {
-            LOG.error("Unable to rerun workflows", e);
+            LOG.error("Unable to rerun action " + actionId, e);
             throw new FalconException(e);
         }
     }
 
 
-    private void assertStatus(String cluster, String jobId, Status... 
statuses) throws FalconException {
-
+    private String assertStatus(String cluster, String jobId, Status... 
statuses) throws FalconException {
         String actualStatus = null;
         int retryCount;
         String retry = 
RuntimeProperties.get().getProperty(WORKFLOW_STATUS_RETRY_COUNT, "30");
@@ -1554,7 +1557,7 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
                     //ignore
                 }
             } else {
-                return;
+                return actualStatus;
             }
         }
         throw new FalconException("For Job" + jobId + ", actual statuses: " + 
actualStatus + ", expected statuses: "

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java 
b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
index 7bf5c37..3bdc0df 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
@@ -90,6 +90,7 @@ public class LocalProxyOozieClient extends OozieClient {
     }
 
     public String run(Properties conf) throws OozieClientException {
+        conf.setProperty("oozie.child.mapreduce.job.tags", "");
         if (conf.getProperty("oozie.wf.application.path") != null) {
             return getLocalOozieClient().run(conf);
         } else if (conf.getProperty("oozie.coord.application.path") != null) {
@@ -213,5 +214,15 @@ public class LocalProxyOozieClient extends OozieClient {
         throw new IllegalStateException("Job logs not supported");
     }
 
+    @Override
+    public void validateWSVersion() throws OozieClientException {
+        // Do nothing as this is local oozie.
+    }
+
+    @Override
+    public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, 
String scope, boolean refresh,
+            boolean noCleanup, boolean failed, Properties props) throws 
OozieClientException {
+        return getClient(jobId).reRunCoord(jobId, rerunType, scope, refresh, 
noCleanup, failed, props);
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 27d2fc2..271b477 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@
         <include.prism>true</include.prism>
 
         <slf4j.version>1.7.5</slf4j.version>
-        <oozie.version>4.1.0</oozie.version>
+        <oozie.version>4.2.0</oozie.version>
         <oozie.buildversion>${oozie.version}-falcon</oozie.buildversion>
         <oozie.forcebuild>false</oozie.forcebuild>
         <activemq.version>5.12.0</activemq.version>

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java 
b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index 2b52762..1fbdbcc 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -23,10 +23,10 @@ package org.apache.falcon.rerun.event;
 public class LaterunEvent extends RerunEvent {
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
+    public LaterunEvent(String clusterName, String wfId, String parentId, long 
msgInsertTime,
                         long delay, String entityType, String entityName,
                         String instance, int runId, String workflowUser) {
-        super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
+        super(clusterName, wfId, parentId, msgInsertTime, delay, entityType, 
entityName,
                 instance, runId, workflowUser);
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java 
b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index 254f285..b917421 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -38,6 +38,7 @@ public class RerunEvent implements Delayed {
 
     protected String clusterName;
     protected String wfId;
+    protected String parentId;
     protected String workflowUser;
     protected long msgInsertTime;
     protected long delayInMilliSec;
@@ -47,10 +48,11 @@ public class RerunEvent implements Delayed {
     protected int runId;
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public RerunEvent(String clusterName, String wfId, long msgInsertTime, 
long delay,
+    public RerunEvent(String clusterName, String wfId, String parentId, long 
msgInsertTime, long delay,
                       String entityType, String entityName, String instance, 
int runId, String workflowUser) {
         this.clusterName = clusterName;
         this.wfId = wfId;
+        this.parentId = parentId;
         this.workflowUser = workflowUser;
         this.msgInsertTime = msgInsertTime;
         this.delayInMilliSec = delay;
@@ -69,6 +71,10 @@ public class RerunEvent implements Delayed {
         return wfId;
     }
 
+    public String getParentId() {
+        return parentId;
+    }
+
     public String getWorkflowUser() {
         return workflowUser;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java 
b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
index c2a8fe2..c97d259 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
@@ -42,7 +42,7 @@ public class RerunEventFactory<T extends RerunEvent> {
     @SuppressWarnings("unchecked")
     private T lateEventFromString(String line) {
         Map<String, String> map = getMap(line);
-        return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
+        return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"), 
map.get("parentId"),
                 Long.parseLong(map.get("msgInsertTime")), 
Long.parseLong(map.get("delayInMilliSec")),
                 map.get("entityType"), map.get("entityName"), 
map.get("instance"),
                 Integer.parseInt(map.get("runId")), map.get("workflowUser"));
@@ -51,7 +51,7 @@ public class RerunEventFactory<T extends RerunEvent> {
     @SuppressWarnings("unchecked")
     public T retryEventFromString(String line) {
         Map<String, String> map = getMap(line);
-        return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"),
+        return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"), 
map.get("parentId"),
                 Long.parseLong(map.get("msgInsertTime")), 
Long.parseLong(map.get("delayInMilliSec")),
                 map.get("entityType"), map.get("entityName"), 
map.get("instance"),
                 Integer.parseInt(map.get("runId")), 
Integer.parseInt(map.get("attempts")),

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java 
b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
index b5312a6..457445f 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
@@ -26,10 +26,10 @@ public class RetryEvent extends RerunEvent {
     private int failRetryCount;
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public RetryEvent(String clusterName, String wfId, long msgInsertTime,
+    public RetryEvent(String clusterName, String wfId, String parentId, long 
msgInsertTime,
                       long delay, String entityType, String entityName, String 
instance,
                       int runId, int attempts, int failRetryCount, String 
workflowUser) {
-        super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
+        super(clusterName, wfId, parentId, msgInsertTime, delay, entityType, 
entityName,
                 instance, runId, workflowUser);
         this.attempts = attempts;
         this.failRetryCount = failRetryCount;

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java 
b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 3ec3617..700095e 100644
--- 
a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ 
b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -57,7 +57,7 @@ public abstract class AbstractRerunHandler<T extends 
RerunEvent, M extends Delay
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public abstract void handleRerun(String clusterName, String entityType,
                                      String entityName, String nominalTime, 
String runId,
-                                     String wfId, String workflowUser, long 
msgReceivedTime);
+                                     String wfId, String parentId, String 
workflowUser, long msgReceivedTime);
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     public AbstractWorkflowEngine getWfEngine(String entityType, String 
entityName) throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java 
b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index fa0d6ae..e79f122 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.rerun.handler;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
@@ -72,13 +73,18 @@ public class LateRerunConsumer<T extends 
LateRerunHandler<DelayedQueue<LaterunEv
                         message.getWfId(), SchemaHelper.formatDateUTC(new 
Date()));
                 handler.handleRerun(clusterName, message.getEntityType(), 
message.getEntityName(),
                         message.getInstance(), 
Integer.toString(message.getRunId()),
-                        message.getWfId(), message.getWorkflowUser(), 
System.currentTimeMillis());
+                        message.getWfId(), message.getParentId(),
+                        message.getWorkflowUser(), System.currentTimeMillis());
                 return;
             }
 
             LOG.info("Late changes detected in the following feeds: {}", 
detectLate);
-
-            handler.getWfEngine(entityType, 
entityName).reRun(message.getClusterName(), message.getWfId(), null, true);
+            // Use coord action id for rerun if available
+            String id = message.getParentId();
+            if (StringUtils.isBlank(id)) {
+                id = message.getWfId();
+            }
+            handler.getWfEngine(entityType, 
entityName).reRun(message.getClusterName(), id, null, false);
             LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}",
                     message.getWfId(), message.getClusterName());
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java 
b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 0be6252..02ab792 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -58,7 +58,7 @@ public class LateRerunHandler<M extends 
DelayedQueue<LaterunEvent>> extends
     @Override
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public void handleRerun(String cluster, String entityType, String 
entityName, String nominalTime,
-                            String runId, String wfId, String workflowUser, 
long msgReceivedTime) {
+                            String runId, String wfId, String parentId, String 
workflowUser, long msgReceivedTime) {
         try {
             Entity entity = EntityUtil.getEntity(entityType, entityName);
             int intRunId = Integer.parseInt(runId);
@@ -88,7 +88,7 @@ public class LateRerunHandler<M extends 
DelayedQueue<LaterunEvent>> extends
 
             LOG.debug("Scheduling the late rerun for entity instance: {} ({}): 
{} And WorkflowId: {}",
                     entityType, entityName, nominalTime, wfId);
-            LaterunEvent event = new LaterunEvent(cluster, wfId, 
msgInsertTime.getTime(),
+            LaterunEvent event = new LaterunEvent(cluster, wfId, parentId, 
msgInsertTime.getTime(),
                     wait, entityType, entityName, nominalTime, intRunId, 
workflowUser);
             offerToQueue(event);
         } catch (Exception e) {
@@ -232,7 +232,7 @@ public class LateRerunHandler<M extends 
DelayedQueue<LaterunEvent>> extends
                 && EntityUtil.getLateProcess(entity) != null) {
             handleRerun(context.getClusterName(), context.getEntityType(),
                     context.getEntityName(), context.getNominalTimeAsISO8601(),
-                    context.getWorkflowRunIdString(), context.getWorkflowId(),
+                    context.getWorkflowRunIdString(), context.getWorkflowId(), 
context.getWorkflowParentId(),
                     context.getWorkflowUser(), 
context.getExecutionCompletionTime());
         } else {
             LOG.info("Late date handling not applicable for entityType: " + 
context.getEntityType()

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java 
b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 9b46713..836a172 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.rerun.handler;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -53,11 +54,17 @@ public class RetryConsumer<T extends 
RetryHandler<DelayedQueue<RetryEvent>>>
                             + " At time: {}",
                     (message.getRunId() + 1), message.getAttempts(), 
message.getEntityName(), message.getInstance(),
                     message.getWfId(), SchemaHelper.formatDateUTC(new 
Date(System.currentTimeMillis())));
-            handler.getWfEngine(entityType, 
entityName).reRun(message.getClusterName(), message.getWfId(), null, false);
+            // Use coord action id for rerun if available
+            String id = message.getParentId();
+            if (StringUtils.isBlank(id)) {
+                id = message.getWfId();
+            }
+            handler.getWfEngine(entityType, 
entityName).reRun(message.getClusterName(), id, null, false);
         } catch (Exception e) {
             if (e instanceof EntityNotRegisteredException) {
                 LOG.warn("Entity {} of type {} doesn't exist in config store. 
So retry "
                         + "cannot be done for workflow ", entityName, 
entityType, message.getWfId());
+
                 return;
             }
             int maxFailRetryCount = Integer.parseInt(StartupProperties.get()

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java 
b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index fac32b3..48d5ce7 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -44,7 +44,7 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> 
extends
     @Override
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public void handleRerun(String clusterName, String entityType, String 
entityName, String nominalTime,
-                            String runId, String wfId, String workflowUser, 
long msgReceivedTime) {
+                            String runId, String wfId, String parentId, String 
workflowUser, long msgReceivedTime) {
         try {
             Entity entity = EntityUtil.getEntity(entityType, entityName);
             Retry retry = getRetry(entity);
@@ -63,7 +63,7 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> 
extends
             if (attempts > intRunId) {
                 AbstractRerunPolicy rerunPolicy = 
RerunPolicyFactory.getRetryPolicy(policy);
                 long delayTime = rerunPolicy.getDelay(delay, 
Integer.parseInt(runId));
-                RetryEvent event = new RetryEvent(clusterName, wfId,
+                RetryEvent event = new RetryEvent(clusterName, wfId, parentId,
                         msgReceivedTime, delayTime, entityType, entityName,
                         nominalTime, intRunId, attempts, 0, workflowUser);
                 offerToQueue(event);
@@ -122,7 +122,7 @@ public class RetryHandler<M extends 
DelayedQueue<RetryEvent>> extends
         }
         handleRerun(context.getClusterName(), context.getEntityType(),
                 context.getEntityName(), context.getNominalTimeAsISO8601(),
-                context.getWorkflowRunIdString(), context.getWorkflowId(),
+                context.getWorkflowRunIdString(), context.getWorkflowId(), 
context.getWorkflowParentId(),
                 context.getWorkflowUser(), 
context.getExecutionCompletionTime());
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java 
b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
index 3c53833..dba4610 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
@@ -50,7 +50,7 @@ public class ActiveMQTest {
                 BROKER_URL, DESTINATION);
         activeMQueue.init();
 
-        RerunEvent event = new LaterunEvent("clusterName", "wfId",
+        RerunEvent event = new LaterunEvent("clusterName", "wfId", "parentId",
                 System.currentTimeMillis(), 60 * 1000, "entityType",
                 "entityName", "instance", 0, FalconTestUtil.TEST_USER_1);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
----------------------------------------------------------------------
diff --git 
a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java 
b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
index 4b179d3..cdaf203 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
@@ -45,7 +45,7 @@ public class InMemoryQueueTest {
             Thread.sleep(30);
             long time = System.currentTimeMillis();
             int delay = ((5 - index) / 2) * 50;
-            MyEvent event = new MyEvent("someCluster", Integer.toString(index),
+            MyEvent event = new MyEvent("someCluster", 
Integer.toString(index), "parent",
                     time, delay, "someType", "someName", "someInstance", 0, 
FalconTestUtil.TEST_USER_1);
             queue.offer(event);
             boolean inserted = false;
@@ -72,10 +72,10 @@ public class InMemoryQueueTest {
     private class MyEvent extends RerunEvent {
 
         //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
-        public MyEvent(String clusterName, String wfId,
+        public MyEvent(String clusterName, String wfId, String parentId,
                        long msgInsertTime, long delay, String entityType,
                        String entityName, String instance, int runId, String 
workflowUser) {
-            super(clusterName, wfId, msgInsertTime, delay,
+            super(clusterName, wfId, parentId, msgInsertTime, delay,
                     entityType, entityName, instance, runId, workflowUser);
         }
         //RESUME CHECKSTYLE CHECK VisibilityModifierCheck

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
 
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index 7ce2420..77cb2fa 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -516,11 +516,12 @@ public class FalconWorkflowEngine extends 
AbstractWorkflowEngine {
     }
 
     @Override
-    public void reRun(String cluster, String jobId, Properties props, boolean 
isForced) throws FalconException {
+    public String reRun(String cluster, String jobId, Properties props, 
boolean isForced) throws FalconException {
         InstanceState instanceState = STATE_STORE.getExecutionInstance(jobId);
         ExecutionInstance instance = instanceState.getInstance();
         EntityExecutor executor = 
EXECUTION_SERVICE.getEntityExecutor(instance.getEntity(), cluster);
         executor.rerun(instance, props, isForced);
+        return 
DAGEngineFactory.getDAGEngine(cluster).info(jobId).getStatus().name();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java 
b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java
index eebfa2e..e762b31 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.oozie.action.hadoop.LauncherMapper;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.XConfiguration;
@@ -162,6 +163,8 @@ public final class FalconUnit {
     private static void cleanUpOozie() throws IOException, FalconException {
         LocalOozie.stop();
         FileUtils.deleteDirectory(new File(OOZIE_HOME_DIR));
+        // Need to explicitly clean this as Oozie Launcher leaves this behind.
+        FileUtils.deleteQuietly(new File(LauncherMapper.PROPAGATION_CONF_XML));
         resetSystemProperties();
         System.setSecurityManager(null);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
----------------------------------------------------------------------
diff --git 
a/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java 
b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
new file mode 100644
index 0000000..3070689
--- /dev/null
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.unit;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RpcClientFactory;
+
+/**
+ * A Dummy implementation of RpcClientFactory that does not do RPC.
+ * This is required as OozieClient tries to connect to RM via RPC to kill jobs 
which fails in local mode.
+ */
+public final class LocalFalconRPCClientFactory implements RpcClientFactory {
+
+    private static LocalFalconRPCClientFactory self = new 
LocalFalconRPCClientFactory();
+
+    @Override
+    public Object getClient(Class<?> aClass, long l, InetSocketAddress 
inetSocketAddress, Configuration configuration) {
+        return new LocalFalconApplicationClientProtocolImpl();
+    }
+
+    public static LocalFalconRPCClientFactory get() {
+        return self;
+    }
+
+    private LocalFalconRPCClientFactory() {
+    }
+
+
+    @Override
+    public void stopClient(Object o) {
+
+    }
+
+    /**
+     * Dummy implementation of ApplicationClientProtocol that returns a empty 
list of applications.
+     */
+    public static class LocalFalconApplicationClientProtocolImpl implements 
ApplicationClientProtocol {
+
+        public LocalFalconApplicationClientProtocolImpl() {
+
+        }
+
+        @Override
+        public GetNewApplicationResponse 
getNewApplication(GetNewApplicationRequest getNewApplicationRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public SubmitApplicationResponse 
submitApplication(SubmitApplicationRequest submitApplicationRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public KillApplicationResponse 
forceKillApplication(KillApplicationRequest killApplicationRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetClusterMetricsResponse 
getClusterMetrics(GetClusterMetricsRequest getClusterMetricsRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest 
getClusterNodesRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest 
getQueueInfoRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetQueueUserAclsInfoResponse 
getQueueUserAcls(GetQueueUserAclsInfoRequest getQueueUserAclsInfoRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest
+                moveApplicationAcrossQueuesRequest) throws YarnException, 
IOException {
+            return null;
+        }
+
+        @Override
+        public ReservationSubmissionResponse 
submitReservation(ReservationSubmissionRequest
+                reservationSubmissionRequest) throws YarnException, 
IOException {
+            return null;
+        }
+
+        @Override
+        public ReservationUpdateResponse 
updateReservation(ReservationUpdateRequest reservationUpdateRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public ReservationDeleteResponse 
deleteReservation(ReservationDeleteRequest reservationDeleteRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetNodesToLabelsResponse 
getNodeToLabels(GetNodesToLabelsRequest getNodesToLabelsRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetClusterNodeLabelsResponse 
getClusterNodeLabels(GetClusterNodeLabelsRequest
+                getClusterNodeLabelsRequest) throws YarnException, IOException 
{
+            return null;
+        }
+
+        @Override
+        public GetApplicationReportResponse 
getApplicationReport(GetApplicationReportRequest
+                getApplicationReportRequest) throws YarnException, IOException 
{
+            return null;
+        }
+
+        @Override
+        public GetApplicationsResponse getApplications(GetApplicationsRequest 
getApplicationsRequest)
+            throws YarnException, IOException {
+            return GetApplicationsResponse.newInstance(new 
ArrayList<ApplicationReport>());
+        }
+
+        @Override
+        public GetApplicationAttemptReportResponse 
getApplicationAttemptReport(GetApplicationAttemptReportRequest
+                getApplicationAttemptReportRequest) throws YarnException, 
IOException {
+            return null;
+        }
+
+        @Override
+        public GetApplicationAttemptsResponse 
getApplicationAttempts(GetApplicationAttemptsRequest
+                getApplicationAttemptsRequest) throws YarnException, 
IOException {
+            return null;
+        }
+
+        @Override
+        public GetContainerReportResponse 
getContainerReport(GetContainerReportRequest getContainerReportRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetContainersResponse getContainers(GetContainersRequest 
getContainersRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public GetDelegationTokenResponse 
getDelegationToken(GetDelegationTokenRequest getDelegationTokenRequest)
+            throws YarnException, IOException {
+            return null;
+        }
+
+        @Override
+        public RenewDelegationTokenResponse 
renewDelegationToken(RenewDelegationTokenRequest
+                renewDelegationTokenRequest) throws YarnException, IOException 
{
+            return null;
+        }
+
+        @Override
+        public CancelDelegationTokenResponse 
cancelDelegationToken(CancelDelegationTokenRequest
+                cancelDelegationTokenRequest) throws YarnException, 
IOException {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/a058cf2b/unit/src/main/resources/yarn-site.xml
----------------------------------------------------------------------
diff --git a/unit/src/main/resources/yarn-site.xml 
b/unit/src/main/resources/yarn-site.xml
new file mode 100644
index 0000000..ab89bde
--- /dev/null
+++ b/unit/src/main/resources/yarn-site.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+    <property>
+        <name>yarn.ipc.client.factory.class</name>
+        <value>org.apache.falcon.unit.LocalFalconRPCClientFactory</value>
+    </property>
+
+</configuration>
\ No newline at end of file

Reply via email to