Repository: flume
Updated Branches:
  refs/heads/trunk 87d4c2c13 -> 94f1fab85


Fix startup order for ExecSource

This patch starts the SourceCounter in the ExecSource before starting
the runner thread that processes input from the forked command.

Starting the SourceCounter before calling its methods is required to get
correct metric counts because the metrics are all reset to 0 when
SourceCounter.start() is called.

A follow-up patch will attempt to enforce that a SourceCounter or any
other MonitoredCounterGroup is running at the time that calls to modify
the metrics occur.

After applying this patch, TestExecSource.testMonitoredCounterGroup no
longer fails.

This closes #72

Reviewers: Attila Simon, Bessenyei Balázs Donát

(Mike Percy via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/94f1fab8
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/94f1fab8
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/94f1fab8

Branch: refs/heads/trunk
Commit: 94f1fab85ab772ecb5af6cba0d4b15ffdb2f6315
Parents: 87d4c2c
Author: Mike Percy <mpe...@apache.org>
Authored: Fri Oct 14 01:24:37 2016 +0200
Committer: Bessenyei Balázs Donát <bes...@apache.org>
Committed: Tue Oct 18 16:18:07 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flume/source/ExecSource.java     | 25 +++++++++-----------
 1 file changed, 11 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/94f1fab8/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
index 52ea808..eaafbd6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
@@ -165,22 +165,19 @@ public class ExecSource extends AbstractSource implements 
EventDrivenSource, Con
 
   @Override
   public void start() {
-    logger.info("Exec source starting with command:{}", command);
+    logger.info("Exec source starting with command: {}", command);
 
-    executor = Executors.newSingleThreadExecutor();
+    // Start the counter before starting any threads that may access it.
+    sourceCounter.start();
 
-    runner = new ExecRunnable(shell, command, getChannelProcessor(), 
sourceCounter,
-        restart, restartThrottle, logStderr, bufferCount, batchTimeout, 
charset);
+    executor = Executors.newSingleThreadExecutor();
+    runner = new ExecRunnable(shell, command, getChannelProcessor(), 
sourceCounter, restart,
+                              restartThrottle, logStderr, bufferCount, 
batchTimeout, charset);
 
-    // FIXME: Use a callback-like executor / future to signal us upon failure.
+    // Start the runner thread.
     runnerFuture = executor.submit(runner);
 
-    /*
-     * NB: This comes at the end rather than the beginning of the method 
because
-     * it sets our state to running. We want to make sure the executor is alive
-     * and well first.
-     */
-    sourceCounter.start();
+    // Mark the Source as RUNNING.
     super.start();
 
     logger.debug("Exec source started");
@@ -188,7 +185,7 @@ public class ExecSource extends AbstractSource implements 
EventDrivenSource, Con
 
   @Override
   public void stop() {
-    logger.info("Stopping exec source with command:{}", command);
+    logger.info("Stopping exec source with command: {}", command);
     if (runner != null) {
       runner.setRestart(false);
       runner.kill();
@@ -324,7 +321,7 @@ public class ExecSource extends AbstractSource implements 
EventDrivenSource, Con
                     }
                   }
                 } catch (Exception e) {
-                  logger.error("Exception occured when processing event 
batch", e);
+                  logger.error("Exception occurred when processing event 
batch", e);
                   if (e instanceof InterruptedException) {
                     Thread.currentThread().interrupt();
                   }
@@ -334,8 +331,8 @@ public class ExecSource extends AbstractSource implements 
EventDrivenSource, Con
           batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
 
           while ((line = reader.readLine()) != null) {
+            sourceCounter.incrementEventReceivedCount();
             synchronized (eventList) {
-              sourceCounter.incrementEventReceivedCount();
               eventList.add(EventBuilder.withBody(line.getBytes(charset)));
               if (eventList.size() >= bufferCount || timeout()) {
                 flushEventBatch(eventList);

Reply via email to