Repository: flume Updated Branches: refs/heads/trunk 5bf1d9b7b -> 14fb4d84f
FLUME-2857. Make Kafka Source/Channel/Sink restore default values when live updating config This commit changes Kafka Channel, Sink and Source to fix an error where sub-configurations aren't tolerant of the configure() method being called more than once (as happens when a Live Config Update happens). Reviewers: Denes Arvay, Attila Simon, Bessenyei Balázs Donát (Tristan Stevens via Bessenyei Balázs Donát) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/14fb4d84 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/14fb4d84 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/14fb4d84 Branch: refs/heads/trunk Commit: 14fb4d84fd0e100253ca947bc96810c242e7a82b Parents: 5bf1d9b Author: Tristan Stevens <[email protected]> Authored: Fri Oct 28 14:24:02 2016 +0200 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Fri Oct 28 14:24:02 2016 +0200 ---------------------------------------------------------------------- .../flume/channel/kafka/KafkaChannel.java | 2 ++ .../flume/channel/kafka/TestKafkaChannel.java | 34 ++++++++++++++++++++ .../org/apache/flume/sink/kafka/KafkaSink.java | 1 + .../apache/flume/sink/kafka/TestKafkaSink.java | 28 ++++++++++++++++ .../apache/flume/source/kafka/KafkaSource.java | 1 + .../flume/source/kafka/TestKafkaSource.java | 28 ++++++++++++++++ 6 files changed, 94 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 47c0634..cc7bb48 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -261,6 +261,7 @@ public class KafkaChannel extends BasicChannelSemantics { private void setProducerProps(Context ctx, String bootStrapServers) { + producerProps.clear(); producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER); @@ -274,6 +275,7 @@ public class KafkaChannel extends BasicChannelSemantics { } private void setConsumerProps(Context ctx, String bootStrapServers) { + consumerProps.clear(); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET); http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java index 276fee1..5e5f2d0 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java @@ -288,6 +288,40 @@ public class TestKafkaChannel { doPartitionErrors(PartitionOption.NOTANUMBER); } + /** + * Tests that sub-properties get set correctly if you run the configure() method twice + * (fix for FLUME-2857) + * @throws Exception + */ + @Test + public void testDefaultSettingsOnReConfigure() throws Exception { + String sampleProducerProp = "compression.type"; + String sampleProducerVal = "snappy"; + + String sampleConsumerProp = "fetch.min.bytes"; + String sampleConsumerVal = "99"; + + Context context = prepareDefaultContext(false); + context.put(KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX + sampleProducerProp, + sampleProducerVal); + context.put(KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX + sampleConsumerProp, + sampleConsumerVal); + + final KafkaChannel channel = createChannel(context); + + Assert.assertEquals(sampleProducerVal, + channel.getProducerProps().getProperty(sampleProducerProp)); + Assert.assertEquals(sampleConsumerVal, + channel.getConsumerProps().getProperty(sampleConsumerProp)); + + context = prepareDefaultContext(false); + channel.configure(context); + + Assert.assertNull(channel.getProducerProps().getProperty(sampleProducerProp)); + Assert.assertNull(channel.getConsumerProps().getProperty(sampleConsumerProp)); + + } + public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets, String group) throws Exception { // create a topic with 1 partition for simplicity http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index dd40224..241e900 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -393,6 +393,7 @@ public class KafkaSink extends AbstractSink implements Configurable { } private void setProducerProps(Context context, String bootStrapServers) { + kafkaProps.clear(); kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS); //Defaults overridden based on config kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER); http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index 7eccf76..7c66420 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -46,6 +46,7 @@ import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -345,6 +346,33 @@ public class TestKafkaSink { } /** + * Tests that sub-properties (kafka.producer.*) apply correctly across multiple invocations + * of configure() (fix for FLUME-2857). + * @throws Exception + */ + @Test + public void testDefaultSettingsOnReConfigure() throws Exception { + String sampleProducerProp = "compression.type"; + String sampleProducerVal = "snappy"; + + Context context = prepareDefaultContext(); + context.put(KafkaSinkConstants.KAFKA_PRODUCER_PREFIX + sampleProducerProp, sampleProducerVal); + + KafkaSink kafkaSink = new KafkaSink(); + + Configurables.configure(kafkaSink, context); + + Assert.assertEquals(sampleProducerVal, + kafkaSink.getKafkaProps().getProperty(sampleProducerProp)); + + context = prepareDefaultContext(); + Configurables.configure(kafkaSink, context); + + Assert.assertNull(kafkaSink.getKafkaProps().getProperty(sampleProducerProp)); + + } + + /** * This function tests three scenarios: * 1. PartitionOption.VALIDBUTOUTOFRANGE: An integer partition is provided, * however it exceeds the number of partitions available on the topic. http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 195eca3..d381850 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -431,6 +431,7 @@ public class KafkaSource extends AbstractPollableSource } private void setConsumerProps(Context ctx) { + kafkaProps.clear(); kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER); kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index 9554201..d1daceb 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -627,6 +627,34 @@ public class TestKafkaSource { doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both"); } + /** + * Tests that sub-properties (kafka.consumer.*) apply correctly across multiple invocations + * of configure() (fix for FLUME-2857). + * @throws Exception + */ + @Test + public void testDefaultSettingsOnReConfigure() throws Exception { + String sampleConsumerProp = "auto.offset.reset"; + String sampleConsumerVal = "earliest"; + String group = "group"; + + Context context = prepareDefaultContext(group); + context.put(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + sampleConsumerProp, + sampleConsumerVal); + context.put(TOPIC, "random-topic"); + + kafkaSource.configure(context); + + Assert.assertEquals(sampleConsumerVal, + kafkaSource.getConsumerProps().getProperty(sampleConsumerProp)); + + context = prepareDefaultContext(group); + context.put(TOPIC, "random-topic"); + + kafkaSource.configure(context); + Assert.assertNull(kafkaSource.getConsumerProps().getProperty(sampleConsumerProp)); + } + public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets, String group) throws Exception { // create a topic with 1 partition for simplicity
