Updated Branches: refs/heads/flume-1.4 7157802eb -> 3daeab6fe
FLUME-1620: Update flume user guide for LoadBalancingSinkProcessor with the backoff changes. (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3daeab6f Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3daeab6f Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3daeab6f Branch: refs/heads/flume-1.4 Commit: 3daeab6feeabd1eea85a1771eb5861e8b06d91a9 Parents: 7157802 Author: Brock Noland <[email protected]> Authored: Thu Oct 18 11:58:52 2012 -0500 Committer: Brock Noland <[email protected]> Committed: Thu Oct 18 11:59:12 2012 -0500 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeDeveloperGuide.rst | 54 +++++++++++++++++++--- flume-ng-doc/sphinx/FlumeUserGuide.rst | 35 ++++++++------ 2 files changed, 66 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/3daeab6f/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst index 8b73a06..25ded18 100644 --- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst +++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst @@ -197,7 +197,7 @@ Failover handler '''''''''''''''' This class wraps the Avro RPC client to provide failover handling capability to -clients. This takes a list of host/ports of the Flume agent. If thereâs an +clients. This takes a whitespace separated list of host/ports of the Flume agents. If thereâs an error in communicating the current agent, then it automatically falls back to the next agent in the list: @@ -212,13 +212,53 @@ the next agent in the list: // address/port pair for each host props.put("hosts.host1", host1 + ":" + port1); - props.put("hosts.host1", host2 + ":" + port2); - props.put("hosts.host1", host3 + ":" + port3); + props.put("hosts.host2", host2 + ":" + port2); + props.put("hosts.host3", host3 + ":" + port3); // create the client with failover properties - client = (FailoverRpcClient); - RpcClientFactory.getInstance(props); + client = RpcClientFactory.getInstance(props); + +LoadBalancing Rpc Client +'''''''''''''''''''''''' + +Flume SDK also supports an RpcClient which load balances between multiple +hosts. This takes a whitespace separated list of host:port of Flume agents. This +client can be configured to either load balance or randomly select among the +configured agents. You can also specify a class that implements the +``LoadBalancingRpcClient$HostSelector`` interface in the properties object to +generate the selection order. + +If ``backoff`` is enabled, the client will blacklist +hosts that fail, removing them for selection for a given timeout. When the +timeout ends, if the host is still unresponsive timeout is increased +exponentially to avoid potentially getting stuck in long waits on unresponsive +hosts. +The maximum backoff time can be configured by setting ``maxBackoff`` - in milliseconds. +There is currently no default maximum back off time, so the backoff will increase +exponentially unless this property is set. +.. code-block:: java + + // Setup properties for the load balancing + Properties props = new Properties(); + props.put("client.type", "DEFAULT_LOADBALANCE"); + + // list of hosts + props.put("hosts", "host1 host2 host3"); + + // address/port pair for each host + props.put("hosts.host1", host1 + ":" + port1); + props.put("hosts.host2", host2 + ":" + port2); + props.put("hosts.host3", host3 + ":" + port3); + + props.put("host-selector","random"); //for random order + //props.put("host-selector","round_robin"); //for round robin order + props.put("backoff", "true"); //disabled by default. + + props.put("maxBackoff", "10000"); //default = No Maximum. + + // create the client with load balancing properties + client = RpcClientFactory.getInstance(props); Transaction interface ~~~~~~~~~~~~~~~~~~~~~ @@ -280,7 +320,7 @@ configuration settings: public class FooSink extends AbstractSink implements Configurable { @Override public void configure(Context context) { - some_Param = context.get("some_param", String.class); + some_Param = context.getString("some_param", "default_value"); // process some_param ⦠} @Override @@ -336,7 +376,7 @@ data: public class BarSource extends AbstractSource implements Configurable, PollableSource { @Override public void configure(Context context) { - some_Param = context.get("some_param", String.class); + some_Param = context.getString("some_param", "default_value"); // process some_param ⦠} @Override http://git-wip-us.apache.org/repos/asf/flume/blob/3daeab6f/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 8160ca4..86bbd1a 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -894,11 +894,11 @@ Example for agent named **agent_foo**: agent_foo.channels = memoryChannel-1 agent_foo.sources.legacysource-1.type = your.namespace.YourClass agent_foo.sources.legacysource-1.channels = memoryChannel-1 - + Scribe Source ~~~~~~~~~~~~~ -Scribe is another type of ingest system. To adopt existing Scribe ingest system, +Scribe is another type of ingest system. To adopt existing Scribe ingest system, Flume should use ScribeSource based on Thrift with compatible transfering protocol. The deployment of Scribe please following guide from Facebook. Required properties are in **bold**. @@ -1602,11 +1602,10 @@ Load balancing Sink Processor Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed. Implementation supports distributing load using -either via ``ROUND_ROBIN``, ``RANDOM``, ``ROUND_ROBIN_BACKOFF``, or -``RANDOM_BACKOFF`` selection mechanisms. The choice of selection mechanism -defaults to ``ROUND_ROBIN`` type, but can be overridden -via configuration. Custom selection mechanisms are supported via custom -classes that inherits from ``LoadBalancingSelector``. +either via ``ROUND_ROBIN`` or ``RANDOM`` selection mechanisms. +The choice of selection mechanism defaults to ``ROUND_ROBIN`` type, +but can be overridden via configuration. Custom selection mechanisms are +supported via custom classes that inherits from ``AbstractSinkSelector``. When invoked, this selector picks the next sink using its configured selection mechanism and invokes it. For ROUND_ROBIN and RANDOM In case the selected sink @@ -1614,7 +1613,9 @@ fails to deliver the event, the processor picks the next available sink via its configured selection mechanism. This implementation does not blacklist the failing sink and instead continues to optimistically attempt every available sink. If all sinks invocations result in failure, the selector -propagates the failure to the sink runner. The BACKOFF variants will blacklist +propagates the failure to the sink runner. + +If ``backoff`` is enabled, the sink processor will blacklist sinks that fail, removing them for selection for a given timeout. When the timeout ends, if the sink is still unresponsive timeout is increased exponentially to avoid potentially getting stuck in long waits on unresponsive @@ -1624,16 +1625,16 @@ sinks. Required properties are in **bold**. -==================================== =============== =============================================================== +==================================== =============== ========================================================================== Property Name Default Description -==================================== =============== =============================================================== -**processor.sinks** -- Space separated list of sinks that are participating in the group +==================================== =============== ========================================================================== +**processor.sinks** -- Space separated list of sinks that are participating in the group **processor.type** ``default`` The component type name, needs to be ``load_balance`` +processor.backoff true Should failed sinks be backed off exponentially. processor.selector ``ROUND_ROBIN`` Selection mechanism. Must be either ``ROUND_ROBIN``, ``RANDOM`` - ``ROUND_ROBIN_BACKOFF``, ``RANDOM_BACKOFF`` or custom FQDN to - class that inherits from ``LoadBalancingSelector`` + or custom FQDN to class that inherits from ``AbstractSinkSelector`` processor.selector.maxBackoffMillis 30000 used by backoff selectors to limit exponential backoff in miliseconds -==================================== =============== =============================================================== +==================================== =============== ========================================================================== Example for agent named **agent_foo**: @@ -1642,8 +1643,10 @@ Example for agent named **agent_foo**: agent_foo.sinkgroups = group1 agent_foo.sinkgroups.group1.sinks = sink1 sink2 agent_foo.sinkgroups.group1.processor.type = load_balance + agent_foo.sinkgroups.group1.processor.backoff = true agent_foo.sinkgroups.group1.processor.selector = random + Custom Sink Processor ~~~~~~~~~~~~~~~~~~~~~ @@ -1679,8 +1682,8 @@ are named components, here is an example of how they are created through configu Note that the interceptor builders are passed to the type config parameter. The interceptors are themselves configurable and can be passed configuration values just like they are passed to any other configurable component. In the above example, events are passed to the HostInterceptor first and the events returned by the HostInterceptor -are then passed along to the TimestampInterceptor. You can specify either the fully qualified class name (FQCN) -or the alias ``TIMESTAMP``. If you have multiple collectors writing to the same HDFS path then you could also use +are then passed along to the TimestampInterceptor. You can specify either the fully qualified class name (FQCN) +or the alias ``TIMESTAMP``. If you have multiple collectors writing to the same HDFS path then you could also use the HostInterceptor. Timestamp Interceptor
