Repository: flume
Updated Branches:
  refs/heads/flume-1.5 6d3d0cd8e -> c9598caa3


FLUME-2323: Morphline sink must increment eventDrainAttemptCount when it takes 
event from channel

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c9598caa
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c9598caa
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c9598caa

Branch: refs/heads/flume-1.5
Commit: c9598caa322532bf8a2f484ceb04899d43a26129
Parents: 6d3d0cd
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Fri Feb 28 15:37:25 2014 -0800
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Fri Feb 28 15:38:16 2014 -0800

----------------------------------------------------------------------
 .../org/apache/flume/sink/solr/morphline/MorphlineSink.java   | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c9598caa/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
index 7c784c4..9c4dc25 100644
--- 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
+++ 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
@@ -134,8 +134,9 @@ public class MorphlineSink extends AbstractSink implements 
Configurable {
         if (event == null) {
           break;
         }
+        sinkCounter.incrementEventDrainAttemptCount();
         numEventsTaken++;
-        LOGGER.debug("Flume event: {}", event);      
+        LOGGER.debug("Flume event: {}", event);
         //StreamEvent streamEvent = createStreamEvent(event);
         handler.process(event);
         if (System.currentTimeMillis() >= batchEndTime) {
@@ -152,12 +153,10 @@ public class MorphlineSink extends AbstractSink 
implements Configurable {
       } else {
         sinkCounter.incrementBatchCompleteCount();
       }
-      sinkCounter.addToEventDrainAttemptCount(numEventsTaken);
-      sinkCounter.addToEventDrainSuccessCount(numEventsTaken);
-
       handler.commitTransaction();
       isMorphlineTransactionCommitted = true;
       txn.commit();
+      sinkCounter.addToEventDrainSuccessCount(numEventsTaken);
       return numEventsTaken == 0 ? Status.BACKOFF : Status.READY;
     } catch (Throwable t) {
       // Ooops - need to rollback and back off

Reply via email to