Jon,

I have completed the code to send the Acks back by reusing the connection used 
for Event transportation.

The basic design can be simply described as below (I will prepare the design 
document)

1. Add a hostList array to Event, so the Event will record each host it passes 
by.
2. Start another two thread for each FlumeNode: AckDistributor and AckReceiver.
3. When Event arrives destination and the sink file is closed, copy the 
hostList to Ack.
4. Ack will be immediately sent to AckDistributor by collector.
5. AckDistributor will send the acks either to next node or the local 
WALManager, according to the hostList in Ack.
6. AckReceiver will wait for the Ack with the same connection used for Event.
7. The EventSource (ThriftEventSource) will capture the connection used for 
Event, then pass the connection to AckDistributor to reuse it to send Ack back.

It is a bit tricky to capture the thrift connection and reuse it.

I have implemented above scheme and tested it. It worked well in my tests.

I have tested the following cases:
1. Simple one hop agent --> collector case.
2. Multi-hop
    I traced the Ack in each hop, they are passed correctly. At the agent side, 
I found that the logs are correct.
3. Fan-out
    I works well with several E2E fanout at collector side.

    I am a liitle surprised to see that the fanout with all E2E sink 
(collectorSink) works well now. The existing flume does not work with this 
configuration, and there is a bug track for this problem: "FanOut Collector 
does not support multiple collectorSink attributes", FLUME-265, 
https://issues.apache.org/jira/browse/FLUME-265

    In my Ack enhancement, I found that the duplicate acks received by agent 
will simply be abandoned. Further enhancement is still required. But in a 
sense, my implementation has automatically fixed this bug.

I am preparing the document and patch, and will send you asap. From this 
weekend, we will have a long summer vacation.

Cheers,
Kun

-----Original Message-----
From: Wang, Yongkun | Yongkun | DU
Sent: Thursday, July 07, 2011 7:08 PM
To: 'Jonathan Hsieh'
Cc: Flume Development; Marthinussen, Terje; Sugino, Junsei | Jun | DU; 
Primozich, Neil | Neil | DU
Subject: RE: Send acks without getting the master involved

Jon,

I would like to have a brief summary of all the designs:

1. Push acks by collector with a new connection. Agent is server.
    - Pros: can send acks immediately back when they are ready.
    - Cons: need a new connection.

2. Piggy back acks by agent
    - Pros: minimum resource requirement
    - Cons: become complicated for multi-hop case. When agent stops sending 
events, the acks need to wait agent to open connection and send event again so 
that they can be piggied back.

3. Push acks by collector with agent's connection. Agent is client. Reuse 
agent's event connection. Combination/Compromise of 1 and 2.
    - Pros: No new connection required. can push acks by collector once they 
are ready.
    - Cons: One concern is that once agent closes the connection, acks have to 
wait until the connection is opened again by agent. But I studied flume source 
code and traced it at runtime, finding that the connection was closed only when 
reconfiguring the source/sink for agent. That is, the connection seems always 
available. Another concern may be that reusing the connection over thrift is 
not easy, need to store a map of agent<->socket on the server (collector) side 
(add a hash map in TSaneThreadPoolServer).

4. using UDP for (1)
    - Pros: no connection required
    - Cons: if acks loss happens, agents will resend the event and hereby 
duplicates will happen.

Now I am focusing on (3).


About design (1),

>The seems to say that if there is more than one agent, maybe all acks for the 
>agents should share one port?

Yes, all agents on one host should share one port for acks. There is a 
standalone ack process (or a thread) per host to listen on the acks for all the 
agents on that host. When acks coming, the ack process will notify the agents 
to move pending acks to the complete queue and delete the WAL log.

My current implementation is that agent starts a thread to listen on the acks. 
So it works well with one agent per host.
If there are more than one agent, each agent will start an ack thread, so there 
will be more than one thread try to bind on the same port, hereby a port 
conflict occurs because only one server socket can bind to the port at a time. 
Need to enhance it as described above to use one process (thread) to handle 
acks for all agents on the same host.

>Another note: It sounds like you have working code -- I'd love to see it.

I have the implementation based on 0.9.3.

Design (1) is implemented, and most of the modifications on the source has been 
described in a previous message. It works well now for most of the cases, 
except multi agents on one host.
I can prepare a patch (maybe the whole flume0.9.3 tar ball) for (1) and submit 
to you for preview. It would be very helpful for me to have your comments and 
feedbacks.

