Updated Branches: refs/heads/trunk 118752374 -> ab0894c7f
FLUME-1849. Embedded Agent doesn't shutdown supervisor. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ab0894c7 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ab0894c7 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ab0894c7 Branch: refs/heads/trunk Commit: ab0894c7f014769a9b400e6b5cb2f5c055c6b065 Parents: 1187523 Author: Hari Shreedharan <[email protected]> Authored: Wed Jan 16 13:17:09 2013 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Wed Jan 16 13:17:09 2013 -0800 ---------------------------------------------------------------------- .../apache/flume/agent/embedded/EmbeddedAgent.java | 51 +-------------- .../embedded/TestEmbeddedAgentEmbeddedSource.java | 28 -------- 2 files changed, 2 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ab0894c7/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java index 4adbea7..d02f440 100644 --- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java +++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java @@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; /** * EmbeddedAgent gives Flume users the ability to embed simple agents in @@ -136,7 +135,7 @@ public class EmbeddedAgent { if(state != State.STARTED) { throw new IllegalStateException("Cannot be stopped unless started"); } - doStop(); + supervisor.stop(); embeddedSource = null; state = State.STOPPED; } @@ -212,7 +211,6 @@ public class EmbeddedAgent { private void doStart() { boolean error = true; - List<LifecycleAware> supervised = Lists.newArrayList(); try { channel.start(); sinkRunner.start(); @@ -220,28 +218,17 @@ public class EmbeddedAgent { supervisor.supervise(channel, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); - supervised.add(channel); supervisor.supervise(sinkRunner, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); - supervised.add(sinkRunner); supervisor.supervise(sourceRunner, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); - supervised.add(sourceRunner); - error = false; } finally { if(error) { - for(LifecycleAware lifeCycleAware : supervised) { - try { - supervisor.unsupervise(lifeCycleAware); - } catch (Exception e) { - LOGGER.warn("Exception while stopping " + lifeCycleAware + - " due to error on startup", e); - } - } stopLogError(sourceRunner); stopLogError(channel); stopLogError(sinkRunner); + supervisor.stop(); } } } @@ -254,40 +241,6 @@ public class EmbeddedAgent { LOGGER.warn("Exception while stopping " + lifeCycleAware, e); } } - private void doStop() { - Exception exception = null; - // source - try { - if(LifecycleState.START.equals(sourceRunner.getLifecycleState())) { - sourceRunner.stop(); - } - } catch (Exception e) { - exception = e; - LOGGER.error("Caught exception stopping source " + sourceRunner, e); - } - // sink - try { - if(LifecycleState.START.equals(sinkRunner.getLifecycleState())) { - sinkRunner.stop(); - } - } catch (Exception e) { - exception = e; - LOGGER.error("Caught exception stopping sink " + sinkRunner, e); - } - // channel - try { - if(LifecycleState.START.equals(channel.getLifecycleState())) { - channel.stop(); - } - } catch (Exception e) { - exception = e; - LOGGER.error("Caught exception stopping channel " + channel, e); - } - if(exception != null) { - throw new FlumeException("Error stopping one or more components " + - "check the logs for an exhaustive list of errors", exception); - } - } private static enum State { NEW(), http://git-wip-us.apache.org/repos/asf/flume/blob/ab0894c7/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java index b315770..4e94d72 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java @@ -165,34 +165,6 @@ public class TestEmbeddedAgentEmbeddedSource { private static final long serialVersionUID = 116546244849853151L; } @Test - public void testStopSourceThrowsException() { - doThrow(new LocalRuntimeException()).when(sourceRunner).stop(); - stopExpectingLocalRuntimeException(); - } - @Test - public void testStopChannelThrowsException() { - doThrow(new LocalRuntimeException()).when(channel).stop(); - stopExpectingLocalRuntimeException(); - } - @Test - public void testStopSinkThrowsException() { - doThrow(new LocalRuntimeException()).when(sinkRunner).stop(); - stopExpectingLocalRuntimeException(); - } - private void stopExpectingLocalRuntimeException() { - agent.configure(properties); - agent.start(); - try { - agent.stop(); - Assert.fail(); - } catch (FlumeException e) { - Assert.assertTrue(e.getCause() instanceof LocalRuntimeException); - } - verify(sourceRunner, times(1)).stop(); - verify(channel, times(1)).stop(); - verify(sinkRunner, times(1)).stop(); - } - @Test public void testPut() throws EventDeliveryException { Event event = new SimpleEvent(); agent.configure(properties);
