Repository: falcon Updated Branches: refs/heads/master 46ecceba8 -> bd147972b
FALCON-1758 APIs fail when oozie workflow entries are deleted (Pavan Kolamuri) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bd147972 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bd147972 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bd147972 Branch: refs/heads/master Commit: bd147972b9336c108d0660d9ba4dd91460963102 Parents: 46ecceb Author: Pallavi Rao <[email protected]> Authored: Mon Jan 25 11:41:49 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Jan 25 11:41:49 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../execution/FalconExecutionService.java | 26 ++++++++++------- .../falcon/execution/ProcessExecutor.java | 30 ++++++++++---------- .../org/apache/falcon/state/EntityState.java | 23 +++++++++++++-- .../falcon/state/EntityStateChangeHandler.java | 9 +++++- .../org/apache/falcon/state/StateService.java | 13 ++++++--- .../falcon/state/store/AbstractStateStore.java | 4 +++ 7 files changed, 74 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a86633f..d87ce26 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -120,6 +120,8 @@ Proposed Release Version: 0.9 OPTIMIZATIONS BUG FIXES + FALCON-1758 APIs fail when oozie workflow entries are deleted (Pavan Kolamuri via Pallavi Rao) + FALCON-1754 JobCompletionService throws FalconException (Pallavi Rao) FALCON-1716 API fails with CommunicationsException when mysql interaction time is longer than 53,434,939 milliseconds (Pavan Kolamuri via Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java index da1d7cc..93c894d 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java @@ -185,6 +185,20 @@ public final class FalconExecutionService implements FalconService, EntityStateC } } + @Override + public void onKill(Entity entity) throws FalconException { + for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + EntityClusterID id = new EntityClusterID(entity, cluster); + if (!executors.containsKey(id)) { + LOG.info("Entity {} is already deleted on cluster {}.", id, cluster); + continue; + } + EntityExecutor executor = getEntityExecutor(entity, cluster); + executor.killAll(); + executors.remove(executor.getId()); + } + } + /** * Schedules an entity. * @@ -222,18 +236,10 @@ public final class FalconExecutionService implements FalconService, EntityStateC * @throws FalconException */ public void delete(Entity entity) throws FalconException { - for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { - EntityClusterID id = new EntityClusterID(entity, cluster); - if (!executors.containsKey(id)) { - LOG.info("Entity {} is already deleted on cluster {}.", id, cluster); - continue; - } - EntityExecutor executor = getEntityExecutor(entity, cluster); - executor.killAll(); - executors.remove(executor.getId()); - } + StateService.get().handleStateChange(entity, EntityState.EVENT.KILL, this); } + /** * Returns the instance of {@link EntityExecutor} for a given entity and cluster. * http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java index 188cec2..745d2ea 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java @@ -20,6 +20,8 @@ package org.apache.falcon.execution; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; + +import java.util.Collection; import java.util.concurrent.ExecutionException; import org.apache.falcon.FalconException; import org.apache.falcon.entity.EntityUtil; @@ -173,14 +175,10 @@ public class ProcessExecutor extends EntityExecutor { // Error handling for an operation. private String handleError(ExecutionInstance instance, FalconException e, EntityState.EVENT action) throws StateStoreException { - try { - // If the instance terminated while a kill/suspend operation was in progress, ignore the exception. - InstanceState.STATE currentState = STATE_STORE.getExecutionInstance(instance.getId()).getCurrentState(); - if (InstanceState.getTerminalStates().contains(currentState)) { - return ""; - } - } catch (StateStoreException sse) { - throw sse; + // If the instance terminated while a kill/suspend operation was in progress, ignore the exception. + InstanceState.STATE currentState = STATE_STORE.getExecutionInstance(instance.getId()).getCurrentState(); + if (InstanceState.getTerminalStates().contains(currentState)) { + return ""; } String errMsg = "Instance " + action.name() + " failed for: " + instance.getId() + " due to " + e.getMessage(); @@ -226,23 +224,24 @@ public class ProcessExecutor extends EntityExecutor { @Override public void killAll() throws FalconException { - NotificationServicesRegistry.unregister(executionService, getId()); StringBuffer errMsg = new StringBuffer(); - // Only active instances are in memory. Kill them first. - for (ExecutionInstance instance : instances.asMap().values()) { + // Kill workflows in oozie. + for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster, + InstanceState.getActiveStates())) { + ExecutionInstance instance = instanceState.getInstance(); try { kill(instance); } catch (FalconException e) { - // Proceed with next errMsg.append(handleError(instance, e, EntityState.EVENT.KILL)); } } - for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster, - InstanceState.getActiveStates())) { - ExecutionInstance instance = instanceState.getInstance(); + // Kill active instances in memory. + Collection<ProcessExecutionInstance> execInstances = instances.asMap().values(); + for (ExecutionInstance instance : execInstances) { try { kill(instance); } catch (FalconException e) { + // Proceed with next errMsg.append(handleError(instance, e, EntityState.EVENT.KILL)); } } @@ -250,6 +249,7 @@ public class ProcessExecutor extends EntityExecutor { if (errMsg.length() != 0) { throw new FalconException("Some instances failed to kill : " + errMsg.toString()); } + NotificationServicesRegistry.unregister(executionService, getId()); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/state/EntityState.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java index ae57fa1..38479a4 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java +++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java @@ -41,8 +41,10 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState. return STATE.SCHEDULED; case SUBMIT: return this; + case KILL: + return STATE.KILLED; default: - throw new InvalidStateTransitionException("Submitted entities can only be scheduled."); + throw new InvalidStateTransitionException("Submitted entities can only be scheduled or killed."); } } }, @@ -54,8 +56,10 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState. return STATE.SUSPENDED; case SCHEDULE: return this; + case KILL: + return STATE.KILLED; default: - throw new InvalidStateTransitionException("Scheduled entities can only be suspended."); + throw new InvalidStateTransitionException("Scheduled entities can only be suspended or killed."); } } }, @@ -67,8 +71,21 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState. return STATE.SCHEDULED; case SUSPEND: return this; + case KILL: + return STATE.KILLED; default: - throw new InvalidStateTransitionException("Suspended entities can only be resumed."); + throw new InvalidStateTransitionException("Suspended entities can only be resumed or killed."); + } + } + }, + KILLED { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + switch (event) { + case KILL: + return STATE.KILLED; + default: + throw new InvalidStateTransitionException("Partially killed entities can only be killed."); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java index 44ec3fc..79c6abd 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java +++ b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java @@ -50,10 +50,17 @@ public interface EntityStateChangeHandler { void onSuspend(Entity entity) throws FalconException; /** - * Invoked when the an intity is resumed. + * Invoked when the an entity is resumed. * * @param entity * @throws FalconException */ void onResume(Entity entity) throws FalconException; + + /** + * Invoked when and entity is killed/deleted. + * @param entity + * @throws FalconException + */ + void onKill(Entity entity) throws FalconException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/state/StateService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java index 9266354..638bb6e 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java +++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java @@ -81,10 +81,12 @@ public final class StateService { EntityState entityState = stateStore.getEntity(id); EntityState.STATE newState = entityState.nextTransition(event); callbackHandler(entity, event, handler); - entityState.setCurrentState(newState); - stateStore.updateEntity(entityState); - LOG.debug("State of entity: {} changed to: {} as a result of event: {}.", id, - entityState.getCurrentState(), event.name()); + if (newState != entityState.getCurrentState()) { + entityState.setCurrentState(newState); + stateStore.updateEntity(entityState); + LOG.debug("State of entity: {} changed to: {} as a result of event: {}.", id, + entityState.getCurrentState(), event.name()); + } } } @@ -107,6 +109,9 @@ public final class StateService { case RESUME: handler.onResume(entity); break; + case KILL: + handler.onKill(entity); + break; default: // Do nothing, only propagate events that originate from user } } http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java index 84d12f8..82cb659 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java @@ -38,6 +38,10 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha @Override public void onAdd(Entity entity) throws FalconException { if (entity.getEntityType() != EntityType.CLUSTER) { + EntityID entityID = new EntityID(entity); + if (entityExists(entityID)) { + deleteEntity(entityID); + } putEntity(new EntityState(entity)); } }
