Repository: falcon
Updated Branches:
  refs/heads/master 6b097cb26 -> 9ec4c23a1


FALCON-1847 Execution order not honored when instances are suspended/resumed

Have made the following changes:

1. Order of instances on which we perform action is sorted in desc only for 
status and params. Else, older instances should be suspended/resumed first.
2. Status is now read from DB and only additional info is obtained from Oozie.
3. Listening to suspend notification too, so further instances can be scheduled.
4. Resume does not immediately resume the instance in order to honor 
concurrency.

Author: Pallavi Rao <[email protected]>

Reviewers: Sandeep Samudrala <[email protected]>

Closes #64 from pallavi-rao/1847 and squashes the following commits:

069c223 [Pallavi Rao] FALCON-1847 Fixing the IT
9cd1249 [Pallavi Rao] FALCON-1847 Execution order not honored when instances 
are suspended/resumed


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9ec4c23a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9ec4c23a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9ec4c23a

Branch: refs/heads/master
Commit: 9ec4c23a1ff8a5ed1f202fc4186a920d0975e5fe
Parents: 6b097cb
Author: Pallavi Rao <[email protected]>
Authored: Mon Mar 14 17:36:08 2016 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Mon Mar 14 17:36:08 2016 +0530

----------------------------------------------------------------------
 .../execution/ProcessExecutionInstance.java     |  7 ++++++-
 .../falcon/execution/ProcessExecutor.java       |  5 +----
 .../service/impl/JobCompletionService.java      |  2 +-
 .../service/impl/SchedulerService.java          | 11 +++++++++-
 .../workflow/engine/FalconWorkflowEngine.java   | 12 +++++------
 .../execution/FalconExecutionServiceTest.java   |  4 +++-
 .../apache/falcon/execution/MockDAGEngine.java  | 11 ++++++++++
 .../service/SchedulerServiceTest.java           | 21 ++++++++++++++++++++
 .../InstanceSchedulerManagerJerseyIT.java       |  5 ++++-
 9 files changed, 63 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
 
