Repository: flume Updated Branches: refs/heads/trunk 585c4c92e -> 2fd0d2572
FLUME-2971. Add secure Kafka Sink/Source/Channel setup to the User Guide The User Guide already has details about configuring Kafka channel to work with a kerberized Kafka cluster. This patch adds similar description for Kafka Sink and Kafka Source. Reviewers: Tristan Stevens, Mike Percy, Bessenyei Balázs Donát (Attila Simon via Bessenyei Balázs Donát) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2fd0d257 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2fd0d257 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2fd0d257 Branch: refs/heads/trunk Commit: 2fd0d2572ceb2bc0138c880a3b763647349b64f4 Parents: 585c4c9 Author: Attila Simon <[email protected]> Authored: Mon Oct 10 20:48:06 2016 +0200 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Mon Oct 10 20:48:06 2016 +0200 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 563 +++++++++++++++++++++++----- 1 file changed, 471 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/2fd0d257/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 25db777..71fd32e 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1240,44 +1240,45 @@ Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topics. - - -=============================== =========== =================================================== -Property Name Default Description -=============================== =========== =================================================== -**channels** -- -**type** -- The component type name, needs to be ``org.apache.flume.source.kafka.KafkaSource`` -**kafka.bootstrap.servers** -- List of brokers in the Kafka cluster used by the source -kafka.consumer.group.id flume Unique identified of consumer group. Setting the same id in multiple sources or agents - indicates that they are part of the same consumer group -**kafka.topics** -- Comma-separated list of topics the kafka consumer will read messages from. -**kafka.topics.regex** -- Regex that defines set of topics the source is subscribed on. This property has higher priority - than ``kafka.topics`` and overrides ``kafka.topics`` if exists. -batchSize 1000 Maximum number of messages written to Channel in one batch -batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel - The batch will be written whenever the first of size and time will be reached. -backoffSleepIncrement 1000 Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty. - Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal for - ingestion use cases but a lower value may be required for low latency operations with - interceptors. -maxBackoffSleep 5000 Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds is - ideal for ingestion use cases but a lower value may be required for low latency operations - with interceptors. -useFlumeEventFormat false By default events are taken as bytes from the Kafka topic directly into the event body. Set to - true to read events as the Flume Avro binary format. Used in conjunction with the same property - on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve - any Flume headers sent on the producing side. -migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. - This should be true to support seamless Kafka client migration from older versions of Flume. - Once migrated this can be set to false, though that should generally not be required. - If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset - defines how offsets are handled. -Other Kafka Consumer Properties -- These properties are used to configure the Kafka Consumer. Any producer property supported - by Kafka can be used. The only requirement is to prepend the property name with the prefix - ``kafka.consumer``. - For example: ``kafka.consumer.auto.offset.reset`` - Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>`_ for details -=============================== =========== =================================================== +================================== =========== =================================================== +Property Name Default Description +================================== =========== =================================================== +**channels** -- +**type** -- The component type name, needs to be ``org.apache.flume.source.kafka.KafkaSource`` +**kafka.bootstrap.servers** -- List of brokers in the Kafka cluster used by the source +kafka.consumer.group.id flume Unique identified of consumer group. Setting the same id in multiple sources or agents + indicates that they are part of the same consumer group +**kafka.topics** -- Comma-separated list of topics the kafka consumer will read messages from. +**kafka.topics.regex** -- Regex that defines set of topics the source is subscribed on. This property has higher priority + than ``kafka.topics`` and overrides ``kafka.topics`` if exists. +batchSize 1000 Maximum number of messages written to Channel in one batch +batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel + The batch will be written whenever the first of size and time will be reached. +backoffSleepIncrement 1000 Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty. + Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal for + ingestion use cases but a lower value may be required for low latency operations with + interceptors. +maxBackoffSleep 5000 Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds is + ideal for ingestion use cases but a lower value may be required for low latency operations + with interceptors. +useFlumeEventFormat false By default events are taken as bytes from the Kafka topic directly into the event body. Set to + true to read events as the Flume Avro binary format. Used in conjunction with the same property + on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve + any Flume headers sent on the producing side. +migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. + This should be true to support seamless Kafka client migration from older versions of Flume. + Once migrated this can be set to false, though that should generally not be required. + If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset + defines how offsets are handled. + Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>`_ for details +kafka.consumer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. +*more consumer security props* If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_ for additional + properties that need to be set on consumer. +Other Kafka Consumer Properties -- These properties are used to configure the Kafka Consumer. Any consumer property supported + by Kafka can be used. The only requirement is to prepend the property name with the prefix + ``kafka.consumer``. + For example: ``kafka.consumer.auto.offset.reset`` +================================== =========== =================================================== .. note:: The Kafka Source overrides two Kafka consumer parameters: auto.commit.enable is set to "false" by the source and every batch is committed. Kafka source guarantees at least once @@ -1319,6 +1320,142 @@ Example for topic subscription by regex # the default kafka.consumer.group.id=flume is used +**Security and Kafka Source:** + +Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. +For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0. + +As of now data encryption is solely provided by SSL/TLS. + +Setting ``kafka.consumer.security.protocol`` to any of the following value means: + +- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data encryption +- **SASL_SSL** - Kerberos or plaintext authentication with data encryption +- **SSL** - TLS based encryption with optional authentication. + +.. warning:: + There is a performance degradation when SSL is enabled, + the magnitude of which depends on the CPU type and the JVM implementation. + Reference: `Kafka security overview <http://kafka.apache.org/documentation#security_overview>`_ + and the jira for tracking this issue: + `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`_ + + +**TLS and Kafka Source:** + +Please read the steps described in `Configuring Kafka Clients SSL <http://kafka.apache.org/documentation#security_configclients>`_ +to learn about additional configuration settings for fine tuning for example any of the following: +security provider, cipher suites, enabled protocols, truststore or keystore types. + +Example configuration with server side authentication and data encryption. + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.consumer.group.id = flume-consumer + a1.channels.channel1.kafka.consumer.security.protocol = SSL + a1.channels.channel1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks + a1.channels.channel1.kafka.consumer.ssl.truststore.password=<password to access the truststore> + + +Note: By default the property ``ssl.endpoint.identification.algorithm`` +is not defined, so hostname verification is not performed. +In order to enable hostname verification, set the following properties + +.. code-block:: properties + + a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS + +Once enabled, clients will verify the server's fully qualified domain name (FQDN) +against one of the following two fields: + +#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 +#) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6 + +If client side authentication is also required then additionally the following should be added to Flume agent configuration. +Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either +individually or by their signature chain. Common example is to sign each client certificate by a single Root CA +which in turn is trusted by Kafka brokers. + +.. code-block:: properties + + a1.channels.channel1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks + a1.channels.channel1.kafka.consumer.ssl.keystore.password=<password to access the keystore> + +If keystore and key use different password protection then ``ssl.key.password`` property will +provide the required additional secret for both consumer keystores: + +.. code-block:: properties + + a1.channels.channel1.kafka.consumer.ssl.key.password=<password to access the key> + + +**Kerberos and Kafka Source:** + +To use Kafka source with a Kafka cluster secured with Kerberos, set the ``consumer.security.protocol`` properties noted above for consumer. +The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file's "KafkaClient" section. "Client" section describes the Zookeeper connection if needed. +See `Kafka doc <http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_ +for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh: + +.. code-block:: properties + + JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf" + JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf" + +Example secure configuration using SASL_PLAINTEXT: + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.consumer.group.id = flume-consumer + a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT + a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka + +Example secure configuration using SASL_SSL: + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.consumer.group.id = flume-consumer + a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL + a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka + a1.channels.channel1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks + a1.channels.channel1.kafka.consumer.ssl.truststore.password=<password to access the truststore> + + +Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) +in Kafka documentation of `SASL configuration <http://kafka.apache.org/documentation#security_sasl_clientconfig>`_. +Since the Kafka Source may also connect to Zookeeper for offset migration, the "Client" section was also added to this example. +This won't be needed unless you require offset migration, or you require this section for other secure components. +Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files. + +.. code-block:: javascript + + Client { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/keytabs/flume.keytab" + principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; + }; + + KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/keytabs/flume.keytab" + principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; + }; + + NetCat Source ~~~~~~~~~~~~~ @@ -2564,38 +2701,41 @@ This version of Flume no longer supports Older Versions (0.8.x) of Kafka. Required properties are marked in bold font. -=============================== =================== ============================================================================================= -Property Name Default Description -=============================== =================== ============================================================================================= -**type** -- Must be set to ``org.apache.flume.sink.kafka.KafkaSink`` -**kafka.bootstrap.servers** -- List of brokers Kafka-Sink will connect to, to get the list of topic partitions - This can be a partial list of brokers, but we recommend at least two for HA. - The format is comma separated list of hostname:port -kafka.topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, - messages will be published to this topic. - If the event header contains a "topic" field, the event will be published to that topic - overriding the topic configured here. -flumeBatchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency. -kafka.producer.acks 1 How many replicas must acknowledge a message before its considered successfully written. - Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) - Set this to -1 to avoid data loss in some cases of leader failure. -useFlumeEventFormat false By default events are put as bytes onto the Kafka topic directly from the event body. Set to - true to store events as the Flume Avro binary format. Used in conjunction with the same property - on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve - any Flume headers for the producing side. -defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless - overriden by ``partitionIdHeader``. By default, if this property is not set, events will be - distributed by the Kafka Producer's partitioner - including by ``key`` if specified (or by a - partitioner specified by ``kafka.partitioner.class``). -partitionIdHeader -- When set, the sink will take the value of the field named using the value of this property - from the event header and send the message to the specified partition of the topic. If the - value represents an invalid partition, an EventDeliveryException will be thrown. If the header value - is present then this setting overrides ``defaultPartitionId``. -Other Kafka Producer Properties -- These properties are used to configure the Kafka Producer. Any producer property supported - by Kafka can be used. The only requirement is to prepend the property name with the prefix - ``kafka.producer``. - For example: kafka.producer.linger.ms -=============================== =================== ============================================================================================= +================================== =================== ============================================================================================= +Property Name Default Description +================================== =================== ============================================================================================= +**type** -- Must be set to ``org.apache.flume.sink.kafka.KafkaSink`` +**kafka.bootstrap.servers** -- List of brokers Kafka-Sink will connect to, to get the list of topic partitions + This can be a partial list of brokers, but we recommend at least two for HA. + The format is comma separated list of hostname:port +kafka.topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, + messages will be published to this topic. + If the event header contains a "topic" field, the event will be published to that topic + overriding the topic configured here. +flumeBatchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency. +kafka.producer.acks 1 How many replicas must acknowledge a message before its considered successfully written. + Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) + Set this to -1 to avoid data loss in some cases of leader failure. +useFlumeEventFormat false By default events are put as bytes onto the Kafka topic directly from the event body. Set to + true to store events as the Flume Avro binary format. Used in conjunction with the same property + on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve + any Flume headers for the producing side. +defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless + overriden by ``partitionIdHeader``. By default, if this property is not set, events will be + distributed by the Kafka Producer's partitioner - including by ``key`` if specified (or by a + partitioner specified by ``kafka.partitioner.class``). +partitionIdHeader -- When set, the sink will take the value of the field named using the value of this property + from the event header and send the message to the specified partition of the topic. If the + value represents an invalid partition, an EventDeliveryException will be thrown. If the header value + is present then this setting overrides ``defaultPartitionId``. +kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. +*more producer security props* If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_ for additional + properties that need to be set on producer. +Other Kafka Producer Properties -- These properties are used to configure the Kafka Producer. Any producer property supported + by Kafka can be used. The only requirement is to prepend the property name with the prefix + ``kafka.producer``. + For example: kafka.producer.linger.ms +================================== =================== ============================================================================================= .. note:: Kafka Sink uses the ``topic`` and ``key`` properties from the FlumeEvent headers to send events to Kafka. If ``topic`` exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink. @@ -2635,6 +2775,132 @@ argument. a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.ki.kafka.producer.compression.type = snappy + +**Security and Kafka Sink:** + +Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. +For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0. + +As of now data encryption is solely provided by SSL/TLS. + +Setting ``kafka.producer.security.protocol`` to any of the following value means: + +- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data encryption +- **SASL_SSL** - Kerberos or plaintext authentication with data encryption +- **SSL** - TLS based encryption with optional authentication. + +.. warning:: + There is a performance degradation when SSL is enabled, + the magnitude of which depends on the CPU type and the JVM implementation. + Reference: `Kafka security overview <http://kafka.apache.org/documentation#security_overview>`_ + and the jira for tracking this issue: + `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`__ + + +**TLS and Kafka Sink:** + +Please read the steps described in `Configuring Kafka Clients SSL <http://kafka.apache.org/documentation#security_configclients>`_ +to learn about additional configuration settings for fine tuning for example any of the following: +security provider, cipher suites, enabled protocols, truststore or keystore types. + +Example configuration with server side authentication and data encryption. + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.producer.security.protocol = SSL + a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks + a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore> + + +Note: By default the property ``ssl.endpoint.identification.algorithm`` +is not defined, so hostname verification is not performed. +In order to enable hostname verification, set the following properties + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS + +Once enabled, clients will verify the server's fully qualified domain name (FQDN) +against one of the following two fields: + +#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 +#) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6 + +If client side authentication is also required then additionally the following should be added to Flume agent configuration. +Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either +individually or by their signature chain. Common example is to sign each client certificate by a single Root CA +which in turn is trusted by Kafka brokers. + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks + a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore> + +If keystore and key use different password protection then ``ssl.key.password`` property will +provide the required additional secret for producer keystore: + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key> + + +**Kerberos and Kafka Sink:** + +To use Kafka sink with a Kafka cluster secured with Kerberos, set the ``producer.security.protocol`` property noted above for producer. +The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file's "KafkaClient" section. "Client" section describes the Zookeeper connection if needed. +See `Kafka doc <http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_ +for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh: + +.. code-block:: properties + + JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf" + JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf" + +Example secure configuration using SASL_PLAINTEXT: + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT + a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka + + +Example secure configuration using SASL_SSL: + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL + a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka + a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks + a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore> + + +Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) +in Kafka documentation of `SASL configuration <http://kafka.apache.org/documentation#security_sasl_clientconfig>`_. +Unlike the Kafka Source or Kafka Channel a "Client" section is not required, unless it is needed by other connecting components. Also please make sure +that the operating system user of the Flume processes has read privileges on the jaas and keytab files. + +.. code-block:: javascript + + KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/keytabs/flume.keytab" + principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; + }; + + Custom Sink ~~~~~~~~~~~ @@ -2792,7 +3058,7 @@ migrateZookeeperOffsets true When no Kaf This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset configuration defines how offsets are handled. -pollTimeout 500 The amount of time(in milliseconds) to wait in the "poll()" call of the conumer. +pollTimeout 500 The amount of time(in milliseconds) to wait in the "poll()" call of the consumer. https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) defaultPartitionId -- Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by ``partitionIdHeader``. By default, if this property is not set, events will be @@ -2808,9 +3074,9 @@ kafka.consumer.auto.offset.reset latest What to do latest: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer\'s group anything else: throw exception to the consumer. -kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using Kerberos. See below for additional info on Kerberos setup. +kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. kafka.consumer.security.protocol PLAINTEXT Same as kafka.producer.security.protocol but for reading/consuming from Kafka. -*more producer/consumer security props* If using SASL_SSL or SSL, refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_ for additional +*more producer/consumer security props* If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_ for additional properties that need to be set on producer/consumer. ======================================= ========================== =============================================================================================================== @@ -2839,43 +3105,156 @@ Example for agent named a1: a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer + +**Security and Kafka Channel:** + +Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. +For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0. + +As of now data encryption is solely provided by SSL/TLS. + +Setting ``kafka.producer|consumer.security.protocol`` to any of the following value means: + +- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data encryption +- **SASL_SSL** - Kerberos or plaintext authentication with data encryption +- **SSL** - TLS based encryption with optional authentication. + +.. warning:: + There is a performance degradation when SSL is enabled, + the magnitude of which depends on the CPU type and the JVM implementation. + Reference: `Kafka security overview <http://kafka.apache.org/documentation#security_overview>`_ + and the jira for tracking this issue: + `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`_ + + +**TLS and Kafka Channel:** + +Please read the steps described in `Configuring Kafka Clients SSL <http://kafka.apache.org/documentation#security_configclients>`_ +to learn about additional configuration settings for fine tuning for example any of the following: +security provider, cipher suites, enabled protocols, truststore or keystore types. + +Example configuration with server side authentication and data encryption. + +.. code-block:: properties + + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.consumer.group.id = flume-consumer + a1.channels.channel1.kafka.producer.security.protocol = SSL + a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks + a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore> + a1.channels.channel1.kafka.consumer.security.protocol = SSL + a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks + a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore> + + +Note: By default the property ``ssl.endpoint.identification.algorithm`` +is not defined, so hostname verification is not performed. +In order to enable hostname verification, set the following properties + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS + a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS + +Once enabled, clients will verify the server's fully qualified domain name (FQDN) +against one of the following two fields: + +#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 +#) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6 + +If client side authentication is also required then additionally the following should be added to Flume agent configuration. +Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either +individually or by their signature chain. Common example is to sign each client certificate by a single Root CA +which in turn is trusted by Kafka brokers. + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks + a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore> + a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks + a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore> + +If keystore and key use different password protection then ``ssl.key.password`` property will +provide the required additional secret for both consumer and producer keystores: + +.. code-block:: properties + + a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key> + a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key> + + **Kerberos and Kafka Channel:** -To use Kafka channel with a Kafka cluster secured with Kerberos, set the producer/consumer.security.protocol properties noted above for producer and/or consumer. -The Kerberos keytab and principal to be used is specified in a JAAS file's "KafkaClient" section. See `Kafka doc <http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_ -for info on the JAAS file contents. The location of this JAAS file is specified via JAVA_OPTS using -Djava.security.auth.login.config=/path/to/kafka_jaas.conf (in flume-env.sh) +To use Kafka channel with a Kafka cluster secured with Kerberos, set the ``producer/consumer.security.protocol`` properties noted above for producer and/or consumer. +The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file's "KafkaClient" section. "Client" section describes the Zookeeper connection if needed. +See `Kafka doc <http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_ +for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh: + +.. code-block:: properties + JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf" + JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf" -Sample secure configuration using SASL_PLAINTEXT. +Example secure configuration using SASL_PLAINTEXT: .. code-block:: properties a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel - a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092 + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT + a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT + a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka -Sample JAAS file +Example secure configuration using SASL_SSL: -.. code-block:: javascript +.. code-block:: properties - KafkaClient { - com.sun.security.auth.module.Krb5LoginModule required - useKeyTab=true - storeKey=true - serviceName="kafka" - keyTab="/path/to/keytabs/testuser1.keytab" - principal="testuser1/kafka1.example.com"; - }; + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.consumer.group.id = flume-consumer + a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL + a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka + a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks + a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore> + a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL + a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI + a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka + a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks + a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore> + + +Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) +in Kafka documentation of `SASL configuration <http://kafka.apache.org/documentation#security_sasl_clientconfig>`_. +Since the Kafka Source may also connect to Zookeeper for offset migration, the "Client" section was also added to this example. +This won't be needed unless you require offset migration, or you require this section for other secure components. +Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files. -Sample flume-env.sh +.. code-block:: javascript -.. code-block:: properties + Client { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/keytabs/flume.keytab" + principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; + }; - export JAVA_HOME=/path/java-home/ - export JAVA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_jaas.conf" + KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/keytabs/flume.keytab" + principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; + }; File Channel
