Repository: flume Updated Branches: refs/heads/trunk c9b531e70 -> e1ec22e4f
FLUME-2323: Morphline sink must increment eventDrainAttemptCount when it takes event from channel (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e1ec22e4 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e1ec22e4 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e1ec22e4 Branch: refs/heads/trunk Commit: e1ec22e4f24600006c20812f7e7362ac80a38656 Parents: c9b531e Author: Jarek Jarcec Cecho <[email protected]> Authored: Fri Feb 28 15:37:25 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Fri Feb 28 15:37:25 2014 -0800 ---------------------------------------------------------------------- .../org/apache/flume/sink/solr/morphline/MorphlineSink.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/e1ec22e4/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java index 7c784c4..9c4dc25 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java @@ -134,8 +134,9 @@ public class MorphlineSink extends AbstractSink implements Configurable { if (event == null) { break; } + sinkCounter.incrementEventDrainAttemptCount(); numEventsTaken++; - LOGGER.debug("Flume event: {}", event); + LOGGER.debug("Flume event: {}", event); //StreamEvent streamEvent = createStreamEvent(event); handler.process(event); if (System.currentTimeMillis() >= batchEndTime) { @@ -152,12 +153,10 @@ public class MorphlineSink extends AbstractSink implements Configurable { } else { sinkCounter.incrementBatchCompleteCount(); } - sinkCounter.addToEventDrainAttemptCount(numEventsTaken); - sinkCounter.addToEventDrainSuccessCount(numEventsTaken); - handler.commitTransaction(); isMorphlineTransactionCommitted = true; txn.commit(); + sinkCounter.addToEventDrainSuccessCount(numEventsTaken); return numEventsTaken == 0 ? Status.BACKOFF : Status.READY; } catch (Throwable t) { // Ooops - need to rollback and back off
