Repository: flume
Updated Branches:
  refs/heads/trunk 965e13264 -> 493b53b64


FLUME-3278 Handling -D keystore parameters in Kafka components

Kafka client does not handle -D keystore parameters directly so Flume has to
pass them explicitly in Kafka properties (like ssl.keystore.location, etc).
Also using the same method for the truststore (in order to handle
keystore/truststore in the same way).

This closes #231

Reviewers: Denes Arvay

(Peter Turcsanyi via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/493b53b6
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/493b53b6
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/493b53b6

Branch: refs/heads/trunk
Commit: 493b53b6430c687f88c438e5883ef9120d7aedb2
Parents: 965e132
Author: Peter Turcsanyi <[email protected]>
Authored: Mon Oct 29 14:28:53 2018 +0100
Committer: Ferenc Szabo <[email protected]>
Committed: Mon Oct 29 14:28:53 2018 +0100

----------------------------------------------------------------------
 flume-ng-channels/flume-kafka-channel/pom.xml   |   4 +
 .../flume/channel/kafka/KafkaChannel.java       |   5 +
 flume-ng-sinks/flume-ng-kafka-sink/pom.xml      |   5 +
 .../org/apache/flume/sink/kafka/KafkaSink.java  |   3 +
 flume-ng-sources/flume-kafka-source/pom.xml     |   4 +
 .../apache/flume/source/kafka/KafkaSource.java  |   3 +
 flume-shared/flume-shared-kafka/pom.xml         |  51 ++++++
 .../apache/flume/shared/kafka/KafkaSSLUtil.java |  73 ++++++++
 .../flume/shared/kafka/KafkaSSLUtilTest.java    | 174 +++++++++++++++++++
 flume-shared/pom.xml                            |   1 +
 pom.xml                                         |   6 +
 11 files changed, 329 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-channels/flume-kafka-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml 
b/flume-ng-channels/flume-kafka-channel/pom.xml
index 1f4a08a..390caf6 100644
--- a/flume-ng-channels/flume-kafka-channel/pom.xml
+++ b/flume-ng-channels/flume-kafka-channel/pom.xml
@@ -39,6 +39,10 @@ limitations under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.flume.flume-shared</groupId>
+      <artifactId>flume-shared-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume.flume-shared</groupId>
       <artifactId>flume-shared-kafka-test</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 5bd9be0..d2ea7ae 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -38,6 +38,7 @@ import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
+import org.apache.flume.shared.kafka.KafkaSSLUtil;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -268,6 +269,8 @@ public class KafkaChannel extends BasicChannelSemantics {
     // Defaults overridden based on config
     producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootStrapServers);
+
+    KafkaSSLUtil.addGlobalSSLParameters(producerProps);
   }
 
   protected Properties getProducerProps() {
@@ -285,6 +288,8 @@ public class KafkaChannel extends BasicChannelSemantics {
     consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootStrapServers);
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    KafkaSSLUtil.addGlobalSSLParameters(consumerProps);
   }
 
   protected Properties getConsumerProps() {

http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml 
b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
index 86a8a18..2a24bc1 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -60,6 +60,11 @@
 
     <dependency>
       <groupId>org.apache.flume.flume-shared</groupId>
+      <artifactId>flume-shared-kafka</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume.flume-shared</groupId>
       <artifactId>flume-shared-kafka-test</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index eaabd6e..3d56caa 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -35,6 +35,7 @@ import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.formatter.output.BucketPath;
 import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
+import org.apache.flume.shared.kafka.KafkaSSLUtil;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.producer.Callback;
@@ -420,6 +421,8 @@ public class KafkaSink extends AbstractSink implements 
Configurable, BatchSizeSu
     kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
DEFAULT_VALUE_SERIAIZER);
     kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
     kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+
