Added: websites/production/flume/content/releases/content/1.8.0/FlumeDeveloperGuide.html ============================================================================== --- websites/production/flume/content/releases/content/1.8.0/FlumeDeveloperGuide.html (added) +++ websites/production/flume/content/releases/content/1.8.0/FlumeDeveloperGuide.html Wed Oct 4 16:41:51 2017 @@ -0,0 +1,985 @@ + +<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" + "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> + + +<html xmlns="http://www.w3.org/1999/xhtml"> + <head> + <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> + + <title>Flume 1.8.0 Developer Guide — Apache Flume</title> + + <link rel="stylesheet" href="_static/flume.css" type="text/css" /> + <link rel="stylesheet" href="_static/pygments.css" type="text/css" /> + + <script type="text/javascript"> + var DOCUMENTATION_OPTIONS = { + URL_ROOT: '', + VERSION: '', + COLLAPSE_INDEX: false, + FILE_SUFFIX: '.html', + HAS_SOURCE: true + }; + </script> + <script type="text/javascript" src="_static/jquery.js"></script> + <script type="text/javascript" src="_static/underscore.js"></script> + <script type="text/javascript" src="_static/doctools.js"></script> + <link rel="top" title="Apache Flume" href="index.html" /> + <link rel="up" title="Documentation" href="documentation.html" /> + <link rel="next" title="Releases" href="releases/index.html" /> + <link rel="prev" title="Flume 1.8.0 User Guide" href="FlumeUserGuide.html" /> + </head> + <body> +<div class="header"> + <table width="100%" border="0"> + <tr> + <td width="10%"> + <div class="logo"> + <a href="index.html"> + <img class="logo" src="_static/flume-logo.png" alt="Logo"/> + </div> + </td> + <td width="2%"> + <span class="trademark">™</span> + </td> + <td width="68%" align="center" class="pageTitle">Apache Flume<sup><span class="trademark">™</span></sup> + </td> + <td width="20%"> + <a href="http://www.apache.org"> + <img src="_static/feather-small.png" alt="Apache Software Foundation" height="70"/> + </a> + </td> + </tr> + </table> +</div> + + + <div class="document"> + <div class="documentwrapper"> + <div class="bodywrapper"> + <div class="body"> + + <div class="section" id="flume-1-8-0-developer-guide"> +<h1>Flume 1.8.0 Developer Guide<a class="headerlink" href="#flume-1-8-0-developer-guide" title="Permalink to this headline">¶</a></h1> +<div class="section" id="introduction"> +<h2>Introduction<a class="headerlink" href="#introduction" title="Permalink to this headline">¶</a></h2> +<div class="section" id="overview"> +<h3>Overview<a class="headerlink" href="#overview" title="Permalink to this headline">¶</a></h3> +<p>Apache Flume is a distributed, reliable, and available system for +efficiently collecting, aggregating and moving large amounts of log +data from many different sources to a centralized data store.</p> +<p>Apache Flume is a top-level project at the Apache Software Foundation. +There are currently two release code lines available, versions 0.9.x and 1.x. +This documentation applies to the 1.x codeline. +For the 0.9.x codeline, please see the <a class="reference external" href="http://archive.cloudera.com/cdh/3/flume/DeveloperGuide/">Flume 0.9.x Developer Guide</a>.</p> +</div> +<div class="section" id="architecture"> +<h3>Architecture<a class="headerlink" href="#architecture" title="Permalink to this headline">¶</a></h3> +<div class="section" id="data-flow-model"> +<h4>Data flow model<a class="headerlink" href="#data-flow-model" title="Permalink to this headline">¶</a></h4> +<p>An <tt class="docutils literal"><span class="pre">Event</span></tt> is a unit of data that flows through a Flume agent. The <tt class="docutils literal"><span class="pre">Event</span></tt> +flows from <tt class="docutils literal"><span class="pre">Source</span></tt> to <tt class="docutils literal"><span class="pre">Channel</span></tt> to <tt class="docutils literal"><span class="pre">Sink</span></tt>, and is represented by an +implementation of the <tt class="docutils literal"><span class="pre">Event</span></tt> interface. An <tt class="docutils literal"><span class="pre">Event</span></tt> carries a payload (byte +array) that is accompanied by an optional set of headers (string attributes). +A Flume agent is a process (JVM) that hosts the components that allow +<tt class="docutils literal"><span class="pre">Event</span></tt>s to flow from an external source to a external destination.</p> +<div class="figure align-center"> +<img alt="Agent component diagram" src="_images/DevGuide_image00.png" /> +</div> +<p>A <tt class="docutils literal"><span class="pre">Source</span></tt> consumes <tt class="docutils literal"><span class="pre">Event</span></tt>s having a specific format, and those +<tt class="docutils literal"><span class="pre">Event</span></tt>s are delivered to the <tt class="docutils literal"><span class="pre">Source</span></tt> by an external source like a web +server. For example, an <tt class="docutils literal"><span class="pre">AvroSource</span></tt> can be used to receive Avro <tt class="docutils literal"><span class="pre">Event</span></tt>s +from clients or from other Flume agents in the flow. When a <tt class="docutils literal"><span class="pre">Source</span></tt> receives +an <tt class="docutils literal"><span class="pre">Event</span></tt>, it stores it into one or more <tt class="docutils literal"><span class="pre">Channel</span></tt>s. The <tt class="docutils literal"><span class="pre">Channel</span></tt> is +a passive store that holds the <tt class="docutils literal"><span class="pre">Event</span></tt> until that <tt class="docutils literal"><span class="pre">Event</span></tt> is consumed by a +<tt class="docutils literal"><span class="pre">Sink</span></tt>. One type of <tt class="docutils literal"><span class="pre">Channel</span></tt> available in Flume is the <tt class="docutils literal"><span class="pre">FileChannel</span></tt> +which uses the local filesystem as its backing store. A <tt class="docutils literal"><span class="pre">Sink</span></tt> is responsible +for removing an <tt class="docutils literal"><span class="pre">Event</span></tt> from the <tt class="docutils literal"><span class="pre">Channel</span></tt> and putting it into an external +repository like HDFS (in the case of an <tt class="docutils literal"><span class="pre">HDFSEventSink</span></tt>) or forwarding it to +the <tt class="docutils literal"><span class="pre">Source</span></tt> at the next hop of the flow. The <tt class="docutils literal"><span class="pre">Source</span></tt> and <tt class="docutils literal"><span class="pre">Sink</span></tt> within +the given agent run asynchronously with the <tt class="docutils literal"><span class="pre">Event</span></tt>s staged in the +<tt class="docutils literal"><span class="pre">Channel</span></tt>.</p> +</div> +<div class="section" id="reliability"> +<h4>Reliability<a class="headerlink" href="#reliability" title="Permalink to this headline">¶</a></h4> +<p>An <tt class="docutils literal"><span class="pre">Event</span></tt> is staged in a Flume agent’s <tt class="docutils literal"><span class="pre">Channel</span></tt>. Then it’s the +<tt class="docutils literal"><span class="pre">Sink</span></tt>‘s responsibility to deliver the <tt class="docutils literal"><span class="pre">Event</span></tt> to the next agent or +terminal repository (like HDFS) in the flow. The <tt class="docutils literal"><span class="pre">Sink</span></tt> removes an <tt class="docutils literal"><span class="pre">Event</span></tt> +from the <tt class="docutils literal"><span class="pre">Channel</span></tt> only after the <tt class="docutils literal"><span class="pre">Event</span></tt> is stored into the <tt class="docutils literal"><span class="pre">Channel</span></tt> of +the next agent or stored in the terminal repository. This is how the single-hop +message delivery semantics in Flume provide end-to-end reliability of the flow. +Flume uses a transactional approach to guarantee the reliable delivery of the +<tt class="docutils literal"><span class="pre">Event</span></tt>s. The <tt class="docutils literal"><span class="pre">Source</span></tt>s and <tt class="docutils literal"><span class="pre">Sink</span></tt>s encapsulate the +storage/retrieval of the <tt class="docutils literal"><span class="pre">Event</span></tt>s in a <tt class="docutils literal"><span class="pre">Transaction</span></tt> provided by the +<tt class="docutils literal"><span class="pre">Channel</span></tt>. This ensures that the set of <tt class="docutils literal"><span class="pre">Event</span></tt>s are reliably passed from +point to point in the flow. In the case of a multi-hop flow, the <tt class="docutils literal"><span class="pre">Sink</span></tt> from +the previous hop and the <tt class="docutils literal"><span class="pre">Source</span></tt> of the next hop both have their +<tt class="docutils literal"><span class="pre">Transaction</span></tt>s open to ensure that the <tt class="docutils literal"><span class="pre">Event</span></tt> data is safely stored in +the <tt class="docutils literal"><span class="pre">Channel</span></tt> of the next hop.</p> +</div> +</div> +<div class="section" id="building-flume"> +<h3>Building Flume<a class="headerlink" href="#building-flume" title="Permalink to this headline">¶</a></h3> +<div class="section" id="getting-the-source"> +<h4>Getting the source<a class="headerlink" href="#getting-the-source" title="Permalink to this headline">¶</a></h4> +<p>Check-out the code using Git. Click here for +<a class="reference external" href="https://git-wip-us.apache.org/repos/asf/flume.git">the git repository root</a>.</p> +<p>The Flume 1.x development happens under the branch “trunk” so this command line +can be used:</p> +<blockquote> +<div>git clone <a class="reference external" href="https://git-wip-us.apache.org/repos/asf/flume.git">https://git-wip-us.apache.org/repos/asf/flume.git</a></div></blockquote> +</div> +<div class="section" id="compile-test-flume"> +<h4>Compile/test Flume<a class="headerlink" href="#compile-test-flume" title="Permalink to this headline">¶</a></h4> +<p>The Flume build is mavenized. You can compile Flume using the standard Maven +commands:</p> +<ol class="arabic simple"> +<li>Compile only: <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">compile</span></tt></li> +<li>Compile and run unit tests: <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">test</span></tt></li> +<li>Run individual test(s): <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">test</span> <span class="pre">-Dtest=<Test1>,<Test2>,...</span> <span class="pre">-DfailIfNoTests=false</span></tt></li> +<li>Create tarball package: <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">install</span></tt></li> +<li>Create tarball package (skip unit tests): <tt class="docutils literal"><span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">install</span> <span class="pre">-DskipTests</span></tt></li> +</ol> +<p>Please note that Flume builds requires that the Google Protocol Buffers compiler +be in the path. You can download and install it by following the instructions +<a class="reference external" href="https://developers.google.com/protocol-buffers/">here</a>.</p> +</div> +<div class="section" id="updating-protocol-buffer-version"> +<h4>Updating Protocol Buffer Version<a class="headerlink" href="#updating-protocol-buffer-version" title="Permalink to this headline">¶</a></h4> +<p>File channel has a dependency on Protocol Buffer. When updating the version of Protocol Buffer +used by Flume, it is necessary to regenerate the data access classes using the protoc compiler +that is part of Protocol Buffer as follows.</p> +<ol class="arabic simple"> +<li>Install the desired version of Protocol Buffer on your local machine</li> +<li>Update version of Protocol Buffer in pom.xml</li> +<li>Generate new Protocol Buffer data access classes in Flume: <tt class="docutils literal"><span class="pre">cd</span> <span class="pre">flume-ng-channels/flume-file-channel;</span> <span class="pre">mvn</span> <span class="pre">-P</span> <span class="pre">compile-proto</span> <span class="pre">clean</span> <span class="pre">package</span> <span class="pre">-DskipTests</span></tt></li> +<li>Add Apache license header to any of the generated files that are missing it</li> +<li>Rebuild and test Flume: <tt class="docutils literal"><span class="pre">cd</span> <span class="pre">../..;</span> <span class="pre">mvn</span> <span class="pre">clean</span> <span class="pre">install</span></tt></li> +</ol> +</div> +</div> +<div class="section" id="developing-custom-components"> +<h3>Developing custom components<a class="headerlink" href="#developing-custom-components" title="Permalink to this headline">¶</a></h3> +<div class="section" id="client"> +<h4>Client<a class="headerlink" href="#client" title="Permalink to this headline">¶</a></h4> +<p>The client operates at the point of origin of events and delivers them to a +Flume agent. Clients typically operate in the process space of the application +they are consuming data from. Flume currently supports Avro, log4j, syslog, +and Http POST (with a JSON body) as ways to transfer data from a external +source. Additionally, thereâs an <tt class="docutils literal"><span class="pre">ExecSource</span></tt> that can consume the output of a +local process as input to Flume.</p> +<p>Itâs quite possible to have a use case where these existing options are not +sufficient. In this case you can build a custom mechanism to send data to +Flume. There are two ways of achieving this. The first option is to create a +custom client that communicates with one of Flumeâs existing <tt class="docutils literal"><span class="pre">Source</span></tt>s like +<tt class="docutils literal"><span class="pre">AvroSource</span></tt> or <tt class="docutils literal"><span class="pre">SyslogTcpSource</span></tt>. Here the client should convert its data +into messages understood by these Flume <tt class="docutils literal"><span class="pre">Source</span></tt>s. The other option is to +write a custom Flume <tt class="docutils literal"><span class="pre">Source</span></tt> that directly talks with your existing client +application using some IPC or RPC protocol, and then converts the client data +into Flume <tt class="docutils literal"><span class="pre">Event</span></tt>s to be sent downstream. Note that all events stored +within the <tt class="docutils literal"><span class="pre">Channel</span></tt> of a Flume agent must exist as Flume <tt class="docutils literal"><span class="pre">Event</span></tt>s.</p> +<div class="section" id="client-sdk"> +<h5>Client SDK<a class="headerlink" href="#client-sdk" title="Permalink to this headline">¶</a></h5> +<p>Though Flume contains a number of built-in mechanisms (i.e. <tt class="docutils literal"><span class="pre">Source</span></tt>s) to +ingest data, often one wants the ability to communicate with Flume directly from +a custom application. The Flume Client SDK is a library that enables +applications to connect to Flume and send data into Flumeâs data flow over RPC.</p> +</div> +<div class="section" id="rpc-client-interface"> +<h5>RPC client interface<a class="headerlink" href="#rpc-client-interface" title="Permalink to this headline">¶</a></h5> +<p>An implementation of Flume’s RpcClient interface encapsulates the RPC mechanism +supported by Flume. The user’s application can simply call the Flume Client +SDK’s <tt class="docutils literal"><span class="pre">append(Event)</span></tt> or <tt class="docutils literal"><span class="pre">appendBatch(List<Event>)</span></tt> to send data and not +worry about the underlying message exchange details. The user can provide the +required <tt class="docutils literal"><span class="pre">Event</span></tt> arg by either directly implementing the <tt class="docutils literal"><span class="pre">Event</span></tt> interface, +by using a convenience implementation such as the SimpleEvent class, or by using +<tt class="docutils literal"><span class="pre">EventBuilder</span></tt>‘s overloaded <tt class="docutils literal"><span class="pre">withBody()</span></tt> static helper methods.</p> +</div> +<div class="section" id="rpc-clients-avro-and-thrift"> +<h5>RPC clients - Avro and Thrift<a class="headerlink" href="#rpc-clients-avro-and-thrift" title="Permalink to this headline">¶</a></h5> +<p>As of Flume 1.4.0, Avro is the default RPC protocol. The +<tt class="docutils literal"><span class="pre">NettyAvroRpcClient</span></tt> and <tt class="docutils literal"><span class="pre">ThriftRpcClient</span></tt> implement the <tt class="docutils literal"><span class="pre">RpcClient</span></tt> +interface. The client needs to create this object with the host and port of +the target Flume agent, and can then use the <tt class="docutils literal"><span class="pre">RpcClient</span></tt> to send data into +the agent. The following example shows how to use the Flume Client SDK API +within a user’s data-generating application:</p> +<div class="highlight-java"><div class="highlight"><pre><span class="kn">import</span> <span class="nn">org.apache.flume.Event</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.flume.EventDeliveryException</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClient</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClientFactory</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.flume.event.EventBuilder</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">java.nio.charset.Charset</span><span class="o">;</span> + +<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyApp</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span> + <span class="n">MyRpcClientFacade</span> <span class="n">client</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MyRpcClientFacade</span><span class="o">();</span> + <span class="c1">// Initialize client with the remote Flume agent's host and port</span> + <span class="n">client</span><span class="o">.</span><span class="na">init</span><span class="o">(</span><span class="s">"host.example.org"</span><span class="o">,</span> <span class="mi">41414</span><span class="o">);</span> + + <span class="c1">// Send 10 events to the remote Flume agent. That agent should be</span> + <span class="c1">// configured to listen with an AvroSource.</span> + <span class="n">String</span> <span class="n">sampleData</span> <span class="o">=</span> <span class="s">"Hello Flume!"</span><span class="o">;</span> + <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="mi">10</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span> + <span class="n">client</span><span class="o">.</span><span class="na">sendDataToFlume</span><span class="o">(</span><span class="n">sampleData</span><span class="o">);</span> + <span class="o">}</span> + + <span class="n">client</span><span class="o">.</span><span class="na">cleanUp</span><span class="o">();</span> + <span class="o">}</span> +<span class="o">}</span> + +<span class="kd">class</span> <span class="nc">MyRpcClientFacade</span> <span class="o">{</span> + <span class="kd">private</span> <span class="n">RpcClient</span> <span class="n">client</span><span class="o">;</span> + <span class="kd">private</span> <span class="n">String</span> <span class="n">hostname</span><span class="o">;</span> + <span class="kd">private</span> <span class="kt">int</span> <span class="n">port</span><span class="o">;</span> + + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">String</span> <span class="n">hostname</span><span class="o">,</span> <span class="kt">int</span> <span class="n">port</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// Setup the RPC connection</span> + <span class="k">this</span><span class="o">.</span><span class="na">hostname</span> <span class="o">=</span> <span class="n">hostname</span><span class="o">;</span> + <span class="k">this</span><span class="o">.</span><span class="na">port</span> <span class="o">=</span> <span class="n">port</span><span class="o">;</span> + <span class="k">this</span><span class="o">.</span><span class="na">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getDefaultInstance</span><span class="o">(</span><span class="n">hostname</span><span class="o">,</span> <span class="n">port</span><span class="o">);</span> + <span class="c1">// Use the following method to create a thrift client (instead of the above line):</span> + <span class="c1">// this.client = RpcClientFactory.getThriftInstance(hostname, port);</span> + <span class="o">}</span> + + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">sendDataToFlume</span><span class="o">(</span><span class="n">String</span> <span class="n">data</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// Create a Flume Event object that encapsulates the sample data</span> + <span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="n">EventBuilder</span><span class="o">.</span><span class="na">withBody</span><span class="o">(</span><span class="n">data</span><span class="o">,</span> <span class="n">Charset</span><span class="o">.</span><span class="na">forName</span><span class="o">(</span><span class="s">"UTF-8"</span><span class="o">));</span> + + <span class="c1">// Send the event</span> + <span class="k">try</span> <span class="o">{</span> + <span class="n">client</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="n">event</span><span class="o">);</span> + <span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">EventDeliveryException</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// clean up and recreate the client</span> + <span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> + <span class="n">client</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> + <span class="n">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getDefaultInstance</span><span class="o">(</span><span class="n">hostname</span><span class="o">,</span> <span class="n">port</span><span class="o">);</span> + <span class="c1">// Use the following method to create a thrift client (instead of the above line):</span> + <span class="c1">// this.client = RpcClientFactory.getThriftInstance(hostname, port);</span> + <span class="o">}</span> + <span class="o">}</span> + + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">cleanUp</span><span class="o">()</span> <span class="o">{</span> + <span class="c1">// Close the RPC connection</span> + <span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> + <span class="o">}</span> + +<span class="o">}</span> +</pre></div> +</div> +<p>The remote Flume agent needs to have an <tt class="docutils literal"><span class="pre">AvroSource</span></tt> (or a +<tt class="docutils literal"><span class="pre">ThriftSource</span></tt> if you are using a Thrift client) listening on some port. +Below is an example Flume agent configuration that’s waiting for a connection +from MyApp:</p> +<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels</span> <span class="o">=</span> <span class="s">c1</span> +<span class="na">a1.sources</span> <span class="o">=</span> <span class="s">r1</span> +<span class="na">a1.sinks</span> <span class="o">=</span> <span class="s">k1</span> + +<span class="na">a1.channels.c1.type</span> <span class="o">=</span> <span class="s">memory</span> + +<span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span class="s">c1</span> +<span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span class="s">avro</span> +<span class="c"># For using a thrift source set the following instead of the above line.</span> +<span class="c"># a1.source.r1.type = thrift</span> +<span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span class="s">0.0.0.0</span> +<span class="na">a1.sources.r1.port</span> <span class="o">=</span> <span class="s">41414</span> + +<span class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span class="s">c1</span> +<span class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span class="s">logger</span> +</pre></div> +</div> +<p>For more flexibility, the default Flume client implementations +(<tt class="docutils literal"><span class="pre">NettyAvroRpcClient</span></tt> and <tt class="docutils literal"><span class="pre">ThriftRpcClient</span></tt>) can be configured with these +properties:</p> +<div class="highlight-properties"><div class="highlight"><pre><span class="na">client.type</span> <span class="o">=</span> <span class="s">default (for avro) or thrift (for thrift)</span> + +<span class="na">hosts</span> <span class="o">=</span> <span class="s">h1 # default client accepts only 1 host</span> + <span class="c"># (additional hosts will be ignored)</span> + +<span class="na">hosts.h1</span> <span class="o">=</span> <span class="s">host1.example.org:41414 # host and port must both be specified</span> + <span class="c"># (neither has a default)</span> + +<span class="na">batch-size</span> <span class="o">=</span> <span class="s">100 # Must be >=1 (default: 100)</span> + +<span class="na">connect-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span> + +<span class="na">request-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span> +</pre></div> +</div> +</div> +<div class="section" id="secure-rpc-client-thrift"> +<h5>Secure RPC client - Thrift<a class="headerlink" href="#secure-rpc-client-thrift" title="Permalink to this headline">¶</a></h5> +<p>As of Flume 1.6.0, Thrift source and sink supports kerberos based authentication. +The client needs to use the getThriftInstance method of <tt class="docutils literal"><span class="pre">SecureRpcClientFactory</span></tt> +to get hold of a <tt class="docutils literal"><span class="pre">SecureThriftRpcClient</span></tt>. <tt class="docutils literal"><span class="pre">SecureThriftRpcClient</span></tt> extends +<tt class="docutils literal"><span class="pre">ThriftRpcClient</span></tt> which implements the <tt class="docutils literal"><span class="pre">RpcClient</span></tt> interface. The kerberos +authentication module resides in flume-ng-auth module which is +required in classpath, when using the <tt class="docutils literal"><span class="pre">SecureRpcClientFactory</span></tt>. Both the client +principal and the client keytab should be passed in as parameters through the +properties and they reflect the credentials of the client to authenticate +against the kerberos KDC. In addition, the server principal of the destination +Thrift source to which this client is connecting to, should also be provided. +The following example shows how to use the <tt class="docutils literal"><span class="pre">SecureRpcClientFactory</span></tt> +within a user’s data-generating application:</p> +<div class="highlight-java"><div class="highlight"><pre><span class="kn">import</span> <span class="nn">org.apache.flume.Event</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.flume.EventDeliveryException</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.flume.event.EventBuilder</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.flume.api.SecureRpcClientFactory</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClientConfigurationConstants</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">org.apache.flume.api.RpcClient</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">java.nio.charset.Charset</span><span class="o">;</span> +<span class="kn">import</span> <span class="nn">java.util.Properties</span><span class="o">;</span> + +<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyApp</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span> + <span class="n">MySecureRpcClientFacade</span> <span class="n">client</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MySecureRpcClientFacade</span><span class="o">();</span> + <span class="c1">// Initialize client with the remote Flume agent's host, port</span> + <span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span> + <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="n">RpcClientConfigurationConstants</span><span class="o">.</span><span class="na">CONFIG_CLIENT_TYPE</span><span class="o">,</span> <span class="s">"thrift"</span><span class="o">);</span> + <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"hosts"</span><span class="o">,</span> <span class="s">"h1"</span><span class="o">);</span> + <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"hosts.h1"</span><span class="o">,</span> <span class="s">"client.example.org"</span><span class="o">+</span><span class="s">":"</span><span class="o">+</span> <span class="n">String</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="mi">41414</span><span class="o">));</span> + + <span class="c1">// Initialize client with the kerberos authentication related properties</span> + <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"kerberos"</span><span class="o">,</span> <span class="s">"true"</span><span class="o">);</span> + <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"client-principal"</span><span class="o">,</span> <span class="s">"flumeclient/[email protected]"</span><span class="o">);</span> + <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"client-keytab"</span><span class="o">,</span> <span class="s">"/tmp/flumeclient.keytab"</span><span class="o">);</span> + <span class="n">props</span><span class="o">.</span><span class="na">setProperty</span><span class="o">(</span><span class="s">"server-principal"</span><span class="o">,</span> <span class="s">"flume/[email protected]"</span><span class="o">);</span> + <span class="n">client</span><span class="o">.</span><span class="na">init</span><span class="o">(</span><span class="n">props</span><span class="o">);</span> + + <span class="c1">// Send 10 events to the remote Flume agent. That agent should be</span> + <span class="c1">// configured to listen with an AvroSource.</span> + <span class="n">String</span> <span class="n">sampleData</span> <span class="o">=</span> <span class="s">"Hello Flume!"</span><span class="o">;</span> + <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="mi">10</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span> + <span class="n">client</span><span class="o">.</span><span class="na">sendDataToFlume</span><span class="o">(</span><span class="n">sampleData</span><span class="o">);</span> + <span class="o">}</span> + + <span class="n">client</span><span class="o">.</span><span class="na">cleanUp</span><span class="o">();</span> + <span class="o">}</span> +<span class="o">}</span> + +<span class="kd">class</span> <span class="nc">MySecureRpcClientFacade</span> <span class="o">{</span> + <span class="kd">private</span> <span class="n">RpcClient</span> <span class="n">client</span><span class="o">;</span> + <span class="kd">private</span> <span class="n">Properties</span> <span class="n">properties</span><span class="o">;</span> + + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">Properties</span> <span class="n">properties</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// Setup the RPC connection</span> + <span class="k">this</span><span class="o">.</span><span class="na">properties</span> <span class="o">=</span> <span class="n">properties</span><span class="o">;</span> + <span class="c1">// Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory</span> + <span class="k">this</span><span class="o">.</span><span class="na">client</span> <span class="o">=</span> <span class="n">SecureRpcClientFactory</span><span class="o">.</span><span class="na">getThriftInstance</span><span class="o">(</span><span class="n">properties</span><span class="o">);</span> + <span class="o">}</span> + + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">sendDataToFlume</span><span class="o">(</span><span class="n">String</span> <span class="n">data</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// Create a Flume Event object that encapsulates the sample data</span> + <span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="n">EventBuilder</span><span class="o">.</span><span class="na">withBody</span><span class="o">(</span><span class="n">data</span><span class="o">,</span> <span class="n">Charset</span><span class="o">.</span><span class="na">forName</span><span class="o">(</span><span class="s">"UTF-8"</span><span class="o">));</span> + + <span class="c1">// Send the event</span> + <span class="k">try</span> <span class="o">{</span> + <span class="n">client</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="n">event</span><span class="o">);</span> + <span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">EventDeliveryException</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// clean up and recreate the client</span> + <span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> + <span class="n">client</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> + <span class="n">client</span> <span class="o">=</span> <span class="n">SecureRpcClientFactory</span><span class="o">.</span><span class="na">getThriftInstance</span><span class="o">(</span><span class="n">properties</span><span class="o">);</span> + <span class="o">}</span> + <span class="o">}</span> + + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">cleanUp</span><span class="o">()</span> <span class="o">{</span> + <span class="c1">// Close the RPC connection</span> + <span class="n">client</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> + <span class="o">}</span> +<span class="o">}</span> +</pre></div> +</div> +<p>The remote <tt class="docutils literal"><span class="pre">ThriftSource</span></tt> should be started in kerberos mode. +Below is an example Flume agent configuration that’s waiting for a connection +from MyApp:</p> +<div class="highlight-properties"><div class="highlight"><pre><span class="na">a1.channels</span> <span class="o">=</span> <span class="s">c1</span> +<span class="na">a1.sources</span> <span class="o">=</span> <span class="s">r1</span> +<span class="na">a1.sinks</span> <span class="o">=</span> <span class="s">k1</span> + +<span class="na">a1.channels.c1.type</span> <span class="o">=</span> <span class="s">memory</span> + +<span class="na">a1.sources.r1.channels</span> <span class="o">=</span> <span class="s">c1</span> +<span class="na">a1.sources.r1.type</span> <span class="o">=</span> <span class="s">thrift</span> +<span class="na">a1.sources.r1.bind</span> <span class="o">=</span> <span class="s">0.0.0.0</span> +<span class="na">a1.sources.r1.port</span> <span class="o">=</span> <span class="s">41414</span> +<span class="na">a1.sources.r1.kerberos</span> <span class="o">=</span> <span class="s">true</span> +<span class="na">a1.sources.r1.agent-principal</span> <span class="o">=</span> <span class="s">flume/[email protected]</span> +<span class="na">a1.sources.r1.agent-keytab</span> <span class="o">=</span> <span class="s">/tmp/flume.keytab</span> + + +<span class="na">a1.sinks.k1.channel</span> <span class="o">=</span> <span class="s">c1</span> +<span class="na">a1.sinks.k1.type</span> <span class="o">=</span> <span class="s">logger</span> +</pre></div> +</div> +</div> +<div class="section" id="failover-client"> +<h5>Failover Client<a class="headerlink" href="#failover-client" title="Permalink to this headline">¶</a></h5> +<p>This class wraps the default Avro RPC client to provide failover handling +capability to clients. This takes a whitespace-separated list of <host>:<port> +representing the Flume agents that make-up a failover group. The Failover RPC +Client currently does not support thrift. If thereâs a +communication error with the currently selected host (i.e. agent) agent, +then the failover client automatically fails-over to the next host in the list. +For example:</p> +<div class="highlight-java"><div class="highlight"><pre><span class="c1">// Setup properties for the failover</span> +<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span> +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"client.type"</span><span class="o">,</span> <span class="s">"default_failover"</span><span class="o">);</span> + +<span class="c1">// List of hosts (space-separated list of user-chosen host aliases)</span> +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts"</span><span class="o">,</span> <span class="s">"h1 h2 h3"</span><span class="o">);</span> + +<span class="c1">// host/port pair for each host alias</span> +<span class="n">String</span> <span class="n">host1</span> <span class="o">=</span> <span class="s">"host1.example.org:41414"</span><span class="o">;</span> +<span class="n">String</span> <span class="n">host2</span> <span class="o">=</span> <span class="s">"host2.example.org:41414"</span><span class="o">;</span> +<span class="n">String</span> <span class="n">host3</span> <span class="o">=</span> <span class="s">"host3.example.org:41414"</span><span class="o">;</span> +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h1"</span><span class="o">,</span> <span class="n">host1</span><span class="o">);</span> +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h2"</span><span class="o">,</span> <span class="n">host2</span><span class="o">);</span> +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h3"</span><span class="o">,</span> <span class="n">host3</span><span class="o">);</span> + +<span class="c1">// create the client with failover properties</span> +<span class="n">RpcClient</span> <span class="n">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getInstance</span><span class="o">(</span><span class="n">props</span><span class="o">);</span> +</pre></div> +</div> +<p>For more flexibility, the failover Flume client implementation +(<tt class="docutils literal"><span class="pre">FailoverRpcClient</span></tt>) can be configured with these properties:</p> +<div class="highlight-properties"><div class="highlight"><pre><span class="na">client.type</span> <span class="o">=</span> <span class="s">default_failover</span> + +<span class="na">hosts</span> <span class="o">=</span> <span class="s">h1 h2 h3 # at least one is required, but 2 or</span> + <span class="c"># more makes better sense</span> + +<span class="na">hosts.h1</span> <span class="o">=</span> <span class="s">host1.example.org:41414</span> + +<span class="na">hosts.h2</span> <span class="o">=</span> <span class="s">host2.example.org:41414</span> + +<span class="na">hosts.h3</span> <span class="o">=</span> <span class="s">host3.example.org:41414</span> + +<span class="na">max-attempts</span> <span class="o">=</span> <span class="s">3 # Must be >=0 (default: number of hosts</span> + <span class="c"># specified, 3 in this case). A '0'</span> + <span class="c"># value doesn't make much sense because</span> + <span class="c"># it will just cause an append call to</span> + <span class="c"># immmediately fail. A '1' value means</span> + <span class="c"># that the failover client will try only</span> + <span class="c"># once to send the Event, and if it</span> + <span class="c"># fails then there will be no failover</span> + <span class="c"># to a second client, so this value</span> + <span class="c"># causes the failover client to</span> + <span class="c"># degenerate into just a default client.</span> + <span class="c"># It makes sense to set this value to at</span> + <span class="c"># least the number of hosts that you</span> + <span class="c"># specified.</span> + +<span class="na">batch-size</span> <span class="o">=</span> <span class="s">100 # Must be >=1 (default: 100)</span> + +<span class="na">connect-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span> + +<span class="na">request-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span> +</pre></div> +</div> +</div> +<div class="section" id="loadbalancing-rpc-client"> +<h5>LoadBalancing RPC client<a class="headerlink" href="#loadbalancing-rpc-client" title="Permalink to this headline">¶</a></h5> +<p>The Flume Client SDK also supports an RpcClient which load-balances among +multiple hosts. This type of client takes a whitespace-separated list of +<host>:<port> representing the Flume agents that make-up a load-balancing group. +This client can be configured with a load balancing strategy that either +randomly selects one of the configured hosts, or selects a host in a round-robin +fashion. You can also specify your own custom class that implements the +<tt class="docutils literal"><span class="pre">LoadBalancingRpcClient$HostSelector</span></tt> interface so that a custom selection +order is used. In that case, the FQCN of the custom class needs to be specified +as the value of the <tt class="docutils literal"><span class="pre">host-selector</span></tt> property. The LoadBalancing RPC Client +currently does not support thrift.</p> +<p>If <tt class="docutils literal"><span class="pre">backoff</span></tt> is enabled then the client will temporarily blacklist +hosts that fail, causing them to be excluded from being selected as a failover +host until a given timeout. When the timeout elapses, if the host is still +unresponsive then this is considered a sequential failure, and the timeout is +increased exponentially to avoid potentially getting stuck in long waits on +unresponsive hosts.</p> +<p>The maximum backoff time can be configured by setting <tt class="docutils literal"><span class="pre">maxBackoff</span></tt> (in +milliseconds). The maxBackoff default is 30 seconds (specified in the +<tt class="docutils literal"><span class="pre">OrderSelector</span></tt> class that’s the superclass of both load balancing +strategies). The backoff timeout will increase exponentially with each +sequential failure up to the maximum possible backoff timeout. +The maximum possible backoff is limited to 65536 seconds (about 18.2 hours). +For example:</p> +<div class="highlight-java"><div class="highlight"><pre><span class="c1">// Setup properties for the load balancing</span> +<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span> +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"client.type"</span><span class="o">,</span> <span class="s">"default_loadbalance"</span><span class="o">);</span> + +<span class="c1">// List of hosts (space-separated list of user-chosen host aliases)</span> +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts"</span><span class="o">,</span> <span class="s">"h1 h2 h3"</span><span class="o">);</span> + +<span class="c1">// host/port pair for each host alias</span> +<span class="n">String</span> <span class="n">host1</span> <span class="o">=</span> <span class="s">"host1.example.org:41414"</span><span class="o">;</span> +<span class="n">String</span> <span class="n">host2</span> <span class="o">=</span> <span class="s">"host2.example.org:41414"</span><span class="o">;</span> +<span class="n">String</span> <span class="n">host3</span> <span class="o">=</span> <span class="s">"host3.example.org:41414"</span><span class="o">;</span> +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h1"</span><span class="o">,</span> <span class="n">host1</span><span class="o">);</span> +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h2"</span><span class="o">,</span> <span class="n">host2</span><span class="o">);</span> +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"hosts.h3"</span><span class="o">,</span> <span class="n">host3</span><span class="o">);</span> + +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"host-selector"</span><span class="o">,</span> <span class="s">"random"</span><span class="o">);</span> <span class="c1">// For random host selection</span> +<span class="c1">// props.put("host-selector", "round_robin"); // For round-robin host</span> +<span class="c1">// // selection</span> +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"backoff"</span><span class="o">,</span> <span class="s">"true"</span><span class="o">);</span> <span class="c1">// Disabled by default.</span> + +<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"maxBackoff"</span><span class="o">,</span> <span class="s">"10000"</span><span class="o">);</span> <span class="c1">// Defaults 0, which effectively</span> + <span class="c1">// becomes 30000 ms</span> + +<span class="c1">// Create the client with load balancing properties</span> +<span class="n">RpcClient</span> <span class="n">client</span> <span class="o">=</span> <span class="n">RpcClientFactory</span><span class="o">.</span><span class="na">getInstance</span><span class="o">(</span><span class="n">props</span><span class="o">);</span> +</pre></div> +</div> +<p>For more flexibility, the load-balancing Flume client implementation +(<tt class="docutils literal"><span class="pre">LoadBalancingRpcClient</span></tt>) can be configured with these properties:</p> +<div class="highlight-properties"><div class="highlight"><pre><span class="na">client.type</span> <span class="o">=</span> <span class="s">default_loadbalance</span> + +<span class="na">hosts</span> <span class="o">=</span> <span class="s">h1 h2 h3 # At least 2 hosts are required</span> + +<span class="na">hosts.h1</span> <span class="o">=</span> <span class="s">host1.example.org:41414</span> + +<span class="na">hosts.h2</span> <span class="o">=</span> <span class="s">host2.example.org:41414</span> + +<span class="na">hosts.h3</span> <span class="o">=</span> <span class="s">host3.example.org:41414</span> + +<span class="na">backoff</span> <span class="o">=</span> <span class="s">false # Specifies whether the client should</span> + <span class="c"># back-off from (i.e. temporarily</span> + <span class="c"># blacklist) a failed host</span> + <span class="c"># (default: false).</span> + +<span class="na">maxBackoff</span> <span class="o">=</span> <span class="s">0 # Max timeout in millis that a will</span> + <span class="c"># remain inactive due to a previous</span> + <span class="c"># failure with that host (default: 0,</span> + <span class="c"># which effectively becomes 30000)</span> + +<span class="na">host-selector</span> <span class="o">=</span> <span class="s">round_robin # The host selection strategy used</span> + <span class="c"># when load-balancing among hosts</span> + <span class="c"># (default: round_robin).</span> + <span class="c"># Other values are include "random"</span> + <span class="c"># or the FQCN of a custom class</span> + <span class="c"># that implements</span> + <span class="c"># LoadBalancingRpcClient$HostSelector</span> + +<span class="na">batch-size</span> <span class="o">=</span> <span class="s">100 # Must be >=1 (default: 100)</span> + +<span class="na">connect-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span> + +<span class="na">request-timeout</span> <span class="o">=</span> <span class="s">20000 # Must be >=1000 (default: 20000)</span> +</pre></div> +</div> +</div> +</div> +<div class="section" id="embedded-agent"> +<h4>Embedded agent<a class="headerlink" href="#embedded-agent" title="Permalink to this headline">¶</a></h4> +<p>Flume has an embedded agent api which allows users to embed an agent in their +application. This agent is meant to be lightweight and as such not all +sources, sinks, and channels are allowed. Specifically the source used +is a special embedded source and events should be send to the source +via the put, putAll methods on the EmbeddedAgent object. Only File Channel +and Memory Channel are allowed as channels while Avro Sink is the only +supported sink. Interceptors are also supported by the embedded agent.</p> +<p>Note: The embedded agent has a dependency on hadoop-core.jar.</p> +<p>Configuration of an Embedded Agent is similar to configuration of a +full Agent. The following is an exhaustive list of configration options:</p> +<p>Required properties are in <strong>bold</strong>.</p> +<table border="1" class="docutils"> +<colgroup> +<col width="20%" /> +<col width="15%" /> +<col width="65%" /> +</colgroup> +<thead valign="bottom"> +<tr class="row-odd"><th class="head">Property Name</th> +<th class="head">Default</th> +<th class="head">Description</th> +</tr> +</thead> +<tbody valign="top"> +<tr class="row-even"><td>source.type</td> +<td>embedded</td> +<td>The only available source is the embedded source.</td> +</tr> +<tr class="row-odd"><td><strong>channel.type</strong></td> +<td>–</td> +<td>Either <tt class="docutils literal"><span class="pre">memory</span></tt> or <tt class="docutils literal"><span class="pre">file</span></tt> which correspond +to MemoryChannel and FileChannel respectively.</td> +</tr> +<tr class="row-even"><td>channel.*</td> +<td>–</td> +<td>Configuration options for the channel type requested, +see MemoryChannel or FileChannel user guide for an exhaustive list.</td> +</tr> +<tr class="row-odd"><td><strong>sinks</strong></td> +<td>–</td> +<td>List of sink names</td> +</tr> +<tr class="row-even"><td><strong>sink.type</strong></td> +<td>–</td> +<td>Property name must match a name in the list of sinks. +Value must be <tt class="docutils literal"><span class="pre">avro</span></tt></td> +</tr> +<tr class="row-odd"><td>sink.*</td> +<td>–</td> +<td>Configuration options for the sink. +See AvroSink user guide for an exhaustive list, +however note AvroSink requires at least hostname and port.</td> +</tr> +<tr class="row-even"><td><strong>processor.type</strong></td> +<td>–</td> +<td>Either <tt class="docutils literal"><span class="pre">failover</span></tt> or <tt class="docutils literal"><span class="pre">load_balance</span></tt> which correspond +to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.</td> +</tr> +<tr class="row-odd"><td>processor.*</td> +<td>–</td> +<td>Configuration options for the sink processor selected. +See FailoverSinksProcessor and LoadBalancingSinkProcessor +user guide for an exhaustive list.</td> +</tr> +<tr class="row-even"><td>source.interceptors</td> +<td>–</td> +<td>Space-separated list of interceptors</td> +</tr> +<tr class="row-odd"><td>source.interceptors.*</td> +<td>–</td> +<td>Configuration options for individual interceptors +specified in the source.interceptors property</td> +</tr> +</tbody> +</table> +<p>Below is an example of how to use the agent:</p> +<div class="highlight-java"><div class="highlight"><pre><span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">properties</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>();</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"channel.type"</span><span class="o">,</span> <span class="s">"memory"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"channel.capacity"</span><span class="o">,</span> <span class="s">"200"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sinks"</span><span class="o">,</span> <span class="s">"sink1 sink2"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink1.type"</span><span class="o">,</span> <span class="s">"avro"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink2.type"</span><span class="o">,</span> <span class="s">"avro"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink1.hostname"</span><span class="o">,</span> <span class="s">"collector1.apache.org"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink1.port"</span><span class="o">,</span> <span class="s">"5564"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink2.hostname"</span><span class="o">,</span> <span class="s">"collector2.apache.org"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"sink2.port"</span><span class="o">,</span> <span class="s">"5565"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"processor.type"</span><span class="o">,</span> <span class="s">"load_balance"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors"</span><span class="o">,</span> <span class="s">"i1"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors.i1.type"</span><span class="o">,</span> <span class="s">"static"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors.i1.key"</span><span class="o">,</span> <span class="s">"key1"</span><span class="o">);</span> +<span class="n">properties</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"source.interceptors.i1.value"</span><span class="o">,</span> <span class="s">"value1"</span><span class="o">);</span> + +<span class="n">EmbeddedAgent</span> <span class="n">agent</span> <span class="o">=</span> <span class="k">new</span> <span class="n">EmbeddedAgent</span><span class="o">(</span><span class="s">"myagent"</span><span class="o">);</span> + +<span class="n">agent</span><span class="o">.</span><span class="na">configure</span><span class="o">(</span><span class="n">properties</span><span class="o">);</span> +<span class="n">agent</span><span class="o">.</span><span class="na">start</span><span class="o">();</span> + +<span class="n">List</span><span class="o"><</span><span class="n">Event</span><span class="o">></span> <span class="n">events</span> <span class="o">=</span> <span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">();</span> + +<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span> +<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span> +<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span> +<span class="n">events</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span> + +<span class="n">agent</span><span class="o">.</span><span class="na">putAll</span><span class="o">(</span><span class="n">events</span><span class="o">);</span> + +<span class="o">...</span> + +<span class="n">agent</span><span class="o">.</span><span class="na">stop</span><span class="o">();</span> +</pre></div> +</div> +</div> +<div class="section" id="transaction-interface"> +<h4>Transaction interface<a class="headerlink" href="#transaction-interface" title="Permalink to this headline">¶</a></h4> +<p>The <tt class="docutils literal"><span class="pre">Transaction</span></tt> interface is the basis of reliability for Flume. All the +major components (ie. <tt class="docutils literal"><span class="pre">Source</span></tt>s, <tt class="docutils literal"><span class="pre">Sink</span></tt>s and <tt class="docutils literal"><span class="pre">Channel</span></tt>s) must use a +Flume <tt class="docutils literal"><span class="pre">Transaction</span></tt>.</p> +<div class="figure align-center"> +<img alt="Transaction sequence diagram" src="_images/DevGuide_image01.png" /> +</div> +<p>A <tt class="docutils literal"><span class="pre">Transaction</span></tt> is implemented within a <tt class="docutils literal"><span class="pre">Channel</span></tt> implementation. Each +<tt class="docutils literal"><span class="pre">Source</span></tt> and <tt class="docutils literal"><span class="pre">Sink</span></tt> that is connected to a <tt class="docutils literal"><span class="pre">Channel</span></tt> must obtain a +<tt class="docutils literal"><span class="pre">Transaction</span></tt> object. The <tt class="docutils literal"><span class="pre">Source</span></tt>s use a <tt class="docutils literal"><span class="pre">ChannelProcessor</span></tt> +to manage the <tt class="docutils literal"><span class="pre">Transaction</span></tt>s, the <tt class="docutils literal"><span class="pre">Sink</span></tt>s manage them explicitly via +their configured <tt class="docutils literal"><span class="pre">Channel</span></tt>. The operation to stage an +<tt class="docutils literal"><span class="pre">Event</span></tt> (put it into a <tt class="docutils literal"><span class="pre">Channel</span></tt>) or extract an <tt class="docutils literal"><span class="pre">Event</span></tt> (take it out of a +<tt class="docutils literal"><span class="pre">Channel</span></tt>) is done inside an active <tt class="docutils literal"><span class="pre">Transaction</span></tt>. For example:</p> +<div class="highlight-java"><div class="highlight"><pre><span class="n">Channel</span> <span class="n">ch</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MemoryChannel</span><span class="o">();</span> +<span class="n">Transaction</span> <span class="n">txn</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="na">getTransaction</span><span class="o">();</span> +<span class="n">txn</span><span class="o">.</span><span class="na">begin</span><span class="o">();</span> +<span class="k">try</span> <span class="o">{</span> + <span class="c1">// This try clause includes whatever Channel operations you want to do</span> + + <span class="n">Event</span> <span class="n">eventToStage</span> <span class="o">=</span> <span class="n">EventBuilder</span><span class="o">.</span><span class="na">withBody</span><span class="o">(</span><span class="s">"Hello Flume!"</span><span class="o">,</span> + <span class="n">Charset</span><span class="o">.</span><span class="na">forName</span><span class="o">(</span><span class="s">"UTF-8"</span><span class="o">));</span> + <span class="n">ch</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">eventToStage</span><span class="o">);</span> + <span class="c1">// Event takenEvent = ch.take();</span> + <span class="c1">// ...</span> + <span class="n">txn</span><span class="o">.</span><span class="na">commit</span><span class="o">();</span> +<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">Throwable</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span> + <span class="n">txn</span><span class="o">.</span><span class="na">rollback</span><span class="o">();</span> + + <span class="c1">// Log exception, handle individual exceptions as needed</span> + + <span class="c1">// re-throw all Errors</span> + <span class="k">if</span> <span class="o">(</span><span class="n">t</span> <span class="k">instanceof</span> <span class="n">Error</span><span class="o">)</span> <span class="o">{</span> + <span class="k">throw</span> <span class="o">(</span><span class="n">Error</span><span class="o">)</span><span class="n">t</span><span class="o">;</span> + <span class="o">}</span> +<span class="o">}</span> <span class="k">finally</span> <span class="o">{</span> + <span class="n">txn</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> +<span class="o">}</span> +</pre></div> +</div> +<p>Here we get hold of a <tt class="docutils literal"><span class="pre">Transaction</span></tt> from a <tt class="docutils literal"><span class="pre">Channel</span></tt>. After <tt class="docutils literal"><span class="pre">begin()</span></tt> +returns, the <tt class="docutils literal"><span class="pre">Transaction</span></tt> is now active/open and the <tt class="docutils literal"><span class="pre">Event</span></tt> is then put +into the <tt class="docutils literal"><span class="pre">Channel</span></tt>. If the put is successful, then the <tt class="docutils literal"><span class="pre">Transaction</span></tt> is +committed and closed.</p> +</div> +<div class="section" id="sink"> +<h4>Sink<a class="headerlink" href="#sink" title="Permalink to this headline">¶</a></h4> +<p>The purpose of a <tt class="docutils literal"><span class="pre">Sink</span></tt> to extract <tt class="docutils literal"><span class="pre">Event</span></tt>s from the <tt class="docutils literal"><span class="pre">Channel</span></tt> and +forward them to the next Flume Agent in the flow or store them in an external +repository. A <tt class="docutils literal"><span class="pre">Sink</span></tt> is associated with exactly one <tt class="docutils literal"><span class="pre">Channel</span></tt>s, as +configured in the Flume properties file. Thereâs one <tt class="docutils literal"><span class="pre">SinkRunner</span></tt> instance +associated with every configured <tt class="docutils literal"><span class="pre">Sink</span></tt>, and when the Flume framework calls +<tt class="docutils literal"><span class="pre">SinkRunner.start()</span></tt>, a new thread is created to drive the <tt class="docutils literal"><span class="pre">Sink</span></tt> (using +<tt class="docutils literal"><span class="pre">SinkRunner.PollingRunner</span></tt> as the thread’s <tt class="docutils literal"><span class="pre">Runnable</span></tt>). This thread manages +the <tt class="docutils literal"><span class="pre">Sink</span></tt>âs lifecycle. The <tt class="docutils literal"><span class="pre">Sink</span></tt> needs to implement the <tt class="docutils literal"><span class="pre">start()</span></tt> and +<tt class="docutils literal"><span class="pre">stop()</span></tt> methods that are part of the <tt class="docutils literal"><span class="pre">LifecycleAware</span></tt> interface. The +<tt class="docutils literal"><span class="pre">Sink.start()</span></tt> method should initialize the <tt class="docutils literal"><span class="pre">Sink</span></tt> and bring it to a state +where it can forward the <tt class="docutils literal"><span class="pre">Event</span></tt>s to its next destination. The +<tt class="docutils literal"><span class="pre">Sink.process()</span></tt> method should do the core processing of extracting the +<tt class="docutils literal"><span class="pre">Event</span></tt> from the <tt class="docutils literal"><span class="pre">Channel</span></tt> and forwarding it. The <tt class="docutils literal"><span class="pre">Sink.stop()</span></tt> method +should do the necessary cleanup (e.g. releasing resources). The <tt class="docutils literal"><span class="pre">Sink</span></tt> +implementation also needs to implement the <tt class="docutils literal"><span class="pre">Configurable</span></tt> interface for +processing its own configuration settings. For example:</p> +<div class="highlight-java"><div class="highlight"><pre><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MySink</span> <span class="kd">extends</span> <span class="n">AbstractSink</span> <span class="kd">implements</span> <span class="n">Configurable</span> <span class="o">{</span> + <span class="kd">private</span> <span class="n">String</span> <span class="n">myProp</span><span class="o">;</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">configure</span><span class="o">(</span><span class="n">Context</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span> + <span class="n">String</span> <span class="n">myProp</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="s">"myProp"</span><span class="o">,</span> <span class="s">"defaultValue"</span><span class="o">);</span> + + <span class="c1">// Process the myProp value (e.g. validation)</span> + + <span class="c1">// Store myProp for later retrieval by process() method</span> + <span class="k">this</span><span class="o">.</span><span class="na">myProp</span> <span class="o">=</span> <span class="n">myProp</span><span class="o">;</span> + <span class="o">}</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">start</span><span class="o">()</span> <span class="o">{</span> + <span class="c1">// Initialize the connection to the external repository (e.g. HDFS) that</span> + <span class="c1">// this Sink will forward Events to ..</span> + <span class="o">}</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">stop</span> <span class="o">()</span> <span class="o">{</span> + <span class="c1">// Disconnect from the external respository and do any</span> + <span class="c1">// additional cleanup (e.g. releasing resources or nulling-out</span> + <span class="c1">// field values) ..</span> + <span class="o">}</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">Status</span> <span class="nf">process</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">EventDeliveryException</span> <span class="o">{</span> + <span class="n">Status</span> <span class="n">status</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> + + <span class="c1">// Start transaction</span> + <span class="n">Channel</span> <span class="n">ch</span> <span class="o">=</span> <span class="n">getChannel</span><span class="o">();</span> + <span class="n">Transaction</span> <span class="n">txn</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="na">getTransaction</span><span class="o">();</span> + <span class="n">txn</span><span class="o">.</span><span class="na">begin</span><span class="o">();</span> + <span class="k">try</span> <span class="o">{</span> + <span class="c1">// This try clause includes whatever Channel operations you want to do</span> + + <span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="na">take</span><span class="o">();</span> + + <span class="c1">// Send the Event to the external repository.</span> + <span class="c1">// storeSomeData(e);</span> + + <span class="n">txn</span><span class="o">.</span><span class="na">commit</span><span class="o">();</span> + <span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">READY</span><span class="o">;</span> + <span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">Throwable</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span> + <span class="n">txn</span><span class="o">.</span><span class="na">rollback</span><span class="o">();</span> + + <span class="c1">// Log exception, handle individual exceptions as needed</span> + + <span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">BACKOFF</span><span class="o">;</span> + + <span class="c1">// re-throw all Errors</span> + <span class="k">if</span> <span class="o">(</span><span class="n">t</span> <span class="k">instanceof</span> <span class="n">Error</span><span class="o">)</span> <span class="o">{</span> + <span class="k">throw</span> <span class="o">(</span><span class="n">Error</span><span class="o">)</span><span class="n">t</span><span class="o">;</span> + <span class="o">}</span> + <span class="o">}</span> + <span class="k">return</span> <span class="n">status</span><span class="o">;</span> + <span class="o">}</span> +<span class="o">}</span> +</pre></div> +</div> +</div> +<div class="section" id="source"> +<h4>Source<a class="headerlink" href="#source" title="Permalink to this headline">¶</a></h4> +<p>The purpose of a <tt class="docutils literal"><span class="pre">Source</span></tt> is to receive data from an external client and store +it into the configured <tt class="docutils literal"><span class="pre">Channel</span></tt>s. A <tt class="docutils literal"><span class="pre">Source</span></tt> can get an instance of its own +<tt class="docutils literal"><span class="pre">ChannelProcessor</span></tt> to process an <tt class="docutils literal"><span class="pre">Event</span></tt>, commited within a <tt class="docutils literal"><span class="pre">Channel</span></tt> +local transaction, in serial. In the case of an exception, required +<tt class="docutils literal"><span class="pre">Channel</span></tt>s will propagate the exception, all <tt class="docutils literal"><span class="pre">Channel</span></tt>s will rollback their +transaction, but events processed previously on other <tt class="docutils literal"><span class="pre">Channel</span></tt>s will remain +committed.</p> +<p>Similar to the <tt class="docutils literal"><span class="pre">SinkRunner.PollingRunner</span></tt> <tt class="docutils literal"><span class="pre">Runnable</span></tt>, thereâs +a <tt class="docutils literal"><span class="pre">PollingRunner</span></tt> <tt class="docutils literal"><span class="pre">Runnable</span></tt> that executes on a thread created when the +Flume framework calls <tt class="docutils literal"><span class="pre">PollableSourceRunner.start()</span></tt>. Each configured +<tt class="docutils literal"><span class="pre">PollableSource</span></tt> is associated with its own thread that runs a +<tt class="docutils literal"><span class="pre">PollingRunner</span></tt>. This thread manages the <tt class="docutils literal"><span class="pre">PollableSource</span></tt>âs lifecycle, +such as starting and stopping. A <tt class="docutils literal"><span class="pre">PollableSource</span></tt> implementation must +implement the <tt class="docutils literal"><span class="pre">start()</span></tt> and <tt class="docutils literal"><span class="pre">stop()</span></tt> methods that are declared in the +<tt class="docutils literal"><span class="pre">LifecycleAware</span></tt> interface. The runner of a <tt class="docutils literal"><span class="pre">PollableSource</span></tt> invokes that +<tt class="docutils literal"><span class="pre">Source</span></tt>‘s <tt class="docutils literal"><span class="pre">process()</span></tt> method. The <tt class="docutils literal"><span class="pre">process()</span></tt> method should check for +new data and store it into the <tt class="docutils literal"><span class="pre">Channel</span></tt> as Flume <tt class="docutils literal"><span class="pre">Event</span></tt>s.</p> +<p>Note that there are actually two types of <tt class="docutils literal"><span class="pre">Source</span></tt>s. The <tt class="docutils literal"><span class="pre">PollableSource</span></tt> +was already mentioned. The other is the <tt class="docutils literal"><span class="pre">EventDrivenSource</span></tt>. The +<tt class="docutils literal"><span class="pre">EventDrivenSource</span></tt>, unlike the <tt class="docutils literal"><span class="pre">PollableSource</span></tt>, must have its own callback +mechanism that captures the new data and stores it into the <tt class="docutils literal"><span class="pre">Channel</span></tt>. The +<tt class="docutils literal"><span class="pre">EventDrivenSource</span></tt>s are not each driven by their own thread like the +<tt class="docutils literal"><span class="pre">PollableSource</span></tt>s are. Below is an example of a custom <tt class="docutils literal"><span class="pre">PollableSource</span></tt>:</p> +<div class="highlight-java"><div class="highlight"><pre><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MySource</span> <span class="kd">extends</span> <span class="n">AbstractSource</span> <span class="kd">implements</span> <span class="n">Configurable</span><span class="o">,</span> <span class="n">PollableSource</span> <span class="o">{</span> + <span class="kd">private</span> <span class="n">String</span> <span class="n">myProp</span><span class="o">;</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">configure</span><span class="o">(</span><span class="n">Context</span> <span class="n">context</span><span class="o">)</span> <span class="o">{</span> + <span class="n">String</span> <span class="n">myProp</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="s">"myProp"</span><span class="o">,</span> <span class="s">"defaultValue"</span><span class="o">);</span> + + <span class="c1">// Process the myProp value (e.g. validation, convert to another type, ...)</span> + + <span class="c1">// Store myProp for later retrieval by process() method</span> + <span class="k">this</span><span class="o">.</span><span class="na">myProp</span> <span class="o">=</span> <span class="n">myProp</span><span class="o">;</span> + <span class="o">}</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">start</span><span class="o">()</span> <span class="o">{</span> + <span class="c1">// Initialize the connection to the external client</span> + <span class="o">}</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">stop</span> <span class="o">()</span> <span class="o">{</span> + <span class="c1">// Disconnect from external client and do any additional cleanup</span> + <span class="c1">// (e.g. releasing resources or nulling-out field values) ..</span> + <span class="o">}</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">Status</span> <span class="nf">process</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">EventDeliveryException</span> <span class="o">{</span> + <span class="n">Status</span> <span class="n">status</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> + + <span class="k">try</span> <span class="o">{</span> + <span class="c1">// This try clause includes whatever Channel/Event operations you want to do</span> + + <span class="c1">// Receive new data</span> + <span class="n">Event</span> <span class="n">e</span> <span class="o">=</span> <span class="n">getSomeData</span><span class="o">();</span> + + <span class="c1">// Store the Event into this Source's associated Channel(s)</span> + <span class="n">getChannelProcessor</span><span class="o">().</span><span class="na">processEvent</span><span class="o">(</span><span class="n">e</span><span class="o">);</span> + + <span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">READY</span><span class="o">;</span> + <span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">Throwable</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// Log exception, handle individual exceptions as needed</span> + + <span class="n">status</span> <span class="o">=</span> <span class="n">Status</span><span class="o">.</span><span class="na">BACKOFF</span><span class="o">;</span> + + <span class="c1">// re-throw all Errors</span> + <span class="k">if</span> <span class="o">(</span><span class="n">t</span> <span class="k">instanceof</span> <span class="n">Error</span><span class="o">)</span> <span class="o">{</span> + <span class="k">throw</span> <span class="o">(</span><span class="n">Error</span><span class="o">)</span><span class="n">t</span><span class="o">;</span> + <span class="o">}</span> + <span class="o">}</span> <span class="k">finally</span> <span class="o">{</span> + <span class="n">txn</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> + <span class="o">}</span> + <span class="k">return</span> <span class="n">status</span><span class="o">;</span> + <span class="o">}</span> +<span class="o">}</span> +</pre></div> +</div> +</div> +<div class="section" id="channel"> +<h4>Channel<a class="headerlink" href="#channel" title="Permalink to this headline">¶</a></h4> +<p>TBD</p> +</div> +</div> +</div> +</div> + + + </div> + </div> + </div> + <div class="sphinxsidebar"> + <div class="sphinxsidebarwrapper"><h3><a href="index.html">Apache Flume</a></h3> +<ul> +<li class="toctree-l1"><a class="reference internal" href="getinvolved.html">How to Get Involved</a></li> +<li class="toctree-l1"><a class="reference internal" href="download.html">Download</a></li> +<li class="toctree-l1"><a class="reference internal" href="documentation.html">Documentation</a></li> +<li class="toctree-l1"><a class="reference internal" href="releases/index.html">Releases</a></li> +<li class="toctree-l1"><a class="reference internal" href="mailinglists.html">Mailing lists</a></li> +<li class="toctree-l1"><a class="reference internal" href="team.html">Team</a></li> +<li class="toctree-l1"><a class="reference internal" href="source.html">Source Repository</a></li> +<li class="toctree-l1"><a class="reference internal" href="license.html">Apache License</a></li> +</ul> + +<h3>Resources</h3> + +<ul class="this-page-menu"> + <li><a href="https://issues.apache.org/jira/browse/FLUME">Flume Issue Tracking (Jira)</a></li> + <li><a href="http://cwiki.apache.org/confluence/display/FLUME">Flume Wiki</a></li> + <li><a href="http://cwiki.apache.org/confluence/display/FLUME/Getting+Started">Getting Started Guide</a></li> + <li><a href="https://builds.apache.org/job/flume-trunk/">Jenkins Continuous Integration Server</a></li> + <li><a href="https://analysis.apache.org/">Sonar Code Quality Reports</a</li> +</ul> + +<h3>Apache</h3> + +<ul class="this-page-menu"> + <li><a href="http://www.apache.org">Home</a></li> + <li><a href="http://www.apache.org/foundation/sponsorship.html">Sponsorship</a></li> + <li><a href="http://www.apache.org/licenses">Licenses</a> </li> + <li><a href="http://www.apache.org/foundation/thanks.html">Thanks</a></li> + <li><a href="http://www.apachecon.com">Conferences</a></li> + <li><a href="http://www.apache.org/security/">Security</a></li> +</ul> + + +<h3><a href="index.html">This Page</a></h3> +<ul> +<li><a class="reference internal" href="#">Flume 1.8.0 Developer Guide</a><ul> +<li><a class="reference internal" href="#introduction">Introduction</a><ul> +<li><a class="reference internal" href="#overview">Overview</a></li> +<li><a class="reference internal" href="#architecture">Architecture</a><ul> +<li><a class="reference internal" href="#data-flow-model">Data flow model</a></li> +<li><a class="reference internal" href="#reliability">Reliability</a></li> +</ul> +</li> +<li><a class="reference internal" href="#building-flume">Building Flume</a><ul> +<li><a class="reference internal" href="#getting-the-source">Getting the source</a></li> +<li><a class="reference internal" href="#compile-test-flume">Compile/test Flume</a></li> +<li><a class="reference internal" href="#updating-protocol-buffer-version">Updating Protocol Buffer Version</a></li> +</ul> +</li> +<li><a class="reference internal" href="#developing-custom-components">Developing custom components</a><ul> +<li><a class="reference internal" href="#client">Client</a><ul> +<li><a class="reference internal" href="#client-sdk">Client SDK</a></li> +<li><a class="reference internal" href="#rpc-client-interface">RPC client interface</a></li> +<li><a class="reference internal" href="#rpc-clients-avro-and-thrift">RPC clients - Avro and Thrift</a></li> +<li><a class="reference internal" href="#secure-rpc-client-thrift">Secure RPC client - Thrift</a></li> +<li><a class="reference internal" href="#failover-client">Failover Client</a></li> +<li><a class="reference internal" href="#loadbalancing-rpc-client">LoadBalancing RPC client</a></li> +</ul> +</li> +<li><a class="reference internal" href="#embedded-agent">Embedded agent</a></li> +<li><a class="reference internal" href="#transaction-interface">Transaction interface</a></li> +<li><a class="reference internal" href="#sink">Sink</a></li> +<li><a class="reference internal" href="#source">Source</a></li> +<li><a class="reference internal" href="#channel">Channel</a></li> +</ul> +</li> +</ul> +</li> +</ul> +</li> +</ul> + + </div> + </div> + <div class="clearer"></div> + </div> +<div class="footer"> + © Copyright 2009-2017 The Apache Software Foundation. Apache Flume, Flume, Apache, the Apache feather logo, and the Apache Flume project logo are trademarks of The Apache Software Foundation.. +</div> + </body> +</html> \ No newline at end of file
