kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e2c683f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e2c683f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e2c683f Branch: refs/heads/trunk Commit: 9e2c683f550b7ae58008d0bcb62238b7a2d89a65 Parents: 503bd36 Author: Sriharsha Chintalapani <[email protected]> Authored: Tue Aug 18 21:51:15 2015 -0700 Committer: Jun Rao <[email protected]> Committed: Tue Aug 18 21:51:15 2015 -0700 ---------------------------------------------------------------------- build.gradle | 6 +- checkstyle/import-control.xml | 52 +- .../org/apache/kafka/clients/ClientUtils.java | 37 +- .../kafka/clients/CommonClientConfigs.java | 18 +- .../org/apache/kafka/clients/NetworkClient.java | 41 +- .../kafka/clients/consumer/ConsumerConfig.java | 26 +- .../kafka/clients/consumer/KafkaConsumer.java | 92 +-- .../kafka/clients/producer/KafkaProducer.java | 7 +- .../kafka/clients/producer/ProducerConfig.java | 18 + .../kafka/common/config/AbstractConfig.java | 10 +- .../apache/kafka/common/config/SSLConfigs.java | 102 +++ .../kafka/common/network/Authenticator.java | 62 ++ .../kafka/common/network/ByteBufferSend.java | 13 +- .../kafka/common/network/ChannelBuilder.java | 44 ++ .../common/network/DefaultAuthenticator.java | 63 ++ .../kafka/common/network/KafkaChannel.java | 166 +++++ .../kafka/common/network/NetworkReceive.java | 5 +- .../common/network/PlaintextChannelBuilder.java | 58 ++ .../common/network/PlaintextTransportLayer.java | 217 ++++++ .../kafka/common/network/SSLChannelBuilder.java | 68 ++ .../kafka/common/network/SSLTransportLayer.java | 690 +++++++++++++++++++ .../apache/kafka/common/network/Selectable.java | 12 +- .../apache/kafka/common/network/Selector.java | 316 +++++---- .../org/apache/kafka/common/network/Send.java | 6 +- .../kafka/common/network/TransportLayer.java | 86 +++ .../kafka/common/protocol/SecurityProtocol.java | 2 + .../security/auth/DefaultPrincipalBuilder.java | 43 ++ .../common/security/auth/KafkaPrincipal.java | 58 ++ .../common/security/auth/PrincipalBuilder.java | 51 ++ .../kafka/common/security/ssl/SSLFactory.java | 210 ++++++ .../org/apache/kafka/common/utils/Utils.java | 53 +- .../apache/kafka/clients/ClientUtilsTest.java | 2 +- .../clients/producer/KafkaProducerTest.java | 19 +- .../apache/kafka/common/network/EchoServer.java | 119 ++++ .../kafka/common/network/SSLSelectorTest.java | 276 ++++++++ .../kafka/common/network/SelectorTest.java | 110 ++- .../common/security/ssl/SSLFactoryTest.java | 60 ++ .../apache/kafka/common/utils/UtilsTest.java | 4 +- .../org/apache/kafka/test/MockSelector.java | 8 +- .../org/apache/kafka/test/TestSSLUtils.java | 243 +++++++ .../main/scala/kafka/api/FetchResponse.scala | 24 +- .../main/scala/kafka/network/SocketServer.scala | 175 +++-- .../main/scala/kafka/server/KafkaConfig.scala | 218 ++++-- .../main/scala/kafka/server/KafkaServer.scala | 110 ++- .../kafka/api/ProducerSendTest.scala | 4 +- .../integration/kafka/api/SSLConsumerTest.scala | 251 +++++++ .../kafka/api/SSLProducerSendTest.scala | 240 +++++++ .../unit/kafka/admin/AddPartitionsTest.scala | 2 +- .../integration/UncleanLeaderElectionTest.scala | 6 +- .../unit/kafka/network/SocketServerTest.scala | 86 ++- .../unit/kafka/server/KafkaConfigTest.scala | 42 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 89 ++- 52 files changed, 4121 insertions(+), 599 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 983587f..17fc223 100644 --- a/build.gradle +++ b/build.gradle @@ -256,9 +256,10 @@ project(':core') { testCompile "$junit" testCompile "$easymock" testCompile 'org.objenesis:objenesis:1.2' + testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52' testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5" - testCompile project(path: ':clients', configuration: 'archives') - + testCompile project(':clients') + testCompile project(':clients').sourceSets.test.output testRuntime "$slf4jlog4j" zinc 'com.typesafe.zinc:zinc:0.3.7' @@ -390,6 +391,7 @@ project(':clients') { compile 'org.xerial.snappy:snappy-java:1.1.1.7' compile 'net.jpountz.lz4:lz4:1.2.0' + testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52' testCompile "$junit" testRuntime "$slf4jlog4j" } http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index e3f4f84..eb682f4 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -1,6 +1,6 @@ <!DOCTYPE import-control PUBLIC - "-//Puppy Crawl//DTD Import Control 1.1//EN" - "http://www.puppycrawl.com/dtds/import_control_1_1.dtd"> +"-//Puppy Crawl//DTD Import Control 1.1//EN" +"http://www.puppycrawl.com/dtds/import_control_1_1.dtd"> <!-- // Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with @@ -8,68 +8,79 @@ // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. ---> +--> + <import-control pkg="org.apache.kafka"> - + <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE --> - + <!-- common library dependencies --> <allow pkg="java" /> <allow pkg="javax.management" /> <allow pkg="org.slf4j" /> <allow pkg="org.junit" /> - + <allow pkg="javax.net.ssl" /> + <!-- no one depends on the server --> <disallow pkg="kafka" /> - + <!-- anyone can use public classes --> <allow pkg="org.apache.kafka.common" exact-match="true" /> <allow pkg="org.apache.kafka.common.utils" /> - + <subpackage name="common"> <disallow pkg="org.apache.kafka.clients" /> <allow pkg="org.apache.kafka.common" exact-match="true" /> <allow pkg="org.apache.kafka.test" /> - + <subpackage name="config"> <allow pkg="org.apache.kafka.common.config" /> <!-- for testing --> <allow pkg="org.apache.kafka.common.metrics" /> </subpackage> - + <subpackage name="metrics"> <allow pkg="org.apache.kafka.common.metrics" /> </subpackage> - + <subpackage name="network"> + <allow pkg="org.apache.kafka.common.security.auth" /> + <allow pkg="org.apache.kafka.common.protocol" /> + <allow pkg="org.apache.kafka.common.config" /> <allow pkg="org.apache.kafka.common.metrics" /> + <allow pkg="org.apache.kafka.common.security" /> </subpackage> - + + <subpackage name="security"> + <allow pkg="org.apache.kafka.common.network" /> + <allow pkg="org.apache.kafka.common.config" /> + </subpackage> + <subpackage name="protocol"> <allow pkg="org.apache.kafka.common.errors" /> <allow pkg="org.apache.kafka.common.protocol.types" /> </subpackage> - + <subpackage name="record"> <allow pkg="net.jpountz" /> <allow pkg="org.apache.kafka.common.record" /> </subpackage> - + <subpackage name="requests"> <allow pkg="org.apache.kafka.common.protocol" /> <allow pkg="org.apache.kafka.common.network" /> <!-- for testing --> <allow pkg="org.apache.kafka.common.errors" /> </subpackage> - + <subpackage name="serialization"> <allow class="org.apache.kafka.common.errors.SerializationException" /> </subpackage> @@ -80,15 +91,15 @@ <allow pkg="org.slf4j" /> <allow pkg="org.apache.kafka.clients" exact-match="true"/> <allow pkg="org.apache.kafka.test" /> - + <subpackage name="consumer"> <allow pkg="org.apache.kafka.clients.consumer" /> </subpackage> - + <subpackage name="producer"> <allow pkg="org.apache.kafka.clients.producer" /> </subpackage> - + <subpackage name="tools"> <allow pkg="org.apache.kafka.clients.producer" /> <allow pkg="org.apache.kafka.clients.consumer" /> @@ -106,6 +117,7 @@ <subpackage name="test"> <allow pkg="org.apache.kafka" /> + <allow pkg="org.bouncycastle" /> </subpackage> <subpackage name="copycat"> http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 0d68bf1..ba3bcbe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -16,8 +16,14 @@ import java.io.Closeable; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.network.ChannelBuilder; +import org.apache.kafka.common.network.SSLChannelBuilder; +import org.apache.kafka.common.network.PlaintextChannelBuilder; +import org.apache.kafka.common.security.ssl.SSLFactory; import org.apache.kafka.common.config.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +31,7 @@ import org.slf4j.LoggerFactory; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; + public class ClientUtils { private static final Logger log = LoggerFactory.getLogger(ClientUtils.class); @@ -61,4 +68,28 @@ public class ClientUtils { } } } -} \ No newline at end of file + + /** + * @param configs client/server configs + * returns ChannelBuilder configured channelBuilder based on the configs. + */ + public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) { + SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + ChannelBuilder channelBuilder = null; + + switch (securityProtocol) { + case SSL: + channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT); + break; + case PLAINTEXT: + channelBuilder = new PlaintextChannelBuilder(); + break; + default: + throw new ConfigException("Invalid SecurityProtocol " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); + } + + channelBuilder.configure(configs); + return channelBuilder; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 2c421f4..7d24c6f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -17,7 +17,7 @@ package org.apache.kafka.clients; * Some configurations shared by both producer and consumer */ public class CommonClientConfigs { - + /* * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE. */ @@ -27,10 +27,10 @@ public class CommonClientConfigs { + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to " + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of " + "servers (you may want more than one, though, in case a server is down)."; - + public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms"; public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions."; - + public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data."; @@ -45,7 +45,7 @@ public class CommonClientConfigs { public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop."; - + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; public static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The number of samples maintained to compute metrics."; @@ -55,6 +55,10 @@ public class CommonClientConfigs { public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol"; + public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported."; + public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT"; + public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms"; public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config."; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 0e51d7b..b31f7f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -101,7 +101,7 @@ public class NetworkClient implements KafkaClient { /** * Begin connecting to the given node, return true if we are already connected and ready to send to that node. - * + * * @param node The node to check * @param now The current timestamp * @return True if we are ready to send to the given node @@ -122,7 +122,7 @@ public class NetworkClient implements KafkaClient { * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled * connections. - * + * * @param node The node to check * @param now The current timestamp * @return The number of milliseconds to wait. @@ -147,7 +147,7 @@ public class NetworkClient implements KafkaClient { /** * Check if the node with the given id is ready to send more requests. - * + * * @param node The node * @param now The current time in ms * @return true if the node is ready @@ -161,21 +161,21 @@ public class NetworkClient implements KafkaClient { return false; else // otherwise we are ready if we are connected and can send more requests - return isSendable(nodeId); + return canSendRequest(nodeId); } /** * Are we connected and ready and able to send more requests to the given connection? - * + * * @param node The node */ - private boolean isSendable(String node) { - return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node); + private boolean canSendRequest(String node) { + return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); } /** * Return the state of the connection to the given node - * + * * @param node The node to check * @return The connection state */ @@ -185,13 +185,13 @@ public class NetworkClient implements KafkaClient { /** * Queue up the given request for sending. Requests can only be sent out to ready nodes. - * + * * @param request The request */ @Override public void send(ClientRequest request) { String nodeId = request.request().destination(); - if (!isSendable(nodeId)) + if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); this.inFlightRequests.add(request); @@ -200,7 +200,7 @@ public class NetworkClient implements KafkaClient { /** * Do actual reads and writes to sockets. - * + * * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately * @param now The current time in milliseconds * @return The list of responses received @@ -246,7 +246,7 @@ public class NetworkClient implements KafkaClient { /** * Await all the outstanding responses for requests on the given connection - * + * * @param node The node to block on * @param now The current time in ms * @return All the collected responses @@ -294,7 +294,7 @@ public class NetworkClient implements KafkaClient { /** * Generate a request header for the given API key - * + * * @param key The api key * @return A request header with the appropriate client id and correlation id */ @@ -324,7 +324,7 @@ public class NetworkClient implements KafkaClient { * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a * connection if all existing connections are in use. This method will never choose a node for which there is no * existing connection and from which we have disconnected within the reconnect backoff period. - * + * * @return The node with the fewest in-flight requests. */ public Node leastLoadedNode(long now) { @@ -349,7 +349,7 @@ public class NetworkClient implements KafkaClient { /** * Handle any completed request send. In particular if no response is expected consider the request complete. - * + * * @param responses The list of responses to update * @param now The current time */ @@ -366,7 +366,7 @@ public class NetworkClient implements KafkaClient { /** * Handle any completed receives and update the response list with the responses received. - * + * * @param responses The list of responses to update * @param now The current time */ @@ -407,7 +407,7 @@ public class NetworkClient implements KafkaClient { /** * Handle any disconnected connections - * + * * @param responses The list of responses that completed with the disconnection * @param now The current time */ @@ -472,8 +472,7 @@ public class NetworkClient implements KafkaClient { } String nodeConnectionId = node.idString(); - - if (connectionStates.isConnected(nodeConnectionId) && inFlightRequests.canSendMore(nodeConnectionId)) { + if (canSendRequest(nodeConnectionId)) { Set<String> topics = metadata.topics(); this.metadataFetchInProgress = true; ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics); http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index d35b421..9c9510a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -19,6 +19,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.config.SSLConfigs; import java.util.HashMap; import java.util.Map; @@ -153,7 +154,7 @@ public class ConsumerConfig extends AbstractConfig { */ public static final String CHECK_CRCS_CONFIG = "check.crcs"; private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance."; - + /** <code>key.deserializer</code> */ public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"; private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface."; @@ -267,7 +268,7 @@ public class ConsumerConfig extends AbstractConfig { Type.BOOLEAN, true, Importance.LOW, - CHECK_CRCS_DOC) + CHECK_CRCS_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, 30000, @@ -293,12 +294,29 @@ public class ConsumerConfig extends AbstractConfig { Type.CLASS, Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC) + .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) + .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC) + .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC) + .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false) + .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false) + .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC) + .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC) + .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false) + .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) + .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false) + .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC) + .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_LOCATION, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC) + .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_PASSWORD, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC) + .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) + .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) + .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC); + } public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs, http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index be46b6c..3749880 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -31,8 +31,9 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -65,7 +66,6 @@ import static org.apache.kafka.common.utils.Utils.min; * <p> * The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to. * Failure to close the consumer after use will leak these connections. - * <p> * The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details. * * <h3>Offsets and Consumer Position</h3> @@ -85,9 +85,9 @@ import static org.apache.kafka.common.utils.Utils.min; * <p> * This distinction gives the consumer control over when a record is considered consumed. It is discussed in further * detail below. - * + * * <h3>Consumer Groups</h3> - * + * * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and * processing records. These processes can either be running on the same machine or, as is more likely, they can be * distributed over many machines to provide additional scalability and fault tolerance for processing. @@ -116,14 +116,14 @@ import static org.apache.kafka.common.utils.Utils.min; * <p> * It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic * partition balancing. - * + * * <h3>Usage Examples</h3> * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to * demonstrate how to use them. - * + * * <h4>Simple Processing</h4> * This example demonstrates the simplest usage of Kafka's consumer api. - * + * * <pre> * Properties props = new Properties(); * props.put("bootstrap.servers", "localhost:9092"); @@ -141,7 +141,7 @@ import static org.apache.kafka.common.utils.Utils.min; * System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); * } * </pre> - * + * * Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by * the config <code>auto.commit.interval.ms</code>. * <p> @@ -161,9 +161,9 @@ import static org.apache.kafka.common.utils.Utils.min; * <p> * The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we * are saying that our record's key and value will just be simple strings. - * + * * <h4>Controlling When Messages Are Considered Consumed</h4> - * + * * In this example we will consume a batch of records and batch them up in memory, when we have sufficient records * batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages * would be considered consumed after they were given out by the consumer, and it would be possible that our process @@ -175,7 +175,7 @@ import static org.apache.kafka.common.utils.Utils.min; * would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way * Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one * time but in failure cases could be duplicated. - * + * * <pre> * Properties props = new Properties(); * props.put("bootstrap.servers", "localhost:9092"); @@ -201,9 +201,9 @@ import static org.apache.kafka.common.utils.Utils.min; * } * } * </pre> - * + * * <h4>Subscribing To Specific Partitions</h4> - * + * * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process * a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple * instances of our program can divided up the work of processing records. @@ -223,7 +223,7 @@ import static org.apache.kafka.common.utils.Utils.min; * <p> * This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular * partitions: - * + * * <pre> * String topic = "foo"; * TopicPartition partition0 = new TopicPartition(topic, 0); @@ -231,15 +231,15 @@ import static org.apache.kafka.common.utils.Utils.min; * consumer.subscribe(partition0); * consumer.subscribe(partition1); * </pre> - * + * * The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only * be changed if the consumer specifies new partitions, and no attempt at failure detection will be made. * <p> * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load * balancing) using the same consumer instance. - * + * * <h4>Managing Your Own Offsets</h4> - * + * * The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of it's own * choosing. The primary use case for this is allowing the application to store both the offset and the results of the * consumption in the same system in a way that both the results and offsets are stored atomically. This is not always @@ -259,14 +259,14 @@ import static org.apache.kafka.common.utils.Utils.min; * This means that in this case the indexing process that comes back having lost recent updates just resumes indexing * from what it has ensuring that no updates are lost. * </ul> - * + * * Each record comes with it's own offset, so to manage your own offset you just need to do the following: * <ol> * <li>Configure <code>enable.auto.commit=false</code> * <li>Use the offset provided with each {@link ConsumerRecord} to save your position. * <li>On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}. * </ol> - * + * * This type of usage is simplest when the partition assignment is also done manually (this would be likely in the * search index use case described above). If the partition assignment is done automatically special care will also be * needed to handle the case where partition assignments change. This can be handled using a special callback specified @@ -279,9 +279,9 @@ import static org.apache.kafka.common.utils.Utils.min; * <p> * Another common use for {@link ConsumerRebalanceCallback} is to flush any caches the application maintains for * partitions that are moved elsewhere. - * + * * <h4>Controlling The Consumer's Position</h4> - * + * * In most use cases the consumer will simply consume records from beginning to end, periodically committing it's * position (either automatically or manually). However Kafka allows the consumer to manually control it's position, * moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to @@ -296,14 +296,14 @@ import static org.apache.kafka.common.utils.Utils.min; * the consumer will want to initialize it's position on start-up to whatever is contained in the local store. Likewise * if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by * reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history). - * + * * Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special * methods for seeking to the earliest and latest offset the server maintains are also available ( * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively). - * + * * * <h3><a name="multithreaded">Multi-threaded Processing</a></h3> - * + * * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}. @@ -353,9 +353,9 @@ import static org.apache.kafka.common.utils.Utils.min; * We have intentionally avoided implementing a particular threading model for processing. This leaves several * options for implementing multi-threaded processing of records. * - * + * * <h4>1. One Consumer Per Thread</h4> - * + * * A simple option is to give each thread it's own consumer instance. Here are the pros and cons of this approach: * <ul> * <li><b>PRO</b>: It is the easiest to implement @@ -368,13 +368,13 @@ import static org.apache.kafka.common.utils.Utils.min; * which can cause some drop in I/O throughput. * <li><b>CON</b>: The number of total threads across all processes will be limited by the total number of partitions. * </ul> - * + * * <h4>2. Decouple Consumption and Processing</h4> - * + * * Another alternative is to have one or more consumer threads that do all data consumption and hands off * {@link ConsumerRecords} instances to a blocking queue consumed by a pool of processor threads that actually handle * the record processing. - * + * * This option likewise has pros and cons: * <ul> * <li><b>PRO</b>: This option allows independently scaling the number of consumers and processors. This makes it @@ -385,11 +385,11 @@ import static org.apache.kafka.common.utils.Utils.min; * <li><b>CON</b>: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure * that processing is complete for that partition. * </ul> - * + * * There are many possible variations on this approach. For example each processor thread can have it's own queue, and * the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify * commit. - * + * */ @InterfaceStability.Unstable public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -430,7 +430,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * string "42" or the integer 42). * <p> * Valid configuration strings are documented at {@link ConsumerConfig} - * + * * @param configs The consumer configs */ public KafkaConsumer(Map<String, Object> configs) { @@ -442,7 +442,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}. * <p> * Valid configuration strings are documented at {@link ConsumerConfig} - * + * * @param configs The consumer configs * @param callback A callback interface that the user can implement to manage customized offsets on the start and * end of every rebalance operation. @@ -476,7 +476,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}. * <p> * Valid configuration strings are documented at {@link ConsumerConfig} - * + * * @param properties The consumer configuration properties * @param callback A callback interface that the user can implement to manage customized offsets on the start and * end of every rebalance operation. @@ -524,12 +524,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), 0); - String metricGrpPrefix = "consumer"; Map<String, String> metricsTags = new LinkedHashMap<String, String>(); metricsTags.put("client-id", clientId); + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); NetworkClient netClient = new NetworkClient( - new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags), + new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder), this.metadata, clientId, 100, // a fixed large enough value will suffice @@ -623,7 +623,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * <li>An existing member of the consumer group dies * <li>A new member is added to an existing consumer group via the join API * </ul> - * + * * @param topics A variable list of topics that the consumer wants to subscribe to */ @Override @@ -664,7 +664,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * Unsubscribe from the specific topics. This will trigger a rebalance operation and records for this topic will not * be returned from the next {@link #poll(long) poll()} onwards - * + * * @param topics Topics to unsubscribe from */ public void unsubscribe(String... topics) { @@ -682,7 +682,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * Unsubscribe from the specific topic partitions. records for these partitions will not be returned from the next * {@link #poll(long) poll()} onwards - * + * * @param partitions Partitions to unsubscribe from */ public void unsubscribe(TopicPartition... partitions) { @@ -705,11 +705,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed * offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions. - * + * * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns * immediately with any records available now. Must not be negative. * @return map of topic to records since the last fetch for the subscribed list of topics and partitions - * + * * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic * offset reset policy has been configured. */ @@ -932,7 +932,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * Returns the offset of the <i>next record</i> that will be fetched (if a record with that offset exists). - * + * * @param partition The partition to get the position for * @return The offset * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is @@ -961,7 +961,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * <p> * This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the * consumer hasn't yet initialized it's cache of committed offsets. - * + * * @param partition The partition to check * @return The last committed offset * @throws NoOffsetForPartitionException If no offset has ever been committed by any process for the given @@ -1003,7 +1003,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it * does not already have any metadata about the given topic. - * + * * @param topic The topic to get partition metadata for * @return The list of partitions */ http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 03b8dd2..c4621e2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -43,6 +43,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; @@ -226,9 +227,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> { metricTags); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); - + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); NetworkClient client = new NetworkClient( - new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags), + new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder), this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), @@ -305,7 +306,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * <p> * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get() - * get()} on this future will block until the associated request completes and then return the metadata for the record + * get()} on this future will block until the associated request completes and then return the metadata for the record * or throw any exception that occurred while sending the record. * <p> * If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately: http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index aa26420..06f00a9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -22,6 +22,7 @@ import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.SSLConfigs; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; @@ -225,9 +226,26 @@ public class ProducerConfig extends AbstractConfig { MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) + .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) + .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC) + .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC) + .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false) + .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false) + .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC) + .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC) + .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false) + .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) + .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false) + .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC) + .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_LOCATION, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC) + .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_PASSWORD, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC) + .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) + .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) + .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC); + } public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs, http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 6c31748..156ec14 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -107,6 +107,10 @@ public class AbstractConfig { return copy; } + public Map<String, ?> values() { + return new HashMap<String, Object>(values); + } + private void logAll() { StringBuilder b = new StringBuilder(); b.append(getClass().getSimpleName()); @@ -133,7 +137,7 @@ public class AbstractConfig { /** * Get a configured instance of the give class specified by the given configuration key. If the object implements * Configurable configure it using the configuration. - * + * * @param key The configuration key for the class * @param t The interface the class should implement * @return A configured instance of the class http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java new file mode 100644 index 0000000..dd7b71a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.config; + +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.KeyManagerFactory; + +public class SSLConfigs { + /* + * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE. + */ + + public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class"; + public static final String PRINCIPAL_BUILDER_CLASS_DOC = "principal builder to generate a java Principal. This config is optional for client."; + public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder"; + + public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol"; + public static final String SSL_PROTOCOL_DOC = "The ssl protocol used to generate SSLContext." + + "Default setting is TLS. Allowed values are SSL, SSLv2, SSLv3, TLS, TLSv1.1, TLSv1.2"; + public static final String DEFAULT_SSL_PROTOCOL = "TLS"; + + public static final String SSL_PROVIDER_CONFIG = "ssl.provider"; + public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM."; + + public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites"; + public static final String SSL_CIPHER_SUITES_DOC = "A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol." + + "By default all the available cipher suites are supported."; + + public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols"; + public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. " + + "All versions of TLS is enabled by default."; + public static final String DEFAULT_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1"; + + public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type"; + public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. " + + "This is optional for client. Default value is JKS"; + public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS"; + + public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location"; + public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. " + + "This is optional for Client and can be used for two-way authentication for client."; + + public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password"; + public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file." + + "This is optional for client and only needed if the ssl.keystore.location configured. "; + + public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password"; + public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file. " + + "This is optional for client."; + + public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type"; + public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file. " + + "Default value is JKS."; + public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS"; + + public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location"; + public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. "; + public static final String DEFAULT_TRUSTSTORE_LOCATION = "/tmp/ssl.truststore.jks"; + + public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password"; + public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. "; + public static final String DEFAULT_TRUSTSTORE_PASSWORD = "truststore_password"; + + public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm"; + public static final String SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections. " + + "Default value is the key manager factory algorithm configured for the Java Virtual Machine."; + public static final String DEFAULT_SSL_KEYMANGER_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm(); + + public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm"; + public static final String SSL_TRUSTMANAGER_ALGORITHM_DOC = "The algorithm used by trust manager factory for SSL connections. " + + "Default value is the trust manager factory algorithm configured for the Java Virtual Machine."; + public static final String DEFAULT_SSL_TRUSTMANAGER_ALGORITHM = TrustManagerFactory.getDefaultAlgorithm(); + + public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "ssl.endpoint.identification.algorithm"; + public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "The endpoint identification algorithm to validate server hostname using server certificate. "; + + public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth"; + public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication." + + " The following settings are common: " + + " <ul>" + + " <li><code>ssl.want.client.auth=required</code> If set to required" + + " client authentication is required." + + " <li><code>ssl.client.auth=requested</code> This means client authentication is optional." + + " unlike requested , if this option is set client can choose not to provide authentication information about itself" + + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed."; + + public static final String SSL_NEED_CLIENT_AUTH_DOC = "It can be REQUESTED . " + + "Default value is false"; + public static final Boolean DEFAULT_SSL_NEED_CLIENT_AUTH = false; + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java new file mode 100644 index 0000000..261f571 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +/** + * Authentication for Channel + */ + +import java.io.IOException; +import java.security.Principal; + +import org.apache.kafka.common.security.auth.PrincipalBuilder; +import org.apache.kafka.common.KafkaException; + +public interface Authenticator { + + /** + * configures Authenticator using principalbuilder and transportLayer. + * @param TransportLayer transportLayer + * @param PrincipalBuilder principalBuilder + */ + void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder); + + /** + * Implements any authentication mechanism. Use transportLayer to read or write tokens. + * If no further authentication needs to be done returns. + */ + void authenticate() throws IOException; + + /** + * Returns Principal using PrincipalBuilder + */ + Principal principal() throws KafkaException; + + /** + * returns true if authentication is complete otherwise returns false; + */ + boolean complete(); + + /** + * Closes this Authenticator + * + * @throws IOException if any I/O error occurs + */ + void close() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java index df0e6d5..d7357b2 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -26,6 +26,7 @@ public class ByteBufferSend implements Send { protected final ByteBuffer[] buffers; private int remaining; private int size; + private boolean pending = false; public ByteBufferSend(String destination, ByteBuffer... buffers) { super(); @@ -43,7 +44,7 @@ public class ByteBufferSend implements Send { @Override public boolean completed() { - return remaining <= 0; + return remaining <= 0 && !pending; } @Override @@ -57,6 +58,12 @@ public class ByteBufferSend implements Send { if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining -= written; + // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel. + // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than + // GatheringByteChannel or ScatteringByteChannel. + if (channel instanceof TransportLayer) + pending = ((TransportLayer) channel).hasPendingWrites(); + return written; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java new file mode 100644 index 0000000..52a7aab --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.util.Map; +import java.nio.channels.SelectionKey; + +import org.apache.kafka.common.KafkaException; + +/** + * A ChannelBuilder interface to build Channel based on configs + */ +public interface ChannelBuilder { + + /** + * Configure this class with the given key-value pairs + */ + void configure(Map<String, ?> configs) throws KafkaException; + + + /** + * returns a Channel with TransportLayer and Authenticator configured. + * @param id channel id + * @param key SelectionKey + */ + KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException; + + + /** + * Closes ChannelBuilder + */ + void close(); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java new file mode 100644 index 0000000..813a4aa --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +import java.security.Principal; +import java.io.IOException; + +import org.apache.kafka.common.security.auth.PrincipalBuilder; +import org.apache.kafka.common.KafkaException; + +public class DefaultAuthenticator implements Authenticator { + + private TransportLayer transportLayer; + private PrincipalBuilder principalBuilder; + private Principal principal; + + public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder) { + this.transportLayer = transportLayer; + this.principalBuilder = principalBuilder; + } + + /** + * No-Op for default authenticator + */ + public void authenticate() throws IOException {} + + /** + * Constructs Principal using configured principalBuilder. + * @return Principal + * @throws KafkaException + */ + public Principal principal() throws KafkaException { + if (principal == null) + principal = principalBuilder.buildPrincipal(transportLayer, this); + return principal; + } + + public void close() throws IOException {} + + /** + * DefaultAuthenticator doesn't implement any additional authentication mechanism. + * @returns true + */ + public boolean complete() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java new file mode 100644 index 0000000..28a4f41 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + + +import java.io.IOException; + +import java.net.Socket; +import java.nio.channels.SelectionKey; + +import java.security.Principal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class KafkaChannel { + private static final Logger log = LoggerFactory.getLogger(KafkaChannel.class); + private final String id; + private TransportLayer transportLayer; + private Authenticator authenticator; + private NetworkReceive receive; + private Send send; + private int maxReceiveSize; + + public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException { + this.id = id; + this.transportLayer = transportLayer; + this.authenticator = authenticator; + this.maxReceiveSize = maxReceiveSize; + } + + public void close() throws IOException { + transportLayer.close(); + authenticator.close(); + } + + /** + * returns user principal for the session + * In case of PLAINTEXT and No Authentication returns ANONYMOUS as the userPrincipal + * If SSL used without any SASL Authentication returns SSLSession.peerPrincipal + */ + public Principal principal() throws IOException { + return authenticator.principal(); + } + + /** + * Does handshake of transportLayer and Authentication using configured authenticator + */ + public void prepare() throws IOException { + if (transportLayer.ready() && authenticator.complete()) + return; + if (!transportLayer.ready()) + transportLayer.handshake(); + if (transportLayer.ready() && !authenticator.complete()) + authenticator.authenticate(); + } + + public void disconnect() { + transportLayer.disconnect(); + } + + + public void finishConnect() throws IOException { + transportLayer.finishConnect(); + } + + public boolean isConnected() { + return transportLayer.isConnected(); + } + + public String id() { + return id; + } + + public void mute() { + transportLayer.removeInterestOps(SelectionKey.OP_READ); + } + + public void unmute() { + transportLayer.addInterestOps(SelectionKey.OP_READ); + } + + public boolean isMute() { + return transportLayer.isMute(); + } + + public boolean ready() { + return transportLayer.ready() && authenticator.complete(); + } + + public boolean hasSend() { + return send != null; + } + + public String socketDescription() { + Socket socket = transportLayer.socketChannel().socket(); + if (socket == null) + return "[unconnected socket]"; + else if (socket.getInetAddress() != null) + return socket.getInetAddress().toString(); + else + return socket.getLocalAddress().toString(); + } + + public void setSend(Send send) { + if (this.send != null) + throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); + this.send = send; + this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); + } + + public NetworkReceive read() throws IOException { + NetworkReceive result = null; + + if (receive == null) { + receive = new NetworkReceive(maxReceiveSize, id); + } + + long x = receive(receive); + if (receive.complete()) { + receive.payload().rewind(); + result = receive; + receive = null; + } + return result; + } + + public Send write() throws IOException { + Send result = null; + if (send != null && send(send)) { + result = send; + send = null; + } + return result; + } + + private long receive(NetworkReceive receive) throws IOException { + long result = receive.readFrom(transportLayer); + return result; + } + + private boolean send(Send send) throws IOException { + send.writeTo(transportLayer); + if (send.completed()) + transportLayer.removeInterestOps(SelectionKey.OP_WRITE); + + return send.completed(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java index 3ca0098..2a1568e 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -89,6 +89,7 @@ public class NetworkReceive implements Receive { throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); + this.buffer = ByteBuffer.allocate(receiveSize); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java new file mode 100644 index 0000000..76dbf93 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.nio.channels.SelectionKey; +import java.util.Map; + +import org.apache.kafka.common.security.auth.PrincipalBuilder; +import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.KafkaException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PlaintextChannelBuilder implements ChannelBuilder { + private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class); + private PrincipalBuilder principalBuilder; + + public void configure(Map<String, ?> configs) throws KafkaException { + try { + this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); + this.principalBuilder.configure(configs); + } catch (Exception e) { + throw new KafkaException(e); + } + } + + public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException { + KafkaChannel channel = null; + try { + PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key); + Authenticator authenticator = new DefaultAuthenticator(); + authenticator.configure(transportLayer, this.principalBuilder); + channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize); + } catch (Exception e) { + log.warn("Failed to create channel due to ", e); + throw new KafkaException(e); + } + return channel; + } + + public void close() { + this.principalBuilder.close(); + } + +}