About (2), Piggy back by agent, my implementation goes as follow:

    thrift code:
        oneway void append( 1:ThriftFlumeEvent evt )
    --> set<string> appendWithPiggyBack( 1:ThriftFlumeEvent evt )
    The returning set contains the acks piggied back if they are available on 
collector.

Now I am working on design (3).


Regards,
Kun

-----Original Message-----
From: Jonathan Hsieh [mailto:[email protected]]
Sent: Sunday, July 03, 2011 4:05 AM
To: Wang, Yongkun | Yongkun | DU
Cc: Flume Development; Marthinussen, Terje; Sugino, Junsei | Jun | DU; 
Primozich, Neil | Neil | DU
Subject: Re: Send acks without getting the master involved

Kun,

The concerns make sense and the back-and-forth is good.  My goal is to make 
sure that other potential designs and problems are considered as well.   
Technically,  I agree with the points you've brought up, and I buy that having 
the collector send info back to the agent is reasonable.  My main concern is 
not necessarily the communications patterns of the "passiveness" you bring up 
-- it is the extra resource consumption and operational complicity from new 
connections.  There are however some workarounds that could be implemented to 
eventually address these concerns.

Another note: It sounds like you have working code -- I'd love to see it.  This 
would start off as a branch and we can review and commit code there.   The 
prerequisites to for merging to trunk would be 1) making it  an option (keep 
the old known to basically work implementation), 2) document setup and explain 
pros and cons.  As long as its an option we can we could put it into trunk once 
its been tested well enough, we could potentially make it the default!  A 
distilled version of these conversations would eventually end up in the 
documentation.

More comments inline.


Jon.

On Thu, Jun 23, 2011 at 3:39 AM, Wang, Yongkun | Yongkun | DU 
<[email protected]> wrote:



        Jon,

        I am happy on receiving your detailed reply. The flume 0.9.4 must take 
you a lot of time and I am glad that it was released successfully.

        I like the idea of piggying back the acks with the same connection. But 
I have the following concerns:
        1. It seems that the event transportation is a "one-way" append manner. 
We may need to modify it to bi-directional. I am not sure whether it is 
possible in one rpc call in thrift to piggy back the acks when sending the 
event data, and how many classes will be affected.



I'm suggesting that another call could be added that isn't "one-way".  The main 
resource consumption concern that may affect scalability is adding more 
potentially long lived connections.   Adding function's doesn't cost much.


        2. The piggy back is an passive way of getting the acks, which means 
that the acks may be possibly left on the middle way for a multi-hop flow. That 
is, the agent stops sending, then some acks on the next next^n hop cannot be 
piggy back any more.




I don't buy the passive argument -- see the answer above.  Its the agents that 
need the ack -- so if they go down and don't come back up, I can see there is 
some wasted network traffic.  This seems minor.

However, I can see that a major weakness with the piggy-backing.  A multi-hop 
situation with piggy backing could be significantly  more complicated -- you'd 
potentially have to keep a chain with the route, or some how tell the agent 
which collector the group got processed by.  I prefer simpler over complicated 
so this potentially disqualifies the piggybacking approach.


        My original design is to send acks directly back by the final 
collectors. This needs a new connection. At the agent side, starting a thread 
as a server listening on a configured port for ack. Once the collector closes 
the hdfs files, it will connect to the agent ack server with the host name 
gotten from Event and the configured port for ack, sending the acks directly 
back to agent without going through the intermediate nodes.



I think my main contention is the new connection.  I can think of a few 
alternatives -- using a direct UDP message or going to a known place (which is 
the current master approach).   I'm pretty convinced that the UDP message 
approach seems reasonable and really is an  optimization that addresses my 
concern but could be done later.


        I have tested this implementation, it works well for most of the cases, 
including some advanced functions such as the fanout on collector side.
        In some cases this implementation would be problematic. For example, 
one host with more than one agents (ServerSocket can not bind on the same 
port). If there are firewalls between agent and final collector, the connection 
may not be setup (ack port may be blocked by the firewall).




The seems to say that if there is more than one agent, maybe all acks for the 
agents should share one port?


        It seems that you guys don't like using agent as a server for a new 
