Repository: camel Updated Branches: refs/heads/master 9584f3851 -> 39742f911
CAMEL-10705 - Allow to use an SSLContextParameters object for Kafka Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/39742f91 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/39742f91 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/39742f91 Branch: refs/heads/master Commit: 39742f911c7cf7aeec71f880712ccd808e51ccbe Parents: 9584f38 Author: Antoine DESSAIGNE <antoine.dessai...@gmail.com> Authored: Fri Jan 13 14:35:42 2017 +0100 Committer: Antoine DESSAIGNE <antoine.dessai...@gmail.com> Committed: Fri Jan 13 14:35:42 2017 +0100 ---------------------------------------------------------------------- .../src/main/docs/kafka-component.adoc | 50 ++++++++++++- .../component/kafka/KafkaConfiguration.java | 78 ++++++++++++++++++++ 2 files changed, 127 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/39742f91/components/camel-kafka/src/main/docs/kafka-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index d100a32..8eacb10 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -49,7 +49,7 @@ The Kafka component supports 1 options which are listed below. // endpoint options: START -The Kafka component supports 78 endpoint options which are listed below: +The Kafka component supports 79 endpoint options which are listed below: {% raw %} [width="100%",cols="2,1,1m,1m,5",options="header"] @@ -120,6 +120,7 @@ The Kafka component supports 78 endpoint options which are listed below: | saslMechanism | security | GSSAPI | String | The Simple Authentication and Security Layer (SASL) Mechanism used. For the valid values see http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml | securityProtocol | security | PLAINTEXT | String | Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported. | sslCipherSuites | security | | String | A list of cipher suites. This is a named combination of authentication encryption MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.By default all the available cipher suites are supported. +| sslContextParameters | security | | SSLContextParameters | SSL configuration using a Camel SSLContextParameters object. If configured it's applied before the other SSL endpoint parameters. | sslEnabledProtocols | security | TLSv1.2,TLSv1.1,TLSv1 | String | The list of protocols enabled for SSL connections. TLSv1.2 TLSv1.1 and TLSv1 are enabled by default. | sslEndpointAlgorithm | security | | String | The endpoint identification algorithm to validate server hostname using server certificate. | sslKeymanagerAlgorithm | security | SunX509 | String | The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. @@ -229,6 +230,53 @@ from("direct:start") .to("kafka:localhost:9092?topic=test"); ---------------------------------------------------------------------------- + +#### SSL configuration + +You have 2 different ways to configure the SSL communication on the Kafka` component. + +The first way is through the many SSL endpoint parameters +[source,java] +------------------------------------------------------------- +from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC + + "&groupId=A" + + "&sslKeystoreLocation=/path/to/keystore.jks" + + "&sslKeystorePassword=changeit" + + "&sslKeyPassword=changeit") + .to("mock:result"); +------------------------------------------------------------- + +The second way is to use the `sslContextParameters` endpoint parameter. +[source,java] +-------------------------------------------------------------------------------------------------- +// Configure the SSLContextParameters object +KeyStoreParameters ksp = new KeyStoreParameters(); +ksp.setResource("/path/to/keystore.jks"); +ksp.setPassword("changeit"); +KeyManagersParameters kmp = new KeyManagersParameters(); +kmp.setKeyStore(ksp); +kmp.setKeyPassword("changeit"); +SSLContextParameters scp = new SSLContextParameters(); +scp.setKeyManagers(kmp); + +// Bind this SSLContextParameters into the Camel registry +JndiRegistry registry = new JndiRegistry(); +registry.bind("ssl", scp); + +// Configure the camel context +DefaultCamelContext camelContext = new DefaultCamelContext(registry); +camelContext.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC + // + "&groupId=A" + // + "&sslContextParameters=#ssl") // Reference the SSL configuration + .to("mock:result"); + } +}); +-------------------------------------------------------------------------------------------------- + + ### Endpoints Camel supports the link:message-endpoint.html[Message Endpoint] pattern http://git-wip-us.apache.org/repos/asf/camel/blob/39742f91/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 214fd2f..a10ace4 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import org.apache.camel.Exchange; import org.apache.camel.spi.Metadata; @@ -27,6 +28,12 @@ import org.apache.camel.spi.StateRepository; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; import org.apache.camel.spi.UriPath; +import org.apache.camel.util.jsse.CipherSuitesParameters; +import org.apache.camel.util.jsse.KeyManagersParameters; +import org.apache.camel.util.jsse.KeyStoreParameters; +import org.apache.camel.util.jsse.SSLContextParameters; +import org.apache.camel.util.jsse.SecureSocketProtocolsParameters; +import org.apache.camel.util.jsse.TrustManagersParameters; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -180,6 +187,10 @@ public class KafkaConfiguration { //reconnect.backoff.ms @UriParam(label = "producer", defaultValue = "50") private Integer reconnectBackoffMs = 50; + + // SSL + @UriParam(label = "common,security") + private SSLContextParameters sslContextParameters; // SSL // ssl.key.password @UriParam(label = "producer,security", secret = true) @@ -264,6 +275,7 @@ public class KafkaConfiguration { addPropertyIfNotNull(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, getCompressionCodec()); addPropertyIfNotNull(props, ProducerConfig.RETRIES_CONFIG, getRetries()); // SSL + applySslConfiguration(props, getSslContextParameters()); addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword()); addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation()); addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword()); @@ -322,6 +334,7 @@ public class KafkaConfiguration { addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs()); addPropertyIfNotNull(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecords()); // SSL + applySslConfiguration(props, getSslContextParameters()); addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword()); addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation()); addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword()); @@ -368,6 +381,54 @@ public class KafkaConfiguration { return props; } + /** + * Uses the standard camel {@link SSLContextParameters} object to fill the Kafka SSL properties + * + * @param props Kafka properties + * @param sslContextParameters SSL configuration + */ + private void applySslConfiguration(Properties props, SSLContextParameters sslContextParameters) { + if (sslContextParameters != null) { + addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, sslContextParameters.getSecureSocketProtocol()); + addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, sslContextParameters.getProvider()); + + CipherSuitesParameters cipherSuites = sslContextParameters.getCipherSuites(); + if (cipherSuites != null) { + addCommaSeparatedList(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, cipherSuites.getCipherSuite()); + } + + SecureSocketProtocolsParameters secureSocketProtocols = sslContextParameters.getSecureSocketProtocols(); + if (secureSocketProtocols != null) { + addCommaSeparatedList(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, secureSocketProtocols.getSecureSocketProtocol()); + } + + KeyManagersParameters keyManagers = sslContextParameters.getKeyManagers(); + if (keyManagers != null) { + addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, keyManagers.getAlgorithm()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyManagers.getKeyPassword()); + + KeyStoreParameters keyStore = keyManagers.getKeyStore(); + if (keyStore != null) { + addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, keyStore.getType()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStore.getResource()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStore.getPassword()); + } + } + + TrustManagersParameters trustManagers = sslContextParameters.getTrustManagers(); + if (trustManagers != null) { + addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, trustManagers.getAlgorithm()); + + KeyStoreParameters keyStore = trustManagers.getKeyStore(); + if (keyStore != null) { + addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, keyStore.getType()); + addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, keyStore.getResource()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStore.getPassword()); + } + } + } + } + private static <T> void addPropertyIfNotNull(Properties props, String key, T value) { if (value != null) { // Kafka expects all properties as String @@ -384,6 +445,12 @@ public class KafkaConfiguration { } } + private static void addCommaSeparatedList(Properties props, String key, List<String> values) { + if (values != null && !values.isEmpty()) { + props.put(key, values.stream().collect(Collectors.joining(","))); + } + } + public String getGroupId() { return groupId; } @@ -837,6 +904,17 @@ public class KafkaConfiguration { this.securityProtocol = securityProtocol; } + public SSLContextParameters getSslContextParameters() { + return sslContextParameters; + } + + /** + * SSL configuration using a Camel {@link SSLContextParameters} object. If configured it's applied before the other SSL endpoint parameters. + */ + public void setSslContextParameters(SSLContextParameters sslContextParameters) { + this.sslContextParameters = sslContextParameters; + } + public String getSslKeyPassword() { return sslKeyPassword; }