Repository: flume Updated Branches: refs/heads/trunk 87d4c2c13 -> 94f1fab85
Fix startup order for ExecSource This patch starts the SourceCounter in the ExecSource before starting the runner thread that processes input from the forked command. Starting the SourceCounter before calling its methods is required to get correct metric counts because the metrics are all reset to 0 when SourceCounter.start() is called. A follow-up patch will attempt to enforce that a SourceCounter or any other MonitoredCounterGroup is running at the time that calls to modify the metrics occur. After applying this patch, TestExecSource.testMonitoredCounterGroup no longer fails. This closes #72 Reviewers: Attila Simon, Bessenyei Balázs Donát (Mike Percy via Bessenyei Balázs Donát) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/94f1fab8 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/94f1fab8 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/94f1fab8 Branch: refs/heads/trunk Commit: 94f1fab85ab772ecb5af6cba0d4b15ffdb2f6315 Parents: 87d4c2c Author: Mike Percy <[email protected]> Authored: Fri Oct 14 01:24:37 2016 +0200 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Tue Oct 18 16:18:07 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flume/source/ExecSource.java | 25 +++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/94f1fab8/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java index 52ea808..eaafbd6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java @@ -165,22 +165,19 @@ public class ExecSource extends AbstractSource implements EventDrivenSource, Con @Override public void start() { - logger.info("Exec source starting with command:{}", command); + logger.info("Exec source starting with command: {}", command); - executor = Executors.newSingleThreadExecutor(); + // Start the counter before starting any threads that may access it. + sourceCounter.start(); - runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, - restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset); + executor = Executors.newSingleThreadExecutor(); + runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, + restartThrottle, logStderr, bufferCount, batchTimeout, charset); - // FIXME: Use a callback-like executor / future to signal us upon failure. + // Start the runner thread. runnerFuture = executor.submit(runner); - /* - * NB: This comes at the end rather than the beginning of the method because - * it sets our state to running. We want to make sure the executor is alive - * and well first. - */ - sourceCounter.start(); + // Mark the Source as RUNNING. super.start(); logger.debug("Exec source started"); @@ -188,7 +185,7 @@ public class ExecSource extends AbstractSource implements EventDrivenSource, Con @Override public void stop() { - logger.info("Stopping exec source with command:{}", command); + logger.info("Stopping exec source with command: {}", command); if (runner != null) { runner.setRestart(false); runner.kill(); @@ -324,7 +321,7 @@ public class ExecSource extends AbstractSource implements EventDrivenSource, Con } } } catch (Exception e) { - logger.error("Exception occured when processing event batch", e); + logger.error("Exception occurred when processing event batch", e); if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } @@ -334,8 +331,8 @@ public class ExecSource extends AbstractSource implements EventDrivenSource, Con batchTimeout, batchTimeout, TimeUnit.MILLISECONDS); while ((line = reader.readLine()) != null) { + sourceCounter.incrementEventReceivedCount(); synchronized (eventList) { - sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset))); if (eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList);