connection for acks, nor do I.
        I would like to hear and discuss more details about the piggy back 
design.


        Regards,
        Kun


        -----Original Message-----
        From: Jonathan Hsieh [mailto:[email protected]]

        Sent: Thursday, June 23, 2011 2:40 AM
        To: Wang, Yongkun | Yongkun | DU
        Cc: Flume Development; Marthinussen, Terje; Sugino, Junsei | Jun | DU; 
Primozich, Neil | Neil | DU
        Subject: Re: Send acks without getting the master involved

        Kun,



        Sorry it took me so long to get back to this -- I had started a 
response but didn't finish it in one sitting.

        I generally like this, but just wanted to open some questions up.  
(essentially the same but trying to avoid creating yet another server and yet 
another connection).

        My first instinct is to piggy back the agent getting acks on the same 
connection to the collector instead of having a client contact the agent.  
Several folks have concerns about the directionality of connections.  (they 
prefer the agents to be clients that initiate requests or connections and don't 
like them to be servers).    Have you considered adding an extra rpc method to 
server underneath the rpc source that can be periodically called asking about 
particular acks?  Whats your opinion of this?

        On Tue, Jun 14, 2011 at 3:25 AM, Wang, Yongkun | Yongkun | DU 
<[email protected]> wrote:


               Jon,

               I find a class, ThriftAckedEventSink, which sending the event 
then waiting for the acks. It is nicely implemented with a sliding window 
(nonblocking, default 4-byte frame size). It should work perfectly to send the 
data then get the acks on the same connection, without getting the master 
involved.
               But I find this sink (ThriftAckedEventSink) is marked as 
deprecated in sink factory (SinkFactoryImpl) and not used any more 
(ThriftEventSink is used instead, acks go through the master).
               I would like to know your consideration on the deprecation of 
this class.




        I'm fine with undeprecating it if it has a reason to exist.  Maybe just 
give it a different name if its rpc calls change?

        The ThriftAckedEventSink is good for one hop, but doesn't handle a 
multihop-ack situation and isn't really useful for delayed acks necessary 
unless something is built on top of it (which hopefully we'll do!). In this 
particular case, we need to send the ack after hdfs has closed/flushed the file 
(not just after it is received by the collector or received by hdfs).

        This could be done at the application level.   We could potentially use 
this call's return value to pass back end-to-end ack info (It would however be 
an ack of previous sets).

        At some point I'd like to have this rpc connection send batches of 
messages (instead of individual events) with a one hop group ack.  It would 
keep the same source and sinks api as everything else but the rpc calls 
underneath would be different.


               If the ThriftAckedEventSink cannot be enabled for some reason, I 
considered sending the acks by collector directly to agent. The design is 
described as follow:

               - agent:
               a) Disable the action of checking acks during the heartbeat



        ok.


               b) Move the check function from master to agent's ack manager. 
When this function is called by collector, it move the corresponding acks from 
pending acks queue to complete queue.




        I'm a little concerned about opening another server on an agent.


               - master:
               delete the master ack manager



        I think I prefer disable (or have a parallel implementation so that one 
can fall back to the older mode) until this new mechanism is reasonably robust.


               - collector:
               a) In collector sink, change the ack set to ack map, 
"rollAckMap": a hash map of ack to the event host;



        >Where do we get the host?  I think we should be able to get this from 
the  special ack message's host field.

         From Event, Event.getHost().


               b) When doing the check of ack checksum, appending not only the 
acks, but their hosts to "rollAckMap";



        > I think you mean tracks the acks and the hosts, right?

        Yes, the hosts where the events come from.


               c) When closing the sink file, call the agent's check function, 
sending the acks to corresponding host in "rollAckMap" (with collector's source 
port).




        >So the sink file here is the hdfs file, right?

        Yes,

        >There are the acks that are waiting to be flushed.  Currently when 
flushed, these are sent to a master.  I think you  are suggesting to push these 
acks into a "completed" set instead of the master.  Is this right?

         Yes



               The corresponding modification on source code is briefly 
described as follow, FYI.

               -agent:
               a) Disable/Comment the ackcheck.checkAcks() in LivenessManager;



        Prefer disable / allow for alternate implementation that can be 
selected at config time.  If the interface needs to be changed, that is ok.


               b) In WALAckManager, disable/comment the checkAcks() which 
