Repository: flume Updated Branches: refs/heads/trunk 9b2c28d80 -> 9f0abea02
FLUME-2495. Kafka Source may miss events when channel is not available (Gwen Shapira via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9f0abea0 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9f0abea0 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9f0abea0 Branch: refs/heads/trunk Commit: 9f0abea027c408669977aa34cd32f5e525bb508b Parents: 9b2c28d Author: Hari Shreedharan <[email protected]> Authored: Tue Oct 7 14:12:02 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Oct 7 14:12:02 2014 -0700 ---------------------------------------------------------------------- .../apache/flume/source/kafka/KafkaSource.java | 21 ++- .../flume/source/kafka/TestKafkaSource.java | 147 +++++++++++++++++-- 2 files changed, 143 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/9f0abea0/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 8cdc967..9d77b47 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -71,17 +71,15 @@ public class KafkaSource extends AbstractSource private final List<Event> eventList = new ArrayList<Event>(); public Status process() throws EventDeliveryException { - eventList.clear(); + byte[] bytes; Event event; Map<String, String> headers; long batchStartTime = System.currentTimeMillis(); long batchEndTime = System.currentTimeMillis() + timeUpperLimit; try { - int eventCounter = 0; - int timeWaited = 0; boolean iterStatus = false; - while (eventCounter < batchUpperLimit && + while (eventList.size() < batchUpperLimit && System.currentTimeMillis() < batchEndTime) { iterStatus = hasNext(); if (iterStatus) { @@ -97,17 +95,21 @@ public class KafkaSource extends AbstractSource } event = EventBuilder.withBody(bytes, headers); eventList.add(event); - eventCounter++; } if (log.isDebugEnabled()) { log.debug("Waited: {} ", System.currentTimeMillis() - batchStartTime); - log.debug("Event #: {}", eventCounter); + log.debug("Event #: {}", eventList.size()); } } // If we have events, send events to channel + // clear the event list // and commit if Kafka doesn't auto-commit - if (eventCounter > 0) { + if (eventList.size() > 0) { getChannelProcessor().processEventBatch(eventList); + eventList.clear(); + if (log.isDebugEnabled()) { + log.debug("Wrote {} events to channel", eventList.size()); + } if (!kafkaAutoCommitEnabled) { // commit the read transactions to Kafka to avoid duplicates consumer.commitOffsets(); @@ -203,11 +205,6 @@ public class KafkaSource extends AbstractSource super.stop(); } - - - - - /** * Check if there are messages waiting in Kafka, * waiting until timeout (10ms by default) for messages to arrive. http://git-wip-us.apache.org/repos/asf/flume/blob/9f0abea0/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index 3695860..72eec77 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -75,19 +75,7 @@ public class TestKafkaSource { context.put(KafkaSourceConstants.TOPIC,topicName); context.put("kafka.consumer.timeout.ms","100"); - - ChannelProcessor channelProcessor = mock(ChannelProcessor.class); - - events = Lists.newArrayList(); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - events.addAll((List<Event>)invocation.getArguments()[0]); - return null; - } - }).when(channelProcessor).processEventBatch(any(List.class)); - kafkaSource.setChannelProcessor(channelProcessor); + kafkaSource.setChannelProcessor(createGoodChannel()); } @After @@ -210,4 +198,137 @@ public class TestKafkaSource { ( context.getLong(KafkaSourceConstants.BATCH_DURATION_MS) + context.getLong("kafka.consumer.timeout.ms")) ); } + + // Consume event, stop source, start again and make sure we are not + // consuming same event again + @Test + public void testCommit() throws InterruptedException, EventDeliveryException { + context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + kafkaSource.configure(context); + kafkaSource.start(); + + Thread.sleep(500L); + + kafkaServer.produce(topicName, "", "hello, world"); + + Thread.sleep(500L); + + Assert.assertEquals(Status.READY, kafkaSource.process()); + kafkaSource.stop(); + Thread.sleep(500L); + kafkaSource.start(); + Thread.sleep(500L); + Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); + + } + + // Remove channel processor and test if we can consume events again + @Test + public void testNonCommit() throws EventDeliveryException, + InterruptedException { + + context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000"); + kafkaSource.configure(context); + kafkaSource.start(); + Thread.sleep(500L); + + kafkaServer.produce(topicName, "", "hello, world"); + Thread.sleep(500L); + + kafkaSource.setChannelProcessor(createBadChannel()); + log.debug("processing from kafka to bad channel"); + Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); + + log.debug("repairing channel"); + kafkaSource.setChannelProcessor(createGoodChannel()); + + log.debug("re-process to good channel - this should work"); + kafkaSource.process(); + Assert.assertEquals("hello, world", new String(events.get(0).getBody(), + Charsets.UTF_8)); + + + } + + @Test + public void testTwoBatches() throws InterruptedException, + EventDeliveryException { + context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000"); + kafkaSource.configure(context); + kafkaSource.start(); + Thread.sleep(500L); + + kafkaServer.produce(topicName, "", "event 1"); + Thread.sleep(500L); + + kafkaSource.process(); + Assert.assertEquals("event 1", new String(events.get(0).getBody(), + Charsets.UTF_8)); + events.clear(); + + kafkaServer.produce(topicName, "", "event 2"); + Thread.sleep(500L); + kafkaSource.process(); + Assert.assertEquals("event 2", new String(events.get(0).getBody(), + Charsets.UTF_8)); + } + + @Test + public void testTwoBatchesWithAutocommit() throws InterruptedException, + EventDeliveryException { + context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000"); + context.put("kafka.auto.commit.enable","true"); + kafkaSource.configure(context); + kafkaSource.start(); + Thread.sleep(500L); + + kafkaServer.produce(topicName, "", "event 1"); + Thread.sleep(500L); + + kafkaSource.process(); + Assert.assertEquals("event 1", new String(events.get(0).getBody(), + Charsets.UTF_8)); + events.clear(); + + kafkaServer.produce(topicName, "", "event 2"); + Thread.sleep(500L); + kafkaSource.process(); + Assert.assertEquals("event 2", new String(events.get(0).getBody(), + Charsets.UTF_8)); + + } + + ChannelProcessor createGoodChannel() { + + ChannelProcessor channelProcessor = mock(ChannelProcessor.class); + + events = Lists.newArrayList(); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + events.addAll((List<Event>)invocation.getArguments()[0]); + return null; + } + }).when(channelProcessor).processEventBatch(any(List.class)); + + return channelProcessor; + + } + + ChannelProcessor createBadChannel() { + ChannelProcessor channelProcessor = mock(ChannelProcessor.class); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + throw new ChannelException("channel intentional broken"); + } + }).when(channelProcessor).processEventBatch(any(List.class)); + + return channelProcessor; + } } \ No newline at end of file
