Jon,
I have completed the code to send the Acks back by reusing the connection used
for Event transportation.
The basic design can be simply described as below (I will prepare the design
document)
1. Add a hostList array to Event, so the Event will record each host it passes
by.
2. Start another two thread for each FlumeNode: AckDistributor and AckReceiver.
3. When Event arrives destination and the sink file is closed, copy the
hostList to Ack.
4. Ack will be immediately sent to AckDistributor by collector.
5. AckDistributor will send the acks either to next node or the local
WALManager, according to the hostList in Ack.
6. AckReceiver will wait for the Ack with the same connection used for Event.
7. The EventSource (ThriftEventSource) will capture the connection used for
Event, then pass the connection to AckDistributor to reuse it to send Ack back.
It is a bit tricky to capture the thrift connection and reuse it.
I have implemented above scheme and tested it. It worked well in my tests.
I have tested the following cases:
1. Simple one hop agent --> collector case.
2. Multi-hop
I traced the Ack in each hop, they are passed correctly. At the agent side,
I found that the logs are correct.
3. Fan-out
I works well with several E2E fanout at collector side.
I am a liitle surprised to see that the fanout with all E2E sink
(collectorSink) works well now. The existing flume does not work with this
configuration, and there is a bug track for this problem: "FanOut Collector
does not support multiple collectorSink attributes", FLUME-265,
https://issues.apache.org/jira/browse/FLUME-265
In my Ack enhancement, I found that the duplicate acks received by agent
will simply be abandoned. Further enhancement is still required. But in a
sense, my implementation has automatically fixed this bug.
I am preparing the document and patch, and will send you asap. From this
weekend, we will have a long summer vacation.
Cheers,
Kun
-----Original Message-----
From: Wang, Yongkun | Yongkun | DU
Sent: Thursday, July 07, 2011 7:08 PM
To: 'Jonathan Hsieh'
Cc: Flume Development; Marthinussen, Terje; Sugino, Junsei | Jun | DU;
Primozich, Neil | Neil | DU
Subject: RE: Send acks without getting the master involved
Jon,
I would like to have a brief summary of all the designs:
1. Push acks by collector with a new connection. Agent is server.
- Pros: can send acks immediately back when they are ready.
- Cons: need a new connection.
2. Piggy back acks by agent
- Pros: minimum resource requirement
- Cons: become complicated for multi-hop case. When agent stops sending
events, the acks need to wait agent to open connection and send event again so
that they can be piggied back.
3. Push acks by collector with agent's connection. Agent is client. Reuse
agent's event connection. Combination/Compromise of 1 and 2.
- Pros: No new connection required. can push acks by collector once they
are ready.
- Cons: One concern is that once agent closes the connection, acks have to
wait until the connection is opened again by agent. But I studied flume source
code and traced it at runtime, finding that the connection was closed only when
reconfiguring the source/sink for agent. That is, the connection seems always
available. Another concern may be that reusing the connection over thrift is
not easy, need to store a map of agent<->socket on the server (collector) side
(add a hash map in TSaneThreadPoolServer).
4. using UDP for (1)
- Pros: no connection required
- Cons: if acks loss happens, agents will resend the event and hereby
duplicates will happen.
Now I am focusing on (3).
About design (1),
>The seems to say that if there is more than one agent, maybe all acks for the
>agents should share one port?
Yes, all agents on one host should share one port for acks. There is a
standalone ack process (or a thread) per host to listen on the acks for all the
agents on that host. When acks coming, the ack process will notify the agents
to move pending acks to the complete queue and delete the WAL log.
My current implementation is that agent starts a thread to listen on the acks.
So it works well with one agent per host.
If there are more than one agent, each agent will start an ack thread, so there
will be more than one thread try to bind on the same port, hereby a port
conflict occurs because only one server socket can bind to the port at a time.
Need to enhance it as described above to use one process (thread) to handle
acks for all agents on the same host.
>Another note: It sounds like you have working code -- I'd love to see it.
I have the implementation based on 0.9.3.
Design (1) is implemented, and most of the modifications on the source has been
described in a previous message. It works well now for most of the cases,
except multi agents on one host.
I can prepare a patch (maybe the whole flume0.9.3 tar ball) for (1) and submit
to you for preview. It would be very helpful for me to have your comments and
feedbacks.
About (2), Piggy back by agent, my implementation goes as follow:
thrift code:
oneway void append( 1:ThriftFlumeEvent evt )
--> set<string> appendWithPiggyBack( 1:ThriftFlumeEvent evt )
The returning set contains the acks piggied back if they are available on
collector.
Now I am working on design (3).
Regards,
Kun
-----Original Message-----
From: Jonathan Hsieh [mailto:[email protected]]
Sent: Sunday, July 03, 2011 4:05 AM
To: Wang, Yongkun | Yongkun | DU
Cc: Flume Development; Marthinussen, Terje; Sugino, Junsei | Jun | DU;
Primozich, Neil | Neil | DU
Subject: Re: Send acks without getting the master involved
Kun,
The concerns make sense and the back-and-forth is good. My goal is to make
sure that other potential designs and problems are considered as well.
Technically, I agree with the points you've brought up, and I buy that having
the collector send info back to the agent is reasonable. My main concern is
not necessarily the communications patterns of the "passiveness" you bring up
-- it is the extra resource consumption and operational complicity from new
connections. There are however some workarounds that could be implemented to
eventually address these concerns.
Another note: It sounds like you have working code -- I'd love to see it. This
would start off as a branch and we can review and commit code there. The
prerequisites to for merging to trunk would be 1) making it an option (keep
the old known to basically work implementation), 2) document setup and explain
pros and cons. As long as its an option we can we could put it into trunk once
its been tested well enough, we could potentially make it the default! A
distilled version of these conversations would eventually end up in the
documentation.
More comments inline.
Jon.
On Thu, Jun 23, 2011 at 3:39 AM, Wang, Yongkun | Yongkun | DU
<[email protected]> wrote:
Jon,
I am happy on receiving your detailed reply. The flume 0.9.4 must take
you a lot of time and I am glad that it was released successfully.
I like the idea of piggying back the acks with the same connection. But
I have the following concerns:
1. It seems that the event transportation is a "one-way" append manner.
We may need to modify it to bi-directional. I am not sure whether it is
possible in one rpc call in thrift to piggy back the acks when sending the
event data, and how many classes will be affected.
I'm suggesting that another call could be added that isn't "one-way". The main
resource consumption concern that may affect scalability is adding more
potentially long lived connections. Adding function's doesn't cost much.
2. The piggy back is an passive way of getting the acks, which means
that the acks may be possibly left on the middle way for a multi-hop flow. That
is, the agent stops sending, then some acks on the next next^n hop cannot be
piggy back any more.
I don't buy the passive argument -- see the answer above. Its the agents that
need the ack -- so if they go down and don't come back up, I can see there is
some wasted network traffic. This seems minor.
However, I can see that a major weakness with the piggy-backing. A multi-hop
situation with piggy backing could be significantly more complicated -- you'd
potentially have to keep a chain with the route, or some how tell the agent
which collector the group got processed by. I prefer simpler over complicated
so this potentially disqualifies the piggybacking approach.
My original design is to send acks directly back by the final
collectors. This needs a new connection. At the agent side, starting a thread
as a server listening on a configured port for ack. Once the collector closes
the hdfs files, it will connect to the agent ack server with the host name
gotten from Event and the configured port for ack, sending the acks directly
back to agent without going through the intermediate nodes.
I think my main contention is the new connection. I can think of a few
alternatives -- using a direct UDP message or going to a known place (which is
the current master approach). I'm pretty convinced that the UDP message
approach seems reasonable and really is an optimization that addresses my
concern but could be done later.
I have tested this implementation, it works well for most of the cases,
including some advanced functions such as the fanout on collector side.
In some cases this implementation would be problematic. For example,
one host with more than one agents (ServerSocket can not bind on the same
port). If there are firewalls between agent and final collector, the connection
may not be setup (ack port may be blocked by the firewall).
The seems to say that if there is more than one agent, maybe all acks for the
agents should share one port?
It seems that you guys don't like using agent as a server for a new
connection for acks, nor do I.
I would like to hear and discuss more details about the piggy back
design.
Regards,
Kun
-----Original Message-----
From: Jonathan Hsieh [mailto:[email protected]]
Sent: Thursday, June 23, 2011 2:40 AM
To: Wang, Yongkun | Yongkun | DU
Cc: Flume Development; Marthinussen, Terje; Sugino, Junsei | Jun | DU;
Primozich, Neil | Neil | DU
Subject: Re: Send acks without getting the master involved
Kun,
Sorry it took me so long to get back to this -- I had started a
response but didn't finish it in one sitting.
I generally like this, but just wanted to open some questions up.
(essentially the same but trying to avoid creating yet another server and yet
another connection).
My first instinct is to piggy back the agent getting acks on the same
connection to the collector instead of having a client contact the agent.
Several folks have concerns about the directionality of connections. (they
prefer the agents to be clients that initiate requests or connections and don't
like them to be servers). Have you considered adding an extra rpc method to
server underneath the rpc source that can be periodically called asking about
particular acks? Whats your opinion of this?
On Tue, Jun 14, 2011 at 3:25 AM, Wang, Yongkun | Yongkun | DU
<[email protected]> wrote:
Jon,
I find a class, ThriftAckedEventSink, which sending the event
then waiting for the acks. It is nicely implemented with a sliding window
(nonblocking, default 4-byte frame size). It should work perfectly to send the
data then get the acks on the same connection, without getting the master
involved.
But I find this sink (ThriftAckedEventSink) is marked as
deprecated in sink factory (SinkFactoryImpl) and not used any more
(ThriftEventSink is used instead, acks go through the master).
I would like to know your consideration on the deprecation of
this class.
I'm fine with undeprecating it if it has a reason to exist. Maybe just
give it a different name if its rpc calls change?
The ThriftAckedEventSink is good for one hop, but doesn't handle a
multihop-ack situation and isn't really useful for delayed acks necessary
unless something is built on top of it (which hopefully we'll do!). In this
particular case, we need to send the ack after hdfs has closed/flushed the file
(not just after it is received by the collector or received by hdfs).
This could be done at the application level. We could potentially use
this call's return value to pass back end-to-end ack info (It would however be
an ack of previous sets).
At some point I'd like to have this rpc connection send batches of
messages (instead of individual events) with a one hop group ack. It would
keep the same source and sinks api as everything else but the rpc calls
underneath would be different.
If the ThriftAckedEventSink cannot be enabled for some reason, I
considered sending the acks by collector directly to agent. The design is
described as follow:
- agent:
a) Disable the action of checking acks during the heartbeat
ok.
b) Move the check function from master to agent's ack manager.
When this function is called by collector, it move the corresponding acks from
pending acks queue to complete queue.
I'm a little concerned about opening another server on an agent.
- master:
delete the master ack manager
I think I prefer disable (or have a parallel implementation so that one
can fall back to the older mode) until this new mechanism is reasonably robust.
- collector:
a) In collector sink, change the ack set to ack map,
"rollAckMap": a hash map of ack to the event host;
>Where do we get the host? I think we should be able to get this from
the special ack message's host field.
From Event, Event.getHost().
b) When doing the check of ack checksum, appending not only the
acks, but their hosts to "rollAckMap";
> I think you mean tracks the acks and the hosts, right?
Yes, the hosts where the events come from.
c) When closing the sink file, call the agent's check function,
sending the acks to corresponding host in "rollAckMap" (with collector's source
port).
>So the sink file here is the hdfs file, right?
Yes,
>There are the acks that are waiting to be flushed. Currently when
flushed, these are sent to a master. I think you are suggesting to push these
acks into a "completed" set instead of the master. Is this right?
Yes
The corresponding modification on source code is briefly
described as follow, FYI.
-agent:
a) Disable/Comment the ackcheck.checkAcks() in LivenessManager;
Prefer disable / allow for alternate implementation that can be
selected at config time. If the interface needs to be changed, that is ok.
b) In WALAckManager, disable/comment the checkAcks() which
contacts master to check the required acks in pending queue;
Add checkAck(ackid), which will move the ackid from pending
ack queue to the complete queue (done).
>maybe rename to checkAck(ackid) to recieveAck(ackid)?
ok, this is simple my implementation for testing.
-master:
Delete/comment MasterAckManager
disable (or deprecate for the time being)
- collector:
a) In CollectorSink, change rollAckSet to rollAckMap (HashMap),
to hold not only the acks, but the corresponding host.
ok
b) In AckAccumulator, modify rollAckSet.add(group) to
rollAckMap.put(group, host), change end(group) to end(group, host);
In AckChecksumChecker, in append() method, change
listener.end(k) to listener.end(k, e.getHost());
ok
c) In RollDetectDeco, in flushRollAcks(), for each ack id in
rollAckMap, call the agent's checkAck(ackid) via the Thrift/Avro using the
corresponding host and collector's source port (new connection for each host).
Let's discuss this connection.
Regards,
Kun
-----Original Message-----
From: Jonathan Hsieh [mailto:[email protected]]
Sent: Tuesday, May 31, 2011 2:10 AM
To: Wang, Yongkun "DU"
Cc: Flume Development
Subject: Re: Send acks without getting the master involved
Kun,
Not clear what "pushing tthe acks by collector" means. Its
really important that we understand a basically agree (makes reviews way
easier) so I'm going to ask you to elaborate on some of the seemingly most
basic things.
Who is the server and who is the client? Would this be going on
the same connection that exist between an agent and a collector or is this new
connection?
When you say the acks in TCP do you mean the acks in the initial
handshake or sequence numbered acks during normal transmission for flow control?
Jon.
On Mon, May 30, 2011 at 2:53 AM, Wang, Yongkun "DU"
<[email protected]> wrote:
Jon,
I feel that pushing the acks by collector looks more
straightforward, like the acks in TCP/IP.
Regards,
Kun
-----Original Message-----
From: Wang, Yongkun "DU"
Sent: Friday, May 27, 2011 5:58 PM
To: 'Jonathan Hsieh'
Cc: Flume Development
Subject: RE: Send acks without getting the master involved
Jon,
Thank you for the correction.
Inside the CollectorSink, a hash structure rollAckSet
holds the acks temporarilly in the memory. Firstly the AckChecksumChecker
checks the acks, then a helper class RollDetectDeco will do flushRollAcks()
when close() is triggerred, and the acks are sent to master by
CollectorAckListener.
I am ok with the design of pulling the acks by agents
periodically. My concern is that the pulling period may have the influence on
the number of lost acks when collector goes down.
In contrast, if pushing the acks by collector right after
the sink closes, the acks can be pushed back immediately. In this case, the
number of lost acks is determined by the time before sink closing, if the
collector goes down.
Lets focus on the design of pulling the acks by agents.
The function to send acks to master inside the flushRollAcks() can be commented
firstly, and a method for agent to pull is needed in the collector.
Regards,
Kun
-----Original Message-----
From: Jonathan Hsieh [mailto:[email protected]]
Sent: Thursday, May 26, 2011 11:12 PM
To: Wang, Yongkun "DU"
Cc: Flume Development
Subject: Re: Send acks without getting the master involved
Kun,
Let me add a subtle correction.
On Wed, May 25, 2011 at 2:32 AM, Wang, Yongkun "DU"
<[email protected]> wrote:
Jon,
Thank you very much for the links. I found them at
the wiki of flume github.
I read the source code these days.
The current system works as described generally
below: (Please correct me if sth is wrong)
On the agent side, the HeartbeatThread thread
inside the LivenessManager periodically (heartbeat period) checks the acks
through the WALAckManager;
Yes
On the collector side, the CollectorSink uses the
CollectorAckListener to send the ack groups to master;
The collector side has two parts -- a AckChecker which
first collects valid/checksumed acks and puts them into an in memory holding
place. Then there is a RollStateDeco that pushes ack group info the master
when the current file/sink closes cleanly (thus guaranteeing data delivery and
durability).
On the master side, the MasterClientServer holds
the acks by FlumeMaster and MasterAckManager.
Yes
My design is to move some functions of
MasterAckManager to the agent side, then open a method to the collector to
append the acks.
My first stab here is to have the collector just hold
onto the acks and have the agent side periodically pull back ack events as
return values when it ships data. This works fine in the default topology
(agent->collector) but would have problems in something more complicated.
Since agent->collector is the main use case I'm ok with a solution that only
addresses this case as long as we have a reasonable story to evolve to handle
more complicated cases.
It seems that the collector doesn't know the
address of the agents. An enhancement may be required on the event to carry the
network information of agents.
The events currently contain the source host of the data.
You could add the host of the machine that generated the initial acks. Also,
per connection, the rpc sources (thrift for sure, avro most likely) should be
able to get the host or at least the host ip of the machine connecting to it.
I created a entry about this improvement in JIRA:
https://issues.cloudera.org/browse/FLUME-640
Great! As we agree on different parts of the design, we
can put information there.
Best regards,
Kun
-----Original Message-----
From: Jonathan Hsieh [mailto:[email protected]]
Sent: Wednesday, May 25, 2011 3:18 AM
To: Wang, Yongkun "DU"
Cc: Flume Development
Subject: Re: Send acks without getting the master
involved
Here are some basic links (some pieces need
updates).
https://github.com/cloudera/flume/wiki/Development-documentation
https://github.com/cloudera/flume/wiki/HowToContribute
Lets talk about a design at a high level before we
go down into the guts!.
Jon.
On Tue, May 17, 2011 at 12:25 AM, Wang, Yongkun
"DU" <[email protected]> wrote:
Jon,
Thank you very much for your reply. This
cheers me up!
Could you please tell me the procedures I
should follow to start the development?
I would like to have discussion with flume
experts.
Thanks again.
Kun
-----Original Message-----
From: Jonathan Hsieh
[mailto:[email protected]]
Sent: Tuesday, May 17, 2011 1:07 AM
To: Wang, Yongkun "DU"
Cc: Flume Development
Subject: Re: Send acks without getting the
master involved
[bcc flume-user, +flume-dev]
Kun,
A redesign of the acks has been mentioned
several times and is on our wishlist. I think several of use would really love
to have something happen here. If you are interested in taking this on, and
since this is a large change, I think it would be best if we discussed the
potential design first and got basic agreement on how it would work before a
large code chunks start appearing.
We definitely want to help!
Thanks,
Jon.
On Mon, May 16, 2011 at 3:56 AM, Wang,
Yongkun "DU" <[email protected]> wrote:
hi,
In the E2E mode, if the master is
down, then the acks will be lost. It seems that even the multi masters cannot
handle this single point failure issue.
I am considering to have some
development on flume to send the acks back without getting the master involved.
The acks can be pushed back by
collectors after CollectorSink, or pulled by agents periodically.
A passive way may be to subclass
some classes such as the CollectorSink, make a plugin to achieve this target;
An agressive way can be some
modifications on flume source to provide it as an enhanced E2E mode.
I am still studying the flume. Any
suggestions or feedbacks are appreciated.
Best regards,
Kun
--
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// [email protected]
--
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// [email protected]
--
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// [email protected]
--
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// [email protected]
--
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// [email protected]
--
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// [email protected]