Modified: websites/staging/flume/trunk/content/FlumeUserGuide.html
==============================================================================
--- websites/staging/flume/trunk/content/FlumeUserGuide.html (original)
+++ websites/staging/flume/trunk/content/FlumeUserGuide.html Mon Oct 17 
12:35:17 2016
@@ -7,7 +7,7 @@
   <head>
     <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
     
-    <title>Flume 1.6.0 User Guide &mdash; Apache Flume</title>
+    <title>Flume 1.7.0 User Guide &mdash; Apache Flume</title>
     
     <link rel="stylesheet" href="_static/flume.css" type="text/css" />
     <link rel="stylesheet" href="_static/pygments.css" type="text/css" />
@@ -26,7 +26,7 @@
     <script type="text/javascript" src="_static/doctools.js"></script>
     <link rel="top" title="Apache Flume" href="index.html" />
     <link rel="up" title="Documentation" href="documentation.html" />
-    <link rel="next" title="Flume 1.6.0 Developer Guide" 
href="FlumeDeveloperGuide.html" />
+    <link rel="next" title="Flume 1.7.0 Developer Guide" 
href="FlumeDeveloperGuide.html" />
     <link rel="prev" title="Documentation" href="documentation.html" /> 
   </head>
   <body>
@@ -59,8 +59,8 @@
         <div class="bodywrapper">
           <div class="body">
             
-  <div class="section" id="flume-1-6-0-user-guide">
-<h1>Flume 1.6.0 User Guide<a class="headerlink" href="#flume-1-6-0-user-guide" 
title="Permalink to this headline">¶</a></h1>
+  <div class="section" id="flume-1-7-0-user-guide">
+<h1>Flume 1.7.0 User Guide<a class="headerlink" href="#flume-1-7-0-user-guide" 
title="Permalink to this headline">¶</a></h1>
 <div class="section" id="introduction">
 <h2>Introduction<a class="headerlink" href="#introduction" title="Permalink to 
this headline">¶</a></h2>
 <div class="section" id="overview">
@@ -84,7 +84,7 @@ in the latest architecture.</p>
 <div class="section" id="system-requirements">
 <h3>System Requirements<a class="headerlink" href="#system-requirements" 
title="Permalink to this headline">¶</a></h3>
 <ol class="arabic simple">
-<li>Java Runtime Environment - Java 1.6 or later (Java 1.7 Recommended)</li>
+<li>Java Runtime Environment - Java 1.7 or later</li>
 <li>Memory - Sufficient memory for configurations used by sources, channels or 
sinks</li>
 <li>Disk Space - Sufficient disk space for configurations used by channels or 
sinks</li>
 <li>Directory Permissions - Read/Write permissions for directories used by 
agent</li>
@@ -248,6 +248,30 @@ OK</pre>
 </div>
 <p>Congratulations - you&#8217;ve successfully configured and deployed a Flume 
agent! Subsequent sections cover agent configuration in much more detail.</p>
 </div>
+<div class="section" id="logging-raw-data">
+<h4>Logging raw data<a class="headerlink" href="#logging-raw-data" 
title="Permalink to this headline">¶</a></h4>
+<p>Logging the raw stream of data flowing through the ingest pipeline is not 
desired behaviour in
+many production environments because this may result in leaking sensitive data 
or security related
+configurations, such as secret keys, to Flume log files.
+By default, Flume will not log such information. On the other hand, if the 
data pipeline is broken,
+Flume will attempt to provide clues for debugging the problem.</p>
+<p>One way to debug problems with event pipelines is to set up an additional 
<a class="reference internal" href="#memory-channel">Memory Channel</a>
+connected to a <a class="reference internal" href="#logger-sink">Logger 
Sink</a>, which will output all event data to the Flume logs.
+In some situations, however, this approach is insufficient.</p>
+<p>In order to enable logging of event- and configuration-related data, some 
Java system properties
+must be set in addition to log4j properties.</p>
+<p>To enable configuration-related logging, set the Java system property
+<tt class="docutils literal"><span 
class="pre">-Dorg.apache.flume.log.printconfig=true</span></tt>. This can 
either be passed on the command line or by
+setting this in the <tt class="docutils literal"><span 
class="pre">JAVA_OPTS</span></tt> variable in <em>flume-env.sh</em>.</p>
+<p>To enable data logging, set the Java system property <tt class="docutils 
literal"><span class="pre">-Dorg.apache.flume.log.rawdata=true</span></tt>
+in the same way described above. For most components, the log4j logging level 
must also be set to
+DEBUG or TRACE to make event-specific logging appear in the Flume logs.</p>
+<p>Here is an example of enabling both configuration logging and raw data 
logging while also
+setting the Log4j loglevel to DEBUG for console output:</p>
+<div class="highlight-none"><div class="highlight"><pre>$ bin/flume-ng agent 
--conf conf --conf-file example.conf --name a1 
-Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true 
-Dorg.apache.flume.log.rawdata=true
+</pre></div>
+</div>
+</div>
 <div class="section" id="zookeeper-based-configuration">
 <h4>Zookeeper based Configuration<a class="headerlink" 
href="#zookeeper-based-configuration" title="Permalink to this 
headline">¶</a></h4>
 <p>Flume supports Agent configurations via Zookeeper. <em>This is an 
experimental feature.</em> The configuration file needs to be uploaded
@@ -1048,7 +1072,7 @@ via FLUME_CLASSPATH variable in flume-en
 </tr>
 <tr class="row-odd"><td><strong>connectionFactory</strong></td>
 <td>&#8211;</td>
-<td>The JNDI name the connection factory shoulld appear as</td>
+<td>The JNDI name the connection factory should appear as</td>
 </tr>
 <tr class="row-even"><td><strong>providerURL</strong></td>
 <td>&#8211;</td>
@@ -1150,9 +1174,9 @@ cases in which events may be duplicated
 This is consistent with the guarantees offered by other Flume components.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="6%" />
-<col width="4%" />
-<col width="89%" />
+<col width="18%" />
+<col width="10%" />
+<col width="72%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -1197,39 +1221,60 @@ This is consistent with the guarantees o
 <td>basename</td>
 <td>Header Key to use when appending  basename of file to event header.</td>
 </tr>
-<tr class="row-odd"><td>ignorePattern</td>
+<tr class="row-odd"><td>includePattern</td>
+<td>^.*$</td>
+<td>Regular expression specifying which files to include.
+It can used together with <tt class="docutils literal"><span 
class="pre">ignorePattern</span></tt>.
+If a file matches both <tt class="docutils literal"><span 
class="pre">ignorePattern</span></tt> and <tt class="docutils literal"><span 
class="pre">includePattern</span></tt> regex,
+the file is ignored.</td>
+</tr>
+<tr class="row-even"><td>ignorePattern</td>
 <td>^$</td>
-<td>Regular expression specifying which files to ignore (skip)</td>
+<td>Regular expression specifying which files to ignore (skip).
+It can used together with <tt class="docutils literal"><span 
class="pre">includePattern</span></tt>.
+If a file matches both <tt class="docutils literal"><span 
class="pre">ignorePattern</span></tt> and <tt class="docutils literal"><span 
class="pre">includePattern</span></tt> regex,
+the file is ignored.</td>
 </tr>
-<tr class="row-even"><td>trackerDir</td>
+<tr class="row-odd"><td>trackerDir</td>
 <td>.flumespool</td>
 <td>Directory to store metadata related to processing of files.
 If this path is not an absolute path, then it is interpreted as relative to 
the spoolDir.</td>
 </tr>
-<tr class="row-odd"><td>consumeOrder</td>
+<tr class="row-even"><td>consumeOrder</td>
 <td>oldest</td>
 <td>In which order files in the spooling directory will be consumed <tt 
class="docutils literal"><span class="pre">oldest</span></tt>,
 <tt class="docutils literal"><span class="pre">youngest</span></tt> and <tt 
class="docutils literal"><span class="pre">random</span></tt>. In case of <tt 
class="docutils literal"><span class="pre">oldest</span></tt> and <tt 
class="docutils literal"><span class="pre">youngest</span></tt>, the last 
modified
 time of the files will be used to compare the files. In case of a tie, the file
-with smallest laxicographical order will be consumed first. In case of <tt 
class="docutils literal"><span class="pre">random</span></tt> any
+with smallest lexicographical order will be consumed first. In case of <tt 
class="docutils literal"><span class="pre">random</span></tt> any
 file will be picked randomly. When using <tt class="docutils literal"><span 
class="pre">oldest</span></tt> and <tt class="docutils literal"><span 
class="pre">youngest</span></tt> the whole
 directory will be scanned to pick the oldest/youngest file, which might be 
slow if there
 are a large number of files, while using <tt class="docutils literal"><span 
class="pre">random</span></tt> may cause old files to be consumed
 very late if new files keep coming in the spooling directory.</td>
 </tr>
-<tr class="row-even"><td>maxBackoff</td>
+<tr class="row-odd"><td>pollDelay</td>
+<td>500</td>
+<td>Delay (in milliseconds) used when polling for new files.</td>
+</tr>
+<tr class="row-even"><td>recursiveDirectorySearch</td>
+<td>false</td>
+<td>Whether to monitor sub directories for new files to read.</td>
+</tr>
+<tr class="row-odd"><td>maxBackoff</td>
 <td>4000</td>
-<td>The maximum time (in millis) to wait between consecutive attempts to write 
to the channel(s) if the channel is full. The source will start at a low 
backoff and increase it exponentially each time the channel throws a 
ChannelException, upto the value specified by this parameter.</td>
+<td>The maximum time (in millis) to wait between consecutive attempts to
+write to the channel(s) if the channel is full. The source will start at
+a low backoff and increase it exponentially each time the channel throws a
+ChannelException, upto the value specified by this parameter.</td>
 </tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-even"><td>batchSize</td>
 <td>100</td>
 <td>Granularity at which to batch transfer to the channel</td>
 </tr>
