Repository: flume Updated Branches: refs/heads/trunk 965e13264 -> 493b53b64
FLUME-3278 Handling -D keystore parameters in Kafka components Kafka client does not handle -D keystore parameters directly so Flume has to pass them explicitly in Kafka properties (like ssl.keystore.location, etc). Also using the same method for the truststore (in order to handle keystore/truststore in the same way). This closes #231 Reviewers: Denes Arvay (Peter Turcsanyi via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/493b53b6 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/493b53b6 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/493b53b6 Branch: refs/heads/trunk Commit: 493b53b6430c687f88c438e5883ef9120d7aedb2 Parents: 965e132 Author: Peter Turcsanyi <[email protected]> Authored: Mon Oct 29 14:28:53 2018 +0100 Committer: Ferenc Szabo <[email protected]> Committed: Mon Oct 29 14:28:53 2018 +0100 ---------------------------------------------------------------------- flume-ng-channels/flume-kafka-channel/pom.xml | 4 + .../flume/channel/kafka/KafkaChannel.java | 5 + flume-ng-sinks/flume-ng-kafka-sink/pom.xml | 5 + .../org/apache/flume/sink/kafka/KafkaSink.java | 3 + flume-ng-sources/flume-kafka-source/pom.xml | 4 + .../apache/flume/source/kafka/KafkaSource.java | 3 + flume-shared/flume-shared-kafka/pom.xml | 51 ++++++ .../apache/flume/shared/kafka/KafkaSSLUtil.java | 73 ++++++++ .../flume/shared/kafka/KafkaSSLUtilTest.java | 174 +++++++++++++++++++ flume-shared/pom.xml | 1 + pom.xml | 6 + 11 files changed, 329 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-channels/flume-kafka-channel/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml index 1f4a08a..390caf6 100644 --- a/flume-ng-channels/flume-kafka-channel/pom.xml +++ b/flume-ng-channels/flume-kafka-channel/pom.xml @@ -39,6 +39,10 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.flume.flume-shared</groupId> + <artifactId>flume-shared-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-shared</groupId> <artifactId>flume-shared-kafka-test</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 5bd9be0..d2ea7ae 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -38,6 +38,7 @@ import org.apache.flume.conf.ConfigurationException; import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.kafka.KafkaChannelCounter; +import org.apache.flume.shared.kafka.KafkaSSLUtil; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -268,6 +269,8 @@ public class KafkaChannel extends BasicChannelSemantics { // Defaults overridden based on config producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX)); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + + KafkaSSLUtil.addGlobalSSLParameters(producerProps); } protected Properties getProducerProps() { @@ -285,6 +288,8 @@ public class KafkaChannel extends BasicChannelSemantics { consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + KafkaSSLUtil.addGlobalSSLParameters(consumerProps); } protected Properties getConsumerProps() { http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-sinks/flume-ng-kafka-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml index 86a8a18..2a24bc1 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml +++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml @@ -60,6 +60,11 @@ <dependency> <groupId>org.apache.flume.flume-shared</groupId> + <artifactId>flume-shared-kafka</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flume.flume-shared</groupId> <artifactId>flume-shared-kafka-test</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index eaabd6e..3d56caa 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -35,6 +35,7 @@ import org.apache.flume.conf.ConfigurationException; import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.instrumentation.kafka.KafkaSinkCounter; +import org.apache.flume.shared.kafka.KafkaSSLUtil; import org.apache.flume.sink.AbstractSink; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.producer.Callback; @@ -420,6 +421,8 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER); kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX)); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + + KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); } protected Properties getKafkaProps() { http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-sources/flume-kafka-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/pom.xml b/flume-ng-sources/flume-kafka-source/pom.xml index b653fbd..cf3fdff 100644 --- a/flume-ng-sources/flume-kafka-source/pom.xml +++ b/flume-ng-sources/flume-kafka-source/pom.xml @@ -50,6 +50,10 @@ </dependency> <dependency> <groupId>org.apache.flume.flume-shared</groupId> + <artifactId>flume-shared-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-shared</groupId> <artifactId>flume-shared-kafka-test</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index da4ec1a..10b2cfb 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -47,6 +47,7 @@ import org.apache.flume.conf.ConfigurationException; import org.apache.flume.conf.LogPrivacyUtil; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.kafka.KafkaSourceCounter; +import org.apache.flume.shared.kafka.KafkaSSLUtil; import org.apache.flume.source.AbstractPollableSource; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.CommonClientConfigs; @@ -461,6 +462,8 @@ public class KafkaSource extends AbstractPollableSource } kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaSourceConstants.DEFAULT_AUTO_COMMIT); + + KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); } /** http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-shared/flume-shared-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/flume-shared/flume-shared-kafka/pom.xml b/flume-shared/flume-shared-kafka/pom.xml new file mode 100644 index 0000000..deb6a35 --- /dev/null +++ b/flume-shared/flume-shared-kafka/pom.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flume-shared</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.9.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.flume.flume-shared</groupId> + <artifactId>flume-shared-kafka</artifactId> + <name>Flume Shared Kafka</name> + + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java ---------------------------------------------------------------------- diff --git a/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java new file mode 100644 index 0000000..b4adcd3 --- /dev/null +++ b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.shared.kafka; + +import org.apache.flume.util.SSLUtil; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.protocol.SecurityProtocol; + +import java.util.Properties; + +public class KafkaSSLUtil { + + private KafkaSSLUtil() { + } + + /** + * Adds the global keystore/truststore SSL parameters to Kafka properties + * if SSL is enabled but the keystore/truststore SSL parameters + * are not defined explicitly in Kafka properties. + * + * @param kafkaProps Kafka properties + */ + public static void addGlobalSSLParameters(Properties kafkaProps) { + if (isSSLEnabled(kafkaProps)) { + addGlobalSSLParameter(kafkaProps, + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SSLUtil.getGlobalKeystorePath()); + addGlobalSSLParameter(kafkaProps, + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSLUtil.getGlobalKeystorePassword()); + addGlobalSSLParameter(kafkaProps, + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SSLUtil.getGlobalKeystoreType(null)); + addGlobalSSLParameter(kafkaProps, + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, SSLUtil.getGlobalTruststorePath()); + addGlobalSSLParameter(kafkaProps, + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSLUtil.getGlobalTruststorePassword()); + addGlobalSSLParameter(kafkaProps, + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, SSLUtil.getGlobalTruststoreType(null)); + } + } + + private static void addGlobalSSLParameter(Properties kafkaProps, + String propName, String globalValue) { + if (!kafkaProps.containsKey(propName) && globalValue != null) { + kafkaProps.put(propName, globalValue); + } + } + + private static boolean isSSLEnabled(Properties kafkaProps) { + String securityProtocol = + kafkaProps.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); + + return securityProtocol != null && + (securityProtocol.equals(SecurityProtocol.SSL.name) || + securityProtocol.equals(SecurityProtocol.SASL_SSL.name)); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java ---------------------------------------------------------------------- diff --git a/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java b/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java new file mode 100644 index 0000000..6096bcf --- /dev/null +++ b/flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.shared.kafka; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class KafkaSSLUtilTest { + + @Before + public void initSystemProperties() { + System.setProperty("javax.net.ssl.keyStore", "global-keystore-path"); + System.setProperty("javax.net.ssl.keyStorePassword", "global-keystore-password"); + System.setProperty("javax.net.ssl.keyStoreType", "global-keystore-type"); + System.setProperty("javax.net.ssl.trustStore", "global-truststore-path"); + System.setProperty("javax.net.ssl.trustStorePassword", "global-truststore-password"); + System.setProperty("javax.net.ssl.trustStoreType", "global-truststore-type"); + } + + @After + public void clearSystemProperties() { + System.clearProperty("javax.net.ssl.keyStore"); + System.clearProperty("javax.net.ssl.keyStorePassword"); + System.clearProperty("javax.net.ssl.keyStoreType"); + System.clearProperty("javax.net.ssl.trustStore"); + System.clearProperty("javax.net.ssl.trustStorePassword"); + System.clearProperty("javax.net.ssl.trustStoreType"); + } + + @Test + public void testSecurityProtocol_PLAINTEXT() { + Properties kafkaProps = new Properties(); + kafkaProps.put( + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); + + KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); + + assertNoSSLParameters(kafkaProps); + } + + @Test + public void testSecurityProtocol_SASL_PLAINTEXT() { + Properties kafkaProps = new Properties(); + kafkaProps.put( + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); + + KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); + + assertNoSSLParameters(kafkaProps); + } + + @Test + public void testSecurityProtocol_SSL() { + Properties kafkaProps = new Properties(); + kafkaProps.put( + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name); + + KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); + + assertGlobalSSLParameters(kafkaProps); + } + + @Test + public void testSecurityProtocol_SASL_SSL() { + Properties kafkaProps = new Properties(); + kafkaProps.put( + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name); + + KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); + + assertGlobalSSLParameters(kafkaProps); + } + + @Test + public void testComponentParametersNotOverridden() { + Properties kafkaProps = new Properties(); + kafkaProps.put( + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name); + + kafkaProps.put( + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "component-keystore-path"); + kafkaProps.put( + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "component-keystore-password"); + kafkaProps.put( + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "component-keystore-type"); + kafkaProps.put( + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "component-truststore-path"); + kafkaProps.put( + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "component-truststore-password"); + kafkaProps.put( + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "component-truststore-type"); + + KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); + + assertComponentSSLParameters(kafkaProps); + } + + @Test + public void testEmptyGlobalParametersNotAdded() { + Properties kafkaProps = new Properties(); + kafkaProps.put( + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name); + + clearSystemProperties(); + + KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); + + assertNoSSLParameters(kafkaProps); + } + + private void assertNoSSLParameters(Properties kafkaProps) { + assertFalse(kafkaProps.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); + assertFalse(kafkaProps.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)); + assertFalse(kafkaProps.containsKey(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)); + assertFalse(kafkaProps.containsKey(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + assertFalse(kafkaProps.containsKey(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + assertFalse(kafkaProps.containsKey(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); + } + + private void assertGlobalSSLParameters(Properties kafkaProps) { + assertEquals("global-keystore-path", + kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); + assertEquals("global-keystore-password", + kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)); + assertEquals("global-keystore-type", + kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)); + assertEquals("global-truststore-path", + kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + assertEquals("global-truststore-password", + kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + assertEquals("global-truststore-type", + kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); + } + + private void assertComponentSSLParameters(Properties kafkaProps) { + assertEquals("component-keystore-path", + kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); + assertEquals("component-keystore-password", + kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)); + assertEquals("component-keystore-type", + kafkaProps.getProperty(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)); + assertEquals("component-truststore-path", + kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + assertEquals("component-truststore-password", + kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + assertEquals("component-truststore-type", + kafkaProps.getProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/flume-shared/pom.xml ---------------------------------------------------------------------- diff --git a/flume-shared/pom.xml b/flume-shared/pom.xml index adb751b..0b77424 100644 --- a/flume-shared/pom.xml +++ b/flume-shared/pom.xml @@ -41,6 +41,7 @@ limitations under the License. </build> <modules> + <module>flume-shared-kafka</module> <module>flume-shared-kafka-test</module> </modules> http://git-wip-us.apache.org/repos/asf/flume/blob/493b53b6/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8cda41a..d52cd37 100644 --- a/pom.xml +++ b/pom.xml @@ -1448,6 +1448,12 @@ limitations under the License. <dependency> <groupId>org.apache.flume.flume-shared</groupId> + <artifactId>flume-shared-kafka</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flume.flume-shared</groupId> <artifactId>flume-shared-kafka-test</artifactId> <version>${project.version}</version> <scope>test</scope>
