FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED' instances (By Pallavi Rao)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f0893f7f Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f0893f7f Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f0893f7f Branch: refs/heads/0.9 Commit: f0893f7fb26feb1af40d3ceda04c9d388a361d19 Parents: 39073ff Author: Pallavi Rao <[email protected]> Authored: Tue Jan 19 21:48:47 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Jan 19 21:48:47 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/execution/ProcessExecutor.java | 39 +++++++++++++------- .../org/apache/falcon/state/EntityState.java | 3 +- 3 files changed, 29 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/f0893f7f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1916c9a..531c4a5 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -113,6 +113,8 @@ Proposed Release Version: 0.9 OPTIMIZATIONS BUG FIXES + FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED' instances (Pallavi Rao) + FALCON-1723 Rerun with skip fail actions won't work in few cases (Pavan Kolamuri via Pallavi Rao) FALCON-1538 Prism status gives wrong info(Praveen Adlakha via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/f0893f7f/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java index 40fe1b3..188cec2 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java @@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Date; import java.util.Properties; -import java.util.TimeZone; /** * This class is responsible for managing execution instances of a process. @@ -153,8 +152,7 @@ public class ProcessExecutor extends EntityExecutor { suspend(instance); } catch (FalconException e) { // Proceed with next - errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage()); - LOG.error("Instance suspend failed for : " + instance.getId(), e); + errMsg.append(handleError(instance, e, EntityState.EVENT.SUSPEND)); } } for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster, @@ -163,8 +161,7 @@ public class ProcessExecutor extends EntityExecutor { try { suspend(instance); } catch (FalconException e) { - errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage()); - LOG.error("Instance suspend failed for : " + instance.getId(), e); + errMsg.append(handleError(instance, e, EntityState.EVENT.SUSPEND)); } } // Some errors @@ -173,6 +170,24 @@ public class ProcessExecutor extends EntityExecutor { } } + // Error handling for an operation. + private String handleError(ExecutionInstance instance, FalconException e, EntityState.EVENT action) + throws StateStoreException { + try { + // If the instance terminated while a kill/suspend operation was in progress, ignore the exception. + InstanceState.STATE currentState = STATE_STORE.getExecutionInstance(instance.getId()).getCurrentState(); + if (InstanceState.getTerminalStates().contains(currentState)) { + return ""; + } + } catch (StateStoreException sse) { + throw sse; + } + + String errMsg = "Instance " + action.name() + " failed for: " + instance.getId() + " due to " + e.getMessage(); + LOG.error(errMsg, e); + return errMsg; + } + // Returns last materialized instance's time. private Date getLastInstanceTime() throws StateStoreException { InstanceState instanceState = STATE_STORE.getLastExecutionInstance(process, cluster); @@ -198,8 +213,8 @@ public class ProcessExecutor extends EntityExecutor { try { resume(instance); } catch (FalconException e) { - errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage()); - LOG.error("Instance suspend failed for : " + instance.getId(), e); + errMsg.append("Instance resume failed for : " + instance.getId() + " due to " + e.getMessage()); + LOG.error("Instance resume failed for : " + instance.getId(), e); } } registerForNotifications(getLastInstanceTime()); @@ -219,8 +234,7 @@ public class ProcessExecutor extends EntityExecutor { kill(instance); } catch (FalconException e) { // Proceed with next - errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage()); - LOG.error("Instance kill failed for : " + instance.getId(), e); + errMsg.append(handleError(instance, e, EntityState.EVENT.KILL)); } } for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster, @@ -229,8 +243,7 @@ public class ProcessExecutor extends EntityExecutor { try { kill(instance); } catch (FalconException e) { - errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage()); - LOG.error("Instance kill failed for : " + instance.getId(), e); + errMsg.append(handleError(instance, e, EntityState.EVENT.KILL)); } } // Some errors @@ -248,12 +261,10 @@ public class ProcessExecutor extends EntityExecutor { LOG.error("Suspend failed for instance id : " + instance.getId(), e); throw new FalconException("Suspend failed for instance : " + instance.getId(), e); } - } @Override public void resume(ExecutionInstance instance) throws FalconException { - try { instance.resume(); if (((ProcessExecutionInstance) instance).isScheduled()) { @@ -452,7 +463,7 @@ public class ProcessExecutor extends EntityExecutor { requestBuilder.setFrequency(process.getFrequency()) .setStartTime(new DateTime(startTime)) .setEndTime(new DateTime(endTime)) - .setTimeZone(TimeZone.getTimeZone("UTC")); + .setTimeZone(EntityUtil.getTimeZone(process)); NotificationServicesRegistry.register(requestBuilder.build()); LOG.info("Registered for a time based notification for process {} with frequency: {}, " + "start time: {}, end time: {}", process.getName(), process.getFrequency(), startTime, endTime); http://git-wip-us.apache.org/repos/asf/falcon/blob/f0893f7f/scheduler/src/main/java/org/apache/falcon/state/EntityState.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java index f44f174..ae57fa1 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java +++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java @@ -81,7 +81,8 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState. SUBMIT, SCHEDULE, SUSPEND, - RESUME + RESUME, + KILL } /**