-<tr class="row-even"><td>inputCharset</td>
+<tr class="row-odd"><td>inputCharset</td>
 <td>UTF-8</td>
 <td>Character set used by deserializers that treat the input file as text.</td>
 </tr>
-<tr class="row-odd"><td>decodeErrorPolicy</td>
+<tr class="row-even"><td>decodeErrorPolicy</td>
 <td><tt class="docutils literal"><span class="pre">FAIL</span></tt></td>
 <td>What to do when we see a non-decodable character in the input file.
 <tt class="docutils literal"><span class="pre">FAIL</span></tt>: Throw an 
exception and fail to parse the file.
@@ -1237,37 +1282,37 @@ very late if new files keep coming in th
 typically Unicode U+FFFD.
 <tt class="docutils literal"><span class="pre">IGNORE</span></tt>: Drop the 
unparseable character sequence.</td>
 </tr>
-<tr class="row-even"><td>deserializer</td>
+<tr class="row-odd"><td>deserializer</td>
 <td><tt class="docutils literal"><span class="pre">LINE</span></tt></td>
 <td>Specify the deserializer used to parse the file into events.
 Defaults to parsing each line as an event. The class specified must implement
 <tt class="docutils literal"><span 
class="pre">EventDeserializer.Builder</span></tt>.</td>
 </tr>
-<tr class="row-odd"><td>deserializer.*</td>
+<tr class="row-even"><td>deserializer.*</td>
 <td>&nbsp;</td>
 <td>Varies per event deserializer.</td>
 </tr>
-<tr class="row-even"><td>bufferMaxLines</td>
+<tr class="row-odd"><td>bufferMaxLines</td>
 <td>&#8211;</td>
 <td>(Obselete) This option is now ignored.</td>
 </tr>
-<tr class="row-odd"><td>bufferMaxLineLength</td>
+<tr class="row-even"><td>bufferMaxLineLength</td>
 <td>5000</td>
 <td>(Deprecated) Maximum length of a line in the commit buffer. Use 
deserializer.maxLineLength instead.</td>
 </tr>
-<tr class="row-even"><td>selector.type</td>
+<tr class="row-odd"><td>selector.type</td>
 <td>replicating</td>
 <td>replicating or multiplexing</td>
 </tr>
-<tr class="row-odd"><td>selector.*</td>
+<tr class="row-even"><td>selector.*</td>
 <td>&nbsp;</td>
 <td>Depends on the selector.type value</td>
 </tr>
-<tr class="row-even"><td>interceptors</td>
+<tr class="row-odd"><td>interceptors</td>
 <td>&#8211;</td>
 <td>Space-separated list of interceptors</td>
 </tr>
-<tr class="row-odd"><td>interceptors.*</td>
+<tr class="row-even"><td>interceptors.*</td>
 <td>&nbsp;</td>
 <td>&nbsp;</td>
 </tr>
@@ -1383,11 +1428,125 @@ inefficient compared to <tt class="docut
 </div>
 </div>
 </div>
+<div class="section" id="taildir-source">
+<h4>Taildir Source<a class="headerlink" href="#taildir-source" 
title="Permalink to this headline">¶</a></h4>
+<div class="admonition note">
+<p class="first admonition-title">Note</p>
+<p class="last"><strong>This source is provided as a preview feature. It does 
not work on Windows.</strong></p>
+</div>
+<p>Watch the specified files, and tail them in nearly real-time once detected 
new lines appended to the each files.
+If the new lines are being written, this source will retry reading them in 
wait for the completion of the write.</p>
+<p>This source is reliable and will not miss data even when the tailing files 
rotate.
+It periodically writes the last read position of each files on the given 
position file in JSON format.
+If Flume is stopped or down for some reason, it can restart tailing from the 
position written on the existing position file.</p>
+<p>In other use case, this source can also start tailing from the arbitrary 
position for each files using the given position file.
+When there is no position file on the specified path, it will start tailing 
from the first line of each files by default.</p>
+<p>Files will be consumed in order of their modification time. File with the 
oldest modification time will be consumed first.</p>
+<p>This source does not rename or delete or do any modifications to the file 
being tailed.
+Currently this source does not support tailing binary files. It reads text 
files line by line.</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="19%" />
+<col width="16%" />
+<col width="65%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td><strong>channels</strong></td>
+<td>&#8211;</td>
+<td>&nbsp;</td>
+</tr>
+<tr class="row-odd"><td><strong>type</strong></td>
+<td>&#8211;</td>
+<td>The component type name, needs to be <tt class="docutils literal"><span 
class="pre">TAILDIR</span></tt>.</td>
+</tr>
+<tr class="row-even"><td><strong>filegroups</strong></td>
+<td>&#8211;</td>
+<td>Space-separated list of file groups. Each file group indicates a set of 
files to be tailed.</td>
+</tr>
+<tr class="row-odd"><td><strong>filegroups.&lt;filegroupName&gt;</strong></td>
+<td>&#8211;</td>
+<td>Absolute path of the file group. Regular expression (and not file system 
patterns) can be used for filename only.</td>
+</tr>
+<tr class="row-even"><td>positionFile</td>
+<td>~/.flume/taildir_position.json</td>
+<td>File in JSON format to record the inode, the absolute path and the last 
position of each tailing file.</td>
+</tr>
+<tr class="row-odd"><td>headers.&lt;filegroupName&gt;.&lt;headerKey&gt;</td>
+<td>&#8211;</td>
+<td>Header value which is the set with header key. Multiple headers can be 
specified for one file group.</td>
+</tr>
+<tr class="row-even"><td>byteOffsetHeader</td>
+<td>false</td>
+<td>Whether to add the byte offset of a tailed line to a header called 
&#8216;byteoffset&#8217;.</td>
+</tr>
+<tr class="row-odd"><td>skipToEnd</td>
+<td>false</td>
+<td>Whether to skip the position to EOF in the case of files not written on 
the position file.</td>
+</tr>
+<tr class="row-even"><td>idleTimeout</td>
+<td>120000</td>
+<td>Time (ms) to close inactive files. If the closed file is appended new 
lines to, this source will automatically re-open it.</td>
+</tr>
+<tr class="row-odd"><td>writePosInterval</td>
+<td>3000</td>
+<td>Interval time (ms) to write the last position of each file on the position 
file.</td>
+</tr>
+<tr class="row-even"><td>batchSize</td>
+<td>100</td>
+<td>Max number of lines to read and send to the channel at a time. Using the 
default is usually fine.</td>
+</tr>
+<tr class="row-odd"><td>backoffSleepIncrement</td>
+<td>1000</td>
+<td>The increment for time delay before reattempting to poll for new data, 
when the last attempt did not find any new data.</td>
+</tr>
+<tr class="row-even"><td>maxBackoffSleep</td>
+<td>5000</td>
+<td>The max time delay between each reattempt to poll for new data, when the 
last attempt did not find any new data.</td>
+</tr>
+<tr class="row-odd"><td>cachePatternMatching</td>
+<td>true</td>
+<td>Listing directories and applying the filename regex pattern may be time 
consuming for directories
+containing thousands of files. Caching the list of matching files can improve 
performance.
+The order in which files are consumed will also be cached.
+Requires that the file system keeps track of modification times with at least 
a 1-second granularity.</td>
+</tr>
+<tr class="row-even"><td>fileHeader</td>
+<td>false</td>
+<td>Whether to add a header storing the absolute path filename.</td>
+</tr>
+<tr class="row-odd"><td>fileHeaderKey</td>
+<td>file</td>
+<td>Header key to use when appending absolute path filename to event 
header.</td>
+</tr>
+</tbody>
+</table>
+<p>Example for agent named a1:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.sources</span> <span class="o">=</span> <span class="s">r1</span>
+<span class="na">a1.channels</span> <span class="o">=</span> <span 
class="s">c1</span>
+<span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span 
class="s">TAILDIR</span>
+<span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span 
class="s">c1</span>
+<span class="na">a1.sources.r1.positionFile</span> <span class="o">=</span> 
<span class="s">/var/log/flume/taildir_position.json</span>
+<span class="na">a1.sources.r1.filegroups</span> <span class="o">=</span> 
<span class="s">f1 f2</span>
+<span class="na">a1.sources.r1.filegroups.f1</span> <span class="o">=</span> 
<span class="s">/var/log/test1/example.log</span>
+<span class="na">a1.sources.r1.headers.f1.headerKey1</span> <span 
class="o">=</span> <span class="s">value1</span>
+<span class="na">a1.sources.r1.filegroups.f2</span> <span class="o">=</span> 
<span class="s">/var/log/test2/.*log.*</span>
+<span class="na">a1.sources.r1.headers.f2.headerKey1</span> <span 
class="o">=</span> <span class="s">value2</span>
+<span class="na">a1.sources.r1.headers.f2.headerKey2</span> <span 
class="o">=</span> <span class="s">value2-2</span>
+<span class="na">a1.sources.r1.fileHeader</span> <span class="o">=</span> 
<span class="s">true</span>
+</pre></div>
+</div>
+</div>
 <div class="section" id="twitter-1-firehose-source-experimental">
 <h4>Twitter 1% firehose Source (experimental)<a class="headerlink" 
href="#twitter-1-firehose-source-experimental" title="Permalink to this 
headline">¶</a></h4>
 <div class="admonition warning">
 <p class="first admonition-title">Warning</p>
