Repository: flume
Updated Branches:
  refs/heads/trunk 5bf1d9b7b -> 14fb4d84f


FLUME-2857. Make Kafka Source/Channel/Sink restore default values when live 
updating config

This commit changes Kafka Channel, Sink and Source to fix an error where
sub-configurations aren't tolerant of the configure() method being called more
than once (as happens when a Live Config Update happens).

Reviewers: Denes Arvay, Attila Simon, Bessenyei Balázs Donát

(Tristan Stevens via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/14fb4d84
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/14fb4d84
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/14fb4d84

Branch: refs/heads/trunk
Commit: 14fb4d84fd0e100253ca947bc96810c242e7a82b
Parents: 5bf1d9b
Author: Tristan Stevens <[email protected]>
Authored: Fri Oct 28 14:24:02 2016 +0200
Committer: Bessenyei Balázs Donát <[email protected]>
Committed: Fri Oct 28 14:24:02 2016 +0200

----------------------------------------------------------------------
 .../flume/channel/kafka/KafkaChannel.java       |  2 ++
 .../flume/channel/kafka/TestKafkaChannel.java   | 34 ++++++++++++++++++++
 .../org/apache/flume/sink/kafka/KafkaSink.java  |  1 +
 .../apache/flume/sink/kafka/TestKafkaSink.java  | 28 ++++++++++++++++
 .../apache/flume/source/kafka/KafkaSource.java  |  1 +
 .../flume/source/kafka/TestKafkaSource.java     | 28 ++++++++++++++++
 6 files changed, 94 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 47c0634..cc7bb48 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -261,6 +261,7 @@ public class KafkaChannel extends BasicChannelSemantics {
 
 
   private void setProducerProps(Context ctx, String bootStrapServers) {
+    producerProps.clear();
     producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
DEFAULT_KEY_SERIALIZER);
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
DEFAULT_VALUE_SERIAIZER);
@@ -274,6 +275,7 @@ public class KafkaChannel extends BasicChannelSemantics {
   }
 
   private void setConsumerProps(Context ctx, String bootStrapServers) {
+    consumerProps.clear();
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
DEFAULT_KEY_DESERIALIZER);
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
DEFAULT_VALUE_DESERIAIZER);
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
DEFAULT_AUTO_OFFSET_RESET);

http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index 276fee1..5e5f2d0 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -288,6 +288,40 @@ public class TestKafkaChannel {
     doPartitionErrors(PartitionOption.NOTANUMBER);
   }
 
+  /**
+   * Tests that sub-properties get set correctly if you run the configure() 
method twice
+   * (fix for FLUME-2857)
+   * @throws Exception
+   */
+  @Test
+  public void testDefaultSettingsOnReConfigure() throws Exception {
+    String sampleProducerProp = "compression.type";
+    String sampleProducerVal = "snappy";
+
+    String sampleConsumerProp = "fetch.min.bytes";
+    String sampleConsumerVal = "99";
+
+    Context context = prepareDefaultContext(false);
+    context.put(KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX + 
sampleProducerProp,
+        sampleProducerVal);
+    context.put(KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX + 
sampleConsumerProp,
+        sampleConsumerVal);
+
+    final KafkaChannel channel = createChannel(context);
+
+    Assert.assertEquals(sampleProducerVal,
+        channel.getProducerProps().getProperty(sampleProducerProp));
+    Assert.assertEquals(sampleConsumerVal,
+        channel.getConsumerProps().getProperty(sampleConsumerProp));
+
+    context = prepareDefaultContext(false);
+    channel.configure(context);
+
+    
Assert.assertNull(channel.getProducerProps().getProperty(sampleProducerProp));
+    
Assert.assertNull(channel.getConsumerProps().getProperty(sampleConsumerProp));
+
+  }
+
   public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, 
boolean hasKafkaOffsets,
                                             String group) throws Exception {
     // create a topic with 1 partition for simplicity

http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index dd40224..241e900 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -393,6 +393,7 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
   }
 
   private void setProducerProps(Context context, String bootStrapServers) {
+    kafkaProps.clear();
     kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
     //Defaults overridden based on config
     kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
DEFAULT_KEY_SERIALIZER);

http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 7eccf76..7c66420 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -46,6 +46,7 @@ import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -345,6 +346,33 @@ public class TestKafkaSink {
   }
 
   /**
+   * Tests that sub-properties (kafka.producer.*) apply correctly across 
multiple invocations
+   * of configure() (fix for FLUME-2857).
+   * @throws Exception
+   */
+  @Test
+  public void testDefaultSettingsOnReConfigure() throws Exception {
+    String sampleProducerProp = "compression.type";
+    String sampleProducerVal = "snappy";
+
+    Context context = prepareDefaultContext();
+    context.put(KafkaSinkConstants.KAFKA_PRODUCER_PREFIX + sampleProducerProp, 
sampleProducerVal);
+
+    KafkaSink kafkaSink = new KafkaSink();
+
+    Configurables.configure(kafkaSink, context);
+
+    Assert.assertEquals(sampleProducerVal,
+        kafkaSink.getKafkaProps().getProperty(sampleProducerProp));
+
+    context = prepareDefaultContext();
+    Configurables.configure(kafkaSink, context);
+
+    
Assert.assertNull(kafkaSink.getKafkaProps().getProperty(sampleProducerProp));
+
+  }
+
+  /**
    * This function tests three scenarios:
    * 1. PartitionOption.VALIDBUTOUTOFRANGE: An integer partition is provided,
    *    however it exceeds the number of partitions available on the topic.

http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 195eca3..d381850 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -431,6 +431,7 @@ public class KafkaSource extends AbstractPollableSource
   }
 
   private void setConsumerProps(Context ctx) {
+    kafkaProps.clear();
     kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
     kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

http://git-wip-us.apache.org/repos/asf/flume/blob/14fb4d84/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 9554201..d1daceb 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -627,6 +627,34 @@ public class TestKafkaSource {
     doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both");
   }
 
+  /**
+   * Tests that sub-properties (kafka.consumer.*) apply correctly across 
multiple invocations
+   * of configure() (fix for FLUME-2857).
+   * @throws Exception
+   */
+  @Test
+  public void testDefaultSettingsOnReConfigure() throws Exception {
+    String sampleConsumerProp = "auto.offset.reset";
+    String sampleConsumerVal = "earliest";
+    String group = "group";
+
+    Context context = prepareDefaultContext(group);
+    context.put(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + 
sampleConsumerProp,
+        sampleConsumerVal);
+    context.put(TOPIC, "random-topic");
+
+    kafkaSource.configure(context);
+
+    Assert.assertEquals(sampleConsumerVal,
+        kafkaSource.getConsumerProps().getProperty(sampleConsumerProp));
+
+    context = prepareDefaultContext(group);
+    context.put(TOPIC, "random-topic");
+
+    kafkaSource.configure(context);
+    
Assert.assertNull(kafkaSource.getConsumerProps().getProperty(sampleConsumerProp));
+  }
+
   public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, 
boolean hasKafkaOffsets,
                                             String group) throws Exception {
     // create a topic with 1 partition for simplicity

Reply via email to