contacts master to check the required acks in pending queue;
                  Add checkAck(ackid), which will move the ackid from pending 
ack queue to the complete queue (done).



        >maybe rename to checkAck(ackid) to recieveAck(ackid)?

        ok, this is simple my implementation for testing.


               -master:
               Delete/comment MasterAckManager



        disable (or deprecate for the time being)


               - collector:
               a) In CollectorSink, change rollAckSet to rollAckMap (HashMap), 
to hold not only the acks, but the corresponding host.


        ok

               b) In AckAccumulator, modify rollAckSet.add(group) to 
rollAckMap.put(group, host), change end(group) to end(group, host);
                  In AckChecksumChecker, in append() method, change 
listener.end(k) to listener.end(k, e.getHost());


        ok

               c) In RollDetectDeco, in flushRollAcks(), for each ack id in 
rollAckMap, call the agent's checkAck(ackid) via the Thrift/Avro using the 
corresponding host and collector's source port (new connection for each host).



        Let's discuss this connection.



               Regards,
               Kun




               -----Original Message-----
               From: Jonathan Hsieh [mailto:[email protected]]

               Sent: Tuesday, May 31, 2011 2:10 AM
               To: Wang, Yongkun "DU"
               Cc: Flume Development
               Subject: Re: Send acks without getting the master involved

               Kun,

               Not clear what "pushing tthe acks by collector" means.  Its 
really important that we understand a basically agree (makes reviews way 
easier) so I'm going to ask you to elaborate on some of the seemingly most 
basic things.

               Who is the server and who is the client?  Would this be going on 
the same connection that exist between an agent and a collector or is this new 
connection?

               When you say the acks in TCP do you mean the acks in the initial 
handshake or sequence numbered acks during normal transmission for flow control?

               Jon.



               On Mon, May 30, 2011 at 2:53 AM, Wang, Yongkun "DU" 
<[email protected]> wrote:


                      Jon,

                      I feel that pushing the acks by collector looks more 
straightforward, like the acks in TCP/IP.


                      Regards,
                      Kun


                      -----Original Message-----
                      From: Wang, Yongkun "DU"
                      Sent: Friday, May 27, 2011 5:58 PM
                      To: 'Jonathan Hsieh'
                      Cc: Flume Development
                      Subject: RE: Send acks without getting the master involved

                      Jon,

                      Thank you for the correction.

                      Inside the CollectorSink, a hash structure rollAckSet 
holds the acks temporarilly in the memory. Firstly the AckChecksumChecker 
checks the acks, then a helper class RollDetectDeco will do flushRollAcks() 
when close() is triggerred, and the acks are sent to master by 
CollectorAckListener.

                      I am ok with the design of pulling the acks by agents 
periodically. My concern is that the pulling period may have the influence on 
the number of lost acks when collector goes down.
                      In contrast, if pushing the acks by collector right after 
the sink closes, the acks can be pushed back immediately. In this case, the 
number of lost acks is determined by the time before sink closing, if the 
collector goes down.

                      Lets focus on the design of pulling the acks by agents. 
The function to send acks to master inside the flushRollAcks() can be commented 
firstly, and a method for agent to pull is needed in the collector.

                      Regards,

                      Kun

                      -----Original Message-----
                      From: Jonathan Hsieh [mailto:[email protected]]

                      Sent: Thursday, May 26, 2011 11:12 PM

                      To: Wang, Yongkun "DU"
                      Cc: Flume Development
                      Subject: Re: Send acks without getting the master involved

                      Kun,

                      Let me add a subtle correction.


                      On Wed, May 25, 2011 at 2:32 AM, Wang, Yongkun "DU" 
<[email protected]> wrote:


                             Jon,

                             Thank you very much for the links. I found them at 
the wiki of flume github.

                             I read the source code these days.

                             The current system works as described generally 
below: (Please correct me if sth is wrong)

                             On the agent side, the HeartbeatThread thread 
inside the LivenessManager periodically (heartbeat period) checks the acks 
through the WALAckManager;


                      Yes


                             On the collector side, the CollectorSink uses the 
CollectorAckListener to send the ack groups to master;


                      The collector side has two parts -- a AckChecker which 
first collects valid/checksumed acks and puts them into an in memory holding 
place.  Then there is a RollStateDeco that pushes ack group info the master 
when the current file/sink closes cleanly (thus guaranteeing data delivery and 
durability).


                             On the master side, the MasterClientServer holds 