-<p class="last">This source is hightly experimental and may change between 
minor versions of Flume.
+<p class="last">This source is highly experimental and may change between 
minor versions of Flume.
 Use at your own risk.</p>
 </div>
 <p>Experimental source that connects via Streaming API to the 1% sample twitter
@@ -1430,7 +1589,7 @@ Required properties are in <strong>bold<
 </tr>
 <tr class="row-odd"><td><strong>accessTokenSecret</strong></td>
 <td>&#8211;</td>
-<td>OAuth toekn secret</td>
+<td>OAuth token secret</td>
 </tr>
 <tr class="row-even"><td>maxBatchSize</td>
 <td>1000</td>
@@ -1458,14 +1617,14 @@ Required properties are in <strong>bold<
 </div>
 <div class="section" id="kafka-source">
 <h4>Kafka Source<a class="headerlink" href="#kafka-source" title="Permalink to 
this headline">¶</a></h4>
-<p>Kafka Source is an Apache Kafka consumer that reads messages from a Kafka 
topic.
+<p>Kafka Source is an Apache Kafka consumer that reads messages from Kafka 
topics.
 If you have multiple Kafka sources running, you can configure them with the 
same Consumer Group
-so each will read a unique set of partitions for the topic.</p>
+so each will read a unique set of partitions for the topics.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="21%" />
-<col width="8%" />
-<col width="71%" />
+<col width="19%" />
+<col width="6%" />
+<col width="75%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -1480,68 +1639,245 @@ so each will read a unique set of partit
 </tr>
 <tr class="row-odd"><td><strong>type</strong></td>
 <td>&#8211;</td>
-<td>The component type name, needs to be <tt class="docutils literal"><span 
class="pre">org.apache.flume.source.kafka,KafkaSource</span></tt></td>
+<td>The component type name, needs to be <tt class="docutils literal"><span 
class="pre">org.apache.flume.source.kafka.KafkaSource</span></tt></td>
 </tr>
-<tr class="row-even"><td><strong>zookeeperConnect</strong></td>
+<tr class="row-even"><td><strong>kafka.bootstrap.servers</strong></td>
 <td>&#8211;</td>
-<td>URI of ZooKeeper used by Kafka cluster</td>
+<td>List of brokers in the Kafka cluster used by the source</td>
 </tr>
-<tr class="row-odd"><td><strong>groupId</strong></td>
+<tr class="row-odd"><td>kafka.consumer.group.id</td>
 <td>flume</td>
 <td>Unique identified of consumer group. Setting the same id in multiple 
sources or agents
 indicates that they are part of the same consumer group</td>
 </tr>
-<tr class="row-even"><td><strong>topic</strong></td>
+<tr class="row-even"><td><strong>kafka.topics</strong></td>
 <td>&#8211;</td>
-<td>Kafka topic we&#8217;ll read messages from. At the time, this is a single 
topic only.</td>
+<td>Comma-separated list of topics the kafka consumer will read messages 
from.</td>
 </tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-odd"><td><strong>kafka.topics.regex</strong></td>
+<td>&#8211;</td>
+<td>Regex that defines set of topics the source is subscribed on. This 
property has higher priority
+than <tt class="docutils literal"><span class="pre">kafka.topics</span></tt> 
and overrides <tt class="docutils literal"><span 
class="pre">kafka.topics</span></tt> if exists.</td>
+</tr>
+<tr class="row-even"><td>batchSize</td>
 <td>1000</td>
 <td>Maximum number of messages written to Channel in one batch</td>
 </tr>
-<tr class="row-even"><td>batchDurationMillis</td>
+<tr class="row-odd"><td>batchDurationMillis</td>
 <td>1000</td>
 <td>Maximum time (in ms) before a batch will be written to Channel
 The batch will be written whenever the first of size and time will be 
reached.</td>
 </tr>
-<tr class="row-odd"><td>backoffSleepIncrement</td>
+<tr class="row-even"><td>backoffSleepIncrement</td>
 <td>1000</td>
 <td>Initial and incremental wait time that is triggered when a Kafka Topic 
appears to be empty.
 Wait period will reduce aggressive pinging of an empty Kafka Topic.  One 
second is ideal for
 ingestion use cases but a lower value may be required for low latency 
operations with
 interceptors.</td>
 </tr>
-<tr class="row-even"><td>maxBackoffSleep</td>
+<tr class="row-odd"><td>maxBackoffSleep</td>
 <td>5000</td>
 <td>Maximum wait time that is triggered when a Kafka Topic appears to be 
empty.  Five seconds is
 ideal for ingestion use cases but a lower value may be required for low 
latency operations
 with interceptors.</td>
 </tr>
-<tr class="row-odd"><td>Other Kafka Consumer Properties</td>
-<td>&#8211;</td>
-<td>These properties are used to configure the Kafka Consumer. Any producer 
property supported
-by Kafka can be used. The only requirement is to prepend the property name 
with the prefix <tt class="docutils literal"><span 
class="pre">kafka.</span></tt>.
-For example: kafka.consumer.timeout.ms
-Check <cite>Kafka documentation 
&lt;https://kafka.apache.org/08/configuration.html#consumerconfigs&gt;</cite> 
for details</td>
+<tr class="row-even"><td>useFlumeEventFormat</td>
+<td>false</td>
+<td>By default events are taken as bytes from the Kafka topic directly into 
the event body. Set to
+true to read events as the Flume Avro binary format. Used in conjunction with 
the same property
+on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel 
this will preserve
+any Flume headers sent on the producing side.</td>
+</tr>
+<tr class="row-odd"><td>migrateZookeeperOffsets</td>
+<td>true</td>
+<td>When no Kafka stored offset is found, look up the offsets in Zookeeper and 
commit them to Kafka.
+This should be true to support seamless Kafka client migration from older 
versions of Flume.
+Once migrated this can be set to false, though that should generally not be 
required.
+If no Zookeeper offset is found, the Kafka configuration 
kafka.consumer.auto.offset.reset
+defines how offsets are handled.
+Check <a class="reference external" 
href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>Kafka 
documentation</a> for details</td>
+</tr>
+<tr class="row-even"><td>kafka.consumer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some 
level of security. See below for additional info on secure setup.</td>
+</tr>
+<tr class="row-odd"><td><em>more consumer security props</em></td>
+<td>&nbsp;</td>
+<td>If using SASL_PLAINTEXT, SASL_SSL or SSL refer to <a class="reference 
external" href="http://kafka.apache.org/documentation.html#security";>Kafka 
security</a> for additional
+properties that need to be set on consumer.</td>
+</tr>
+<tr class="row-even"><td>Other Kafka Consumer Properties</td>
+<td>&#8211;</td>
+<td>These properties are used to configure the Kafka Consumer. Any consumer 
property supported
+by Kafka can be used. The only requirement is to prepend the property name 
with the prefix
+<tt class="docutils literal"><span class="pre">kafka.consumer</span></tt>.
+For example: <tt class="docutils literal"><span 
class="pre">kafka.consumer.auto.offset.reset</span></tt></td>
 </tr>
 </tbody>
 </table>
 <div class="admonition note">
 <p class="first admonition-title">Note</p>
 <p class="last">The Kafka Source overrides two Kafka consumer parameters:
-auto.commit.enable is set to &#8220;false&#8221; by the source and we commit 
every batch. For improved performance
-this can be set to &#8220;true&#8221;, however, this can lead to loss of data
-consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we 
wait at most 10ms for the data to arrive
-setting this to a higher value can reduce CPU utilization (we&#8217;ll poll 
Kafka in less of a tight loop), but also means
-higher latency in writing batches to channel (since we&#8217;ll wait longer 
for data to arrive).</p>
+auto.commit.enable is set to &#8220;false&#8221; by the source and every batch 
is committed. Kafka source guarantees at least once
+strategy of messages retrieval. The duplicates can be present when the source 
starts.
+The Kafka Source also provides defaults for the 
key.deserializer(org.apache.kafka.common.serialization.StringSerializer)
+and 
value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer). 
Modification of these parameters is not recommended.</p>
+</div>
+<p>Deprecated Properties</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="22%" />
+<col width="13%" />
+<col width="65%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td>topic</td>
+<td>&#8211;</td>
+<td>Use kafka.topics</td>
+</tr>
+<tr class="row-odd"><td>groupId</td>
+<td>flume</td>
+<td>Use kafka.consumer.group.id</td>
+</tr>
+<tr class="row-even"><td>zookeeperConnect</td>
+<td>&#8211;</td>
+<td>Is no longer supported by kafka consumer client since 0.9.x. Use 
kafka.bootstrap.servers
+to establish connection with kafka cluster</td>
+</tr>
+</tbody>
+</table>
+<p>Example for topic subscription by comma-separated topic list.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">tier1.sources.source1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.source.kafka.KafkaSource</span>
+<span class="na">tier1.sources.source1.channels</span> <span 
class="o">=</span> <span class="s">channel1</span>
+<span class="na">tier1.sources.source1.batchSize</span> <span 
class="o">=</span> <span class="s">5000</span>
+<span class="na">tier1.sources.source1.batchDurationMillis</span> <span 
class="o">=</span> <span class="s">2000</span>
+<span class="na">tier1.sources.source1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">localhost:9092</span>
+<span class="na">tier1.sources.source1.kafka.topics</span> <span 
class="o">=</span> <span class="s">test1, test2</span>
+<span class="na">tier1.sources.source1.kafka.consumer.group.id</span> <span 
class="o">=</span> <span class="s">custom.g.id</span>
+</pre></div>
 </div>
-<p>Example for agent named tier1:</p>
+<p>Example for topic subscription by regex</p>
 <div class="highlight-properties"><div class="highlight"><pre><span 
