Repository: falcon Updated Branches: refs/heads/master 6b097cb26 -> 9ec4c23a1
FALCON-1847 Execution order not honored when instances are suspended/resumed Have made the following changes: 1. Order of instances on which we perform action is sorted in desc only for status and params. Else, older instances should be suspended/resumed first. 2. Status is now read from DB and only additional info is obtained from Oozie. 3. Listening to suspend notification too, so further instances can be scheduled. 4. Resume does not immediately resume the instance in order to honor concurrency. Author: Pallavi Rao <[email protected]> Reviewers: Sandeep Samudrala <[email protected]> Closes #64 from pallavi-rao/1847 and squashes the following commits: 069c223 [Pallavi Rao] FALCON-1847 Fixing the IT 9cd1249 [Pallavi Rao] FALCON-1847 Execution order not honored when instances are suspended/resumed Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9ec4c23a Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9ec4c23a Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9ec4c23a Branch: refs/heads/master Commit: 9ec4c23a1ff8a5ed1f202fc4186a920d0975e5fe Parents: 6b097cb Author: Pallavi Rao <[email protected]> Authored: Mon Mar 14 17:36:08 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Mar 14 17:36:08 2016 +0530 ---------------------------------------------------------------------- .../execution/ProcessExecutionInstance.java | 7 ++++++- .../falcon/execution/ProcessExecutor.java | 5 +---- .../service/impl/JobCompletionService.java | 2 +- .../service/impl/SchedulerService.java | 11 +++++++++- .../workflow/engine/FalconWorkflowEngine.java | 12 +++++------ .../execution/FalconExecutionServiceTest.java | 4 +++- .../apache/falcon/execution/MockDAGEngine.java | 11 ++++++++++ .../service/SchedulerServiceTest.java | 21 ++++++++++++++++++++ .../InstanceSchedulerManagerJerseyIT.java | 5 ++++- 9 files changed, 63 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java index 8f026b7..2d666c3 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java @@ -41,6 +41,7 @@ import org.apache.falcon.state.InstanceID; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.workflow.engine.DAGEngine; import org.apache.falcon.workflow.engine.DAGEngineFactory; +import org.apache.falcon.workflow.engine.FalconWorkflowEngine; import org.apache.hadoop.fs.Path; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Properties; /** @@ -269,7 +271,10 @@ public class ProcessExecutionInstance extends ExecutionInstance { public void resume() throws FalconException { // Was already scheduled on the DAGEngine, so resume on DAGEngine if suspended if (getExternalID() != null) { - dagEngine.resume(this); + if (getProperties() == null) { + setProperties(new Properties()); + } + getProperties().setProperty(FalconWorkflowEngine.FALCON_RESUME, "true"); } else if (awaitedPredicates != null && !awaitedPredicates.isEmpty()) { // Evaluate any remaining predicates registerForNotifications(true); http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java index 745d2ea..0fc68f0 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java @@ -267,10 +267,7 @@ public class ProcessExecutor extends EntityExecutor { public void resume(ExecutionInstance instance) throws FalconException { try { instance.resume(); - if (((ProcessExecutionInstance) instance).isScheduled()) { - stateService.handleStateChange(instance, InstanceState.EVENT.RESUME_RUNNING, this); - onSchedule(instance); - } else if (((ProcessExecutionInstance) instance).isReady()) { + if (((ProcessExecutionInstance) instance).isReady()) { stateService.handleStateChange(instance, InstanceState.EVENT.RESUME_READY, this); onConditionsMet(instance); } else { http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java index cfebf1c..0eb233d 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java @@ -139,7 +139,7 @@ public class JobCompletionService implements FalconNotificationService, Workflow @Override public void onSuspend(WorkflowExecutionContext context) throws FalconException { - // Do nothing + onEnd(context, WorkflowJob.Status.SUSPENDED); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index 635fec4..401c57e 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -298,7 +298,9 @@ public class SchedulerService implements FalconNotificationService, Notification if (props != null) { isForced = Boolean.valueOf(props.getProperty(FalconWorkflowEngine.FALCON_FORCE_RERUN)); } - if (isReRun(props)) { + if (isResume(props)) { + DAGEngineFactory.getDAGEngine(instance.getCluster()).resume(instance); + } else if (isReRun(props)) { DAGEngineFactory.getDAGEngine(instance.getCluster()).reRun(instance, props, isForced); } } else { @@ -329,6 +331,13 @@ public class SchedulerService implements FalconNotificationService, Notification return false; } + private boolean isResume(Properties props) { + if (props != null && !props.isEmpty()) { + return Boolean.valueOf(props.getProperty(FalconWorkflowEngine.FALCON_RESUME)); + } + return false; + } + public short getPriority() { return priority; } http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index 77cb2fa..c6d4212 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -68,6 +68,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; public static final String FALCON_FORCE_RERUN = "falcon.system.force.rerun"; public static final String FALCON_RERUN = "falcon.system.rerun"; + public static final String FALCON_RESUME = "falcon.system.resume"; private enum JobAction { KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS @@ -220,7 +221,9 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { // To ensure compatibility with OozieWorkflowEngine. // Also because users would like to see the most recent instances first. - sortInstancesDescBySequence(instancesToActOn); + if (action == JobAction.STATUS || action == JobAction.PARAMS) { + sortInstancesDescBySequence(instancesToActOn); + } List<InstancesResult.Instance> instances = new ArrayList<>(); for (ExecutionInstance ins : instancesToActOn) { @@ -298,16 +301,13 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { populateInstanceInfo(instanceInfo, instance); break; case STATUS: - // Mask wfParams - instanceInfo.wfParams = null; + populateInstanceInfo(instanceInfo, instance); + // If already scheduled externally, get details for actions if (StringUtils.isNotEmpty(instance.getExternalID())) { List<InstancesResult.InstanceAction> instanceActions = DAGEngineFactory.getDAGEngine(cluster).getJobDetails(instance.getExternalID()); instanceInfo.actions = instanceActions .toArray(new InstancesResult.InstanceAction[instanceActions.size()]); - // If not scheduled externally yet, get details from state - } else { - populateInstanceInfo(instanceInfo, instance); } break; case PARAMS: http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java index d08f7d4..417ec3e 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java @@ -271,10 +271,12 @@ public class FalconExecutionServiceTest extends AbstractSchedulerTestBase { FalconExecutionService.get().resume(process); instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance())); instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance())); - Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING); + Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY); Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY); // Running should finish after resume + event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance()); + FalconExecutionService.get().onEvent(event); event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance1.getInstance()); FalconExecutionService.get().onEvent(event); http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java index c99f3fd..245258b 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java @@ -34,6 +34,7 @@ import java.util.Properties; public class MockDAGEngine implements DAGEngine { private List<ExecutionInstance> failInstances = new ArrayList<>(); private Map<ExecutionInstance, Integer> runInvocations = new HashMap<>(); + private Map<ExecutionInstance, Integer> resumeInvocations = new HashMap<>(); public MockDAGEngine(String cluster) { @@ -68,7 +69,13 @@ public class MockDAGEngine implements DAGEngine { @Override public void resume(ExecutionInstance instance) throws DAGEngineException { + Integer count = 1; + if (resumeInvocations.containsKey(instance)) { + // Increment count + count = resumeInvocations.get(instance) + 1; + } + resumeInvocations.put(instance, count); } @Override @@ -125,4 +132,8 @@ public class MockDAGEngine implements DAGEngine { public Integer getTotalRuns(ExecutionInstance instance) { return runInvocations.get(instance); } + + public Integer getTotalResumes(ExecutionInstance instance) { + return resumeInvocations.get(instance); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java index a442738..bad2841 100644 --- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java @@ -45,6 +45,7 @@ import org.apache.falcon.util.StartupProperties; import org.apache.falcon.util.StateStoreProperties; import org.apache.falcon.workflow.engine.DAGEngine; import org.apache.falcon.workflow.engine.DAGEngineFactory; +import org.apache.falcon.workflow.engine.FalconWorkflowEngine; import org.apache.oozie.client.WorkflowJob; import org.joda.time.DateTime; import org.mockito.Mockito; @@ -56,6 +57,7 @@ import org.testng.annotations.Test; import java.util.ArrayList; import java.util.Date; +import java.util.Properties; import static org.apache.falcon.state.InstanceState.STATE; @@ -305,6 +307,25 @@ public class SchedulerServiceTest extends AbstractTestBase { ((MockDAGEngine)mockDagEngine).removeFailInstance(instance1); } + @Test + public void testResume() throws Exception { + storeEntity(EntityType.PROCESS, "summarize6"); + Process mockProcess = getStore().get(EntityType.PROCESS, "summarize6"); + mockProcess.setParallel(1); + Date startTime = EntityUtil.getStartTime(mockProcess, cluster); + ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster); + Properties resumeProps = new Properties(); + resumeProps.setProperty(FalconWorkflowEngine.FALCON_RESUME, "true"); + instance1.setProperties(resumeProps); + instance1.setExternalID("123"); + SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(handler, instance1.getId()); + request.setInstance(instance1); + scheduler.register(request.build()); + Thread.sleep(100); + Assert.assertEquals(((MockDAGEngine)mockDagEngine).getTotalResumes(instance1), new Integer(1)); + } + /** * A mock notification Handler that makes appropriate state changes. */ http://git-wip-us.apache.org/repos/asf/falcon/blob/9ec4c23a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java index b06725f..28a8fa9 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java @@ -121,7 +121,10 @@ public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJe END_TIME, colo, null, null, null, null); status = getClient().getInstanceStatus(EntityType.PROCESS.name(), processName, START_INSTANCE); - Assert.assertEquals(status, InstancesResult.WorkflowStatus.RUNNING); + Assert.assertEquals(status, InstancesResult.WorkflowStatus.READY); + + waitForStatus(EntityType.PROCESS.toString(), processName, + START_INSTANCE, InstancesResult.WorkflowStatus.RUNNING); } @Test
