Repository: falcon Updated Branches: refs/heads/master 62fb403be -> 10fcb9153
FALCON-1636 Add Rerun API In Falcon Native Scheduler. Contributed by Pavan Kumar Kolamuri. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/10fcb915 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/10fcb915 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/10fcb915 Branch: refs/heads/master Commit: 10fcb91537c1e4589578bf14ccc6eb47c9dda173 Parents: 62fb403 Author: Ajay Yadava <[email protected]> Authored: Fri Dec 18 15:36:41 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Fri Dec 18 15:36:41 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/falcon/resource/InstancesResult.java | 2 +- .../apache/falcon/execution/EntityExecutor.java | 8 ++++ .../falcon/execution/ExecutionInstance.java | 11 +++++ .../execution/ProcessExecutionInstance.java | 7 +-- .../falcon/execution/ProcessExecutor.java | 40 +++++++++++++++-- .../notification/service/event/EventType.java | 4 +- .../notification/service/event/RerunEvent.java | 45 ++++++++++++++++++++ .../service/impl/SchedulerService.java | 23 ++++++++-- .../org/apache/falcon/predicate/Predicate.java | 21 ++++++++- .../org/apache/falcon/state/InstanceState.java | 20 ++++++++- .../state/InstanceStateChangeHandler.java | 8 ++++ .../org/apache/falcon/state/StateService.java | 7 +++ .../falcon/state/store/InMemoryStateStore.java | 14 ++++++ .../falcon/state/store/InstanceStateStore.java | 9 ++++ .../falcon/state/store/jdbc/BeanMapperUtil.java | 33 ++++++++++++++ .../falcon/state/store/jdbc/InstanceBean.java | 15 ++++++- .../falcon/state/store/jdbc/JDBCStateStore.java | 31 ++++++++++++++ .../falcon/workflow/engine/DAGEngine.java | 4 +- .../workflow/engine/FalconWorkflowEngine.java | 28 ++++++++++-- .../falcon/workflow/engine/OozieDAGEngine.java | 35 ++++++++++++--- .../apache/falcon/execution/MockDAGEngine.java | 6 ++- .../state/service/store/TestJDBCStateStore.java | 34 +++++++++++++++ 23 files changed, 379 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 45d2e7c..10ac338 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1636 Add Rerun API In Falcon Native Scheduler(Pavan Kumar Kolamuri via Ajay Yadava) + FALCON-1562 Documentation for enabling native scheduler in falcon (Pallavi Rao) FALCON-1512 Implement touch feature for native scheduler (Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/client/src/main/java/org/apache/falcon/resource/InstancesResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java index e05eeeb..e12c083 100644 --- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java +++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java @@ -34,7 +34,7 @@ public class InstancesResult extends APIResult { * Workflow status as being set in result object. */ public static enum WorkflowStatus { - WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, SKIPPED, UNDEFINED + WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, SKIPPED, UNDEFINED, READY } /** http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java index c9c0f42..bf70dca 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java @@ -25,6 +25,8 @@ import org.apache.falcon.state.InstanceStateChangeHandler; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.StateStore; +import java.util.Properties; + /** * This class is responsible for creation of execution instances for a given entity. * An execution instance is created upon receipt of a "trigger event". @@ -98,6 +100,12 @@ public abstract class EntityExecutor implements NotificationHandler, InstanceSta public abstract void kill(ExecutionInstance instance) throws FalconException; /** + * Reruns a specified execution instance.It relies on the DAGEngine to maintain a details of + * reruns and no rerun information is stored in Falcon. + */ + public abstract void rerun(ExecutionInstance instance, Properties props, boolean isForced) throws FalconException; + + /** * @return The entity */ public abstract Entity getEntity(); http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java index 3cc8a25..cadbfb6 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java @@ -26,6 +26,7 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import java.util.List; +import java.util.Properties; /** * Represents an execution instance of an entity. @@ -43,6 +44,7 @@ public abstract class ExecutionInstance implements NotificationHandler { private final DateTime creationTime; private DateTime actualStart; private DateTime actualEnd; + private Properties properties; protected static final DateTimeZone UTC = DateTimeZone.UTC; /** @@ -148,6 +150,15 @@ public abstract class ExecutionInstance implements NotificationHandler { this.actualEnd = actualEnd; } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + /** * Creation time of an instance. * @return http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java index f3beabc..72e5558 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java @@ -187,9 +187,6 @@ public class ProcessExecutionInstance extends ExecutionInstance { * @return true when it is not already scheduled or is gated on some conditions. */ public boolean isReady() { - if (getExternalID() != null) { - return false; - } if (awaitedPredicates.isEmpty()) { return true; } else { @@ -324,4 +321,8 @@ public class ProcessExecutionInstance extends ExecutionInstance { NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA) .unregister(executionService, getId()); } + + public void rerun() throws FalconException { + registerForNotifications(false); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java index e446069..e1ec1bd 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java @@ -31,6 +31,7 @@ import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.notification.service.event.Event; import org.apache.falcon.notification.service.event.EventType; import org.apache.falcon.notification.service.event.JobCompletedEvent; +import org.apache.falcon.notification.service.event.RerunEvent; import org.apache.falcon.notification.service.event.TimeElapsedEvent; import org.apache.falcon.notification.service.impl.AlarmService; import org.apache.falcon.notification.service.impl.JobCompletionService; @@ -43,13 +44,16 @@ import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.StateService; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.engine.DAGEngineFactory; +import org.apache.falcon.workflow.engine.FalconWorkflowEngine; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Date; +import java.util.Properties; import java.util.TimeZone; +import java.util.concurrent.ExecutionException; /** * This class is responsible for managing execution instances of a process. @@ -267,6 +271,21 @@ public class ProcessExecutor extends EntityExecutor { } @Override + public void rerun(ExecutionInstance instance, Properties props, boolean isForced) throws FalconException { + if (props == null) { + props = new Properties(); + } + if (isForced) { + props.put(FalconWorkflowEngine.FALCON_FORCE_RERUN, "true"); + } + props.put(FalconWorkflowEngine.FALCON_RERUN, "true"); + instance.setProperties(props); + instances.put(new InstanceID(instance), (ProcessExecutionInstance) instance); + RerunEvent rerunEvent = new RerunEvent(instance.getId(), instance.getInstanceTime()); + onEvent(rerunEvent); + } + + @Override public Entity getEntity() { return process; } @@ -311,7 +330,6 @@ public class ProcessExecutor extends EntityExecutor { private void handleEvent(Event event) throws FalconException { ProcessExecutionInstance instance; - InstanceID instanceID; try { switch (event.getType()) { // TODO : Handle cases where scheduling fails. @@ -341,17 +359,24 @@ public class ProcessExecutor extends EntityExecutor { "Job seems to be have been managed outside Falcon."); } break; + case RE_RUN: + instance = instances.get((InstanceID)event.getTarget()); + stateService.handleStateChange(instance, InstanceState.EVENT.EXTERNAL_TRIGGER, this); + if (instance.isReady()) { + stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this); + } + break; default: if (isTriggerEvent(event)) { instance = buildInstance(event); stateService.handleStateChange(instance, InstanceState.EVENT.TRIGGER, this); // This happens where are no conditions the instance is waiting on (for example, no data inputs). - if (instance.isReady()) { + if (!instance.isScheduled() && instance.isReady()) { stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this); } } } - } catch (Exception ee) { + } catch (ExecutionException ee) { throw new FalconException("Unable to cache execution instance", ee); } } @@ -397,7 +422,8 @@ public class ProcessExecutor extends EntityExecutor { private boolean shouldHandleEvent(Event event) { return event.getTarget().equals(id) || event.getType() == EventType.JOB_COMPLETED - || event.getType() == EventType.JOB_SCHEDULED; + || event.getType() == EventType.JOB_SCHEDULED + || event.getType() == EventType.RE_RUN; } @Override @@ -406,6 +432,12 @@ public class ProcessExecutor extends EntityExecutor { } @Override + public void onExternalTrigger(ExecutionInstance instance) throws FalconException { + instances.put(new InstanceID(instance), (ProcessExecutionInstance) instance); + ((ProcessExecutionInstance) instance).rerun(); + } + + @Override public void onConditionsMet(ExecutionInstance instance) throws FalconException { // Put process in run queue and register for notification SchedulerService.JobScheduleRequestBuilder requestBuilder = (SchedulerService.JobScheduleRequestBuilder) http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java index 59f5cba..b2418ec 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java @@ -25,6 +25,6 @@ public enum EventType { TIME_ELAPSED, DATA_AVAILABLE, JOB_COMPLETED, - JOB_SCHEDULED - + JOB_SCHEDULED, + RE_RUN } http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/notification/service/event/RerunEvent.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/RerunEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/RerunEvent.java new file mode 100644 index 0000000..67b4b50 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/RerunEvent.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.event; + +import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; +import org.joda.time.DateTime; + +/** + * Rerun Event used while rerunning an instance. + */ +public class RerunEvent extends Event { + private DateTime instanceTime; + private final InstanceID callbackID; + + public DateTime getInstanceTime() { + return instanceTime; + } + + public RerunEvent(InstanceID callbackID, DateTime instanceTime) { + this.callbackID = callbackID; + this.instanceTime = instanceTime; + this.type = EventType.RE_RUN; + } + + @Override + public ID getTarget() { + return callbackID; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index fb11091..c524dfa 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -52,6 +52,7 @@ import org.apache.falcon.state.store.StateStore; import org.apache.falcon.util.ReflectionUtils; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.workflow.engine.DAGEngineFactory; +import org.apache.falcon.workflow.engine.FalconWorkflowEngine; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +61,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -274,12 +276,20 @@ public class SchedulerService implements FalconNotificationService, Notification } LOG.debug("Received request to run instance {}", instance.getId()); if (checkConditions()) { - // If instance not already scheduled. String externalId = instance.getExternalID(); - if (externalId == null) { + if (externalId != null) { + Properties props = instance.getProperties(); + boolean isForced = false; + if (props != null) { + isForced = Boolean.valueOf(props.getProperty(FalconWorkflowEngine.FALCON_FORCE_RERUN)); + } + if (isReRun(props)) { + DAGEngineFactory.getDAGEngine(instance.getCluster()).reRun(instance, props, isForced); + } + } else { externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()).run(instance); - LOG.info("Scheduled job {} for instance {}", externalId, instance.getId()); } + LOG.info("Scheduled job {} for instance {}", externalId, instance.getId()); JobScheduledEvent event = new JobScheduledEvent(instance.getId(), JobScheduledEvent.STATUS.SUCCESSFUL); event.setExternalID(externalId); @@ -297,6 +307,13 @@ public class SchedulerService implements FalconNotificationService, Notification } } + private boolean isReRun(Properties props) { + if (props != null && !props.isEmpty()) { + return Boolean.valueOf(props.getProperty(FalconWorkflowEngine.FALCON_RERUN)); + } + return false; + } + public short getPriority() { return priority; } http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java index 164fb0e..c3db685 100644 --- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java +++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java @@ -23,6 +23,7 @@ import org.apache.falcon.execution.NotificationHandler; import org.apache.falcon.notification.service.event.DataEvent; import org.apache.falcon.notification.service.event.Event; import org.apache.falcon.notification.service.event.EventType; +import org.apache.falcon.notification.service.event.RerunEvent; import org.apache.falcon.notification.service.event.TimeElapsedEvent; import org.apache.falcon.state.ID; @@ -46,7 +47,8 @@ public class Predicate implements Serializable { public enum TYPE { DATA, TIME, - JOB_COMPLETION + JOB_COMPLETION, + RE_RUN } private final TYPE type; @@ -179,6 +181,16 @@ public class Predicate implements Serializable { } /** + * Creates a predicate of type Rerun. + * @param instanceTime + * @return + */ + public static Predicate createRerunPredicate(long instanceTime) { + return new Predicate(TYPE.RE_RUN) + .addClause("instanceTime", (instanceTime < 0) ? ANY : instanceTime); + } + + /** * Creates a predicate from an event based on the event source and values in the event. * * @param event @@ -206,6 +218,13 @@ public class Predicate implements Serializable { throw new FalconException("Event does not have enough data to create a predicate"); } + } else if (event.getType() == EventType.RE_RUN) { + RerunEvent rerunEvent = (RerunEvent) event; + if (rerunEvent.getInstanceTime() != null) { + return Predicate.createRerunPredicate(rerunEvent.getInstanceTime().getMillis()); + } else { + throw new FalconException("Event does not have enough data to create a predicate"); + } } else { throw new FalconException("Unhandled event type " + event.getType()); } http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java index 7f2bda9..27dd8d4 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java +++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java @@ -50,6 +50,8 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance return TIMED_OUT; case TRIGGER: return this; + case EXTERNAL_TRIGGER: + return this; default: throw new InvalidStateTransitionException("Event " + event.name() + " not valid for state, " + STATE.WAITING.name()); @@ -68,6 +70,8 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance return RUNNING; case CONDITIONS_MET: return this; + case FAIL: + return FAILED; default: throw new InvalidStateTransitionException("Event " + event.name() + " not valid for state, " + this.name()); @@ -99,6 +103,9 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance if (event == EVENT.SUCCEED) { return this; } + if (event == EVENT.EXTERNAL_TRIGGER) { + return WAITING; + } throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name() + ". Cannot apply transitions."); } @@ -109,6 +116,9 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance if (event == EVENT.FAIL) { return this; } + if (event == EVENT.EXTERNAL_TRIGGER) { + return WAITING; + } throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name() + ". Cannot apply transitions."); } @@ -119,6 +129,9 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance if (event == EVENT.KILL) { return this; } + if (event == EVENT.EXTERNAL_TRIGGER) { + return WAITING; + } throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name() + ". Cannot apply transitions."); } @@ -129,6 +142,9 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance if (event == EVENT.TIME_OUT) { return this; } + if (event == EVENT.EXTERNAL_TRIGGER) { + return WAITING; + } throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name() + ". Cannot apply transitions."); } @@ -176,7 +192,8 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance RESUME_RUNNING, KILL, SUCCEED, - FAIL + FAIL, + EXTERNAL_TRIGGER } /** @@ -245,6 +262,7 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance states.add(STATE.FAILED); states.add(STATE.KILLED); states.add(STATE.SUCCEEDED); + states.add(STATE.TIMED_OUT); return states; } http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java index 1f69fab..e460ee7 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java +++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java @@ -34,6 +34,14 @@ public interface InstanceStateChangeHandler { void onTrigger(ExecutionInstance instance) throws FalconException; /** + * Invoked when an instance is rerun or triggered externally. + * + * @param instance + * @throws FalconException + */ + void onExternalTrigger(ExecutionInstance instance) throws FalconException; + + /** * Invoked when all gating conditions are satisfied. * * @param instance http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/StateService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java index c702cc3..9266354 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java +++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java @@ -129,6 +129,10 @@ public final class StateService { callbackHandler(instance, InstanceState.EVENT.TRIGGER, handler); stateStore.putExecutionInstance(new InstanceState(instance)); LOG.debug("Instance {} triggered due to event {}.", id, event.name()); + } else if (event == InstanceState.EVENT.EXTERNAL_TRIGGER) { + callbackHandler(instance, InstanceState.EVENT.EXTERNAL_TRIGGER, handler); + stateStore.updateExecutionInstance(new InstanceState(instance)); + LOG.debug("Instance {} triggered due to event {}.", id, event.name()); } else { throw new FalconException("Instance " + id + "does not exist."); } @@ -154,6 +158,9 @@ public final class StateService { case TRIGGER: handler.onTrigger(instance); break; + case EXTERNAL_TRIGGER: + handler.onExternalTrigger(instance); + break; case CONDITIONS_MET: handler.onConditionsMet(instance); break; http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java index 2f3aa3a..c4ced46 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java @@ -18,6 +18,7 @@ package org.apache.falcon.state.store; import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.execution.ExecutionInstance; @@ -141,6 +142,19 @@ public final class InMemoryStateStore extends AbstractStateStore { } @Override + public InstanceState getExecutionInstance(String externalID) throws StateStoreException { + if (StringUtils.isEmpty(externalID)) { + throw new StateStoreException("External ID for retrieving instance cannot be null"); + } + for (InstanceState instanceState : instanceStates.values()) { + if (externalID.equals(instanceState.getInstance().getExternalID())) { + return instanceState; + } + } + return null; + } + + @Override public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException { String key = new InstanceID(instanceState.getInstance()).getKey(); if (!instanceStates.containsKey(key)) { http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java index f1d1931..8ce8fa0 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java @@ -48,6 +48,15 @@ public interface InstanceStateStore { InstanceState getExecutionInstance(InstanceID instanceId) throws StateStoreException; /** + * Retrieves ExecutionInstance from external ID. + * + * @param externalID + * @return + * @throws StateStoreException + */ + InstanceState getExecutionInstance(String externalID) throws StateStoreException; + + /** * Updates an execution instance in the store. * * @param instanceState http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java index 4bee269..3def14a 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java @@ -41,6 +41,7 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Properties; /** * Mapping util for Persistent Store. @@ -165,6 +166,10 @@ public final class BeanMapperUtil { IOUtils.closeQuietly(out); } } + if (instance.getProperties() != null && !instance.getProperties().isEmpty()) { + byte[] props = getProperties(instanceState); + instanceBean.setProperties(props); + } return instanceBean; } @@ -207,6 +212,22 @@ public final class BeanMapperUtil { } } executionInstance.setAwaitingPredicates(predicates); + + result = instanceBean.getProperties(); + if (result != null && result.length != 0) { + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(result); + ObjectInputStream in = null; + Properties properties = null; + try { + in = new ObjectInputStream(byteArrayInputStream); + properties = (Properties) in.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } finally { + IOUtils.closeQuietly(in); + } + executionInstance.setProperties(properties); + } InstanceState instanceState = new InstanceState(executionInstance); instanceState.setCurrentState(InstanceState.STATE.valueOf(instanceBean.getCurrentState())); return instanceState; @@ -268,4 +289,16 @@ public final class BeanMapperUtil { IOUtils.closeQuietly(out); } } + + public static byte [] getProperties(InstanceState instanceState) throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream out = null; + try { + out = new ObjectOutputStream(byteArrayOutputStream); + out.writeObject(instanceState.getInstance().getProperties()); + return byteArrayOutputStream.toByteArray(); + } finally { + IOUtils.closeQuietly(out); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java index 0e3dfa9..305b398 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java @@ -37,9 +37,10 @@ import java.sql.Timestamp; @Entity @NamedQueries({ @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"), + @NamedQuery(name = "GET_INSTANCE_FOR_EXTERNAL_ID", query = "select OBJECT(a) from InstanceBean a where a.externalID = :externalID"), @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"), @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"), - @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates where a.id = :id"), + @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates, a.properties = :properties where a.id = :id"), @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"), @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"), @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"), @@ -108,6 +109,10 @@ public class InstanceBean { @Lob private byte[] awaitedPredicates; + @Column(name = "properties", columnDefinition = "BLOB") + @Lob + private byte[] properties; + public String getId() { return id; @@ -196,4 +201,12 @@ public class InstanceBean { public void setEntityId(String entityId) { this.entityId = entityId; } + + public byte[] getProperties() { + return properties; + } + + public void setProperties(byte[] properties) { + this.properties = properties; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java index b2f8e80..151c2c2 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.state.store.jdbc; +import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.execution.ExecutionInstance; @@ -217,6 +218,29 @@ public final class JDBCStateStore extends AbstractStateStore { } @Override + public InstanceState getExecutionInstance(String externalID) throws StateStoreException { + if (StringUtils.isEmpty(externalID)) { + throw new StateStoreException("External ID for retrieving instance cannot be null or empty"); + } + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_INSTANCE_FOR_EXTERNAL_ID"); + q.setParameter("externalID", externalID); + List result = q.getResultList(); + entityManager.close(); + if (result.isEmpty()) { + return null; + } + try { + InstanceBean instanceBean = (InstanceBean)(result.get(0)); + return BeanMapperUtil.convertToInstanceState(instanceBean); + } catch (IOException e) { + throw new StateStoreException(e); + } + + } + + + @Override public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException { InstanceID id = new InstanceID(instanceState.getInstance()); String key = id.toString(); @@ -248,6 +272,13 @@ public final class JDBCStateStore extends AbstractStateStore { throw new StateStoreException(e); } } + if (instance.getProperties() != null && !instance.getProperties().isEmpty()) { + try { + q.setParameter("properties", BeanMapperUtil.getProperties(instanceState)); + } catch (IOException e) { + throw new StateStoreException(e); + } + } q.executeUpdate(); commitAndCloseTransaction(entityManager); } http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java index e0d2a0e..49e083c 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java @@ -75,9 +75,11 @@ public interface DAGEngine { * Re-run a terminated instance. * * @param instance + * @param props + * @param isForced * @throws DAGEngineException */ - void reRun(ExecutionInstance instance) throws DAGEngineException; + void reRun(ExecutionInstance instance, Properties props, boolean isForced) throws DAGEngineException; /** * Perform dryrun of an instance. http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index c19cada..6abc222 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -63,6 +63,8 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { private static final StateStore STATE_STORE = AbstractStateStore.get(); private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get(); private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; + public static final String FALCON_FORCE_RERUN = "falcon.system.force.rerun"; + public static final String FALCON_RERUN = "falcon.system.rerun"; private enum JobAction { KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS @@ -160,6 +162,12 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles) throws FalconException { + return doJobAction(action, entity, start, end, props, lifeCycles, false); + } + + private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, + Properties props, List<LifeCycle> lifeCycles, + boolean isForced) throws FalconException { Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS); APIResult.Status overallStatus = APIResult.Status.SUCCEEDED; @@ -186,6 +194,10 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { states.addAll(InstanceState.getTerminalStates()); states.add(InstanceState.STATE.SUSPENDED); break; + case RERUN: + // Applicable only for terminated States + states = InstanceState.getTerminalStates(); + break; default: throw new IllegalArgumentException("Unhandled action " + action); } @@ -210,7 +222,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { InstancesResult.Instance instance = null; try { - instance = performAction(ins.getCluster(), entity, action, ins); + instance = performAction(ins.getCluster(), entity, action, ins, props, isForced); instance.instance = instanceTimeStr; } catch (FalconException e) { LOG.warn("Unable to perform action {} on cluster", action, e); @@ -241,7 +253,8 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { } private InstancesResult.Instance performAction(String cluster, Entity entity, JobAction action, - ExecutionInstance instance) throws FalconException { + ExecutionInstance instance, Properties userProps, + boolean isForced) throws FalconException { EntityExecutor executor = EXECUTION_SERVICE.getEntityExecutor(entity, cluster); InstancesResult.Instance instanceInfo = null; LOG.debug("Retrieving information for {} for action {}", instance.getId(), action); @@ -266,6 +279,10 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { .getExecutionInstance(instance.getId()).getCurrentState().name()); break; case RERUN: + executor.rerun(instance, userProps, isForced); + instanceInfo.status = + InstancesResult.WorkflowStatus.valueOf(STATE_STORE + .getExecutionInstance(instance.getId()).getCurrentState().name()); break; case STATUS: if (StringUtils.isNotEmpty(instance.getExternalID())) { @@ -302,7 +319,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { @Override public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException { - throw new FalconException("Not yet Implemented"); + return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles, isForced); } @Override @@ -395,7 +412,10 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { @Override public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException { - throw new FalconException("Not yet Implemented"); + InstanceState instanceState = STATE_STORE.getExecutionInstance(jobId); + ExecutionInstance instance = instanceState.getInstance(); + EntityExecutor executor = EXECUTION_SERVICE.getEntityExecutor(instance.getEntity(), cluster); + executor.rerun(instance, props, isForced); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java index a26eb77..4786cc3 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java @@ -32,6 +32,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.util.OozieUtils; import org.apache.falcon.util.RuntimeProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -199,7 +200,8 @@ public class OozieDAGEngine implements DAGEngine { client.suspend(instance.getExternalID()); assertStatus(instance.getExternalID(), Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Job.Status.SUCCEEDED, Job.Status.FAILED, Job.Status.KILLED); - LOG.info("Suspended job {} on cluster {}", instance.getExternalID(), instance.getCluster()); + LOG.info("Suspended job {} of entity {} of time {} on cluster {}", instance.getExternalID(), + instance.getEntity().getName(), instance.getInstanceTime(), instance.getCluster()); } catch (OozieClientException e) { throw new DAGEngineException(e); } @@ -211,7 +213,8 @@ public class OozieDAGEngine implements DAGEngine { client.resume(instance.getExternalID()); assertStatus(instance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED, Job.Status.FAILED, Job.Status.KILLED); - LOG.info("Resumed job {} on cluster {}", instance.getExternalID(), instance.getCluster()); + LOG.info("Resumed job {} of entity {} of time {} on cluster {}", instance.getExternalID(), + instance.getEntity().getName(), instance.getInstanceTime(), instance.getCluster()); } catch (OozieClientException e) { throw new DAGEngineException(e); } @@ -222,15 +225,37 @@ public class OozieDAGEngine implements DAGEngine { try { client.kill(instance.getExternalID()); assertStatus(instance.getExternalID(), Job.Status.KILLED, Job.Status.SUCCEEDED, Job.Status.FAILED); - LOG.info("Killed job {} on cluster {}", instance.getExternalID(), instance.getCluster()); + LOG.info("Killed job {} of entity {} of time {} on cluster {}", instance.getExternalID(), + instance.getEntity().getName(), instance.getInstanceTime(), instance.getCluster()); } catch (OozieClientException e) { throw new DAGEngineException(e); } } @Override - public void reRun(ExecutionInstance instance) throws DAGEngineException { - // TODO : Implement this + public void reRun(ExecutionInstance instance, Properties props, boolean isForced) throws DAGEngineException { + String jobId = instance.getExternalID(); + try { + WorkflowJob jobInfo = client.getJobInfo(jobId); + Properties jobprops = OozieUtils.toProperties(jobInfo.getConf()); + if (props != null) { + jobprops.putAll(props); + } + //if user has set any of these oozie rerun properties then force rerun flag is ignored + if (!jobprops.containsKey(OozieClient.RERUN_FAIL_NODES) + && !jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) { + jobprops.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced)); + } + jobprops.remove(OozieClient.COORDINATOR_APP_PATH); + jobprops.remove(OozieClient.BUNDLE_APP_PATH); + client.reRun(jobId, jobprops); + assertStatus(instance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED); + LOG.info("Rerun job {} of entity {} of time {} on cluster {}", jobId, instance.getEntity().getName(), + instance.getInstanceTime(), instance.getCluster()); + } catch (Exception e) { + LOG.error("Unable to rerun workflows", e); + throw new DAGEngineException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java index d274ad7..c99f3fd 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java @@ -79,8 +79,10 @@ public class MockDAGEngine implements DAGEngine { } @Override - public void reRun(ExecutionInstance instance) throws DAGEngineException { - + public void reRun(ExecutionInstance instance, Properties props, boolean isForced) throws DAGEngineException { + if (failInstances.contains(instance)) { + throw new DAGEngineException("mock failure."); + } } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java index 6d5bd49..34156c0 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java +++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java @@ -364,6 +364,40 @@ public class TestJDBCStateStore extends AbstractSchedulerTestBase { } + @Test + public void testGetInstanceFromExternalID() throws Exception { + storeEntity(EntityType.CLUSTER, "testCluster"); + storeEntity(EntityType.FEED, "clicksFeed"); + storeEntity(EntityType.FEED, "clicksSummary"); + + long instance1Time = System.currentTimeMillis() - 180000; + long instance2Time = System.currentTimeMillis(); + EntityState entityState = getEntityState(EntityType.PROCESS, "processext"); + ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + instance1Time, "cluster1", instance1Time); + processExecutionInstance1.setExternalID("external_id_1"); + InstanceState instanceState1 = new InstanceState(processExecutionInstance1); + instanceState1.setCurrentState(InstanceState.STATE.RUNNING); + + ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + instance2Time, "cluster1", instance2Time); + processExecutionInstance2.setExternalID("external_id_2"); + InstanceState instanceState2 = new InstanceState(processExecutionInstance2); + instanceState2.setCurrentState(InstanceState.STATE.RUNNING); + + stateStore.putExecutionInstance(instanceState1); + stateStore.putExecutionInstance(instanceState2); + + InstanceState actualInstanceState = stateStore.getExecutionInstance("external_id_1"); + Assert.assertEquals(actualInstanceState.getInstance(), processExecutionInstance1); + + actualInstanceState = stateStore.getExecutionInstance("external_id_2"); + Assert.assertEquals(actualInstanceState.getInstance(), processExecutionInstance2); + + } + private void initInstanceState(InstanceState instanceState) { instanceState.setCurrentState(InstanceState.STATE.READY);