class="na">tier1.sources.source1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.source.kafka.KafkaSource</span>
 <span class="na">tier1.sources.source1.channels</span> <span 
class="o">=</span> <span class="s">channel1</span>
-<span class="na">tier1.sources.source1.zookeeperConnect</span> <span 
class="o">=</span> <span class="s">localhost:2181</span>
-<span class="na">tier1.sources.source1.topic</span> <span class="o">=</span> 
<span class="s">test1</span>
-<span class="na">tier1.sources.source1.groupId</span> <span class="o">=</span> 
<span class="s">flume</span>
-<span class="na">tier1.sources.source1.kafka.consumer.timeout.ms</span> <span 
class="o">=</span> <span class="s">100</span>
+<span class="na">tier1.sources.source1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">localhost:9092</span>
+<span class="na">tier1.sources.source1.kafka.topics.regex</span> <span 
class="o">=</span> <span class="s">^topic[0-9]$</span>
+<span class="c"># the default kafka.consumer.group.id=flume is used</span>
+</pre></div>
+</div>
+<p><strong>Security and Kafka Source:</strong></p>
+<p>Secure authentication as well as data encryption is supported on the 
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the 
parameter is named SSL, the actual protocol is a TLS implementation) can be 
used from Kafka version 0.9.0.</p>
+<p>As of now data encryption is solely provided by SSL/TLS.</p>
+<p>Setting <tt class="docutils literal"><span 
class="pre">kafka.consumer.security.protocol</span></tt> to any of the 
following value means:</p>
+<ul class="simple">
+<li><strong>SASL_PLAINTEXT</strong> - Kerberos or plaintext authentication 
with no data encryption</li>
+<li><strong>SASL_SSL</strong> - Kerberos or plaintext authentication with data 
encryption</li>
+<li><strong>SSL</strong> - TLS based encryption with optional 
authentication.</li>
+</ul>
+<div class="admonition warning">
+<p class="first admonition-title">Warning</p>
+<p class="last">There is a performance degradation when SSL is enabled,
+the magnitude of which depends on the CPU type and the JVM implementation.
+Reference: <a class="reference external" 
href="http://kafka.apache.org/documentation#security_overview";>Kafka security 
overview</a>
+and the jira for tracking this issue:
+<a class="reference external" 
href="https://issues.apache.org/jira/browse/KAFKA-2561";>KAFKA-2561</a></p>
+</div>
+<p><strong>TLS and Kafka Source:</strong></p>
+<p>Please read the steps described in <a class="reference external" 
href="http://kafka.apache.org/documentation#security_configclients";>Configuring 
Kafka Clients SSL</a>
+to learn about additional configuration settings for fine tuning for example 
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore 
types.</p>
+<p>Example configuration with server side authentication and data 
encryption.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span 
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span 
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> 
<span class="o">=</span> <span class="s">SSL</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span><span
 class="o">=</span><span class="s">/path/to/truststore.jks</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.password</span><span
 class="o">=</span><span class="s">&lt;password to access the 
truststore&gt;</span>
+</pre></div>
+</div>
+<p>Note: By default the property <tt class="docutils literal"><span 
class="pre">ssl.endpoint.identification.algorithm</span></tt>
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm</span><span
 class="o">=</span><span class="s">HTTPS</span>
+</pre></div>
+</div>
+<p>Once enabled, clients will verify the server&#8217;s fully qualified domain 
name (FQDN)
+against one of the following two fields:</p>
+<ol class="arabic simple">
+<li>Common Name (CN) <a class="reference external" 
href="https://tools.ietf.org/html/rfc6125#section-2.3";>https://tools.ietf.org/html/rfc6125#section-2.3</a></li>
+<li>Subject Alternative Name (SAN) <a class="reference external" 
href="https://tools.ietf.org/html/rfc5280#section-4.2.1.6";>https://tools.ietf.org/html/rfc5280#section-4.2.1.6</a></li>
+</ol>
+<p>If client side authentication is also required then additionally the 
following should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by 
Kafka brokers either
+individually or by their signature chain. Common example is to sign each 
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.location</span><span
 class="o">=</span><span class="s">/path/to/client.keystore.jks</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.password</span><span
 class="o">=</span><span class="s">&lt;password to access the 
keystore&gt;</span>
+</pre></div>
+</div>
+<p>If keystore and key use different password protection then <tt 
class="docutils literal"><span class="pre">ssl.key.password</span></tt> 
property will
+provide the required additional secret for both consumer keystores:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.kafka.consumer.ssl.key.password</span><span 
class="o">=</span><span class="s">&lt;password to access the key&gt;</span>
+</pre></div>
+</div>
+<p><strong>Kerberos and Kafka Source:</strong></p>
+<p>To use Kafka source with a Kafka cluster secured with Kerberos, set the <tt 
class="docutils literal"><span 
class="pre">consumer.security.protocol</span></tt> properties noted above for 
consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified 
in a JAAS file&#8217;s &#8220;KafkaClient&#8221; section. &#8220;Client&#8221; 
section describes the Zookeeper connection if needed.
+See <a class="reference external" 
href="http://kafka.apache.org/documentation.html#security_sasl_clientconfig";>Kafka
 doc</a>
+for information on the JAAS file contents. The location of this JAAS file and 
optionally the system wide kerberos configuration can be specified via 
JAVA_OPTS in flume-env.sh:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">JAVA_OPTS</span><span class="o">=</span><span 
class="s">&quot;$JAVA_OPTS 
-Djava.security.krb5.conf=/path/to/krb5.conf&quot;</span>
+<span class="na">JAVA_OPTS</span><span class="o">=</span><span 
class="s">&quot;$JAVA_OPTS 
-Djava.security.auth.login.config=/path/to/flume_jaas.conf&quot;</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_PLAINTEXT:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span 
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span 
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> 
<span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span> 
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span>
 <span class="o">=</span> <span class="s">kafka</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_SSL:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span 
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span 
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> 
<span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span> 
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span>
 <span class="o">=</span> <span class="s">kafka</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span><span
 class="o">=</span><span class="s">/path/to/truststore.jks</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.password</span><span
 class="o">=</span><span class="s">&lt;password to access the 
truststore&gt;</span>
+</pre></div>
+</div>
+<p>Sample JAAS file. For reference of its content please see client config 
sections of the desired authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of <a class="reference external" 
href="http://kafka.apache.org/documentation#security_sasl_clientconfig";>SASL 
configuration</a>.
+Since the Kafka Source may also connect to Zookeeper for offset migration, the 
&#8220;Client&#8221; section was also added to this example.
+This won&#8217;t be needed unless you require offset migration, or you require 
this section for other secure components.
+Also please make sure that the operating system user of the Flume processes 
has read privileges on the jaas and keytab files.</p>
+<div class="highlight-javascript"><div class="highlight"><pre><span 
class="nx">Client</span> <span class="p">{</span>
+  <span class="nx">com</span><span class="p">.</span><span 
class="nb">sun</span><span class="p">.</span><span 
class="nx">security</span><span class="p">.</span><span 
class="nx">auth</span><span class="p">.</span><span 
class="nx">module</span><span class="p">.</span><span 
class="nx">Krb5LoginModule</span> <span class="nx">required</span>
+  <span class="nx">useKeyTab</span><span class="o">=</span><span 
class="kc">true</span>
+  <span class="nx">storeKey</span><span class="o">=</span><span 
class="kc">true</span>
+  <span class="nx">keyTab</span><span class="o">=</span><span 
class="s2">&quot;/path/to/keytabs/flume.keytab&quot;</span>
+  <span class="nx">principal</span><span class="o">=</span><span 
class="s2">&quot;flume/flumehost1.example.com@YOURKERBEROSREALM&quot;</span><span
 class="p">;</span>
+<span class="p">};</span>
+
+<span class="nx">KafkaClient</span> <span class="p">{</span>
+  <span class="nx">com</span><span class="p">.</span><span 
class="nb">sun</span><span class="p">.</span><span 
class="nx">security</span><span class="p">.</span><span 
class="nx">auth</span><span class="p">.</span><span 
class="nx">module</span><span class="p">.</span><span 
class="nx">Krb5LoginModule</span> <span class="nx">required</span>
+  <span class="nx">useKeyTab</span><span class="o">=</span><span 
class="kc">true</span>
+  <span class="nx">storeKey</span><span class="o">=</span><span 
class="kc">true</span>
+  <span class="nx">keyTab</span><span class="o">=</span><span 
class="s2">&quot;/path/to/keytabs/flume.keytab&quot;</span>
+  <span class="nx">principal</span><span class="o">=</span><span 
class="s2">&quot;flume/flumehost1.example.com@YOURKERBEROSREALM&quot;</span><span
 class="p">;</span>
+<span class="p">};</span>
 </pre></div>
 </div>
 </div>
@@ -1613,21 +1949,21 @@ Flume event and sent via the connected c
 <span class="na">a1.channels</span> <span class="o">=</span> <span 
class="s">c1</span>
 <span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span 
class="s">netcat</span>
 <span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span 
class="s">0.0.0.0</span>
-<span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span 
class="s">6666</span>
+<span class="na">a1.sources.r1.port</span> <span class="o">=</span> <span 
class="s">6666</span>
 <span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span 
class="s">c1</span>
 </pre></div>
 </div>
 </div>
 <div class="section" id="sequence-generator-source">
 <h4>Sequence Generator Source<a class="headerlink" 
