Updated Branches: refs/heads/flume-1.4 c9de68584 -> fc53e9ae2
FLUME-1296: Lifecycle supervisor should check if the monitor service is still running before supervising (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/fc53e9ae Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/fc53e9ae Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/fc53e9ae Branch: refs/heads/flume-1.4 Commit: fc53e9ae2882bdd234f0f75d3ac33fffc385f671 Parents: c9de685 Author: Brock Noland <[email protected]> Authored: Mon Dec 10 14:38:13 2012 -0600 Committer: Brock Noland <[email protected]> Committed: Mon Dec 10 14:38:25 2012 -0600 ---------------------------------------------------------------------- .../flume/lifecycle/LifecycleSupervisor.java | 25 ++++++++++++--- 1 files changed, 20 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/fc53e9ae/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java index 78eda05..59d780a 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java @@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.flume.FlumeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,13 +81,19 @@ public class LifecycleSupervisor implements LifecycleAware { if (monitorService != null) { monitorService.shutdown(); - - while (!monitorService.isTerminated()) { + try{ + monitorService.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("Interrupted while waiting for monitor service to stop"); + } + if(!monitorService.isTerminated()) { + monitorService.shutdownNow(); try { - monitorService.awaitTermination(500, TimeUnit.MILLISECONDS); + while(!monitorService.isTerminated()) { + monitorService.awaitTermination(10, TimeUnit.SECONDS); + } } catch (InterruptedException e) { - logger.debug("Interrupted while waiting for monitor service to stop"); - monitorService.shutdownNow(); + logger.error("Interrupted while waiting for monitor service to stop"); } } } @@ -95,6 +102,7 @@ public class LifecycleSupervisor implements LifecycleAware { .entrySet()) { if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) { + entry.getValue().status.desiredState = LifecycleState.STOP; entry.getKey().stop(); } } @@ -114,6 +122,13 @@ public class LifecycleSupervisor implements LifecycleAware { public synchronized void supervise(LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState) { + if(this.monitorService.isShutdown() + || this.monitorService.isTerminated() + || this.monitorService.isTerminating()){ + throw new FlumeException("Supervise called on " + lifecycleAware + " " + + "after shutdown has been initiated. " + lifecycleAware + " will not" + + " be started"); + } Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware), "Refusing to supervise " + lifecycleAware + " more than once");
