Repository: flume Updated Branches: refs/heads/trunk a619cc157 -> f46bee03e
FLUME-2920: Kafka Channel Should Not Commit Offsets When Stopping (Kevin Conaway via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f46bee03 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f46bee03 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f46bee03 Branch: refs/heads/trunk Commit: f46bee03eb2c9094aa192dd6b82a494cb26eef35 Parents: a619cc1 Author: Jarek Jarcec Cecho <[email protected]> Authored: Fri Jun 10 15:52:20 2016 +0200 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Fri Jun 10 15:52:20 2016 +0200 ---------------------------------------------------------------------- .../flume/channel/kafka/KafkaChannel.java | 4 - .../flume/channel/kafka/TestKafkaChannel.java | 82 +++++++++++++++++--- 2 files changed, 70 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/f46bee03/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 09d3f9d..dfc95bc 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -267,10 +267,6 @@ public class KafkaChannel extends BasicChannelSemantics { } private void decommissionConsumerAndRecords(ConsumerAndRecords c) { - if (c.failedEvents.isEmpty()) { - c.commitOffsets(); - } - c.failedEvents.clear(); c.consumer.close(); } http://git-wip-us.apache.org/repos/asf/flume/blob/f46bee03/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java index 13e073b..d01346a 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java @@ -32,26 +32,42 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.*; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG; public class TestKafkaChannel { - private final static Logger LOGGER = - LoggerFactory.getLogger(TestKafkaChannel.class); - private static TestUtil testUtil = TestUtil.getInstance(); private String topic = null; private final Set<String> usedTopics = new HashSet<String>(); - private CountDownLatch latch = null; @BeforeClass public static void setupClass() throws Exception { @@ -74,7 +90,6 @@ public class TestKafkaChannel { } catch (Exception e) { } Thread.sleep(2500); - latch = new CountDownLatch(5); } @AfterClass @@ -191,6 +206,49 @@ public class TestKafkaChannel { doTestNullKeyNoHeader(); } + @Test + public void testOffsetsNotCommittedOnStop() throws Exception { + String message = "testOffsetsNotCommittedOnStop-" + System.nanoTime(); + + KafkaChannel channel = startChannel(false); + + KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(channel.getProducerProps()); + ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, "header-" + message, message.getBytes()); + producer.send(data).get(); + producer.flush(); + producer.close(); + + Event event = takeEventWithoutCommittingTxn(channel); + Assert.assertNotNull(event); + Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody())); + + // Stop the channel without committing the transaction + channel.stop(); + + channel = startChannel(false); + + // Message should still be available + event = takeEventWithoutCommittingTxn(channel); + Assert.assertNotNull(event); + Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody())); + } + + private Event takeEventWithoutCommittingTxn(KafkaChannel channel) { + for (int i=0; i < 5; i++) { + Transaction txn = channel.getTransaction(); + txn.begin(); + + Event event = channel.take(); + if (event != null) { + return event; + } else { + txn.commit(); + txn.close(); + } + } + return null; + } + private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception { final KafkaChannel channel = startChannel(false); Properties props = channel.getProducerProps();