href="#sequence-generator-source" title="Permalink to this headline">¶</a></h4>
-<p>A simple sequence generator that continuously generates events with a 
counter
-that starts from 0 and increments by 1. Useful mainly for testing.
-Required properties are in <strong>bold</strong>.</p>
+<p>A simple sequence generator that continuously generates events with a 
counter that starts from 0,
+increments by 1 and stops at totalEvents. Retries when it can&#8217;t send 
events to the channel. Useful
+mainly for testing. Required properties are in <strong>bold</strong>.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="20%" />
-<col width="16%" />
-<col width="64%" />
+<col width="19%" />
+<col width="21%" />
+<col width="60%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -1664,6 +2000,10 @@ Required properties are in <strong>bold<
 <td>1</td>
 <td>&nbsp;</td>
 </tr>
+<tr class="row-odd"><td>totalEvents</td>
+<td>Long.MAX_VALUE</td>
+<td>Number of unique events sent by the source.</td>
+</tr>
 </tbody>
 </table>
 <p>Example for agent named a1:</p>
@@ -2395,8 +2735,8 @@ required.</p>
 <p>The following are the escape sequences supported:</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="10%" />
-<col width="90%" />
+<col width="15%" />
+<col width="85%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Alias</th>
@@ -2473,8 +2813,19 @@ required.</p>
 <tr class="row-even"><td>%z</td>
 <td>+hhmm numeric timezone (for example, -0400)</td>
 </tr>
+<tr class="row-odd"><td>%[localhost]</td>
+<td>Substitute the hostname of the host where the agent is running</td>
+</tr>
+<tr class="row-even"><td>%[IP]</td>
+<td>Substitute the IP address of the host where the agent is running</td>
+</tr>
+<tr class="row-odd"><td>%[FQDN]</td>
+<td>Substitute the canonical hostname of the host where the agent is 
running</td>
+</tr>
 </tbody>
 </table>
+<p>Note: The escape strings %[localhost], %[IP] and %[FQDN] all rely on 
Java&#8217;s ability to obtain the
+hostname, which may fail in some networking environments.</p>
 <p>The file in use will have the name mangled to include &#8221;.tmp&#8221; at 
the end. Once
 the file is closed, this extension is removed. This allows excluding partially
 complete files in the directory.
@@ -2662,8 +3013,7 @@ timestamp 11:54:34 AM, June 12, 2012 wil
 Events are written using Hive transactions. As soon as a set of events are 
committed to Hive, they become
 immediately visible to Hive queries. Partitions to which flume will stream to 
can either be pre-created
 or, optionally, Flume can create them if they are missing. Fields from 
incoming event data are mapped to
-corresponding columns in the Hive table. <strong>This sink is provided as a 
preview feature and not recommended
-for use in production.</strong></p>
+corresponding columns in the Hive table.</p>
 <table border="1" class="docutils">
 <colgroup>
 <col width="15%" />
@@ -2918,8 +3268,9 @@ accept tab separated input containing th
 </div>
 <div class="section" id="logger-sink">
 <h4>Logger Sink<a class="headerlink" href="#logger-sink" title="Permalink to 
this headline">¶</a></h4>
-<p>Logs event at INFO level. Typically useful for testing/debugging purpose.
-Required properties are in <strong>bold</strong>.</p>
+<p>Logs event at INFO level. Typically useful for testing/debugging purpose. 
Required properties are
+in <strong>bold</strong>. This sink is the only exception which doesn&#8217;t 
require the extra configuration
+explained in the <a class="reference internal" 
href="#logging-raw-data">Logging raw data</a> section.</p>
 <table border="1" class="docutils">
 <colgroup>
 <col width="20%" />
@@ -3243,9 +3594,9 @@ backslash, like this: &#8220;\n&#8221;)<
 Required properties are in <strong>bold</strong>.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="13%" />
+<col width="17%" />
 <col width="5%" />
-<col width="82%" />
+<col width="78%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -3266,15 +3617,27 @@ Required properties are in <strong>bold<
 <td>&#8211;</td>
 <td>The directory where files will be stored</td>
 </tr>
-<tr class="row-odd"><td>sink.rollInterval</td>
+<tr class="row-odd"><td>sink.pathManager</td>
+<td>DEFAULT</td>
+<td>The PathManager implementation to use.</td>
+</tr>
+<tr class="row-even"><td>sink.pathManager.extension</td>
+<td>&#8211;</td>
+<td>The file extension if the default PathManager is used.</td>
+</tr>
+<tr class="row-odd"><td>sink.pathManager.prefix</td>
+<td>&#8211;</td>
+<td>A character string to add to the beginning of the file name if the default 
PathManager is used</td>
+</tr>
+<tr class="row-even"><td>sink.rollInterval</td>
 <td>30</td>
 <td>Roll the file every 30 seconds. Specifying 0 will disable rolling and 
cause all events to be written to a single file.</td>
 </tr>
-<tr class="row-even"><td>sink.serializer</td>
+<tr class="row-odd"><td>sink.serializer</td>
 <td>TEXT</td>
 <td>Other possible options include <tt class="docutils literal"><span 
class="pre">avro_event</span></tt> or the FQCN of an implementation of 
EventSerializer.Builder interface.</td>
 </tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-even"><td>batchSize</td>
 <td>100</td>
 <td>&nbsp;</td>
 </tr>
@@ -3827,13 +4190,14 @@ the kerberos principal</td>
 <p>This is a Flume Sink implementation that can publish data to a
 <a class="reference external" href="http://kafka.apache.org/";>Kafka</a> topic. 
One of the objective is to integrate Flume
 with Kafka so that pull based processing systems can process the data coming
-through various Flume sources. This currently supports Kafka 0.8.x series of 
releases.</p>
+through various Flume sources. This currently supports Kafka 0.9.x series of 
releases.</p>
+<p>This version of Flume no longer supports Older Versions (0.8.x) of 
Kafka.</p>
 <p>Required properties are marked in bold font.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="20%" />
-<col width="12%" />
-<col width="68%" />
+<col width="18%" />
+<col width="10%" />
+<col width="72%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -3846,34 +4210,65 @@ through various Flume sources. This curr
 <td>&#8211;</td>
 <td>Must be set to <tt class="docutils literal"><span 
class="pre">org.apache.flume.sink.kafka.KafkaSink</span></tt></td>
 </tr>
-<tr class="row-odd"><td><strong>brokerList</strong></td>
+<tr class="row-odd"><td><strong>kafka.bootstrap.servers</strong></td>
 <td>&#8211;</td>
 <td>List of brokers Kafka-Sink will connect to, to get the list of topic 
partitions
 This can be a partial list of brokers, but we recommend at least two for HA.
 The format is comma separated list of hostname:port</td>
 </tr>
-<tr class="row-even"><td>topic</td>
+<tr class="row-even"><td>kafka.topic</td>
 <td>default-flume-topic</td>
 <td>The topic in Kafka to which the messages will be published. If this 
parameter is configured,
 messages will be published to this topic.
 If the event header contains a &#8220;topic&#8221; field, the event will be 
published to that topic
 overriding the topic configured here.</td>
 </tr>
-<tr class="row-odd"><td>batchSize</td>
+<tr class="row-odd"><td>flumeBatchSize</td>
 <td>100</td>
 <td>How many messages to process in one batch. Larger batches improve 
throughput while adding latency.</td>
 </tr>
-<tr class="row-even"><td>requiredAcks</td>
+<tr class="row-even"><td>kafka.producer.acks</td>
 <td>1</td>
 <td>How many replicas must acknowledge a message before its considered 
successfully written.
 Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader 
only), -1 (wait for all replicas)
 Set this to -1 to avoid data loss in some cases of leader failure.</td>
 </tr>
-<tr class="row-odd"><td>Other Kafka Producer Properties</td>
+<tr class="row-odd"><td>useFlumeEventFormat</td>
+<td>false</td>
+<td>By default events are put as bytes onto the Kafka topic directly from the 
event body. Set to
+true to store events as the Flume Avro binary format. Used in conjunction with 
the same property
+on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel 
this will preserve
+any Flume headers for the producing side.</td>
+</tr>
+<tr class="row-even"><td>defaultPartitionId</td>
+<td>&#8211;</td>
+<td>Specifies a Kafka partition ID (integer) for all events in this channel to 
be sent to, unless
+overriden by <tt class="docutils literal"><span 
class="pre">partitionIdHeader</span></tt>. By default, if this property is not 
set, events will be
+distributed by the Kafka Producer&#8217;s partitioner - including by <tt 
class="docutils literal"><span class="pre">key</span></tt> if specified (or by a
+partitioner specified by <tt class="docutils literal"><span 
class="pre">kafka.partitioner.class</span></tt>).</td>
+</tr>
+<tr class="row-odd"><td>partitionIdHeader</td>
+<td>&#8211;</td>
+<td>When set, the sink will take the value of the field named using the value 
of this property
+from the event header and send the message to the specified partition of the 
topic. If the
+value represents an invalid partition, an EventDeliveryException will be 
thrown. If the header value
+is present then this setting overrides <tt class="docutils literal"><span 
class="pre">defaultPartitionId</span></tt>.</td>
+</tr>
+<tr class="row-even"><td>kafka.producer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some 
level of security. See below for additional info on secure setup.</td>
+</tr>
+<tr class="row-odd"><td><em>more producer security props</em></td>
+<td>&nbsp;</td>
+<td>If using SASL_PLAINTEXT, SASL_SSL or SSL refer to <a class="reference 
external" href="http://kafka.apache.org/documentation.html#security";>Kafka 
security</a> for additional
+properties that need to be set on producer.</td>
+</tr>
+<tr class="row-even"><td>Other Kafka Producer Properties</td>
 <td>&#8211;</td>
 <td>These properties are used to configure the Kafka Producer. Any producer 
