Repository: flume Updated Branches: refs/heads/flume-1.5 6d3d0cd8e -> c9598caa3
FLUME-2323: Morphline sink must increment eventDrainAttemptCount when it takes event from channel (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c9598caa Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c9598caa Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c9598caa Branch: refs/heads/flume-1.5 Commit: c9598caa322532bf8a2f484ceb04899d43a26129 Parents: 6d3d0cd Author: Jarek Jarcec Cecho <[email protected]> Authored: Fri Feb 28 15:37:25 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Fri Feb 28 15:38:16 2014 -0800 ---------------------------------------------------------------------- .../org/apache/flume/sink/solr/morphline/MorphlineSink.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c9598caa/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java index 7c784c4..9c4dc25 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java @@ -134,8 +134,9 @@ public class MorphlineSink extends AbstractSink implements Configurable { if (event == null) { break; } + sinkCounter.incrementEventDrainAttemptCount(); numEventsTaken++; - LOGGER.debug("Flume event: {}", event); + LOGGER.debug("Flume event: {}", event); //StreamEvent streamEvent = createStreamEvent(event); handler.process(event); if (System.currentTimeMillis() >= batchEndTime) { @@ -152,12 +153,10 @@ public class MorphlineSink extends AbstractSink implements Configurable { } else { sinkCounter.incrementBatchCompleteCount(); } - sinkCounter.addToEventDrainAttemptCount(numEventsTaken); - sinkCounter.addToEventDrainSuccessCount(numEventsTaken); - handler.commitTransaction(); isMorphlineTransactionCommitted = true; txn.commit(); + sinkCounter.addToEventDrainSuccessCount(numEventsTaken); return numEventsTaken == 0 ? Status.BACKOFF : Status.READY; } catch (Throwable t) { // Ooops - need to rollback and back off
