[
https://issues.apache.org/jira/browse/SPARK-2201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14081935#comment-14081935
]
sunsc commented on SPARK-2201:
------------------------------
The problem of the original implementation is that the config(host:port) is
static and allows only one host:port. Once host or port changed, the flume
agent should be restarted to reload the conf.
To solve it, one solution is to set a virtual address instead of a real address
in the flume conf. Meanwhile, a address router was introduced that can tell us
all the real addresses of a virtual address and notify such events that a real
address is added to or removed from the virtual address.
I found the router can be easily implemented by the zookeeper. In such scenario:
1. A spark receiver selects a free port and creates a tmp node with the path
/path/to/logicalhost/host:port to zookeeper when started.
If 3 receivers started, three nodes (host1:port1, host2:port2, host3:port3)
will be created under /path/to/logicalhost;
2. On the side of flume agent, the flume sink gets the children nodes
(host1:port1, host2:port2, host3:port3) from /path/to/logicalhost and buffers
them into a ClientPool.
When append called, it selects a client from ClientPool in a round-robin manner
and call client.append to send events.
3. If any receiver crashed/started, the tmp zk node will be removed/added, and
then ClientPool will remove/add the client from the buffer since it watched
those zk children events.
In my implementation:
LogicalHostRouter is the implementation of the address router. You know, the
spark or flume should not know the existence of zk.
The ZkProxy is an encapsulation of the zk curator client.
> Improve FlumeInputDStream's stability and make it scalable
> ----------------------------------------------------------
>
> Key: SPARK-2201
> URL: https://issues.apache.org/jira/browse/SPARK-2201
> Project: Spark
> Issue Type: Improvement
> Components: Streaming
> Reporter: sunsc
>
> Currently:
> FlumeUtils.createStream(ssc, "localhost", port);
> This means that only one flume receiver can work with FlumeInputDStream .so
> the solution is not scalable.
> I use a zookeeper to solve this problem.
> Spark flume receivers register themselves to a zk path when started, and a
> flume agent get physical hosts and push events to them.
> Some works need to be done here:
> 1.receiver create tmp node in zk, listeners just watch those tmp nodes.
> 2. when spark FlumeReceivers started, they acquire a physical host
> (localhost's ip and an idle port) and register itself to zookeeper.
> 3. A new flume sink. In the method of appendEvents, they get physical hosts
> and push data to them in a round-robin manner.
--
This message was sent by Atlassian JIRA
(v6.2#6252)