property supported
-by Kafka can be used. The only requirement is to prepend the property name 
with the prefix <tt class="docutils literal"><span 
class="pre">kafka.</span></tt>.
-For example: kafka.producer.type</td>
+by Kafka can be used. The only requirement is to prepend the property name 
with the prefix
+<tt class="docutils literal"><span class="pre">kafka.producer</span></tt>.
+For example: kafka.producer.linger.ms</td>
 </tr>
 </tbody>
 </table>
@@ -3884,19 +4279,152 @@ If <tt class="docutils literal"><span cl
 If <tt class="docutils literal"><span class="pre">key</span></tt> exists in 
the headers, the key will used by Kafka to partition the data between the topic 
partitions. Events with same key
 will be sent to the same partition. If the key is null, events will be sent to 
random partitions.</p>
 </div>
+<p>The Kafka sink also provides defaults for the 
key.serializer(org.apache.kafka.common.serialization.StringSerializer)
+and 
value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer). 
Modification of these parameters is not recommended.</p>
+<p>Deprecated Properties</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="22%" />
+<col width="13%" />
+<col width="65%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td>brokerList</td>
+<td>&#8211;</td>
+<td>Use kafka.bootstrap.servers</td>
+</tr>
+<tr class="row-odd"><td>topic</td>
+<td>default-flume-topic</td>
+<td>Use kafka.topic</td>
+</tr>
+<tr class="row-even"><td>batchSize</td>
+<td>100</td>
+<td>Use kafka.flumeBatchSize</td>
+</tr>
+<tr class="row-odd"><td>requiredAcks</td>
+<td>1</td>
+<td>Use kafka.producer.acks</td>
+</tr>
+</tbody>
+</table>
 <p>An example configuration of a Kafka sink is given below. Properties starting
-with the prefix <tt class="docutils literal"><span 
class="pre">kafka</span></tt> (the last 3 properties) are used when 
instantiating
-the Kafka producer. The properties that are passed when creating the Kafka
+with the prefix <tt class="docutils literal"><span 
class="pre">kafka.producer</span></tt> the Kafka producer. The properties that 
are passed when creating the Kafka
 producer are not limited to the properties given in this example.
-Also it&#8217;s possible include your custom properties here and access them 
inside
+Also it is possible to include your custom properties here and access them 
inside
 the preprocessor through the Flume Context object passed in as a method
 argument.</p>
-<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.sink.kafka.KafkaSink</span>
-<span class="na">a1.sinks.k1.topic</span> <span class="o">=</span> <span 
class="s">mytopic</span>
-<span class="na">a1.sinks.k1.brokerList</span> <span class="o">=</span> <span 
class="s">localhost:9092</span>
-<span class="na">a1.sinks.k1.requiredAcks</span> <span class="o">=</span> 
<span class="s">1</span>
-<span class="na">a1.sinks.k1.batchSize</span> <span class="o">=</span> <span 
class="s">20</span>
-<span class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span 
class="s">c1</span>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span 
class="s">c1</span>
+<span class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.sink.kafka.KafkaSink</span>
+<span class="na">a1.sinks.k1.kafka.topic</span> <span class="o">=</span> <span 
class="s">mytopic</span>
+<span class="na">a1.sinks.k1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">localhost:9092</span>
+<span class="na">a1.sinks.k1.kafka.flumeBatchSize</span> <span 
class="o">=</span> <span class="s">20</span>
+<span class="na">a1.sinks.k1.kafka.producer.acks</span> <span 
class="o">=</span> <span class="s">1</span>
+<span class="na">a1.sinks.k1.kafka.producer.linger.ms</span> <span 
class="o">=</span> <span class="s">1</span>
+<span class="na">a1.sinks.ki.kafka.producer.compression.type</span> <span 
class="o">=</span> <span class="s">snappy</span>
+</pre></div>
+</div>
+<p><strong>Security and Kafka Sink:</strong></p>
+<p>Secure authentication as well as data encryption is supported on the 
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the 
parameter is named SSL, the actual protocol is a TLS implementation) can be 
used from Kafka version 0.9.0.</p>
+<p>As of now data encryption is solely provided by SSL/TLS.</p>
+<p>Setting <tt class="docutils literal"><span 
class="pre">kafka.producer.security.protocol</span></tt> to any of the 
following value means:</p>
+<ul class="simple">
+<li><strong>SASL_PLAINTEXT</strong> - Kerberos or plaintext authentication 
with no data encryption</li>
+<li><strong>SASL_SSL</strong> - Kerberos or plaintext authentication with data 
encryption</li>
+<li><strong>SSL</strong> - TLS based encryption with optional 
authentication.</li>
+</ul>
+<div class="admonition warning">
+<p class="first admonition-title">Warning</p>
+<p class="last">There is a performance degradation when SSL is enabled,
+the magnitude of which depends on the CPU type and the JVM implementation.
+Reference: <a class="reference external" 
href="http://kafka.apache.org/documentation#security_overview";>Kafka security 
overview</a>
+and the jira for tracking this issue:
+<a class="reference external" 
href="https://issues.apache.org/jira/browse/KAFKA-2561";>KAFKA-2561</a></p>
+</div>
+<p><strong>TLS and Kafka Sink:</strong></p>
+<p>Please read the steps described in <a class="reference external" 
href="http://kafka.apache.org/documentation#security_configclients";>Configuring 
Kafka Clients SSL</a>
+to learn about additional configuration settings for fine tuning for example 
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore 
types.</p>
+<p>Example configuration with server side authentication and data 
encryption.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span 
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> 
<span class="o">=</span> <span class="s">SSL</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span> 
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span> 
<span class="o">=</span> <span class="s">&lt;password to access the 
truststore&gt;</span>
+</pre></div>
+</div>
+<p>Note: By default the property <tt class="docutils literal"><span 
class="pre">ssl.endpoint.identification.algorithm</span></tt>
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm</span>
 <span class="o">=</span> <span class="s">HTTPS</span>
+</pre></div>
+</div>
+<p>Once enabled, clients will verify the server&#8217;s fully qualified domain 
name (FQDN)
+against one of the following two fields:</p>
+<ol class="arabic simple">
+<li>Common Name (CN) <a class="reference external" 
href="https://tools.ietf.org/html/rfc6125#section-2.3";>https://tools.ietf.org/html/rfc6125#section-2.3</a></li>
+<li>Subject Alternative Name (SAN) <a class="reference external" 
href="https://tools.ietf.org/html/rfc5280#section-4.2.1.6";>https://tools.ietf.org/html/rfc5280#section-4.2.1.6</a></li>
+</ol>
+<p>If client side authentication is also required then additionally the 
following should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by 
Kafka brokers either
+individually or by their signature chain. Common example is to sign each 
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.kafka.producer.ssl.keystore.location</span> 
<span class="o">=</span> <span class="s">/path/to/client.keystore.jks</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.ssl.keystore.password</span> 
<span class="o">=</span> <span class="s">&lt;password to access the 
keystore&gt;</span>
+</pre></div>
+</div>
+<p>If keystore and key use different password protection then <tt 
class="docutils literal"><span class="pre">ssl.key.password</span></tt> 
property will
+provide the required additional secret for producer keystore:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.kafka.producer.ssl.key.password</span> <span 
class="o">=</span> <span class="s">&lt;password to access the key&gt;</span>
+</pre></div>
+</div>
+<p><strong>Kerberos and Kafka Sink:</strong></p>
+<p>To use Kafka sink with a Kafka cluster secured with Kerberos, set the <tt 
class="docutils literal"><span 
class="pre">producer.security.protocol</span></tt> property noted above for 
producer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified 
in a JAAS file&#8217;s &#8220;KafkaClient&#8221; section. &#8220;Client&#8221; 
section describes the Zookeeper connection if needed.
+See <a class="reference external" 
href="http://kafka.apache.org/documentation.html#security_sasl_clientconfig";>Kafka
 doc</a>
+for information on the JAAS file contents. The location of this JAAS file and 
optionally the system wide kerberos configuration can be specified via 
JAVA_OPTS in flume-env.sh:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">JAVA_OPTS</span><span class="o">=</span><span 
class="s">&quot;$JAVA_OPTS 
-Djava.security.krb5.conf=/path/to/krb5.conf&quot;</span>
+<span class="na">JAVA_OPTS</span><span class="o">=</span><span 
class="s">&quot;$JAVA_OPTS 
-Djava.security.auth.login.config=/path/to/flume_jaas.conf&quot;</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_PLAINTEXT:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span 
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> 
<span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span> 
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span>
 <span class="o">=</span> <span class="s">kafka</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_SSL:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span 
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> 
<span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span> 
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span>
 <span class="o">=</span> <span class="s">kafka</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span> 
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span> 
<span class="o">=</span> <span class="s">&lt;password to access the 
truststore&gt;</span>
+</pre></div>
+</div>
+<p>Sample JAAS file. For reference of its content please see client config 
sections of the desired authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of <a class="reference external" 
href="http://kafka.apache.org/documentation#security_sasl_clientconfig";>SASL 
configuration</a>.
+Unlike the Kafka Source or Kafka Channel a &#8220;Client&#8221; section is not 
required, unless it is needed by other connecting components. Also please make 
sure
+that the operating system user of the Flume processes has read privileges on 
the jaas and keytab files.</p>
+<div class="highlight-javascript"><div class="highlight"><pre><span 
class="nx">KafkaClient</span> <span class="p">{</span>
+  <span class="nx">com</span><span class="p">.</span><span 
class="nb">sun</span><span class="p">.</span><span 
class="nx">security</span><span class="p">.</span><span 
class="nx">auth</span><span class="p">.</span><span 
class="nx">module</span><span class="p">.</span><span 
class="nx">Krb5LoginModule</span> <span class="nx">required</span>
+  <span class="nx">useKeyTab</span><span class="o">=</span><span 
class="kc">true</span>
+  <span class="nx">storeKey</span><span class="o">=</span><span 
class="kc">true</span>
+  <span class="nx">keyTab</span><span class="o">=</span><span 
class="s2">&quot;/path/to/keytabs/flume.keytab&quot;</span>
+  <span class="nx">principal</span><span class="o">=</span><span 
class="s2">&quot;flume/flumehost1.example.com@YOURKERBEROSREALM&quot;</span><span
 class="p">;</span>
+<span class="p">};</span>
 </pre></div>
 </div>
 </div>