+    KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
   }
 
   protected Properties getKafkaProps() {

http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-sources/flume-kafka-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/pom.xml 
b/flume-ng-sources/flume-kafka-source/pom.xml
index b653fbd..cf3fdff 100644
--- a/flume-ng-sources/flume-kafka-source/pom.xml
+++ b/flume-ng-sources/flume-kafka-source/pom.xml
@@ -50,6 +50,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.flume.flume-shared</groupId>
+      <artifactId>flume-shared-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume.flume-shared</groupId>
       <artifactId>flume-shared-kafka-test</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index da4ec1a..10b2cfb 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -47,6 +47,7 @@ import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.conf.LogPrivacyUtil;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
+import org.apache.flume.shared.kafka.KafkaSSLUtil;
 import org.apache.flume.source.AbstractPollableSource;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -461,6 +462,8 @@ public class KafkaSource extends AbstractPollableSource
     }
     kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
                    KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
+
+    KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-shared/flume-shared-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/flume-shared/flume-shared-kafka/pom.xml 
b/flume-shared/flume-shared-kafka/pom.xml
new file mode 100644
index 0000000..deb6a35
--- /dev/null
+++ b/flume-shared/flume-shared-kafka/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-shared</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.9.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-shared</groupId>
+  <artifactId>flume-shared-kafka</artifactId>
+  <name>Flume Shared Kafka</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java
----------------------------------------------------------------------
diff --git 
a/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java
 