the acks by FlumeMaster and MasterAckManager.



                      Yes


                             My design is to move some functions of 
MasterAckManager to the agent side, then open a method to the collector to 
append the acks.



                      My first stab here is to have the collector just hold 
onto the acks and have the agent side periodically pull back ack events as 
return values when it ships data.  This works fine in the default topology 
(agent->collector) but would have problems in something more complicated.  
Since agent->collector is the main use case I'm ok with a solution that only 
addresses this case as long as we have a reasonable story to evolve to handle 
more complicated cases.


                             It seems that the collector doesn't know the 
address of the agents. An enhancement may be required on the event to carry the 
network information of agents.



                      The events currently contain the source host of the data. 
 You could add the host of the machine that generated the initial acks.  Also, 
per connection, the rpc sources (thrift for sure, avro most likely) should be 
able to get the host or at least the host ip of the machine connecting to it.


                             I created a entry about this improvement in JIRA:
                             https://issues.cloudera.org/browse/FLUME-640



                      Great!  As we agree on different parts of the design, we 
can put information there.


                             Best regards,

                             Kun

                             -----Original Message-----
                             From: Jonathan Hsieh [mailto:[email protected]]

                             Sent: Wednesday, May 25, 2011 3:18 AM
                             To: Wang, Yongkun "DU"
                             Cc: Flume Development
                             Subject: Re: Send acks without getting the master 
involved

                             Here are some basic links (some pieces need 
updates).

                             
https://github.com/cloudera/flume/wiki/Development-documentation
                             
https://github.com/cloudera/flume/wiki/HowToContribute

                             Lets talk about a design at a high level before we 
go down into the guts!.

                             Jon.


                             On Tue, May 17, 2011 at 12:25 AM, Wang, Yongkun 
"DU" <[email protected]> wrote:


                                    Jon,

                                    Thank you very much for your reply. This 
cheers me up!

                                    Could you please tell me the procedures I 
should follow to start the development?

                                    I would like to have discussion with flume 
experts.

                                    Thanks again.
                                    Kun


                                    -----Original Message-----
                                    From: Jonathan Hsieh 
[mailto:[email protected]]
                                    Sent: Tuesday, May 17, 2011 1:07 AM
                                    To: Wang, Yongkun "DU"
                                    Cc: Flume Development
                                    Subject: Re: Send acks without getting the 
master involved

                                    [bcc flume-user, +flume-dev]

                                    Kun,

                                    A redesign of the acks has been mentioned 
several times and is on our wishlist.  I think several of use would really love 
to have something happen here.  If you are interested in taking this on, and 
since this is a large change, I think it would be best if we discussed the 
potential design first and got basic agreement on how it would work before a 
large code chunks start appearing.

                                    We definitely want to help!

                                    Thanks,
                                    Jon.


                                    On Mon, May 16, 2011 at 3:56 AM, Wang, 
Yongkun "DU" <[email protected]> wrote:


                                           hi,

                                           In the E2E mode, if the master is 
down, then the acks will be lost. It seems that even the multi masters cannot 
handle this single point failure issue.

                                           I am considering to have some 
development on flume to send the acks back without getting the master involved.
                                           The acks can be pushed back by 
collectors after CollectorSink, or pulled by agents periodically.

                                           A passive way may be to subclass 
some classes such as the CollectorSink, make a plugin to achieve this target;
                                           An agressive way can be some 
modifications on flume source to provide it as an enhanced E2E mode.

                                           I am still studying the flume. Any 
suggestions or feedbacks are appreciated.

                                           Best regards,
                                           Kun




                                    --
                                    // Jonathan Hsieh (shay)
                                    // Software Engineer, Cloudera

                                    // [email protected]







                             --
                             // Jonathan Hsieh (shay)
                             // Software Engineer, Cloudera

                             // [email protected]







                      --
                      // Jonathan Hsieh (shay)
                      // Software Engineer, Cloudera

                      // [email protected]







               --
               // Jonathan Hsieh (shay)
               // Software Engineer, Cloudera

               // [email protected]







        --
        // Jonathan Hsieh (shay)
        // Software Engineer, Cloudera

        // [email protected]







--
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera

// [email protected]


Reply via email to