@@ -4101,16 +4629,29 @@ READ_COMMITTED, SERIALIZABLE, REPEATABLE
 <h4>Kafka Channel<a class="headerlink" href="#kafka-channel" title="Permalink 
to this headline">¶</a></h4>
 <p>The events are stored in a Kafka cluster (must be installed separately). 
Kafka provides high availability and
 replication, so in case an agent or a kafka broker crashes, the events are 
immediately available to other sinks</p>
-<p>The Kafka channel can be used for multiple scenarios:
-* With Flume source and sink - it provides a reliable and highly available 
channel for events
-* With Flume source and interceptor but no sink - it allows writing Flume 
events into a Kafka topic, for use by other apps
-* With Flume sink, but no source - it is a low-latency, fault tolerant way to 
send events from Kafka to Flume sources such as HDFS, HBase or Solr</p>
+<p>The Kafka channel can be used for multiple scenarios:</p>
+<ol class="arabic simple">
+<li>With Flume source and sink - it provides a reliable and highly available 
channel for events</li>
+<li>With Flume source and interceptor but no sink - it allows writing Flume 
events into a Kafka topic, for use by other apps</li>
+<li>With Flume sink, but no source - it is a low-latency, fault tolerant way 
to send events from Kafka to Flume sinks such as HDFS, HBase or Solr</li>
+</ol>
+<p>This version of Flume requires Kafka version 0.9 or greater due to the 
reliance on the Kafka clients shipped with that version. The configuration of
+the channel has changed compared to previous flume versions.</p>
+<p>The configuration parameters are organized as such:</p>
+<ol class="arabic simple">
+<li>Configuration values related to the channel generically are applied at the 
channel config level, eg: a1.channel.k1.type =</li>
+<li>Configuration values related to Kafka or how the Channel operates are 
prefixed with &#8220;kafka.&#8221;, (this are analgous to CommonClient Configs) 
eg: a1.channels.k1.kafka.topic and a1.channels.k1.kafka.bootstrap.servers. This 
is not dissimilar to how the hdfs sink operates</li>
+<li>Properties specific to the producer/consumer are prefixed by 
kafka.producer or kafka.consumer</li>
+<li>Where possible, the Kafka paramter names are used, eg: bootstrap.servers 
and acks</li>
+</ol>
+<p>This version of flume is backwards-compatible with previous versions, 
however deprecated properties are indicated in the table below and a warning 
message
+is logged on startup when they are present in the configuration file.</p>
 <p>Required properties are in <strong>bold</strong>.</p>
 <table border="1" class="docutils">
 <colgroup>
-<col width="14%" />
-<col width="16%" />
-<col width="70%" />
+<col width="19%" />
+<col width="13%" />
+<col width="68%" />
 </colgroup>
 <thead valign="bottom">
 <tr class="row-odd"><th class="head">Property Name</th>
@@ -4123,49 +4664,110 @@ replication, so in case an agent or a ka
 <td>&#8211;</td>
 <td>The component type name, needs to be <tt class="docutils literal"><span 
class="pre">org.apache.flume.channel.kafka.KafkaChannel</span></tt></td>
 </tr>
-<tr class="row-odd"><td><strong>brokerList</strong></td>
+<tr class="row-odd"><td><strong>kafka.bootstrap.servers</strong></td>
 <td>&#8211;</td>
 <td>List of brokers in the Kafka cluster used by the channel
 This can be a partial list of brokers, but we recommend at least two for HA.
 The format is comma separated list of hostname:port</td>
 </tr>
-<tr class="row-even"><td><strong>zookeeperConnect</strong></td>
-<td>&#8211;</td>
-<td>URI of ZooKeeper used by Kafka cluster
-The format is comma separated list of hostname:port. If chroot is used, it is 
added once at the end.
-For example: zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2181/kafka</td>
-</tr>
-<tr class="row-odd"><td>topic</td>
+<tr class="row-even"><td>kafka.topic</td>
 <td>flume-channel</td>
 <td>Kafka topic which the channel will use</td>
 </tr>
-<tr class="row-even"><td>groupId</td>
+<tr class="row-odd"><td>kafka.consumer.group.id</td>
 <td>flume</td>
 <td>Consumer group ID the channel uses to register with Kafka.
 Multiple channels must use the same topic and group to ensure that when one 
agent fails another can get the data
 Note that having non-channel consumers with the same ID can lead to data 
loss.</td>
 </tr>
-<tr class="row-odd"><td>parseAsFlumeEvent</td>
+<tr class="row-even"><td>parseAsFlumeEvent</td>
 <td>true</td>
 <td>Expecting Avro datums with FlumeEvent schema in the channel.
-This should be true if Flume source is writing to the channel
-And false if other producers are writing into the topic that the channel is 
using
-Flume source messages to Kafka can be parsed outside of Flume by using
+This should be true if Flume source is writing to the channel and false if 
other producers are
+writing into the topic that the channel is using. Flume source messages to 
Kafka can be parsed outside of Flume by using
 org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk 
artifact</td>
 </tr>
-<tr class="row-even"><td>readSmallestOffset</td>
+<tr class="row-odd"><td>migrateZookeeperOffsets</td>
+<td>true</td>
+<td>When no Kafka stored offset is found, look up the offsets in Zookeeper and 
commit them to Kafka.
+This should be true to support seamless Kafka client migration from older 
versions of Flume. Once migrated this can be set
+to false, though that should generally not be required. If no Zookeeper offset 
is found the kafka.consumer.auto.offset.reset
+configuration defines how offsets are handled.</td>
+</tr>
+<tr class="row-even"><td>pollTimeout</td>
+<td>500</td>
+<td>The amount of time(in milliseconds) to wait in the &#8220;poll()&#8221; 
call of the consumer.
+<a class="reference external" 
href="https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long">https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long</a>)</td>
+</tr>
+<tr class="row-odd"><td>defaultPartitionId</td>
+<td>&#8211;</td>
+<td>Specifies a Kafka partition ID (integer) for all events in this channel to 
be sent to, unless
+overriden by <tt class="docutils literal"><span 
class="pre">partitionIdHeader</span></tt>. By default, if this property is not 
set, events will be
+distributed by the Kafka Producer&#8217;s partitioner - including by <tt 
class="docutils literal"><span class="pre">key</span></tt> if specified (or by a
+partitioner specified by <tt class="docutils literal"><span 
class="pre">kafka.partitioner.class</span></tt>).</td>
+</tr>
+<tr class="row-even"><td>partitionIdHeader</td>
+<td>&#8211;</td>
+<td>When set, the producer will take the value of the field named using the 
value of this property
+from the event header and send the message to the specified partition of the 
topic. If the
+value represents an invalid partition the event will not be accepted into the 
channel. If the header value
+is present then this setting overrides <tt class="docutils literal"><span 
class="pre">defaultPartitionId</span></tt>.</td>
+</tr>
+<tr class="row-odd"><td>kafka.consumer.auto.offset.reset</td>
+<td>latest</td>
+<td>What to do when there is no initial offset in Kafka or if the current 
offset does not exist any more on the server
+(e.g. because that data has been deleted):
+earliest: automatically reset the offset to the earliest offset
+latest: automatically reset the offset to the latest offset
+none: throw exception to the consumer if no previous offset is found for the 
consumer&#8217;s group
+anything else: throw exception to the consumer.</td>
+</tr>
+<tr class="row-even"><td>kafka.producer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some 
level of security. See below for additional info on secure setup.</td>
+</tr>
+<tr class="row-odd"><td>kafka.consumer.security.protocol</td>
+<td>PLAINTEXT</td>
+<td>Same as kafka.producer.security.protocol but for reading/consuming from 
Kafka.</td>
+</tr>
+<tr class="row-even"><td><em>more producer/consumer security props</em></td>
+<td>&nbsp;</td>
+<td>If using SASL_PLAINTEXT, SASL_SSL or SSL refer to <a class="reference 
external" href="http://kafka.apache.org/documentation.html#security";>Kafka 
security</a> for additional
+properties that need to be set on producer/consumer.</td>
+</tr>
+</tbody>
+</table>
+<p>Deprecated Properties</p>
+<table border="1" class="docutils">
+<colgroup>
+<col width="19%" />
+<col width="15%" />
+<col width="66%" />
+</colgroup>
+<thead valign="bottom">
+<tr class="row-odd"><th class="head">Property Name</th>
+<th class="head">Default</th>
+<th class="head">Description</th>
+</tr>
+</thead>
+<tbody valign="top">
+<tr class="row-even"><td>brokerList</td>
+<td>&#8211;</td>
+<td>List of brokers in the Kafka cluster used by the channel
+This can be a partial list of brokers, but we recommend at least two for HA.
+The format is comma separated list of hostname:port</td>
+</tr>
+<tr class="row-odd"><td>topic</td>
+<td>flume-channel</td>
+<td>Use kafka.topic</td>
+</tr>
+<tr class="row-even"><td>groupId</td>
+<td>flume</td>
+<td>Use kafka.consumer.group.id</td>
+</tr>
+<tr class="row-odd"><td>readSmallestOffset</td>
 <td>false</td>