b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java
new file mode 100644
index 0000000..b4adcd3
--- /dev/null
+++ 
b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.shared.kafka;
+
+import org.apache.flume.util.SSLUtil;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+
+import java.util.Properties;
+
+public class KafkaSSLUtil {
+
+  private KafkaSSLUtil() {
+  }
+
+  /**
+   * Adds the global keystore/truststore SSL parameters to Kafka properties
+   * if SSL is enabled but the keystore/truststore SSL parameters
+   * are not defined explicitly in Kafka properties.
+   *
+   * @param kafkaProps Kafka properties
+   */
+  public static void addGlobalSSLParameters(Properties kafkaProps) {
+    if (isSSLEnabled(kafkaProps)) {
+      addGlobalSSLParameter(kafkaProps,
+          SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
SSLUtil.getGlobalKeystorePath());
+      addGlobalSSLParameter(kafkaProps,
+          SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
SSLUtil.getGlobalKeystorePassword());
+      addGlobalSSLParameter(kafkaProps,
+          SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
SSLUtil.getGlobalKeystoreType(null));
+      addGlobalSSLParameter(kafkaProps,
+          SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
SSLUtil.getGlobalTruststorePath());
+      addGlobalSSLParameter(kafkaProps,
+          SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
SSLUtil.getGlobalTruststorePassword());
+      addGlobalSSLParameter(kafkaProps,
+          SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
SSLUtil.getGlobalTruststoreType(null));
+    }
+  }
+
+  private static void addGlobalSSLParameter(Properties kafkaProps,
+                                            String propName, String 
globalValue) {
+    if (!kafkaProps.containsKey(propName) && globalValue != null) {
+      kafkaProps.put(propName, globalValue);
+    }
+  }
+
+  private static boolean isSSLEnabled(Properties kafkaProps) {
+    String securityProtocol =
+        kafkaProps.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+
+    return securityProtocol != null &&
+        (securityProtocol.equals(SecurityProtocol.SSL.name) ||
+            securityProtocol.equals(SecurityProtocol.SASL_SSL.name));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java
 
b/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java
new file mode 100644
index 0000000..6096bcf
--- /dev/null
+++ 
b/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.shared.kafka;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class KafkaSSLUtilTest {
+
+  @Before
+  public void initSystemProperties() {
+    System.setProperty("javax.net.ssl.keyStore", "global-keystore-path");
+    System.setProperty("javax.net.ssl.keyStorePassword", 
"global-keystore-password");
+    System.setProperty("javax.net.ssl.keyStoreType", "global-keystore-type");
+    System.setProperty("javax.net.ssl.trustStore", "global-truststore-path");
+    System.setProperty("javax.net.ssl.trustStorePassword", 
"global-truststore-password");
+    System.setProperty("javax.net.ssl.trustStoreType", 
"global-truststore-type");
+  }
+
+  @After
+  public void clearSystemProperties() {
+    System.clearProperty("javax.net.ssl.keyStore");
+    System.clearProperty("javax.net.ssl.keyStorePassword");
+    System.clearProperty("javax.net.ssl.keyStoreType");
+    System.clearProperty("javax.net.ssl.trustStore");
+    System.clearProperty("javax.net.ssl.trustStorePassword");
+    System.clearProperty("javax.net.ssl.trustStoreType");
+  }
+
+  @Test
+  public void testSecurityProtocol_PLAINTEXT() {
+    Properties kafkaProps = new Properties();
+    kafkaProps.put(
+        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.PLAINTEXT.name);
+
+    KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
+
+    assertNoSSLParameters(kafkaProps);
+  }
+
+  @Test
+  public void testSecurityProtocol_SASL_PLAINTEXT() {
+    Properties kafkaProps = new Properties();
+    kafkaProps.put(
+        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SASL_PLAINTEXT.name);
+
+    KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
+
+    assertNoSSLParameters(kafkaProps);
+  }
+
+  @Test
+  public void testSecurityProtocol_SSL() {
+    Properties kafkaProps = new Properties();
+    kafkaProps.put(
+        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SSL.name);
+
+    KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
+
+    assertGlobalSSLParameters(kafkaProps);
+  }
+
+  @Test
+  public void testSecurityProtocol_SASL_SSL() {
+    Properties kafkaProps = new Properties();
+    kafkaProps.put(
+        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SASL_SSL.name);
+
+    KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
+
+    assertGlobalSSLParameters(kafkaProps);
+  }
+
+  @Test
+  public void testComponentParametersNotOverridden() {
+    Properties kafkaProps = new Properties();
+    kafkaProps.put(
+        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SSL.name);
+
+    kafkaProps.put(
+        SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "component-keystore-path");
+    kafkaProps.put(
+        SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
"component-keystore-password");
+    kafkaProps.put(
+        SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "component-keystore-type");
+    kafkaProps.put(
+        SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
"component-truststore-path");
+    kafkaProps.put(
+        SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
"component-truststore-password");
+    kafkaProps.put(
+        SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "component-truststore-type");
+
+    KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
+
+    assertComponentSSLParameters(kafkaProps);
+  }
+
+  @Test
+  public void testEmptyGlobalParametersNotAdded() {
+    Properties kafkaProps = new Properties();
+    kafkaProps.put(
+        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SSL.name);
+
+    clearSystemProperties();
+
+    KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
+
+    assertNoSSLParameters(kafkaProps);
+  }
+
+  private void assertNoSSLParameters(Properties kafkaProps) {
+    
assertFalse(kafkaProps.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
+    
assertFalse(kafkaProps.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
+    assertFalse(kafkaProps.containsKey(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
+    
assertFalse(kafkaProps.containsKey(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+    
assertFalse(kafkaProps.containsKey(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+    assertFalse(kafkaProps.containsKey(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
+  }
+
+  private void assertGlobalSSLParameters(Properties kafkaProps) {
+    assertEquals("global-keystore-path",
+        kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
+    assertEquals("global-keystore-password",
+        kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
+    assertEquals("global-keystore-type",
+        kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
+    assertEquals("global-truststore-path",
+        kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+    assertEquals("global-truststore-password",
+        kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+    assertEquals("global-truststore-type",
+        kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
+  }
+
+  private void assertComponentSSLParameters(Properties kafkaProps) {
+    assertEquals("component-keystore-path",
+        kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
+    assertEquals("component-keystore-password",
+        kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
+    assertEquals("component-keystore-type",
+        kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
+    assertEquals("component-truststore-path",
+        kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+    assertEquals("component-truststore-password",
+        kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+    assertEquals("component-truststore-type",
+        kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-shared/pom.xml
----------------------------------------------------------------------
diff --git a/flume-shared/pom.xml b/flume-shared/pom.xml
index adb751b..0b77424 100644
--- a/flume-shared/pom.xml
+++ b/flume-shared/pom.xml
@@ -41,6 +41,7 @@ limitations under the License.
   </build>
 
   <modules>
+    <module>flume-shared-kafka</module>
     <module>flume-shared-kafka-test</module>
   </modules>
 

http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8cda41a..d52cd37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1448,6 +1448,12 @@ limitations under the License.
 
       <dependency>
         <groupId>org.apache.flume.flume-shared</groupId>
+        <artifactId>flume-shared-kafka</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.flume.flume-shared</groupId>
         <artifactId>flume-shared-kafka-test</artifactId>
         <version>${project.version}</version>
         <scope>test</scope>

Reply via email to