Repository: flume
Updated Branches:
  refs/heads/flume-1.6 56c1b8662 -> 94d8ea40a


FLUME-2470. Kafka Sink and Source config updates. Missed a couple files in the 
last commit, adding them.

(Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/94d8ea40
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/94d8ea40
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/94d8ea40

Branch: refs/heads/flume-1.6
Commit: 94d8ea40a6e08b53ae04b3f056944b5a99cf66d2
Parents: 56c1b86
Author: Hari Shreedharan <[email protected]>
Authored: Tue Sep 23 23:17:40 2014 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Tue Sep 23 23:17:40 2014 -0700

----------------------------------------------------------------------
 .../apache/flume/sink/kafka/KafkaSinkUtil.java  | 103 +++++++++++++++++++
 .../flume/sink/kafka/KafkaSinkUtilTest.java     |  55 ++++++++++
 2 files changed, 158 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/94d8ea40/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
new file mode 100644
index 0000000..66bde85
--- /dev/null
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.kafka;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.util.PropertiesTrait;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaSinkUtil {
+
+  private static final Logger log =
+          LoggerFactory.getLogger(KafkaSinkUtil.class);
+
+  public static Properties getKafkaProperties(Context context) {
+    log.info("context={}",context.toString());
+    Properties props =  generateDefaultKafkaProps();
+    setKafkaProps(context, props);
+    addDocumentedKafkaProps(context, props);
+    return props;
+  }
+
+  /**
+   * Some of the producer properties are especially important
+   * We documented them and gave them a camel-case name to match Flume config
+   * If user set these, we will override any existing parameters with these
+   * settings.
+   * Knowledge of which properties are documented is maintained here for now.
+   * If this will become a maintenance issue we'll set a proper data structure.
+   */
+  private static void addDocumentedKafkaProps(Context context,
+                                              Properties kafkaProps)
+          throws ConfigurationException {
+    String brokerList = context.getString(KafkaSinkConstants
+            .BROKER_LIST_FLUME_KEY);
+    if (brokerList == null) {
+      throw new ConfigurationException("brokerList must contain at least " +
+              "one Kafka broker");
+    }
+    kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);
+
+    String requiredKey = context.getString(
+            KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
+
+    if (requiredKey != null ) {
+      kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey);
+    }
+  }
+
+
+  /**
+   * Generate producer properties object with some defaults
+   * @return
+   */
+  private static Properties generateDefaultKafkaProps() {
+    Properties props = new Properties();
+    props.put(KafkaSinkConstants.MESSAGE_SERIALIZER_KEY,
+            KafkaSinkConstants.DEFAULT_MESSAGE_SERIALIZER);
+    props.put(KafkaSinkConstants.KEY_SERIALIZER_KEY,
+            KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
+    props.put(KafkaSinkConstants.REQUIRED_ACKS_KEY,
+            KafkaSinkConstants.DEFAULT_REQUIRED_ACKS);
+    return props;
+  }
+
+
+  /**
+   * Add all configuration parameters starting with "kafka"
+   * to producer properties
+   */
+  private static void setKafkaProps(Context context, Properties kafkaProps) {
+
+    Map<String,String> kafkaProperties =
+            context.getSubProperties(KafkaSinkConstants.PROPERTY_PREFIX);
+
+    for (Map.Entry<String,String> prop : kafkaProperties.entrySet()) {
+
+      kafkaProps.put(prop.getKey(), prop.getValue());
+      if (log.isDebugEnabled()) {
+        log.debug("Reading a Kafka Producer Property: key: "
+                + prop.getKey() + ", value: " + prop.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/94d8ea40/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
new file mode 100644
index 0000000..84d213c
--- /dev/null
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka;
+
+import junit.framework.TestCase;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurables;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class KafkaSinkUtilTest extends TestCase {
+
+  @Test
+  public void testGetKafkaProperties() {
+    Context context = new Context();
+    context.put("kafka.serializer.class", "override.default.serializer");
+    context.put("kafka.fake.property", "kafka.property.value");
+    context.put("kafka.metadata.broker.list","bad-broker-list");
+    context.put("brokerList","real-broker-list");
+    Properties kafkaProps = KafkaSinkUtil.getKafkaProperties(context);
+
+    //check that we have defaults set
+    assertEquals(
+            kafkaProps.getProperty(KafkaSinkConstants.KEY_SERIALIZER_KEY),
+            KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
+    //check that kafka properties override the default and get correct name
+    assertEquals(
+            kafkaProps.getProperty(KafkaSinkConstants.MESSAGE_SERIALIZER_KEY),
+            "override.default.serializer");
+    //check that any kafka property gets in
+    assertEquals(kafkaProps.getProperty("fake.property"),
+            "kafka.property.value");
+    //check that documented property overrides defaults
+    assertEquals(kafkaProps.getProperty("metadata.broker.list")
+            ,"real-broker-list");
+  }
+}
\ No newline at end of file

Reply via email to