-<td>When set to true, the channel will read all data in the topic, starting 
from the oldest event
-when false, it will read only events written after the channel started
-When &#8220;parseAsFlumeEvent&#8221; is true, this will be false. Flume source 
will start prior to the sinks and this
-guarantees that events sent by source before sinks start will not be lost.</td>
-</tr>
-<tr class="row-odd"><td>Other Kafka Properties</td>
-<td>&#8211;</td>
-<td>These properties are used to configure the Kafka Producer and Consumer 
used by the channel.
-Any property supported by Kafka can be used.
-The only requirement is to prepend the property name with the prefix <tt 
class="docutils literal"><span class="pre">kafka.</span></tt>.
-For example: kafka.producer.type</td>
+<td>Use kafka.consumer.auto.offset.reset</td>
 </tr>
 </tbody>
 </table>
@@ -4174,12 +4776,135 @@ For example: kafka.producer.type</td>
 <p class="last">Due to the way the channel is load balanced, there may be 
duplicate events when the agent first starts up</p>
 </div>
 <p>Example for agent named a1:</p>
-<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.type</span>   <span class="o">=</span> <span 
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
-<span class="na">a1.channels.channel1.capacity</span> <span class="o">=</span> 
<span class="s">10000</span>
-<span class="na">a1.channels.channel1.transactionCapacity</span> <span 
class="o">=</span> <span class="s">1000</span>
-<span class="na">a1.channels.channel1.brokerList</span><span 
class="o">=</span><span class="s">kafka-2:9092,kafka-3:9092</span>
-<span class="na">a1.channels.channel1.topic</span><span 
class="o">=</span><span class="s">channel1</span>
-<span class="na">a1.channels.channel1.zookeeperConnect</span><span 
class="o">=</span><span class="s">kafka-1:2181</span>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">kafka-1:9092,kafka-2:9092,kafka-3:9092</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span 
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span 
class="o">=</span> <span class="s">flume-consumer</span>
+</pre></div>
+</div>
+<p><strong>Security and Kafka Channel:</strong></p>
+<p>Secure authentication as well as data encryption is supported on the 
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the 
parameter is named SSL, the actual protocol is a TLS implementation) can be 
used from Kafka version 0.9.0.</p>
+<p>As of now data encryption is solely provided by SSL/TLS.</p>
+<p>Setting <tt class="docutils literal"><span 
class="pre">kafka.producer|consumer.security.protocol</span></tt> to any of the 
following value means:</p>
+<ul class="simple">
+<li><strong>SASL_PLAINTEXT</strong> - Kerberos or plaintext authentication 
with no data encryption</li>
+<li><strong>SASL_SSL</strong> - Kerberos or plaintext authentication with data 
encryption</li>
+<li><strong>SSL</strong> - TLS based encryption with optional 
authentication.</li>
+</ul>
+<div class="admonition warning">
+<p class="first admonition-title">Warning</p>
+<p class="last">There is a performance degradation when SSL is enabled,
+the magnitude of which depends on the CPU type and the JVM implementation.
+Reference: <a class="reference external" 
href="http://kafka.apache.org/documentation#security_overview";>Kafka security 
overview</a>
+and the jira for tracking this issue:
+<a class="reference external" 
href="https://issues.apache.org/jira/browse/KAFKA-2561";>KAFKA-2561</a></p>
+</div>
+<p><strong>TLS and Kafka Channel:</strong></p>
+<p>Please read the steps described in <a class="reference external" 
href="http://kafka.apache.org/documentation#security_configclients";>Configuring 
Kafka Clients SSL</a>
+to learn about additional configuration settings for fine tuning for example 
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore 
types.</p>
+<p>Example configuration with server side authentication and data 
encryption.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span 
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span 
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> 
<span class="o">=</span> <span class="s">SSL</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span> 
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span> 
<span class="o">=</span> <span class="s">&lt;password to access the 
truststore&gt;</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> 
<span class="o">=</span> <span class="s">SSL</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span> 
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.password</span> 
<span class="o">=</span> <span class="s">&lt;password to access the 
truststore&gt;</span>
+</pre></div>
+</div>
+<p>Note: By default the property <tt class="docutils literal"><span 
class="pre">ssl.endpoint.identification.algorithm</span></tt>
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm</span>
 <span class="o">=</span> <span class="s">HTTPS</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm</span>
 <span class="o">=</span> <span class="s">HTTPS</span>
+</pre></div>
+</div>
+<p>Once enabled, clients will verify the server&#8217;s fully qualified domain 
name (FQDN)
+against one of the following two fields:</p>
+<ol class="arabic simple">
+<li>Common Name (CN) <a class="reference external" 
href="https://tools.ietf.org/html/rfc6125#section-2.3";>https://tools.ietf.org/html/rfc6125#section-2.3</a></li>
+<li>Subject Alternative Name (SAN) <a class="reference external" 
href="https://tools.ietf.org/html/rfc5280#section-4.2.1.6";>https://tools.ietf.org/html/rfc5280#section-4.2.1.6</a></li>
+</ol>
+<p>If client side authentication is also required then additionally the 
following should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by 
Kafka brokers either
+individually or by their signature chain. Common example is to sign each 
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.kafka.producer.ssl.keystore.location</span> 
<span class="o">=</span> <span class="s">/path/to/client.keystore.jks</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.ssl.keystore.password</span> 
<span class="o">=</span> <span class="s">&lt;password to access the 
keystore&gt;</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.location</span> 
<span class="o">=</span> <span class="s">/path/to/client.keystore.jks</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.ssl.keystore.password</span> 
<span class="o">=</span> <span class="s">&lt;password to access the 
keystore&gt;</span>
+</pre></div>
+</div>
+<p>If keystore and key use different password protection then <tt 
class="docutils literal"><span class="pre">ssl.key.password</span></tt> 
property will
+provide the required additional secret for both consumer and producer 
keystores:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.kafka.producer.ssl.key.password</span> <span 
class="o">=</span> <span class="s">&lt;password to access the key&gt;</span>
+<span class="na">a1.channels.channel1.kafka.consumer.ssl.key.password</span> 
<span class="o">=</span> <span class="s">&lt;password to access the 
key&gt;</span>
+</pre></div>
+</div>
+<p><strong>Kerberos and Kafka Channel:</strong></p>
+<p>To use Kafka channel with a Kafka cluster secured with Kerberos, set the 
<tt class="docutils literal"><span 
class="pre">producer/consumer.security.protocol</span></tt> properties noted 
above for producer and/or consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified 
in a JAAS file&#8217;s &#8220;KafkaClient&#8221; section. &#8220;Client&#8221; 
section describes the Zookeeper connection if needed.
+See <a class="reference external" 
href="http://kafka.apache.org/documentation.html#security_sasl_clientconfig";>Kafka
 doc</a>
+for information on the JAAS file contents. The location of this JAAS file and 
optionally the system wide kerberos configuration can be specified via 
JAVA_OPTS in flume-env.sh:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">JAVA_OPTS</span><span class="o">=</span><span 
class="s">&quot;$JAVA_OPTS 
-Djava.security.krb5.conf=/path/to/krb5.conf&quot;</span>
+<span class="na">JAVA_OPTS</span><span class="o">=</span><span 
class="s">&quot;$JAVA_OPTS 
-Djava.security.auth.login.config=/path/to/flume_jaas.conf&quot;</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_PLAINTEXT:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span 
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span 
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> 
<span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span> 
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span>
 <span class="o">=</span> <span class="s">kafka</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> 
<span class="o">=</span> <span class="s">SASL_PLAINTEXT</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span> 
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span>
 <span class="o">=</span> <span class="s">kafka</span>
+</pre></div>
+</div>
+<p>Example secure configuration using SASL_SSL:</p>
+<div class="highlight-properties"><div class="highlight"><pre><span 
class="na">a1.channels.channel1.type</span> <span class="o">=</span> <span 
class="s">org.apache.flume.channel.kafka.KafkaChannel</span>
+<span class="na">a1.channels.channel1.kafka.bootstrap.servers</span> <span 
class="o">=</span> <span class="s">kafka-1:9093,kafka-2:9093,kafka-3:9093</span>
+<span class="na">a1.channels.channel1.kafka.topic</span> <span 
class="o">=</span> <span class="s">channel1</span>
+<span class="na">a1.channels.channel1.kafka.consumer.group.id</span> <span 
class="o">=</span> <span class="s">flume-consumer</span>
+<span class="na">a1.channels.channel1.kafka.producer.security.protocol</span> 
<span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.producer.sasl.mechanism</span> 
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.sasl.kerberos.service.name</span>
 <span class="o">=</span> <span class="s">kafka</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.location</span> 
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>
+<span 
class="na">a1.channels.channel1.kafka.producer.ssl.truststore.password</span> 
<span class="o">=</span> <span class="s">&lt;password to access the 
truststore&gt;</span>
+<span class="na">a1.channels.channel1.kafka.consumer.security.protocol</span> 
<span class="o">=</span> <span class="s">SASL_SSL</span>
+<span class="na">a1.channels.channel1.kafka.consumer.sasl.mechanism</span> 
<span class="o">=</span> <span class="s">GSSAPI</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name</span>
 <span class="o">=</span> <span class="s">kafka</span>
+<span 
class="na">a1.channels.channel1.kafka.consumer.ssl.truststore.location</span> 
<span class="o">=</span> <span class="s">/path/to/truststore.jks</span>

[... 193 lines stripped ...]


Reply via email to