http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/ID.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/ID.java b/scheduler/src/main/java/org/apache/falcon/state/ID.java new file mode 100644 index 0000000..420c856 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/ID.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.execution.ExecutionInstance; +import org.datanucleus.util.StringUtils; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.io.Serializable; + +/** + * A serializable, comparable ID used to uniquely represent an entity or an instance. + */ +public final class ID implements Serializable, Comparable<ID> { + public static final String KEY_SEPARATOR = "/"; + public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; + + private String entityName; + private EntityType entityType; + private String entityKey; + private String cluster; + private DateTime instanceTime; + + /** + * Default Constructor. + */ + public ID(){} + + /** + * Constructor. + * + * @param type + * @param name + */ + public ID(EntityType type, String name) { + assert type != null : "Entity type must be present."; + assert !StringUtils.isEmpty(name) : "Entity name must be present."; + this.entityName = name; + this.entityType = type; + this.entityKey = entityType + KEY_SEPARATOR + entityName; + } + + /** + * Constructor. + * + * @param entity + */ + public ID(Entity entity) { + this(entity.getEntityType(), entity.getName()); + } + + /** + * Constructor. + * + * @param entity + * @param cluster + */ + public ID(Entity entity, String cluster) { + this(entity.getEntityType(), entity.getName()); + assert !StringUtils.isEmpty(cluster) : "Cluster cannot be empty."; + this.cluster = cluster; + } + + /** + * Constructor. + * + * @param instance + */ + public ID(ExecutionInstance instance) { + this(instance.getEntity(), instance.getCluster()); + assert instance.getInstanceTime() != null : "Nominal time cannot be null."; + this.instanceTime = instance.getInstanceTime(); + } + + /** + * Constructor. + * + * @param entity + * @param cluster + * @param instanceTime + */ + public ID(Entity entity, String cluster, DateTime instanceTime) { + this(entity, cluster); + assert instanceTime != null : "Nominal time cannot be null."; + this.instanceTime = instanceTime; + } + + /** + * @return cluster name + */ + public String getCluster() { + return cluster; + } + + /** + * @param cluster name + */ + public void setCluster(String cluster) { + this.cluster = cluster; + } + + /** + * @return nominal time + */ + public DateTime getInstanceTime() { + return instanceTime; + } + + /** + * @param instanceTime + */ + public void setInstanceTime(DateTime instanceTime) { + this.instanceTime = instanceTime; + } + + /** + * @return entity name + */ + public String getEntityName() { + return entityName; + } + + /** + * @return entity type + */ + public EntityType getEntityType() { + return entityType; + } + + @Override + public String toString() { + String val = entityKey; + if (!StringUtils.isEmpty(cluster)) { + val = val + KEY_SEPARATOR + cluster; + } + + if (instanceTime != null) { + DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT); + val = val + KEY_SEPARATOR + fmt.print(instanceTime); + } + return val; + } + + /** + * @return An ID without the cluster name + */ + public String getEntityKey() { + return entityKey; + } + + /** + * @return ID without the instance information + */ + public ID getEntityID() { + ID newID = new ID(this.entityType, this.entityName); + newID.setCluster(this.cluster); + newID.setInstanceTime(null); + return newID; + } + + @Override + public boolean equals(Object id) { + if (id == null || id.getClass() != getClass()) { + return false; + } + return compareTo((ID)id) == 0; + } + + @Override + public int hashCode() { + return this.toString().hashCode(); + } + + @Override + public int compareTo(ID id) { + if (id == null) { + return -1; + } + return this.toString().compareTo(id.toString()); + } +}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java new file mode 100644 index 0000000..8cf24ee --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state; + +import org.apache.falcon.exception.InvalidStateTransitionException; +import org.apache.falcon.execution.ExecutionInstance; + +import java.util.ArrayList; +import java.util.List; + +/** + * Represents the state of an execution instance. + * Implements {@link org.apache.falcon.state.StateMachine} for an instance. + */ +public class InstanceState implements StateMachine<InstanceState.STATE, InstanceState.EVENT> { + private ExecutionInstance instance; + private STATE currentState; + private static final STATE INITIAL_STATE = STATE.WAITING; + + /** + * Enumerates all the valid states of an instance and the valid transitions from that state. + */ + public enum STATE implements StateMachine<InstanceState.STATE, InstanceState.EVENT> { + WAITING { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + switch (event) { + case SUSPEND: + return SUSPENDED; + case KILL: + return KILLED; + case CONDITIONS_MET: + return READY; + case TIME_OUT: + return TIMED_OUT; + case TRIGGER: + return this; + default: + throw new InvalidStateTransitionException("Event " + event.name() + " not valid for state, " + + STATE.WAITING.name()); + } + } + }, + READY { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + switch (event) { + case SUSPEND: + return SUSPENDED; + case KILL: + return KILLED; + case SCHEDULE: + return RUNNING; + case CONDITIONS_MET: + return this; + default: + throw new InvalidStateTransitionException("Event " + event.name() + + " not valid for state, " + this.name()); + } + } + }, + RUNNING { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + switch (event) { + case SUSPEND: + return SUSPENDED; + case KILL: + return KILLED; + case SUCCEED: + return SUCCEEDED; + case FAIL: + return FAILED; + case SCHEDULE: + return this; + default: + throw new InvalidStateTransitionException("Event " + event.name() + + " not valid for state, " + this.name()); + } + } + }, SUCCEEDED { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + if (event == EVENT.SUCCEED) { + return this; + } + throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name() + + ". Cannot apply transitions."); + } + }, + FAILED { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + if (event == EVENT.FAIL) { + return this; + } + throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name() + + ". Cannot apply transitions."); + } + }, + KILLED { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + if (event == EVENT.KILL) { + return this; + } + throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name() + + ". Cannot apply transitions."); + } + }, + TIMED_OUT { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + if (event == EVENT.TIME_OUT) { + return this; + } + throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name() + + ". Cannot apply transitions."); + } + }, + SUSPENDED { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + switch (event) { + case RESUME_WAITING: + return WAITING; + case RESUME_READY: + return READY; + case RESUME_RUNNING: + return RUNNING; + case SUSPEND: + return this; + // The instance can complete execution on DAG engine, just after a suspend was issued. + // Especially with Oozie, it finishes execution of current action before suspending. + // Hence need to allow terminal states too. + case SUCCEED: + return SUCCEEDED; + case FAIL: + return FAILED; + case KILL: + return KILLED; + default: + throw new InvalidStateTransitionException("Event " + event.name() + + " not valid for state, " + this.name()); + } + } + } + } + + /** + * Enumerates all the valid events that can cause a state transition. + */ + public enum EVENT { + TRIGGER, + CONDITIONS_MET, + TIME_OUT, + SCHEDULE, + SUSPEND, + RESUME_WAITING, + RESUME_READY, + RESUME_RUNNING, + KILL, + SUCCEED, + FAIL + } + + /** + * Constructor. + * + * @param instance + */ + public InstanceState(ExecutionInstance instance) { + this.instance = instance; + currentState = INITIAL_STATE; + } + + /** + * @return execution instance + */ + public ExecutionInstance getInstance() { + return instance; + } + + /** + * @return current state + */ + public STATE getCurrentState() { + return currentState; + } + + /** + * @param state + * @return This instance + */ + public InstanceState setCurrentState(STATE state) { + this.currentState = state; + return this; + } + + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + return currentState.nextTransition(event); + } + + /** + * @return "active" states of an instance. + */ + public static List<STATE> getActiveStates() { + List<InstanceState.STATE> states = new ArrayList<STATE>(); + states.add(STATE.RUNNING); + states.add(STATE.READY); + states.add(STATE.WAITING); + return states; + } + + /** + * @return "running" states of an instance. + */ + public static List<STATE> getRunningStates() { + List<InstanceState.STATE> states = new ArrayList<STATE>(); + states.add(STATE.RUNNING); + return states; + } + + /** + * @return "terminal" states of an instance. + */ + public static List<STATE> getTerminalStates() { + List<InstanceState.STATE> states = new ArrayList<STATE>(); + states.add(STATE.FAILED); + states.add(STATE.KILLED); + states.add(STATE.SUCCEEDED); + return states; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java new file mode 100644 index 0000000..1f69fab --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state; + +import org.apache.falcon.FalconException; +import org.apache.falcon.execution.ExecutionInstance; + +/** + * Any handler interested in handling state changes of instances must implement this interface. + */ +public interface InstanceStateChangeHandler { + + /** + * Invoked when an instance is created. + * + * @param instance + * @throws FalconException + */ + void onTrigger(ExecutionInstance instance) throws FalconException; + + /** + * Invoked when all gating conditions are satisfied. + * + * @param instance + * @throws FalconException + */ + void onConditionsMet(ExecutionInstance instance) throws FalconException; + + /** + * Invoked when an instance scheduled on a DAG Engine. + * + * @param instance + * @throws FalconException + */ + void onSchedule(ExecutionInstance instance) throws FalconException; + + /** + * Invoked on suspension of an instance. + * + * @param instance + * @throws FalconException + */ + void onSuspend(ExecutionInstance instance) throws FalconException; + + /** + * Invoked when an instance is resumed. + * + * @param instance + * @throws FalconException + */ + void onResume(ExecutionInstance instance) throws FalconException; + + /** + * Invoked when an instance is killed. + * + * @param instance + * @throws FalconException + */ + void onKill(ExecutionInstance instance) throws FalconException; + + /** + * Invoked when an instance completes successfully. + * + * @param instance + * @throws FalconException + */ + void onSuccess(ExecutionInstance instance) throws FalconException; + + /** + * Invoked when execution of an instance fails. + * + * @param instance + * @throws FalconException + */ + void onFailure(ExecutionInstance instance) throws FalconException; + + /** + * Invoked when an instance times out waiting for gating conditions to be satisfied. + * + * @param instance + * @throws FalconException + */ + void onTimeOut(ExecutionInstance instance) throws FalconException; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java b/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java new file mode 100644 index 0000000..6ca0500 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state; + +import org.apache.falcon.exception.InvalidStateTransitionException; + +/** + * Interface to be implemented by a class that handles state transitions. + */ +public interface StateMachine<STATE extends Enum<STATE>, EVENT extends Enum<EVENT>> { + + /** + * @param event + * @return The state that the machine enters into as a result of the event. + * @throws InvalidStateTransitionException + */ + STATE nextTransition(EVENT event) throws InvalidStateTransitionException; + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/StateService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java new file mode 100644 index 0000000..81357a4 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.state.store.AbstractStateStore; +import org.apache.falcon.state.store.StateStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A service that fetches state from state store, handles state transitions of entities and instances, + * invokes state change handler and finally persists the new state in the state store. + */ +public final class StateService { + private static final Logger LOG = LoggerFactory.getLogger(StateService.class); + private static final StateService LIFE_CYCLE_SERVICE = new StateService(); + private final StateStore stateStore; + + private StateService() { + stateStore = AbstractStateStore.get(); + } + + /** + * @return - Singleton instance of StateService + */ + public static StateService get() { + return LIFE_CYCLE_SERVICE; + } + + /** + * @return - Name of the service + */ + public String getName() { + return "EntityLifeCycleService"; + } + + /** + * Fetches the entity from state store, applies state transitions, calls appropriate method on the handler and + * persists the final state in the store. + * + * @param entity + * @param event + * @param handler + * @throws FalconException + */ + public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler) + throws FalconException { + ID id = new ID(entity); + if (!stateStore.entityExists(id)) { + // New entity + if (event == EntityState.EVENT.SUBMIT) { + callbackHandler(entity, EntityState.EVENT.SUBMIT, handler); + stateStore.putEntity(new EntityState(entity)); + LOG.debug("Entity {} submitted due to event {}.", id, event.name()); + } else { + throw new FalconException("Entity " + id + " does not exist in state store."); + } + } else { + if (entity.getEntityType() == EntityType.CLUSTER) { + throw new FalconException("Cluster entity " + entity.getName() + " can only be submitted."); + } + EntityState entityState = stateStore.getEntity(id); + EntityState.STATE newState = entityState.nextTransition(event); + callbackHandler(entity, event, handler); + entityState.setCurrentState(newState); + stateStore.updateEntity(entityState); + LOG.debug("State of entity: {} changed to: {} as a result of event: {}.", id, + entityState.getCurrentState(), event.name()); + } + } + + // Invokes the right method on the state change handler + private void callbackHandler(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler) + throws FalconException { + if (handler == null) { + return; + } + switch (event) { + case SUBMIT: + handler.onSubmit(entity); + break; + case SCHEDULE: + handler.onSchedule(entity); + break; + case SUSPEND: + handler.onSuspend(entity); + break; + case RESUME: + handler.onResume(entity); + break; + default: // Do nothing, only propagate events that originate from user + } + } + + /** + * Fetches the instance from state store, applies state transitions, calls appropriate method on the handler and + * persists the final state in the store. + * + * @param instance + * @param event + * @param handler + * @throws FalconException + */ + public void handleStateChange(ExecutionInstance instance, InstanceState.EVENT event, + InstanceStateChangeHandler handler) throws FalconException { + ID id = new ID(instance); + if (!stateStore.executionInstanceExists(id)) { + // New instance + if (event == InstanceState.EVENT.TRIGGER) { + callbackHandler(instance, InstanceState.EVENT.TRIGGER, handler); + stateStore.putExecutionInstance(new InstanceState(instance)); + LOG.debug("Instance {} triggered due to event {}.", id, event.name()); + } else { + throw new FalconException("Instance " + id + "does not exist."); + } + } else { + InstanceState instanceState = stateStore.getExecutionInstance(id); + InstanceState.STATE newState = instanceState.nextTransition(event); + callbackHandler(instance, event, handler); + instanceState.setCurrentState(newState); + stateStore.updateExecutionInstance(instanceState); + LOG.debug("State of instance: {} changed to: {} as a result of event: {}.", id, + instanceState.getCurrentState(), event.name()); + } + } + + // Invokes the right method on the state change handler + private void callbackHandler(ExecutionInstance instance, InstanceState.EVENT event, + InstanceStateChangeHandler handler) throws FalconException { + if (handler == null) { + return; + } + switch (event) { + case TRIGGER: + handler.onTrigger(instance); + break; + case CONDITIONS_MET: + handler.onConditionsMet(instance); + break; + case TIME_OUT: + handler.onTimeOut(instance); + break; + case SCHEDULE: + handler.onSchedule(instance); + break; + case SUSPEND: + handler.onSuspend(instance); + break; + case RESUME_WAITING: + case RESUME_READY: + case RESUME_RUNNING: + handler.onResume(instance); + break; + case KILL: + handler.onKill(instance); + break; + case SUCCEED: + handler.onSuccess(instance); + break; + case FAIL: + handler.onFailure(instance); + break; + default: // Do nothing + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java new file mode 100644 index 0000000..67e047f --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.store; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.service.ConfigurationChangeListener; +import org.apache.falcon.state.EntityState; +import org.apache.falcon.state.ID; +import org.apache.falcon.util.ReflectionUtils; +import org.apache.falcon.util.StartupProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This abstract class listens to config store changes and keeps the state store in sync with the config store. + */ +public abstract class AbstractStateStore implements StateStore, ConfigurationChangeListener { + private static StateStore stateStore; + private static final Logger LOG = LoggerFactory.getLogger(AbstractStateStore.class); + + @Override + public void onAdd(Entity entity) throws FalconException { + if (entity.getEntityType() != EntityType.CLUSTER) { + putEntity(new EntityState(entity)); + } + } + + @Override + public void onRemove(Entity entity) throws FalconException { + // Delete entity should remove its instances too. + if (entity.getEntityType() != EntityType.CLUSTER) { + deleteEntity(new ID(entity)); + } + } + + @Override + public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { + if (newEntity.getEntityType() != EntityType.CLUSTER) { + EntityState entityState = getEntity(new ID(oldEntity)); + if (entityState == null) { + onAdd(newEntity); + } else { + entityState.setEntity(newEntity); + updateEntity(entityState); + } + } + } + + @Override + public void onReload(Entity entity) throws FalconException { + if (entity.getEntityType() != EntityType.CLUSTER) { + // To ensure the config store and state store are in sync + if (!entityExists(new ID(entity))) { + LOG.info("State store missing entity {}. Adding it.", entity.getName()); + onAdd(entity); + } + } + } + + /** + * @return Singleton instance of an implementation of State Store based on the startup properties. + */ + public static synchronized StateStore get() { + if (stateStore == null) { + String storeImpl = StartupProperties.get().getProperty("state.store", + "org.apache.falcon.state.store.InMemoryStateStore"); + try { + stateStore = ReflectionUtils.getInstanceByClassName(storeImpl); + } catch (FalconException e) { + throw new RuntimeException("Unable to load state store impl. : " + storeImpl, e); + } + } + return stateStore; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java new file mode 100644 index 0000000..4aa6fdb --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.store; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.exception.StateStoreException; +import org.apache.falcon.state.EntityState; +import org.apache.falcon.state.ID; + +import java.util.Collection; + +/** + * Interface to abstract out Entity store API. + */ +public interface EntityStateStore { + /** + * @param entityState + * @throws StateStoreException + */ + void putEntity(EntityState entityState) throws StateStoreException; + + /** + * @param entityId + * @return Entity corresponding to the key + * @throws StateStoreException - If entity does not exist. + */ + EntityState getEntity(ID entityId) throws StateStoreException; + + /** + * @param entityId + * @return true, if entity exists in store. + */ + boolean entityExists(ID entityId); + + /** + * @param state + * @return Entities in a given state. + */ + Collection<Entity> getEntities(EntityState.STATE state); + + /** + * @return All Entities in the store. + */ + Collection<EntityState> getAllEntities(); + + /** + * Update an existing entity with the new values. + * + * @param entityState + * @throws StateStoreException when entity does not exist. + */ + void updateEntity(EntityState entityState) throws StateStoreException; + + /** + * Removes the entity and its instances from the store. + * + * @param entityId + * @throws StateStoreException + */ + void deleteEntity(ID entityId) throws StateStoreException; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java new file mode 100644 index 0000000..3822860 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.store; + +import com.google.common.collect.Lists; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.exception.StateStoreException; +import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.state.EntityState; +import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceState; +import org.joda.time.DateTime; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * An in memory state store mostly intended for unit tests. + * Singleton. + */ +public final class InMemoryStateStore extends AbstractStateStore { + + private Map<String, EntityState> entityStates = new HashMap<>(); + // Keep it sorted + private SortedMap<String, InstanceState> instanceStates = Collections + .synchronizedSortedMap(new TreeMap<String, InstanceState>()); + + private static final StateStore STORE = new InMemoryStateStore(); + + private InMemoryStateStore() { + } + + public static StateStore get() { + return STORE; + } + + @Override + public void putEntity(EntityState entityState) throws StateStoreException { + String key = new ID(entityState.getEntity()).getEntityKey(); + if (entityStates.containsKey(key)) { + throw new StateStoreException("Entity with key, " + key + " already exists."); + } + entityStates.put(key, entityState); + } + + @Override + public EntityState getEntity(ID entityId) throws StateStoreException { + if (!entityStates.containsKey(entityId.getEntityKey())) { + throw new StateStoreException("Entity with key, " + entityId + " does not exist."); + } + return entityStates.get(entityId.getEntityKey()); + } + + @Override + public boolean entityExists(ID entityId) { + return entityStates.containsKey(entityId.getEntityKey()); + } + + @Override + public Collection<Entity> getEntities(EntityState.STATE state) { + Collection<Entity> entities = new ArrayList<>(); + for (EntityState entityState : entityStates.values()) { + if (entityState.getCurrentState().equals(state)) { + entities.add(entityState.getEntity()); + } + } + return entities; + } + + @Override + public Collection<EntityState> getAllEntities() { + return entityStates.values(); + } + + @Override + public void updateEntity(EntityState entityState) throws StateStoreException { + String key = new ID(entityState.getEntity()).getEntityKey(); + if (!entityStates.containsKey(key)) { + throw new StateStoreException("Entity with key, " + key + " does not exist."); + } + entityStates.put(key, entityState); + } + + @Override + public void deleteEntity(ID entityId) throws StateStoreException { + if (!entityStates.containsKey(entityId.getEntityKey())) { + throw new StateStoreException("Entity with key, " + entityId + " does not exist."); + } + deleteExecutionInstances(entityId); + entityStates.remove(entityId.getEntityKey()); + } + + @Override + public void putExecutionInstance(InstanceState instanceState) throws StateStoreException { + String key = new ID(instanceState.getInstance()).toString(); + if (instanceStates.containsKey(key)) { + throw new StateStoreException("Instance with key, " + key + " already exists."); + } + instanceStates.put(key, instanceState); + } + + @Override + public InstanceState getExecutionInstance(ID instanceId) throws StateStoreException { + if (!instanceStates.containsKey(instanceId.toString())) { + throw new StateStoreException("Instance with key, " + instanceId + " does not exist."); + } + return instanceStates.get(instanceId.toString()); + } + + @Override + public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException { + String key = new ID(instanceState.getInstance()).toString(); + if (!instanceStates.containsKey(key)) { + throw new StateStoreException("Instance with key, " + key + " does not exist."); + } + instanceStates.put(key, instanceState); + } + + @Override + public Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster) + throws StateStoreException { + ID id = new ID(entity, cluster); + if (!entityStates.containsKey(id.getEntityKey())) { + throw new StateStoreException("Entity with key, " + id.getEntityKey() + " does not exist."); + } + Collection<InstanceState> instances = new ArrayList<InstanceState>(); + for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) { + if (instanceState.getKey().startsWith(id.toString())) { + instances.add(instanceState.getValue()); + } + } + return instances; + } + + @Override + public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster, + Collection<InstanceState.STATE> states) throws StateStoreException { + ID id = new ID(entity, cluster); + return getExecutionInstances(id, states); + } + + @Override + public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster, + Collection<InstanceState.STATE> states, DateTime start, DateTime end) throws StateStoreException { + List<InstanceState> instancesToReturn = new ArrayList<>(); + ID id = new ID(entity, cluster); + for (InstanceState state : getExecutionInstances(id, states)) { + ExecutionInstance instance = state.getInstance(); + // if end time is before start time of instance + // or start time is after end time of instance ignore. + if ((instance.getActualStart() != null && !(end.isBefore(instance.getActualStart())) + || (instance.getActualEnd() != null && start.isAfter(instance.getActualEnd())))) { + instancesToReturn.add(state); + } + } + return instancesToReturn; + } + + @Override + public Collection<InstanceState> getExecutionInstances(ID entityId, Collection<InstanceState.STATE> states) + throws StateStoreException { + Collection<InstanceState> instances = new ArrayList<InstanceState>(); + for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) { + if (instanceState.getKey().startsWith(entityId.toString()) + && states.contains(instanceState.getValue().getCurrentState())) { + instances.add(instanceState.getValue()); + } + } + return instances; + } + + @Override + public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException { + ID id = new ID(entity, cluster); + if (!entityStates.containsKey(id.getEntityKey())) { + throw new StateStoreException("Entity with key, " + id.getEntityKey() + " does not exist."); + } + InstanceState latestState = null; + // TODO : Very crude. Iterating over all entries and getting the last one. + for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) { + if (instanceState.getKey().startsWith(id.toString())) { + latestState = instanceState.getValue(); + } + } + return latestState; + } + + @Override + public boolean executionInstanceExists(ID instanceId) { + return instanceStates.containsKey(instanceId.toString()); + } + + @Override + public void deleteExecutionInstances(ID entityId) { + for (String instanceKey : Lists.newArrayList(instanceStates.keySet())) { + if (instanceKey.startsWith(entityId.getEntityKey())) { + instanceStates.remove(instanceKey); + } + } + } + + public void clear() { + entityStates.clear(); + instanceStates.clear(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java new file mode 100644 index 0000000..d6a4b49 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.store; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.exception.StateStoreException; +import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceState; +import org.joda.time.DateTime; + +import java.util.Collection; + +/** + * Interface to abstract out instance store API. + */ +// TODO : Add order and limit capabilities to the API +public interface InstanceStateStore { + /** + * Adds an execution instance to the store. + * + * @param instanceState + * @throws StateStoreException + */ + void putExecutionInstance(InstanceState instanceState) throws StateStoreException; + + /** + * @param instanceId + * @return Execution instance corresponding to the name. + * @throws StateStoreException - When instance does not exist + */ + InstanceState getExecutionInstance(ID instanceId) throws StateStoreException; + + /** + * Updates an execution instance in the store. + * + * @param instanceState + * @throws StateStoreException - if the instance does not exist. + */ + void updateExecutionInstance(InstanceState instanceState) throws StateStoreException; + + /** + * @param entity + * @param cluster + * @return - All execution instances for the given entity and cluster. + * @throws StateStoreException + */ + Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster) throws StateStoreException; + + /** + * @param entity + * @param cluster + * @param states + * @return - All execution instances for the given entity and cluster and states + * @throws StateStoreException + */ + Collection<InstanceState> getExecutionInstances(Entity entity, String cluster, + Collection<InstanceState.STATE> states) throws StateStoreException; + + /** + * @param entity + * @param cluster + * @param states + * @return - All execution instances for the given entity and cluster and states + * @throws StateStoreException + */ + Collection<InstanceState> getExecutionInstances(Entity entity, String cluster, + Collection<InstanceState.STATE> states, + DateTime start, DateTime end) throws StateStoreException; + + /** + * @param entityId + * @param states + * @return - All execution instance for an given entityKey (that includes the cluster name) + * @throws StateStoreException + */ + Collection<InstanceState> getExecutionInstances(ID entityId, Collection<InstanceState.STATE> states) + throws StateStoreException; + /** + * @param entity + * @param cluster + * @return - The latest execution instance + * @throws StateStoreException + */ + InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException; + + /** + * @param instanceId + * @return true, if instance exists. + */ + boolean executionInstanceExists(ID instanceId); + + /** + * Delete instances of a given entity. + * + * @param entityId + */ + void deleteExecutionInstances(ID entityId); +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java new file mode 100644 index 0000000..f595c26 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.store; + +import org.apache.falcon.service.ConfigurationChangeListener; + +/** + * Interface that combines entity, instance store APIs and also config change listener's. + */ +public interface StateStore extends ConfigurationChangeListener, EntityStateStore, InstanceStateStore { + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java new file mode 100644 index 0000000..ebc05ec --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.workflow.engine; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.exception.DAGEngineException; +import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.resource.InstancesResult; + +import java.util.List; +import java.util.Properties; + + +/** + * Interface for an implementation that executes a DAG. + */ +public interface DAGEngine { + + /** + * Run an instance for execution. + * + * @param instance + * @return + * @throws DAGEngineException + */ + String run(ExecutionInstance instance) throws DAGEngineException; + + /** + * @param instance + * @return true if an instance is scheduled for execution. + * @throws DAGEngineException + */ + boolean isScheduled(ExecutionInstance instance) throws DAGEngineException; + + /** + * Suspend a running instance. + * + * @param instance + * @throws DAGEngineException + */ + void suspend(ExecutionInstance instance) throws DAGEngineException; + + /** + * Resume a suspended instance. + * + * @param instance + * @throws DAGEngineException + */ + void resume(ExecutionInstance instance) throws DAGEngineException; + + /** + * Kill a running instance. + * + * @param instance + * @throws DAGEngineException + */ + void kill(ExecutionInstance instance) throws DAGEngineException; + + /** + * Re-run a terminated instance. + * + * @param instance + * @throws DAGEngineException + */ + void reRun(ExecutionInstance instance) throws DAGEngineException; + + /** + * Perform dryrun of an instance. + * + * @param entity + * @throws DAGEngineException + */ + void submit(Entity entity) throws DAGEngineException; + + /** + * Returns info about the Job. + * @param externalID + * @return + */ + InstancesResult.Instance info(String externalID) throws DAGEngineException; + + /** + * @param externalID + * @return status of each individual node in the DAG. + * @throws DAGEngineException + */ + List<InstancesResult.InstanceAction> getJobDetails(String externalID) throws DAGEngineException; + + /** + * @return true if DAG Engine is up and running. + */ + boolean isAlive() throws DAGEngineException; + + /** + * @param externalID + * @return Configuration used to run the job. + * @throws DAGEngineException + */ + Properties getConfiguration(String externalID) throws DAGEngineException; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java new file mode 100644 index 0000000..e400326 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.workflow.engine; + +import org.apache.falcon.FalconException; +import org.apache.falcon.util.ReflectionUtils; +import org.apache.falcon.entity.v0.cluster.Cluster; + +import java.util.HashMap; +import java.util.Map; + +/** + * Factory for providing appropriate dag engine to the Falcon services. + */ +public final class DAGEngineFactory { + private static final String DAG_ENGINE = "dag.engine.impl"; + + // Cache the DAGEngines, to avoid overhead of creation. + private static final Map<String, DAGEngine> DAG_ENGINES = new HashMap<>(); + + private DAGEngineFactory() { + } + + public static DAGEngine getDAGEngine(Cluster cluster) throws FalconException { + return getDAGEngine(cluster.getName()); + } + + public static DAGEngine getDAGEngine(String clusterName) throws FalconException { + // Cache the DAGEngines for every cluster. + if (!DAG_ENGINES.containsKey(clusterName)) { + DAG_ENGINES.put(clusterName, + (DAGEngine) ReflectionUtils.getInstance(DAG_ENGINE, String.class, clusterName)); + } + + return DAG_ENGINES.get(clusterName); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java new file mode 100644 index 0000000..8dcf3a5 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.workflow.engine; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.execution.EntityExecutor; +import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.execution.FalconExecutionService; +import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.resource.InstancesSummaryResult; +import org.apache.falcon.state.EntityState; +import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceState; +import org.apache.falcon.state.store.AbstractStateStore; +import org.apache.falcon.state.store.StateStore; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Workflow engine which uses Falcon's native scheduler. + */ +public class FalconWorkflowEngine extends AbstractWorkflowEngine { + + private static final Logger LOG = LoggerFactory.getLogger(FalconWorkflowEngine.class); + private static final FalconExecutionService EXECUTION_SERVICE = FalconExecutionService.get(); + private static final StateStore STATE_STORE = AbstractStateStore.get(); + private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get(); + private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; + + private enum JobAction { + KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS + } + + public FalconWorkflowEngine() { + // Registering As it cleans up staging paths and not entirely Oozie Specific. + registerListener(new OozieHouseKeepingService()); + } + + @Override + public boolean isAlive(Cluster cluster) throws FalconException { + return DAGEngineFactory.getDAGEngine(cluster).isAlive(); + } + + @Override + public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> properties) throws FalconException { + EXECUTION_SERVICE.schedule(entity); + } + + @Override + public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException { + DAGEngineFactory.getDAGEngine(clusterName).submit(entity); + } + + @Override + public boolean isActive(Entity entity) throws FalconException { + return STATE_STORE.getEntity(new ID(entity)).getCurrentState() != EntityState.STATE.SUBMITTED; + } + + @Override + public boolean isSuspended(Entity entity) throws FalconException { + return STATE_STORE.getEntity(new ID(entity)) + .getCurrentState().equals(EntityState.STATE.SUSPENDED); + } + + @Override + public boolean isCompleted(Entity entity) throws FalconException { + throw new FalconException("Not yet implemented."); + } + + @Override + public String suspend(Entity entity) throws FalconException { + EXECUTION_SERVICE.suspend(entity); + return "SUCCESS"; + } + + @Override + public String resume(Entity entity) throws FalconException { + EXECUTION_SERVICE.resume(entity); + return "SUCCESS"; + } + + @Override + public String delete(Entity entity) throws FalconException { + if (isActive(entity)) { + EXECUTION_SERVICE.delete(entity); + } + // This should remove it from state store too as state store listens to config store changes. + CONFIG_STORE.remove(entity.getEntityType(), entity.getName()); + return "SUCCESS"; + } + + @Override + public String delete(Entity entity, String cluster) throws FalconException { + EXECUTION_SERVICE.getEntityExecutor(entity, cluster).killAll(); + return "SUCCESS"; + } + + @Override + public InstancesResult getRunningInstances(Entity entity, List<LifeCycle> lifeCycles) throws FalconException { + Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); + List<InstancesResult.Instance> runInstances = new ArrayList<>(); + + for (String cluster : clusters) { + Collection<InstanceState> instances = + STATE_STORE.getExecutionInstances(entity, cluster, InstanceState.getRunningStates()); + for (InstanceState state : instances) { + String instanceTimeStr = state.getInstance().getInstanceTime().toString(); + InstancesResult.Instance instance = new InstancesResult.Instance(cluster, instanceTimeStr, + InstancesResult.WorkflowStatus.RUNNING); + instance.startTime = state.getInstance().getActualStart().toDate(); + runInstances.add(instance); + } + } + InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED, "Running Instances"); + result.setInstances(runInstances.toArray(new InstancesResult.Instance[runInstances.size()])); + return result; + } + + private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, + Properties props, List<LifeCycle> lifeCycles) throws FalconException { + Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); + List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS); + APIResult.Status overallStatus = APIResult.Status.SUCCEEDED; + int instanceCount = 0; + + Collection<InstanceState.STATE> states; + switch(action) { + case KILL: + case SUSPEND: + states = InstanceState.getActiveStates(); + break; + case RESUME: + states = new ArrayList<>(); + states.add(InstanceState.STATE.SUSPENDED); + break; + case STATUS: + case PARAMS: + // Applicable only for running and finished jobs. + states = InstanceState.getRunningStates(); + states.addAll(InstanceState.getTerminalStates()); + states.add(InstanceState.STATE.SUSPENDED); + break; + default: + throw new IllegalArgumentException("Unhandled action " + action); + } + + List<ExecutionInstance> instancesToActOn = new ArrayList<>(); + for (String cluster : clusters) { + if (clusterList.size() != 0 && !clusterList.contains(cluster)) { + continue; + } + LOG.debug("Retrieving instances for cluster : {} for action {}" , cluster, action); + Collection<InstanceState> instances = + STATE_STORE.getExecutionInstances(entity, cluster, states, new DateTime(start), new DateTime(end)); + for (InstanceState state : instances) { + instancesToActOn.add(state.getInstance()); + } + } + + List<InstancesResult.Instance> instances = new ArrayList<>(); + for (ExecutionInstance ins : instancesToActOn) { + instanceCount++; + String instanceTimeStr = SchemaHelper.formatDateUTC(ins.getInstanceTime().toDate()); + + InstancesResult.Instance instance = null; + try { + instance = performAction(ins.getCluster(), entity, action, ins); + instance.instance = instanceTimeStr; + } catch (FalconException e) { + LOG.warn("Unable to perform action {} on cluster", action, e); + instance = new InstancesResult.Instance(ins.getCluster(), instanceTimeStr, null); + instance.status = InstancesResult.WorkflowStatus.ERROR; + instance.details = e.getMessage(); + overallStatus = APIResult.Status.PARTIAL; + } + instances.add(instance); + } + if (instanceCount < 2 && overallStatus == APIResult.Status.PARTIAL) { + overallStatus = APIResult.Status.FAILED; + } + InstancesResult instancesResult = new InstancesResult(overallStatus, action.name()); + instancesResult.setInstances(instances.toArray(new InstancesResult.Instance[instances.size()])); + return instancesResult; + } + + private List<String> getIncludedClusters(Properties props, String clustersType) { + String clusters = props == null ? "" : props.getProperty(clustersType, ""); + List<String> clusterList = new ArrayList<>(); + for (String cluster : clusters.split(",")) { + if (StringUtils.isNotEmpty(cluster)) { + clusterList.add(cluster.trim()); + } + } + return clusterList; + } + + private InstancesResult.Instance performAction(String cluster, Entity entity, JobAction action, + ExecutionInstance instance) throws FalconException { + EntityExecutor executor = EXECUTION_SERVICE.getEntityExecutor(entity, cluster); + InstancesResult.Instance instanceInfo = null; + LOG.debug("Retrieving information for {} for action {}", instance.getId(), action); + if (StringUtils.isNotEmpty(instance.getExternalID())) { + instanceInfo = DAGEngineFactory.getDAGEngine(cluster).info(instance.getExternalID()); + } else { + instanceInfo = new InstancesResult.Instance(); + } + switch(action) { + case KILL: + executor.kill(instance); + instanceInfo.status = InstancesResult.WorkflowStatus.KILLED; + break; + case SUSPEND: + executor.suspend(instance); + instanceInfo.status = InstancesResult.WorkflowStatus.SUSPENDED; + break; + case RESUME: + executor.resume(instance); + instanceInfo.status = + InstancesResult.WorkflowStatus.valueOf(STATE_STORE + .getExecutionInstance(instance.getId()).getCurrentState().name()); + break; + case RERUN: + break; + case STATUS: + if (StringUtils.isNotEmpty(instance.getExternalID())) { + List<InstancesResult.InstanceAction> instanceActions = + DAGEngineFactory.getDAGEngine(cluster).getJobDetails(instance.getExternalID()); + instanceInfo.actions = instanceActions + .toArray(new InstancesResult.InstanceAction[instanceActions.size()]); + } + break; + + case PARAMS: + // Mask details, log + instanceInfo.details = null; + instanceInfo.logFile = null; + Properties props = DAGEngineFactory.getDAGEngine(cluster).getConfiguration(instance.getExternalID()); + InstancesResult.KeyValuePair[] keyValuePairs = new InstancesResult.KeyValuePair[props.size()]; + int i=0; + for (String name : props.stringPropertyNames()) { + keyValuePairs[i++] = new InstancesResult.KeyValuePair(name, props.getProperty(name)); + } + break; + default: + throw new IllegalArgumentException("Unhandled action " + action); + } + return instanceInfo; + } + + @Override + public InstancesResult killInstances(Entity entity, Date start, Date end, + Properties props, List<LifeCycle> lifeCycles) throws FalconException { + return doJobAction(JobAction.KILL, entity, start, end, props, lifeCycles); + } + + @Override + public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, + List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException { + throw new FalconException("Not yet Implemented"); + } + + @Override + public InstancesResult suspendInstances(Entity entity, Date start, Date end, + Properties props, List<LifeCycle> lifeCycles) throws FalconException { + return doJobAction(JobAction.SUSPEND, entity, start, end, props, lifeCycles); + } + + @Override + public InstancesResult resumeInstances(Entity entity, Date start, Date end, + Properties props, List<LifeCycle> lifeCycles) throws FalconException { + return doJobAction(JobAction.RESUME, entity, start, end, props, lifeCycles); + } + + @Override + public InstancesResult getStatus(Entity entity, Date start, Date end, + List<LifeCycle> lifeCycles) throws FalconException { + return doJobAction(JobAction.STATUS, entity, start, end, null, lifeCycles); + } + + @Override + public InstancesSummaryResult getSummary(Entity entity, Date start, Date end, + List<LifeCycle> lifeCycles) throws FalconException { + throw new FalconException("Not yet Implemented"); + } + + @Override + public InstancesResult getInstanceParams(Entity entity, Date start, Date end, + List<LifeCycle> lifeCycles) throws FalconException { + return doJobAction(JobAction.PARAMS, entity, start, end, null, lifeCycles); + } + + @Override + public boolean isNotificationEnabled(String cluster, String jobID) throws FalconException { + return true; + } + + @Override + public String update(Entity oldEntity, Entity newEntity, String cluster, Boolean skipDryRun) + throws FalconException { + throw new FalconException("Not yet Implemented"); + } + + @Override + public String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException { + throw new FalconException("Not yet Implemented"); + } + + @Override + public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException { + throw new FalconException("Not yet Implemented"); + } + + @Override + public String getWorkflowStatus(String cluster, String jobId) throws FalconException { + return DAGEngineFactory.getDAGEngine(cluster).info(jobId).getStatus().name(); + } + + @Override + public Properties getWorkflowProperties(String cluster, String jobId) throws FalconException { + return DAGEngineFactory.getDAGEngine(cluster).getConfiguration(jobId); + } + + @Override + public InstancesResult getJobDetails(String cluster, String jobId) throws FalconException { + InstancesResult.Instance[] instances = new InstancesResult.Instance[1]; + InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED, + "Instance for workflow id:" + jobId); + instances[0] = DAGEngineFactory.getDAGEngine(cluster).info(jobId); + result.setInstances(instances); + return result; + } +} +