b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index 8f026b7..2d666c3 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -41,6 +41,7 @@ import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.workflow.engine.DAGEngine;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.falcon.workflow.engine.FalconWorkflowEngine;
 import org.apache.hadoop.fs.Path;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Properties;
 
 
 /**
@@ -269,7 +271,10 @@ public class ProcessExecutionInstance extends 
ExecutionInstance {
     public void resume() throws FalconException {
         // Was already scheduled on the DAGEngine, so resume on DAGEngine if 
suspended
         if (getExternalID() != null) {
-            dagEngine.resume(this);
+            if (getProperties() == null) {
+                setProperties(new Properties());
+            }
+            getProperties().setProperty(FalconWorkflowEngine.FALCON_RESUME, 
"true");
         } else if (awaitedPredicates != null && !awaitedPredicates.isEmpty()) {
             // Evaluate any remaining predicates
             registerForNotifications(true);

http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java 
b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index 745d2ea..0fc68f0 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -267,10 +267,7 @@ public class ProcessExecutor extends EntityExecutor {
     public void resume(ExecutionInstance instance) throws FalconException {
         try {
             instance.resume();
-            if (((ProcessExecutionInstance) instance).isScheduled()) {
-                stateService.handleStateChange(instance, 
InstanceState.EVENT.RESUME_RUNNING, this);
-                onSchedule(instance);
-            } else if (((ProcessExecutionInstance) instance).isReady()) {
+            if (((ProcessExecutionInstance) instance).isReady()) {
                 stateService.handleStateChange(instance, 
InstanceState.EVENT.RESUME_READY, this);
                 onConditionsMet(instance);
             } else {

http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
index cfebf1c..0eb233d 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
@@ -139,7 +139,7 @@ public class JobCompletionService implements 
FalconNotificationService, Workflow
 
     @Override
     public void onSuspend(WorkflowExecutionContext context) throws 
FalconException {
-        // Do nothing
+        onEnd(context, WorkflowJob.Status.SUSPENDED);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index 635fec4..401c57e 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -298,7 +298,9 @@ public class SchedulerService implements 
FalconNotificationService, Notification
                         if (props != null) {
                             isForced = 
Boolean.valueOf(props.getProperty(FalconWorkflowEngine.FALCON_FORCE_RERUN));
                         }
-                        if (isReRun(props)) {
+                        if (isResume(props)) {
+                            
DAGEngineFactory.getDAGEngine(instance.getCluster()).resume(instance);
+                        } else if (isReRun(props)) {
                             
DAGEngineFactory.getDAGEngine(instance.getCluster()).reRun(instance, props, 
isForced);
                         }
                     } else {
@@ -329,6 +331,13 @@ public class SchedulerService implements 
FalconNotificationService, Notification
             return false;
         }
 
+        private boolean isResume(Properties props) {
+            if (props != null && !props.isEmpty()) {
+                return 
Boolean.valueOf(props.getProperty(FalconWorkflowEngine.FALCON_RESUME));
+            }
+            return false;
+        }
+
         public short getPriority() {
             return priority;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
 
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index 77cb2fa..c6d4212 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -68,6 +68,7 @@ public class FalconWorkflowEngine extends 
AbstractWorkflowEngine {
     private static final String FALCON_INSTANCE_ACTION_CLUSTERS = 
"falcon.instance.action.clusters";
     public static final String FALCON_FORCE_RERUN = 
"falcon.system.force.rerun";
     public static final String FALCON_RERUN = "falcon.system.rerun";
+    public static final String FALCON_RESUME = "falcon.system.resume";
 
     private enum JobAction {
         KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS
@@ -220,7 +221,9 @@ public class FalconWorkflowEngine extends 
AbstractWorkflowEngine {
 
         // To ensure compatibility with OozieWorkflowEngine.
         // Also because users would like to see the most recent instances 
first.
-        sortInstancesDescBySequence(instancesToActOn);
+        if (action == JobAction.STATUS || action == JobAction.PARAMS) {
+            sortInstancesDescBySequence(instancesToActOn);
+        }
 
         List<InstancesResult.Instance> instances = new ArrayList<>();
         for (ExecutionInstance ins : instancesToActOn) {
@@ -298,16 +301,13 @@ public class FalconWorkflowEngine extends 
AbstractWorkflowEngine {
             populateInstanceInfo(instanceInfo, instance);
             break;
         case STATUS:
-            // Mask wfParams
-            instanceInfo.wfParams = null;
+            populateInstanceInfo(instanceInfo, instance);
+            // If already scheduled externally, get details for actions
             if (StringUtils.isNotEmpty(instance.getExternalID())) {
                 List<InstancesResult.InstanceAction> instanceActions =
                         
DAGEngineFactory.getDAGEngine(cluster).getJobDetails(instance.getExternalID());
                 instanceInfo.actions = instanceActions
                         .toArray(new 
InstancesResult.InstanceAction[instanceActions.size()]);
-            // If not scheduled externally yet, get details from state
-            } else {
-                populateInstanceInfo(instanceInfo, instance);
             }
             break;
         case PARAMS:

http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
 
b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
index d08f7d4..417ec3e 100644
--- 
a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
+++ 
b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -271,10 +271,12 @@ public class FalconExecutionServiceTest extends 
AbstractSchedulerTestBase {
         FalconExecutionService.get().resume(process);
         instance1 = stateStore.getExecutionInstance(new 
InstanceID(instance1.getInstance()));
         instance2 = stateStore.getExecutionInstance(new 
InstanceID(instance2.getInstance()));
-        Assert.assertEquals(instance1.getCurrentState(), 
InstanceState.STATE.RUNNING);
+        Assert.assertEquals(instance1.getCurrentState(), 
InstanceState.STATE.READY);
         Assert.assertEquals(instance2.getCurrentState(), 
InstanceState.STATE.READY);
 
         // Running should finish after resume
+        event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, 
instance1.getInstance());
+        FalconExecutionService.get().onEvent(event);
         event = 
createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, 
instance1.getInstance());
         FalconExecutionService.get().onEvent(event);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java 
b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
index c99f3fd..245258b 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
@@ -34,6 +34,7 @@ import java.util.Properties;
 public class MockDAGEngine implements DAGEngine {
     private List<ExecutionInstance> failInstances = new ArrayList<>();
     private Map<ExecutionInstance, Integer> runInvocations =  new HashMap<>();
+    private Map<ExecutionInstance, Integer> resumeInvocations =  new 
HashMap<>();
 
     public MockDAGEngine(String cluster) {
 
@@ -68,7 +69,13 @@ public class MockDAGEngine implements DAGEngine {
 
     @Override
     public void resume(ExecutionInstance instance) throws DAGEngineException {
+        Integer count = 1;
+        if (resumeInvocations.containsKey(instance)) {
+            // Increment count
+            count = resumeInvocations.get(instance) + 1;
+        }
 
+        resumeInvocations.put(instance, count);
     }
 
     @Override
@@ -125,4 +132,8 @@ public class MockDAGEngine implements DAGEngine {
     public Integer getTotalRuns(ExecutionInstance instance) {
         return runInvocations.get(instance);
     }
+
+    public Integer getTotalResumes(ExecutionInstance instance) {
+        return resumeInvocations.get(instance);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
 
b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
index a442738..bad2841 100644
--- 
a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
+++ 
b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -45,6 +45,7 @@ import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.util.StateStoreProperties;
 import org.apache.falcon.workflow.engine.DAGEngine;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.falcon.workflow.engine.FalconWorkflowEngine;
 import org.apache.oozie.client.WorkflowJob;
 import org.joda.time.DateTime;
 import org.mockito.Mockito;
@@ -56,6 +57,7 @@ import org.testng.annotations.Test;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Properties;
 
 import static org.apache.falcon.state.InstanceState.STATE;
 
@@ -305,6 +307,25 @@ public class SchedulerServiceTest extends AbstractTestBase 
{
         ((MockDAGEngine)mockDagEngine).removeFailInstance(instance1);
     }
 
+    @Test
+    public void testResume() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize6");
+        Process mockProcess = getStore().get(EntityType.PROCESS, "summarize6");
+        mockProcess.setParallel(1);
+        Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+        ExecutionInstance instance1 = new 
ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+        Properties resumeProps = new Properties();
+        resumeProps.setProperty(FalconWorkflowEngine.FALCON_RESUME, "true");
+        instance1.setProperties(resumeProps);
+        instance1.setExternalID("123");
+        SchedulerService.JobScheduleRequestBuilder request = 
(SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(handler, instance1.getId());
+        request.setInstance(instance1);
+        scheduler.register(request.build());
+        Thread.sleep(100);
+        
Assert.assertEquals(((MockDAGEngine)mockDagEngine).getTotalResumes(instance1), 
new Integer(1));
+    }
+
     /**
      * A mock notification Handler that makes appropriate state changes.
      */

http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
 
b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
index b06725f..28a8fa9 100644
--- 
a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
+++ 
b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
@@ -121,7 +121,10 @@ public class InstanceSchedulerManagerJerseyIT extends 
AbstractSchedulerManagerJe
                 END_TIME, colo, null, null, null, null);
         status = getClient().getInstanceStatus(EntityType.PROCESS.name(),
                 processName, START_INSTANCE);
-        Assert.assertEquals(status, InstancesResult.WorkflowStatus.RUNNING);
+        Assert.assertEquals(status, InstancesResult.WorkflowStatus.READY);
+
+        waitForStatus(EntityType.PROCESS.toString(), processName,
+                START_INSTANCE, InstancesResult.WorkflowStatus.RUNNING);
     }
 
     @Test

Reply via email to