http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java new file mode 100644 index 0000000..ca2010b --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.workflow.engine; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.exception.DAGEngineException; +import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; +import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.util.RuntimeProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.client.Job; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.OozieClientException; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowJob; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A DAG Engine that uses Oozie to execute the DAG. + */ +public class OozieDAGEngine implements DAGEngine { + private static final Logger LOG = LoggerFactory.getLogger(OozieDAGEngine.class); + private final OozieClient client; + private static final int WORKFLOW_STATUS_RETRY_DELAY_MS = 100; + + private static final String WORKFLOW_STATUS_RETRY_COUNT = "workflow.status.retry.count"; + private static final List<String> PARENT_WF_ACTION_NAMES = Arrays.asList( + "pre-processing", + "recordsize", + "succeeded-post-processing", + "failed-post-processing" + ); + + public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; + private final Cluster cluster; + + public OozieDAGEngine(Cluster cluster) throws DAGEngineException { + try { + client = OozieClientFactory.get(cluster); + this.cluster = cluster; + } catch (Exception e) { + throw new DAGEngineException(e); + } + } + + public OozieDAGEngine(String clusterName) throws DAGEngineException { + try { + this.cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName); + client = OozieClientFactory.get(cluster); + } catch (Exception e) { + throw new DAGEngineException(e); + } + } + + @Override + public String run(ExecutionInstance instance) throws DAGEngineException { + try { + Properties properties = getRunProperties(instance); + Path buildPath = EntityUtil.getLatestStagingPath(cluster, instance.getEntity()); + switchUser(); + properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser()); + properties.setProperty(OozieClient.APP_PATH, buildPath.toString()); + return client.run(properties); + } catch (OozieClientException e) { + LOG.error("Ozie client exception:", e); + throw new DAGEngineException(e); + } catch (FalconException e1) { + LOG.error("Falcon Exception : ", e1); + throw new DAGEngineException(e1); + } + } + + private void prepareEntityBuildPath(Entity entity) throws FalconException { + Path stagingPath = EntityUtil.getBaseStagingPath(cluster, entity); + Path logPath = EntityUtil.getLogPath(cluster, entity); + + try { + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( + ClusterHelper.getConfiguration(cluster)); + HadoopClientFactory.mkdirsWithDefaultPerms(fs, stagingPath); + HadoopClientFactory.mkdirsWithDefaultPerms(fs, logPath); + } catch (IOException e) { + throw new FalconException("Error preparing base staging dirs: " + stagingPath, e); + } + } + + private void dryRunInternal(Properties props) throws OozieClientException { + LOG.info("Dry run with properties {}", props); + client.dryrun(props); + } + + private void switchUser() { + String user = System.getProperty("user.name"); + CurrentUser.authenticate(user); + } + + @Override + public boolean isScheduled(ExecutionInstance instance) throws DAGEngineException { + try { + return statusEquals(client.getJobInfo(instance.getExternalID()).getStatus().name(), + Job.Status.PREP, Job.Status.RUNNING); + } catch (OozieClientException e) { + throw new DAGEngineException(e); + } + } + + // TODO : To be implemented. Currently hardcoded for process + private Properties getRunProperties(ExecutionInstance instance) { + Properties props = new Properties(); + DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT); + String nominalTime = fmt.print(instance.getInstanceTime()); + props.put("nominalTime", nominalTime); + props.put("timeStamp", nominalTime); + props.put("feedNames", "NONE"); + props.put("feedInstancePaths", "NONE"); + props.put("falconInputFeeds", "NONE"); + props.put("falconInPaths", "NONE"); + props.put("feedNames", "NONE"); + props.put("feedInstancePaths", "NONE"); + props.put("userJMSNotificationEnabled", "true"); + return props; + } + + // TODO : To be implemented. Currently hardcoded for process + private Properties getDryRunProperties(Entity entity) { + Properties props = new Properties(); + DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT); + String nominalTime = fmt.print(DateTime.now()); + props.put("nominalTime", nominalTime); + props.put("timeStamp", nominalTime); + props.put("feedNames", "NONE"); + props.put("feedInstancePaths", "NONE"); + props.put("falconInputFeeds", "NONE"); + props.put("falconInPaths", "NONE"); + props.put("feedNames", "NONE"); + props.put("feedInstancePaths", "NONE"); + props.put("userJMSNotificationEnabled", "true"); + return props; + } + + @Override + public void suspend(ExecutionInstance instance) throws DAGEngineException { + try { + client.suspend(instance.getExternalID()); + assertStatus(instance.getExternalID(), Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Job.Status.SUCCEEDED, + Job.Status.FAILED, Job.Status.KILLED); + LOG.info("Suspended job {} on cluster {}", instance.getExternalID(), instance.getCluster()); + } catch (OozieClientException e) { + throw new DAGEngineException(e); + } + } + + @Override + public void resume(ExecutionInstance instance) throws DAGEngineException { + try { + client.resume(instance.getExternalID()); + assertStatus(instance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED, + Job.Status.FAILED, Job.Status.KILLED); + LOG.info("Resumed job {} on cluster {}", instance.getExternalID(), instance.getCluster()); + } catch (OozieClientException e) { + throw new DAGEngineException(e); + } + } + + @Override + public void kill(ExecutionInstance instance) throws DAGEngineException { + try { + client.kill(instance.getExternalID()); + assertStatus(instance.getExternalID(), Job.Status.KILLED, Job.Status.SUCCEEDED, Job.Status.FAILED); + LOG.info("Killed job {} on cluster {}", instance.getExternalID(), instance.getCluster()); + } catch (OozieClientException e) { + throw new DAGEngineException(e); + } + } + + @Override + public void reRun(ExecutionInstance instance) throws DAGEngineException { + // TODO : Implement this + } + + @Override + public void submit(Entity entity) throws DAGEngineException { + try { + // TODO : remove hardcoded Tag value when feed support is added. + OozieOrchestrationWorkflowBuilder builder = + OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT); + prepareEntityBuildPath(entity); + Path buildPath = EntityUtil.getNewStagingPath(cluster, entity); + Properties properties = builder.build(cluster, buildPath); + if (properties == null) { + LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(), cluster); + throw new DAGEngineException("Properties for entity " + entity.getName() + " is empty"); + } + + switchUser(); + LOG.debug("Logged in user is " + CurrentUser.getUser()); + properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser()); + properties.setProperty(OozieClient.APP_PATH, buildPath.toString()); + properties.putAll(getDryRunProperties(entity)); + //Do submit before run as run is asynchronous + dryRunInternal(properties); + } catch (OozieClientException e) { + LOG.error("Oozie client exception:", e); + throw new DAGEngineException(e); + } catch (FalconException e1) { + LOG.error("Falcon Exception : ", e1); + throw new DAGEngineException(e1); + } + } + + @Override + public InstancesResult.Instance info(String externalID) throws DAGEngineException { + InstancesResult.Instance instance = new InstancesResult.Instance(); + try { + LOG.debug("Retrieving details for job {} ", externalID); + WorkflowJob jobInfo = client.getJobInfo(externalID); + instance.startTime = jobInfo.getStartTime(); + if (jobInfo.getStatus().name().equals(Job.Status.RUNNING.name())) { + instance.endTime = new Date(); + } else { + instance.endTime = jobInfo.getEndTime(); + } + instance.cluster = cluster.getName(); + instance.runId = jobInfo.getRun(); + instance.status = InstancesResult.WorkflowStatus.valueOf(jobInfo.getStatus().name()); + instance.logFile = jobInfo.getConsoleUrl(); + instance.wfParams = getWFParams(jobInfo); + return instance; + } catch (Exception e) { + LOG.error("Error when attempting to get info for " + externalID, e); + throw new DAGEngineException(e); + } + } + + private InstancesResult.KeyValuePair[] getWFParams(WorkflowJob jobInfo) { + Configuration conf = new Configuration(false); + conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes())); + InstancesResult.KeyValuePair[] wfParams = new InstancesResult.KeyValuePair[conf.size()]; + int i = 0; + for (Map.Entry<String, String> entry : conf) { + wfParams[i++] = new InstancesResult.KeyValuePair(entry.getKey(), entry.getValue()); + } + return wfParams; + } + + @Override + public List<InstancesResult.InstanceAction> getJobDetails(String externalID) throws DAGEngineException { + List<InstancesResult.InstanceAction> instanceActions = new ArrayList<>(); + try { + WorkflowJob wfJob = client.getJobInfo(externalID); + List<WorkflowAction> wfActions = wfJob.getActions(); + // We wanna capture job urls for all user-actions & non succeeded actions of the main workflow + for (WorkflowAction action : wfActions) { + if (action.getType().equalsIgnoreCase("sub-workflow") + && StringUtils.isNotEmpty(action.getExternalId())) { + // if the action is sub-workflow, get job urls of all actions within the sub-workflow + List<WorkflowAction> subWorkFlowActions = client + .getJobInfo(action.getExternalId()).getActions(); + for (WorkflowAction subWfAction : subWorkFlowActions) { + if (!subWfAction.getType().startsWith(":")) { + InstancesResult.InstanceAction instanceAction = + new InstancesResult.InstanceAction(subWfAction.getName(), + subWfAction.getExternalStatus(), subWfAction.getConsoleUrl()); + instanceActions.add(instanceAction); + } + } + } else if (!action.getType().startsWith(":")) { + // if the action is a transition node it starts with :, we don't need their statuses + if (PARENT_WF_ACTION_NAMES.contains(action.getName()) + && !Job.Status.SUCCEEDED.toString().equals(action.getExternalStatus())) { + // falcon actions in the main workflow are defined in the list + // get job urls for all non succeeded actions of the main workflow + InstancesResult.InstanceAction instanceAction = + new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(), + action.getConsoleUrl()); + instanceActions.add(instanceAction); + } else if (!PARENT_WF_ACTION_NAMES.contains(action.getName()) + && !StringUtils.equals(action.getExternalId(), "-")) { + // if user-action is pig/hive there is no sub-workflow, we wanna capture their urls as well + InstancesResult.InstanceAction instanceAction = + new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(), + action.getConsoleUrl()); + instanceActions.add(instanceAction); + } + } + } + return instanceActions; + } catch (OozieClientException oce) { + throw new DAGEngineException(oce); + } + } + + @Override + public boolean isAlive() throws DAGEngineException { + try { + return client.getSystemMode() == OozieClient.SYSTEM_MODE.NORMAL; + } catch (OozieClientException e) { + throw new DAGEngineException("Unable to reach Oozie server.", e); + } + } + + @Override + public Properties getConfiguration(String externalID) throws DAGEngineException { + Properties props = new Properties(); + try { + WorkflowJob jobInfo = client.getJobInfo(externalID); + Configuration conf = new Configuration(false); + conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes())); + + for (Map.Entry<String, String> entry : conf) { + props.put(entry.getKey(), entry.getValue()); + } + } catch (OozieClientException e) { + throw new DAGEngineException(e); + } + + return props; + } + + // Get status of a workflow (with retry) and ensure it is one of statuses requested. + private void assertStatus(String jobID, Job.Status... statuses) throws DAGEngineException { + String actualStatus = null; + int retryCount; + String retry = RuntimeProperties.get().getProperty(WORKFLOW_STATUS_RETRY_COUNT, "30"); + try { + retryCount = Integer.valueOf(retry); + } catch (NumberFormatException nfe) { + throw new DAGEngineException("Invalid value provided for runtime property \"" + + WORKFLOW_STATUS_RETRY_COUNT + "\". Please provide an integer value."); + } + for (int counter = 0; counter < retryCount; counter++) { + try { + actualStatus = client.getJobInfo(jobID).getStatus().name(); + } catch (OozieClientException e) { + LOG.error("Unable to get status of workflow: " + jobID, e); + throw new DAGEngineException(e); + } + if (!statusEquals(actualStatus, statuses)) { + try { + Thread.sleep(WORKFLOW_STATUS_RETRY_DELAY_MS); + } catch (InterruptedException ignore) { + //ignore + } + } else { + return; + } + } + throw new DAGEngineException("For Job" + jobID + ", actual statuses: " + actualStatus + ", expected statuses: " + + Arrays.toString(statuses)); + } + + private boolean statusEquals(String left, Job.Status... right) { + for (Job.Status rightElement : right) { + if (left.equals(rightElement.name())) { + return true; + } + } + return false; + } +}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java new file mode 100644 index 0000000..b2f9e59 --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java @@ -0,0 +1,557 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.execution; + +import org.apache.falcon.FalconException; +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.notification.service.event.DataEvent; +import org.apache.falcon.notification.service.event.Event; +import org.apache.falcon.notification.service.event.JobCompletedEvent; +import org.apache.falcon.notification.service.event.JobScheduledEvent; +import org.apache.falcon.notification.service.event.TimeElapsedEvent; +import org.apache.falcon.notification.service.impl.AlarmService; +import org.apache.falcon.notification.service.impl.DataAvailabilityService; +import org.apache.falcon.notification.service.impl.JobCompletionService; +import org.apache.falcon.notification.service.impl.SchedulerService; +import org.apache.falcon.service.Services; +import org.apache.falcon.state.EntityState; +import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceState; +import org.apache.falcon.state.store.AbstractStateStore; +import org.apache.falcon.state.store.InMemoryStateStore; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.engine.DAGEngine; +import org.apache.falcon.workflow.engine.DAGEngineFactory; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.client.OozieClientException; +import org.apache.oozie.client.WorkflowJob; +import org.joda.time.DateTime; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; + +/** + * Tests the API of FalconExecution Service and in turn the FalconExecutionService.get()s. + */ +public class FalconExecutionServiceTest extends AbstractTestBase { + + private InMemoryStateStore stateStore = null; + private AlarmService mockTimeService; + private DataAvailabilityService mockDataService; + private SchedulerService mockSchedulerService; + private JobCompletionService mockCompletionService; + private DAGEngine dagEngine; + private int instanceCount = 0; + + @BeforeClass + public void init() throws Exception { + this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); + this.conf = dfsCluster.getConf(); + setupServices(); + setupConfigStore(); + } + + @AfterClass + public void tearDown() { + this.dfsCluster.shutdown(); + } + + // State store is set up to sync with Config Store. That gets tested too. + public void setupConfigStore() throws Exception { + stateStore = (InMemoryStateStore) AbstractStateStore.get(); + getStore().registerListener(stateStore); + storeEntity(EntityType.CLUSTER, "testCluster"); + storeEntity(EntityType.FEED, "clicksFeed"); + storeEntity(EntityType.FEED, "clicksSummary"); + } + + public void setupServices() throws FalconException, OozieClientException { + mockTimeService = Mockito.mock(AlarmService.class); + Mockito.when(mockTimeService.getName()).thenReturn("AlarmService"); + Mockito.when(mockTimeService.createRequestBuilder(Mockito.any(NotificationHandler.class), + Mockito.any(ID.class))).thenCallRealMethod(); + + mockDataService = Mockito.mock(DataAvailabilityService.class); + Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService"); + Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class), + Mockito.any(ID.class))).thenCallRealMethod(); + mockSchedulerService = Mockito.mock(SchedulerService.class); + Mockito.when(mockSchedulerService.getName()).thenReturn("JobSchedulerService"); + StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName()); + StartupProperties.get().setProperty("execution.service.impl", FalconExecutionService.class.getName()); + dagEngine = Mockito.spy(DAGEngineFactory.getDAGEngine("testCluster")); + Mockito.when(mockSchedulerService.createRequestBuilder(Mockito.any(NotificationHandler.class), + Mockito.any(ID.class))).thenCallRealMethod(); + mockCompletionService = Mockito.mock(JobCompletionService.class); + Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService"); + Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class), + Mockito.any(ID.class))).thenCallRealMethod(); + Services.get().register(mockTimeService); + Services.get().register(mockDataService); + Services.get().register(mockSchedulerService); + Services.get().register(mockCompletionService); + Services.get().register(FalconExecutionService.get()); + } + + @BeforeMethod + private void setupStateStore() throws FalconException { + stateStore.clear(); + } + + @Test + public void testBasicFlow() throws Exception { + storeEntity(EntityType.PROCESS, "summarize1"); + Process process = getStore().get(EntityType.PROCESS, "summarize1"); + Assert.assertNotNull(process); + ID processKey = new ID(process); + String clusterName = dfsCluster.getCluster().getName(); + + // Schedule a process + Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SUBMITTED); + FalconExecutionService.get().schedule(process); + Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SCHEDULED); + + // Simulate a time notification + Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName); + FalconExecutionService.get().onEvent(event); + + // Ensure an instance is triggered and registers for data notification + Assert.assertEquals(stateStore.getAllExecutionInstances(process, clusterName).size(), 1); + InstanceState instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next(); + Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.WAITING); + + // Simulate a data notification + event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance.getInstance()); + FalconExecutionService.get().onEvent(event); + + // Ensure the instance is ready for execution + Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.READY); + + // Simulate a scheduled notification + event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance.getInstance()); + FalconExecutionService.get().onEvent(event); + + // Ensure the instance is running + instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next(); + Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.RUNNING); + + // Simulate a job complete notification + event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance.getInstance()); + FalconExecutionService.get().onEvent(event); + + // Ensure the instance is in succeeded and is in the state store + instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next(); + Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.SUCCEEDED); + } + + @Test + public void testSuspendResume() throws Exception { + Mockito.doNothing().when(dagEngine).resume(Mockito.any(ExecutionInstance.class)); + storeEntity(EntityType.PROCESS, "summarize2"); + Process process = getStore().get(EntityType.PROCESS, "summarize2"); + Assert.assertNotNull(process); + String clusterName = dfsCluster.getCluster().getName(); + ID processID = new ID(process, clusterName); + + // Schedule a process + Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED); + FalconExecutionService.get().schedule(process); + + // Simulate two time notifications + Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName); + FalconExecutionService.get().onEvent(event); + event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName); + FalconExecutionService.get().onEvent(event); + + // Suspend and resume All waiting - check for notification deregistration + + Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator(); + InstanceState instance1 = (InstanceState) i.next(); + InstanceState instance2 = (InstanceState) i.next(); + + // Simulate a data notification + event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance()); + FalconExecutionService.get().onEvent(event); + + // One in ready and one in waiting. Both should be suspended. + Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY); + Assert.assertEquals(instance1.getInstance().getAwaitingPredicates().size(), 0); + Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING); + + FalconExecutionService.get().suspend(process); + + Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED); + Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED); + Mockito.verify(mockDataService).unregister(FalconExecutionService.get(), + instance1.getInstance().getId()); + Mockito.verify(mockDataService).unregister(FalconExecutionService.get(), + instance2.getInstance().getId()); + Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID); + + FalconExecutionService.get().resume(process); + Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY); + Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING); + + // Simulate a data notification and a job run notification + event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance()); + FalconExecutionService.get().onEvent(event); + + event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance()); + FalconExecutionService.get().onEvent(event); + + // One in running and the other in ready. Both should be suspended + Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING); + Mockito.when(dagEngine.isScheduled(instance1.getInstance())).thenReturn(true); + Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY); + + FalconExecutionService.get().suspend(process); + + Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED); + Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED); + + FalconExecutionService.get().resume(process); + + Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING); + Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY); + + // Running should finish after resume + event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance1.getInstance()); + FalconExecutionService.get().onEvent(event); + + Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUCCEEDED); + } + + @Test + // Kill waiting, ready, running - check for notification deregistration + public void testDelete() throws Exception { + storeEntity(EntityType.PROCESS, "summarize4"); + Process process = getStore().get(EntityType.PROCESS, "summarize4"); + Assert.assertNotNull(process); + String clusterName = dfsCluster.getCluster().getName(); + ID processID = new ID(process, clusterName); + + // Schedule a process + Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED); + FalconExecutionService.get().schedule(process); + + // Simulate three time notifications + Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName); + FalconExecutionService.get().onEvent(event); + event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName); + FalconExecutionService.get().onEvent(event); + event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName); + FalconExecutionService.get().onEvent(event); + + // Suspend and resume All waiting - check for notification deregistration + Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator(); + InstanceState instance1 = (InstanceState) i.next(); + InstanceState instance2 = (InstanceState) i.next(); + InstanceState instance3 = (InstanceState) i.next(); + + // Simulate two data notifications and one job run + event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance()); + FalconExecutionService.get().onEvent(event); + event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance()); + FalconExecutionService.get().onEvent(event); + event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance()); + FalconExecutionService.get().onEvent(event); + + // One in ready, one in waiting and one running. + Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING); + Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY); + Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING); + + FalconExecutionService.get().delete(process); + + // Deregister from notification services + Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID); + } + + @Test + public void testTimeOut() throws Exception { + storeEntity(EntityType.PROCESS, "summarize3"); + Process process = getStore().get(EntityType.PROCESS, "summarize3"); + Assert.assertNotNull(process); + String clusterName = dfsCluster.getCluster().getName(); + ID processID = new ID(process, clusterName); + + // Schedule a process + Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED); + FalconExecutionService.get().schedule(process); + + // Simulate time notification + Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName); + FalconExecutionService.get().onEvent(event); + + // Simulate data unavailable notification and timeout + InstanceState instanceState = stateStore.getAllExecutionInstances(process, clusterName).iterator().next(); + ((Process) instanceState.getInstance().getEntity()).setTimeout(new Frequency("minutes(0)")); + DataEvent dataEvent = (DataEvent) createEvent(NotificationServicesRegistry.SERVICE.DATA, + instanceState.getInstance()); + dataEvent.setStatus(DataEvent.STATUS.UNAVAILABLE); + + FalconExecutionService.get().onEvent(dataEvent); + + Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.TIMED_OUT); + } + + // Non-triggering event should not create an instance + @Test + public void testNonTriggeringEvents() throws Exception { + storeEntity(EntityType.PROCESS, "summarize6"); + Process process = getStore().get(EntityType.PROCESS, "summarize6"); + Assert.assertNotNull(process); + String clusterName = dfsCluster.getCluster().getName(); + ID processID = new ID(process, clusterName); + + // Schedule a process + Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED); + FalconExecutionService.get().schedule(process); + Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED); + + // Simulate data notification with a callback of the process. + Event event = createEvent(NotificationServicesRegistry.SERVICE.DATA, process, clusterName); + FalconExecutionService.get().onEvent(event); + + // No instances should get triggered + Assert.assertTrue(stateStore.getAllExecutionInstances(process, clusterName).isEmpty()); + } + + // Individual instance suspend, resume, kill + @Test + public void testInstanceOperations() throws Exception { + storeEntity(EntityType.PROCESS, "summarize5"); + Process process = getStore().get(EntityType.PROCESS, "summarize5"); + Assert.assertNotNull(process); + String clusterName = dfsCluster.getCluster().getName(); + ID processID = new ID(process, clusterName); + + // Schedule a process + Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED); + FalconExecutionService.get().schedule(process); + + // Simulate three time notifications + Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName); + FalconExecutionService.get().onEvent(event); + event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName); + FalconExecutionService.get().onEvent(event); + event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName); + FalconExecutionService.get().onEvent(event); + + // Suspend and resume All waiting - check for notification deregistration + Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator(); + InstanceState instance1 = (InstanceState) i.next(); + InstanceState instance2 = (InstanceState) i.next(); + InstanceState instance3 = (InstanceState) i.next(); + + // Simulate two data notifications and one job run + event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance()); + FalconExecutionService.get().onEvent(event); + event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance()); + FalconExecutionService.get().onEvent(event); + event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance()); + FalconExecutionService.get().onEvent(event); + + // One in ready, one in waiting and one running. + Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING); + Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY); + Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING); + + EntityExecutor entityExecutor = FalconExecutionService.get().getEntityExecutor(process, clusterName); + Assert.assertNotNull(entityExecutor); + Collection<ExecutionInstance> instances = new ArrayList<>(); + for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) { + instances.add(instanceState.getInstance()); + } + + for (ExecutionInstance instance : instances) { + entityExecutor.suspend(instance); + } + + // Instances must be suspended, but, entity itself must not be. + for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) { + Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.SUSPENDED); + Mockito.verify(mockDataService).unregister(FalconExecutionService.get(), + instanceState.getInstance().getId()); + } + Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED); + + for (ExecutionInstance instance : instances) { + entityExecutor.resume(instance); + } + // Back to one in ready, one in waiting and one running. + Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING); + Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY); + Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING); + + for (ExecutionInstance instance : instances) { + entityExecutor.kill(instance); + } + + // Instances must be killed, but, entity itself must not be. + for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) { + Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.KILLED); + } + Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED); + } + + @Test(priority = -1) + // Add some entities and instance in the store and ensure the executor picks up from where it left + public void testSystemRestart() throws Exception { + storeEntity(EntityType.PROCESS, "summarize7"); + Process process = getStore().get(EntityType.PROCESS, "summarize7"); + Assert.assertNotNull(process); + String clusterName = dfsCluster.getCluster().getName(); + ID processID = new ID(process, clusterName); + + // Store couple of instances in store + stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED); + ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process, + new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName); + InstanceState instanceState1 = new InstanceState(instance1); + instanceState1.setCurrentState(InstanceState.STATE.RUNNING); + stateStore.putExecutionInstance(instanceState1); + ProcessExecutionInstance instance2 = new ProcessExecutionInstance(process, + new DateTime(System.currentTimeMillis() - 30 * 60 * 1000), clusterName); + InstanceState instanceState2 = new InstanceState(instance2); + instanceState2.setCurrentState(InstanceState.STATE.READY); + stateStore.putExecutionInstance(instanceState2); + + FalconExecutionService.get().init(); + + // Simulate a scheduled notification. This should cause the reload from state store + Event event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instanceState2.getInstance()); + FalconExecutionService.get().onEvent(event); + Assert.assertEquals(instanceState2.getCurrentState(), InstanceState.STATE.RUNNING); + + // Simulate a Job completion notification and ensure the instance resumes from where it left + event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instanceState1.getInstance()); + FalconExecutionService.get().onEvent(event); + Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.SUCCEEDED); + } + + @Test(dataProvider = "actions") + public void testPartialFailures(String name, String action, InstanceState.STATE state) throws Exception { + storeEntity(EntityType.PROCESS, name); + Process process = getStore().get(EntityType.PROCESS, name); + Assert.assertNotNull(process); + String clusterName = dfsCluster.getCluster().getName(); + ID processID = new ID(process, clusterName); + + // Schedule the process + FalconExecutionService.get().schedule(process); + + // Store couple of instances in store + stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED); + ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process, + new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName); + instance1.setExternalID("123"); + InstanceState instanceState1 = new InstanceState(instance1); + instanceState1.setCurrentState(InstanceState.STATE.RUNNING); + stateStore.putExecutionInstance(instanceState1); + ProcessExecutionInstance instance2 = new ProcessExecutionInstance(process, + new DateTime(System.currentTimeMillis() - 30 * 60 * 1000), clusterName); + InstanceState instanceState2 = new InstanceState(instance2); + instanceState2.setCurrentState(InstanceState.STATE.READY); + stateStore.putExecutionInstance(instanceState2); + + // Mock failure + ((MockDAGEngine)dagEngine).addFailInstance(instance1); + Method m = FalconExecutionService.get().getClass().getMethod(action, Entity.class); + try { + m.invoke(FalconExecutionService.get(), process); + Assert.fail("Exception expected."); + } catch (Exception e) { + // One instance must fail and the other not + Assert.assertEquals(instanceState2.getCurrentState(), state); + Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.RUNNING); + } + + // throw no exception + ((MockDAGEngine)dagEngine).removeFailInstance(instance1); + m.invoke(FalconExecutionService.get(), process); + + // Both instances must be in expected state. + Assert.assertEquals(instanceState2.getCurrentState(), state); + Assert.assertEquals(instanceState1.getCurrentState(), state); + } + + @DataProvider(name = "actions") + public Object[][] testActions() { + return new Object[][] { + {"summarize8", "suspend", InstanceState.STATE.SUSPENDED}, + {"summarize9", "delete", InstanceState.STATE.KILLED}, + }; + } + + private Event createEvent(NotificationServicesRegistry.SERVICE type, Process process, String cluster) { + ID id = new ID(process, cluster); + switch (type) { + case TIME: + Date start = process.getClusters().getClusters().get(0).getValidity().getStart(); + long instanceOffset = + SchedulerUtil.getFrequencyInMillis(DateTime.now(), process.getFrequency()) * instanceCount++; + return new TimeElapsedEvent(id, new DateTime(start), + new DateTime(process.getClusters().getClusters().get(0).getValidity().getEnd()), + new DateTime(start.getTime() + instanceOffset)); + case DATA: + DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA, + DataEvent.STATUS.AVAILABLE); + return dataEvent; + default: + return null; + } + } + + private Event createEvent(NotificationServicesRegistry.SERVICE type, ExecutionInstance instance) { + ID id = new ID(instance); + switch (type) { + case DATA: + DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA, + DataEvent.STATUS.AVAILABLE); + return dataEvent; + case JOB_SCHEDULE: + JobScheduledEvent scheduledEvent = new JobScheduledEvent(id, JobScheduledEvent.STATUS.SUCCESSFUL); + scheduledEvent.setExternalID("234"); + return scheduledEvent; + case JOB_COMPLETION: + JobCompletedEvent jobEvent = new JobCompletedEvent(id, WorkflowJob.Status.SUCCEEDED, DateTime.now()); + return jobEvent; + default: + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java new file mode 100644 index 0000000..087114f --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.execution; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.exception.DAGEngineException; +import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.workflow.engine.DAGEngine; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A Mock DAG Execution Engine used for tests. + */ +public class MockDAGEngine implements DAGEngine { + private List<ExecutionInstance> failInstances = new ArrayList<>(); + private Map<ExecutionInstance, Integer> runInvocations = new HashMap<>(); + + public MockDAGEngine(String cluster) { + + } + + @Override + public String run(ExecutionInstance instance) throws DAGEngineException { + if (failInstances.contains(instance)) { + throw new DAGEngineException("Mock failure."); + } + Integer count = 1; + if (runInvocations.containsKey(instance)) { + // Increment count + count = runInvocations.get(instance) + 1; + } + + runInvocations.put(instance, count); + return "123"; + } + + @Override + public boolean isScheduled(ExecutionInstance instance) throws DAGEngineException { + return true; + } + + @Override + public void suspend(ExecutionInstance instance) throws DAGEngineException { + if (failInstances.contains(instance)) { + throw new DAGEngineException("mock failure."); + } + } + + @Override + public void resume(ExecutionInstance instance) throws DAGEngineException { + + } + + @Override + public void kill(ExecutionInstance instance) throws DAGEngineException { + if (failInstances.contains(instance)) { + throw new DAGEngineException("mock failure."); + } + } + + @Override + public void reRun(ExecutionInstance instance) throws DAGEngineException { + + } + + @Override + public void submit(Entity entity) throws DAGEngineException { + + } + + @Override + public InstancesResult.Instance info(String externalID) throws DAGEngineException { + return new InstancesResult.Instance(); + } + + @Override + public List<InstancesResult.InstanceAction> getJobDetails(String externalID) throws DAGEngineException { + return null; + } + + @Override + public boolean isAlive() throws DAGEngineException { + return false; + } + + @Override + public Properties getConfiguration(String externalID) throws DAGEngineException { + return null; + } + + public void addFailInstance(ExecutionInstance failInstance) { + this.failInstances.add(failInstance); + } + + public void removeFailInstance(ExecutionInstance failInstance) { + this.failInstances.remove(failInstance); + } + + public Integer getTotalRuns(ExecutionInstance instance) { + return runInvocations.get(instance); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java b/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java new file mode 100644 index 0000000..9457454 --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.execution; + +import org.apache.falcon.entity.v0.Frequency; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Test for Utility methods. + */ +public class SchedulerUtilTest { + + @Test(dataProvider = "frequencies") + public void testGetFrequencyInMillis(DateTime referenceTime, Frequency frequency, long expectedValue) { + Assert.assertEquals(SchedulerUtil.getFrequencyInMillis(referenceTime, frequency), expectedValue); + } + + @DataProvider(name = "frequencies") + public Object[][] getTestFrequencies() { + DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss"); + return new Object[][] { + {DateTime.now(), new Frequency("minutes(10)"), 10*60*1000L}, + {DateTime.now(), new Frequency("hours(6)"), 6*60*60*1000L}, + // Feb of leap year + {formatter.parseDateTime("04/02/2012 14:00:00"), new Frequency("months(1)"), 29*24*60*60*1000L}, + // Months with 31 and 30 days + {formatter.parseDateTime("02/10/2015 03:30:00"), new Frequency("months(2)"), (31+30)*24*60*60*1000L}, + }; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java new file mode 100644 index 0000000..36f1fd1 --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.event.Event; +import org.apache.falcon.notification.service.impl.AlarmService; +import org.apache.falcon.state.ID; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.mockito.Mockito; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.TimeZone; + +/** + * Class to test the time notification service. + */ +public class AlarmServiceTest { + + private static AlarmService timeService = Mockito.spy(new AlarmService()); + private static NotificationHandler handler = Mockito.mock(NotificationHandler.class); + + @BeforeClass + public void setup() throws FalconException { + timeService.init(); + } + + @Test + // This test ensures notifications are generated for time events that have occurred in the past. + public void testbackLogCatchup() throws Exception { + TimeZone tz = TimeZone.getTimeZone("UTC"); + DateTime now = DateTime.now(DateTimeZone.forTimeZone(tz)); + // Start time 2 mins ago + DateTime startTime = new DateTime(now.getMillis() - 2*60*1000, DateTimeZone.forTimeZone(tz)); + // End time 2 mins later + DateTime endTime = new DateTime(now.getMillis() + 2*60*1000 , DateTimeZone.forTimeZone(tz)); + + Process mockProcess = new Process(); + mockProcess.setName("test"); + ID id = new ID(mockProcess); + id.setCluster("testCluster"); + + AlarmService.AlarmRequestBuilder request = + new AlarmService.AlarmRequestBuilder(handler, id); + request.setStartTime(startTime); + request.setEndTime(endTime); + request.setFrequency(new Frequency("minutes(1)")); + request.setTimeZone(TimeZone.getTimeZone("UTC")); + + timeService.register(request.build()); + // Asynchronous execution, hence a small wait. + Thread.sleep(1000); + // Based on the minute boundary, there might be 3. + Mockito.verify(handler, Mockito.atLeast(2)).onEvent(Mockito.any(Event.class)); + + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java new file mode 100644 index 0000000..b4a0f35 --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service; + +import org.apache.falcon.FalconException; +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.entity.v0.process.Property; +import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.execution.MockDAGEngine; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.execution.ProcessExecutionInstance; +import org.apache.falcon.notification.service.event.Event; +import org.apache.falcon.notification.service.event.JobCompletedEvent; +import org.apache.falcon.notification.service.event.JobScheduledEvent; +import org.apache.falcon.notification.service.impl.DataAvailabilityService; +import org.apache.falcon.notification.service.impl.JobCompletionService; +import org.apache.falcon.notification.service.impl.SchedulerService; +import org.apache.falcon.service.Services; +import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceState; +import org.apache.falcon.state.store.AbstractStateStore; +import org.apache.falcon.state.store.InMemoryStateStore; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.engine.DAGEngine; +import org.apache.falcon.workflow.engine.DAGEngineFactory; +import org.apache.oozie.client.WorkflowJob; +import org.joda.time.DateTime; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Date; + +import static org.apache.falcon.state.InstanceState.STATE; + +/** + * Class to test the job scheduler service. + */ +public class SchedulerServiceTest extends AbstractTestBase { + + private SchedulerService scheduler = Mockito.spy(new SchedulerService()); + private NotificationHandler handler; + private static String cluster = "testCluster"; + private static InMemoryStateStore stateStore = (InMemoryStateStore) AbstractStateStore.get(); + private static DAGEngine mockDagEngine; + private static Process process; + private volatile boolean failed = false; + + @BeforeMethod + public void setup() throws FalconException { + // Initialized before every test in order to track invocations. + handler = Mockito.spy(new MockNotificationHandler()); + } + + @BeforeClass + public void init() throws Exception { + this.dfsCluster = EmbeddedCluster.newCluster(cluster); + this.conf = dfsCluster.getConf(); + setupConfigStore(); + DataAvailabilityService mockDataService = Mockito.mock(DataAvailabilityService.class); + Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService"); + Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class), + Mockito.any(ID.class))).thenCallRealMethod(); + Services.get().register(mockDataService); + JobCompletionService mockCompletionService = Mockito.mock(JobCompletionService.class); + Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService"); + Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class), + Mockito.any(ID.class))).thenCallRealMethod(); + Services.get().register(mockCompletionService); + + Services.get().register(scheduler); + scheduler.init(); + StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName()); + mockDagEngine = DAGEngineFactory.getDAGEngine("testCluster"); + } + + @AfterClass + public void tearDown() { + this.dfsCluster.shutdown(); + } + + // State store is set up to sync with Config Store. That gets tested too. + public void setupConfigStore() throws Exception { + getStore().registerListener(stateStore); + storeEntity(EntityType.CLUSTER, "testCluster"); + storeEntity(EntityType.FEED, "clicksFeed"); + storeEntity(EntityType.FEED, "clicksSummary"); + storeEntity(EntityType.PROCESS, "mockSummary"); + process = getStore().get(EntityType.PROCESS, "mockSummary"); + } + + @Test + public void testSchedulingWithParallelInstances() throws Exception { + storeEntity(EntityType.PROCESS, "summarize"); + Process mockProcess = getStore().get(EntityType.PROCESS, "summarize"); + mockProcess.setParallel(1); + Date startTime = EntityUtil.getStartTime(mockProcess, cluster); + ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster); + SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(handler, instance1.getId()); + request.setInstance(instance1); + // No instances running, ensure DAGEngine.run is invoked. + scheduler.register(request.build()); + Thread.sleep(100); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1)); + Mockito.verify(handler).onEvent(Mockito.any(JobScheduledEvent.class)); + // Max. instances running, the new instance should not be run. + ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess, + new DateTime(startTime.getTime() + 60000), cluster); + SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(handler, instance2.getId()); + request2.setInstance(instance2); + scheduler.register(request2.build()); + Thread.sleep(100); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), null); + // Max. instances running, the new instance should not be run. + ExecutionInstance instance3 = new ProcessExecutionInstance(mockProcess, + new DateTime(startTime.getTime() + 120000), cluster); + SchedulerService.JobScheduleRequestBuilder request3 = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(handler, instance3.getId()); + request3.setInstance(instance3); + scheduler.register(request3.build()); + Thread.sleep(100); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1)); + // Simulate the completion of previous instance. + stateStore.getExecutionInstance(instance1.getId()).setCurrentState(STATE.SUCCEEDED); + scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED, + DateTime.now())); + // When an instance completes instance2 should get scheduled next iteration + Thread.sleep(100); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1)); + // Simulate another completion and ensure instance3 runs. + stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED); + scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED, + DateTime.now())); + Thread.sleep(100); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1)); + } + + @Test + public void testSchedulingWithDependencies() throws Exception { + storeEntity(EntityType.PROCESS, "summarize1"); + Process mockProcess = getStore().get(EntityType.PROCESS, "summarize1"); + Date startTime = EntityUtil.getStartTime(mockProcess, cluster); + // Create two instances, one dependent on the other. + ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster); + ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess, + new DateTime(startTime.getTime() + 60000), cluster); + SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(handler, instance1.getId()); + ArrayList<ExecutionInstance> dependencies = new ArrayList<ExecutionInstance>(); + dependencies.add(instance2); + request.setDependencies(dependencies); + request.setInstance(instance1); + // instance2 is not scheduled, dependency not satisfied + // So, instance1 should not be scheduled either. + scheduler.register(request.build()); + Thread.sleep(100); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), null); + Mockito.verify(handler, Mockito.times(0)).onEvent(Mockito.any(JobScheduledEvent.class)); + + // Schedule instance1 + SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(handler, instance2.getId()); + request2.setInstance(instance2); + scheduler.register(request2.build()); + + // Simulate completion of the instance. + Thread.sleep(100); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1)); + Mockito.verify(handler, Mockito.times(1)).onEvent(Mockito.any(JobScheduledEvent.class)); + stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED); + scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster, instance2.getInstanceTime()), + WorkflowJob.Status.SUCCEEDED, DateTime.now())); + // Dependency now satisfied. Now, the first instance should get scheduled after retry delay. + Thread.sleep(100); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1)); + } + + @Test + public void testSchedulingWithPriorities() throws Exception { + storeEntity(EntityType.PROCESS, "summarize2"); + Process mockProcess = getStore().get(EntityType.PROCESS, "summarize2"); + storeEntity(EntityType.PROCESS, "summarize3"); + Process mockProcess2 = getStore().get(EntityType.PROCESS, "summarize3"); + // Set higher priority + Property priorityProp = new Property(); + priorityProp.setName(EntityUtil.MR_JOB_PRIORITY); + priorityProp.setValue("HIGH"); + mockProcess2.getProperties().getProperties().add(priorityProp); + Date startTime = EntityUtil.getStartTime(mockProcess, cluster); + // Create two one dependent on the other. + ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster); + ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess2, new DateTime(startTime), cluster); + SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(handler, instance1.getId()); + request.setInstance(instance1); + SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(handler, instance2.getId()); + request2.setInstance(instance2); + scheduler.register(request.build()); + scheduler.register(request2.build()); + Thread.sleep(100); + // Instance2 should be scheduled first. + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1)); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1)); + Mockito.verify(handler, Mockito.times(2)).onEvent(Mockito.any(JobScheduledEvent.class)); + } + + @Test + public void testDeRegistration() throws Exception { + storeEntity(EntityType.PROCESS, "summarize4"); + Process mockProcess = getStore().get(EntityType.PROCESS, "summarize4"); + mockProcess.setParallel(3); + Date startTime = EntityUtil.getStartTime(mockProcess, cluster); + ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster); + // Schedule 3 instances. + SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(handler, instance1.getId()); + request.setInstance(instance1); + scheduler.register(request.build()); + ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess, + new DateTime(startTime.getTime() + 60000), cluster); + SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(handler, instance2.getId()); + request2.setInstance(instance2); + scheduler.register(request2.build()); + ExecutionInstance instance3 = new ProcessExecutionInstance(mockProcess, + new DateTime(startTime.getTime() + 120000), cluster); + SchedulerService.JobScheduleRequestBuilder request3 = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(handler, instance3.getId()); + request3.setInstance(instance3); + scheduler.register(request3.build()); + + // Abort second instance + scheduler.unregister(handler, instance2.getId()); + + Thread.sleep(100); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1)); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1)); + // Second instance should not run. + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), null); + } + + @Test + public void testScheduleFailure() throws Exception { + storeEntity(EntityType.PROCESS, "summarize5"); + Process mockProcess = getStore().get(EntityType.PROCESS, "summarize5"); + Date startTime = EntityUtil.getStartTime(mockProcess, cluster); + ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster); + // Scheduling an instance should fail. + NotificationHandler failureHandler = new NotificationHandler() { + @Override + public void onEvent(Event event) throws FalconException { + JobScheduledEvent scheduledEvent = ((JobScheduledEvent) event); + if (scheduledEvent.getStatus() != JobScheduledEvent.STATUS.FAILED) { + failed = true; + } + } + }; + SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder) + scheduler.createRequestBuilder(failureHandler, instance1.getId()); + request.setInstance(instance1); + ((MockDAGEngine)mockDagEngine).addFailInstance(instance1); + scheduler.register(request.build()); + Thread.sleep(100); + Assert.assertFalse(failed); + ((MockDAGEngine)mockDagEngine).removeFailInstance(instance1); + } + + /** + * A mock notification Handler that makes appropriate state changes. + */ + public static class MockNotificationHandler implements NotificationHandler { + @Override + public void onEvent(Event event) throws FalconException { + JobScheduledEvent scheduledEvent = ((JobScheduledEvent) event); + Process p = (Process) process.copy(); + p.setName(scheduledEvent.getTarget().getEntityName()); + ProcessExecutionInstance instance = new ProcessExecutionInstance(p, + scheduledEvent.getTarget().getInstanceTime(), cluster); + InstanceState state = new InstanceState(instance).setCurrentState(STATE.RUNNING); + if (!stateStore.executionInstanceExists(instance.getId())) { + stateStore.putExecutionInstance(state); + } else { + stateStore.updateExecutionInstance(state); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java new file mode 100644 index 0000000..95dd5ae --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.predicate; + +import org.apache.falcon.FalconException; +import org.apache.falcon.notification.service.event.TimeElapsedEvent; +import org.apache.falcon.state.ID; +import org.joda.time.DateTime; +import org.testng.Assert; +import org.testng.annotations.Test; +import org.apache.falcon.entity.v0.process.Process; + +/** + * Tests the predicate class. + */ +public class PredicateTest { + + @Test + public void testPredicateFromEvent() throws FalconException { + Process process = new Process(); + process.setName("test"); + DateTime now = DateTime.now(); + TimeElapsedEvent te = new TimeElapsedEvent(new ID(process), now, now, now); + Predicate.getPredicate(te); + } + + @Test + public void testComparison() { + Predicate firstPredicate = Predicate.createTimePredicate(100, 200, 50); + Predicate secondPredicate = Predicate.createTimePredicate(1000, 2000, 50); + Predicate thirdPredicate = Predicate.createTimePredicate(100, 200, -1); + + Assert.assertFalse(firstPredicate.evaluate(secondPredicate)); + Assert.assertFalse(secondPredicate.evaluate(thirdPredicate)); + //With "ANY" type + Assert.assertTrue(firstPredicate.evaluate(thirdPredicate)); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java new file mode 100644 index 0000000..2f32b43 --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.exception.InvalidStateTransitionException; +import org.apache.falcon.state.store.AbstractStateStore; +import org.apache.falcon.state.store.InMemoryStateStore; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Tests to ensure entity state changes happen correctly. + */ +public class EntityStateServiceTest { + + private EntityStateChangeHandler listener = Mockito.mock(EntityStateChangeHandler.class); + + @BeforeMethod + public void setUp() { + ((InMemoryStateStore) AbstractStateStore.get()).clear(); + } + + // Tests a schedulable entity's lifecycle : Submit -> run -> suspend -> resume + @Test + public void testLifeCycle() throws FalconException { + Process mockEntity = new Process(); + mockEntity.setName("test"); + + StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener); + EntityState entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next(); + Mockito.verify(listener).onSubmit(mockEntity); + Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUBMITTED)); + StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener); + Mockito.verify(listener).onSchedule(mockEntity); + Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED)); + StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener); + Mockito.verify(listener).onSuspend(mockEntity); + Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUSPENDED)); + StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener); + Mockito.verify(listener).onResume(mockEntity); + Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED)); + } + + @Test + public void testInvalidTransitions() throws FalconException { + Feed mockEntity = new Feed(); + mockEntity.setName("test"); + StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener); + // Attempt suspending a submitted entity + try { + StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener); + Assert.fail("Exception expected"); + } catch (InvalidStateTransitionException e) { + // Do nothing + } + + StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener); + // Attempt resuming a scheduled entity + try { + StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener); + Assert.fail("Exception expected"); + } catch (InvalidStateTransitionException e) { + // Do nothing + } + + // Attempt scheduling a cluster + Cluster mockCluster = new Cluster(); + mockCluster.setName("test"); + StateService.get().handleStateChange(mockCluster, EntityState.EVENT.SUBMIT, listener); + try { + StateService.get().handleStateChange(mockCluster, EntityState.EVENT.SCHEDULE, listener); + Assert.fail("Exception expected"); + } catch (FalconException e) { + // Do nothing + } + } + + @Test(dataProvider = "state_and_events") + public void testIdempotency(EntityState.STATE state, EntityState.EVENT event) + throws InvalidStateTransitionException { + Process mockEntity = new Process(); + mockEntity.setName("test"); + + EntityState entityState = new EntityState(mockEntity).setCurrentState(state); + entityState.nextTransition(event); + Assert.assertEquals(entityState.getCurrentState(), state); + } + + @DataProvider(name = "state_and_events") + public Object[][] stateAndEvents() { + return new Object[][]{ + {EntityState.STATE.SCHEDULED, EntityState.EVENT.SCHEDULE}, + {EntityState.STATE.SUBMITTED, EntityState.EVENT.SUBMIT}, + {EntityState.STATE.SUSPENDED, EntityState.EVENT.SUSPEND}, + }; + } +}
