Hi Paris, I like the proposed changes to the iteration API, this cleans up things in the Java API without any strict restriction I think (it was never a problem in the Scala API).
The termination algorithm based on the proposed scoped loops seems to be fairly simple and looks good :) Cheers, Gyula Paris Carbone <par...@kth.se> ezt írta (időpont: 2016. nov. 14., H, 8:50): > That would be great Shi! Let's take that offline. > > Anyone else interested in the iteration changes? It would be nice to > incorporate these to v1.2 if possible so I count on your review asap. > > cheers, > Paris > > On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <xiaogang....@alibaba-inc.com > <mailto:xiaogang....@alibaba-inc.com>> wrote: > > Hi Paris > > Unfortunately, the project is not public yet. > But i can provide you a primitive implementation of the update protocol in > the paper. It’s implemented in Storm. Since the protocol assumes the > communication channels between different tasks are dual, i think it’s not > easy to adapt it to Flink. > > Regards > Xiaogang > > > 在 2016年11月12日,上午3:03,Paris Carbone <par...@kth.se<mailto:par...@kth.se>> > 写道: > > Hi Shi, > > Naiad/Timely Dataflow and other projects use global coordination which is > very convenient for asynchronous progress tracking in general but it has > some downsides in a production systems that count on in-flight > transactional control mechanisms and rollback recovery guarantees. This is > why we generally prefer decentralized approaches (despite their our > downsides). > > Regarding synchronous/structured iterations, this is a bit off topic and > they are a bit of a different story as you already know. > We maintain a graph streaming (gelly-streams) library on Flink that you > might find interesting [1]. Vasia, another Flink committer is also working > on that among others. > You can keep an eye on it since we are planning to use this project as a > showcase for a new way of doing structured and fixpoint iterations on > streams in the future. > > P.S. many thanks for sharing your publication, it was an interesting read. > Do you happen to have your source code public? We could most certainly use > it in an benchmark soon. > > [1] https://github.com/vasia/gelly-streaming > > > On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaoga...@gmail.com<mailto: > shixiaoga...@gmail.com><mailto:shixiaoga...@gmail.com>> wrote: > > Hi, Fouad > > Thank you for the explanation. Now the centralized method seems correct to > me. > The passing of StatusUpdate events will lead to synchronous iterations and > we are using the information in each iterations to terminate the > computation. > > Actually, i prefer the centralized method because in many applications, the > convergence may depend on some global statistics. > For example, a PageRank program may terminate the computation when 99% > vertices are converged. > I think those learning programs which cannot reach the fixed-point > (oscillating around the fixed-point) can benefit a lot from such features. > The decentralized method makes it hard to support such convergence > conditions. > > > Another concern is that Flink cannot produce periodical results in the > iteration over infinite data streams. > Take a concrete example. Given an edge stream constructing a graph, the > user may need the PageRank weight of each vertex in the graphs formed at > certain instants. > Currently Flink does not provide any input or iteration information to > users, making users hard to implement such real-time iterative > applications. > Such features are supported in both Naiad and Tornado. I think Flink should > support it as well. > > What do you think? > > Regards > Xiaogang > > > 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsay...@gmail.com<mailto: > fouad.alsay...@gmail.com><mailto:fouad.alsay...@gmail.com>>: > > Hi Shi, > > It seems that you are referring to the centralized algorithm which is no > longer the proposed version. > In the decentralized version (check last doc) there is no master node or > global coordination involved. > > Let us keep this discussion to the decentralized one if possible. > > To answer your points on the previous approach, there is a catch in your > trace at t7. Here is what is happening : > - Head,as well as RS, will receive a 'BroadcastStatusUpdate' from > runtime (see 2.1 in the steps). > - RS and Heads will broadcast StatusUpdate event and will not notify its > status. > - When StatusUpdate event gets back to the head it will notify its > WORKING status. > > Hope that answers your concern. > > Best, > Fouad > > On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaoga...@gmail.com<mailto: > shixiaoga...@gmail.com><mailto:shixiaoga...@gmail.com>> > wrote: > > Hi Paris > > I have several concerns about the correctness of the termination > protocol. > I think the termination protocol put an end to the computation even when > the computation has not converged. > > Suppose there exists a loop context constructed by a OP operator, a Head > operator and a Tail operator (illustrated in Figure 2 in the first > draft). > The stream only contains one record. OP will pass the record to its > downstream operators 10 times. In other words, the loop should iterate 10 > times. > > If I understood the protocol correctly, the following event sequence may > happen in the computation: > t1: RS emits Record to OP. Since RS has reached the "end-of-stream", the > system enters into Speculative Phase. > t2: OP receives Record and emits it to TAIL. > t3: HEAD receives the UpdateStatus event, and notifies with an IDLE > state. > t4. OP receives the UpdateStatus event from HEAD, and notifies with an > WORKING state. > t5. TAIL receives Record and emits it to HEAD. > t6. TAIL receives the UpdateStatus event from OP, and notifies with an > WORKING state. > t7. The system starts a new attempt. HEAD receives the UpdateStatus event > and notifies with an IDLE state. (Record is still in transition.) > t8. OP receives the UpdateStatus event from HEAD and notifies with an > IDLE > state. > t9. TAIL receives the UpdateStatus event from OP and notifies with an > IDLE > state. > t10. HEAD receives Record from TAIL and emits it to OP. > t11. System puts an end to the computation. > > Though the computation is expected to iterate 10 times, it ends earlier. > The cause is that the communication channels of MASTER=>HEAD and > TAIL=>HEAD > are not synchronized. > > I think the protocol follows the idea of the Chandy-Lamport algorithm to > determine a global state. > But the information of whether a node has processed any record to since > the > last request is not STABLE. > Hence i doubt the correctness of the protocol. > > To determine the termination correctly, we need some information that is > stable. > In timelyflow, Naiad collects the progress made in each iteration and > terminates the loop when a little progress is made in an iteration > (identified by the timestamp vector). > The information is stable because the result of an iteration cannot be > changed by the execution of later iterations. > > A similar method is also adopted in Tornado. > You may see my paper for more details about the termination of loops: > http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf < > http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf> > > Regards > Xiaogang > > 2016-11-11 3:19 GMT+08:00 Paris Carbone <par...@kth.se<mailto: > par...@kth.se><mailto:par...@kth.se> <mailto: > par...@kth.se<mailto:par...@kth.se><mailto:par...@kth.se>>>: > > Hi again Flink folks, > > Here is our new proposal that addresses Job Termination - the loop fault > tolerance proposal will follow shortly. > As Stephan hinted, we need operators to be aware of their scope level. > > Thus, it is time we make loops great again! :) > > Part of this FLIP basically introduces a new functional, compositional > API > for defining asynchronous loops for DataStreams. > This is coupled with a decentralized algorithm for job termination with > loops - along the lines of what Stephan described. > We are already working on the actual prototypes as you can observe in > the > links of the doc. > > Please let us know if you like (or don't like) it and why, in this mail > discussion. > > https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y- > PfTHtq3173EhsAkpBoQ > > cheers > Paris and Fouad > > On 31 Oct 2016, at 12:53, Paris Carbone <par...@kth.se<mailto: > par...@kth.se> <mailto: > par...@kth.se<mailto:par...@kth.se><mailto:par...@kth.se>><mailto:parisc@ > kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/>>> > wrote: > > Hey Stephan, > > Thanks for looking into it! > > +1 for breaking this up, will do that. > > I can see your point and maybe it makes sense to introduce part of > scoping > to incorporate support for nested loops (otherwise it can’t work). > Let us think about this a bit. We will share another draft for a more > detail description of the approach you are suggesting asap. > > > On 27 Oct 2016, at 10:55, Stephan Ewen <se...@apache.org<mailto: > se...@apache.org><mailto:se...@apache.org> <mailto: > se...@apache.org<mailto:se...@apache.org><mailto:se...@apache.org > >><mailto:sewen > @apache.org<http://apache.org/>>> wrote: > > How about we break this up into two FLIPs? There are after all two > orthogonal problems (termination, fault tolerance) with quite different > discussion states. > > Concerning fault tolerance, I like the ideas. > For the termination proposal, I would like to iterate a bit more. > > *Termination algorithm:* > > My main concern here is the introduction of a termination coordinator > and > any involvement of RPC messages when deciding termination. > That would be such a fundamental break with the current runtime > architecture, and it would make the currently very elegant and simple > model > much more complicated and harder to maintain. Given that Flink's > runtime is > complex enough, I would really like to avoid that. > > The current runtime paradigm coordinates between operators strictly via > in-band events. RPC calls happen between operators and the master for > triggering and acknowledging execution and checkpoints. > > I was wondering whether we can keep following that paradigm and still > get > most of what you are proposing here. In some sense, all we need to do is > replace RPC calls with in-band events, and "decentralize" the > coordinator > such that every operator can make its own termination decision by > itself. > > This is only a rough sketch, you probably need to flesh it out more. > > - I assume that the OP in the diagram knows that it is in a loop and > that > it is the one connected to the head and tail > > - When OP receives and EndOfStream Event from the regular source (RS), > it > emits an "AttemptTermination" event downstream to the operators > involved in > the loop. It attaches an attempt sequence number and memorizes that > - Tail and Head forward these events > - When OP receives the event back with the same attempt sequence number, > and no records came in the meantime, it shuts down and emits EndOfStream > downstream > - When other records came back between emitting the AttemptTermination > event and receiving it back, then it emits a new AttemptTermination > event > with the next sequence number. > - This should terminate as soon as the loop is empty. > > Might this model even generalize to nested loops, where the > "AttemptTermination" event is scoped by the loop's nesting level? > > Let me know what you think! > > > Best, > Stephan > > > On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <se...@apache.org<mailto: > se...@apache.org><mailto:se...@apache.org> > <mailto:se...@apache.org><mailto: > se...@apache.org<mailto:se...@apache.org><mailto:se...@apache.org> > <mailto:se...@apache.org>>> wrote: > > Hi! > > I am still scanning it and compiling some comments. Give me a bit ;-) > > Stephan > > > On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <par...@kth.se<mailto: > par...@kth.se><mailto:par...@kth.se> <mailto: > par...@kth.se<mailto:par...@kth.se><mailto:par...@kth.se>><mailto: > par...@kth.se<mailto:par...@kth.se><mailto:par...@kth.se> <mailto: > par...@kth.se>>> wrote: > > Hey all, > > Now that many of you have already scanned the document (judging from the > views) maybe it is time to give back some feedback! > Did you like it? Would you suggest an improvement? > > I would suggest not to leave this in the void. It has to do with > important properties that the system promises to provide. > Me and Fouad will do our best to answer your questions and discuss this > further. > > cheers > Paris > > On 21 Oct 2016, at 08:54, Paris Carbone <par...@kth.se<mailto: > par...@kth.se><mailto:par...@kth.se> <mailto: > par...@kth.se<mailto:par...@kth.se><mailto:par...@kth.se>><mailto:parisc@ > kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/ > >><mailto:parisc@k > th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/>< > http://th.se<http://th.se/> <http://th.se/>>>> wrote: > > Hello everyone, > > Loops in Apache Flink have a good potential to become a much more > powerful thing in future version of Apache Flink. > There is generally high demand to make them usable and first of all > production-ready for upcoming releases. > > As a first commitment we would like to propose FLIP-13 for consistent > processing with Loops. > We are also working on scoped loops for Q1 2017 which we can share if > there is enough interest. > > For now, that is an improvement proposal that solves two pending major > issues: > > 1) The (not so trivial) problem of correct termination of jobs with > iterations > 2) The applicability of the checkpointing algorithm to iterative > dataflow > graphs. > > We would really appreciate it if you go through the linked draft > (motivation and proposed changes) for FLIP-13 and point out comments, > preferably publicly in this devlist discussion before we go ahead and > update the wiki. > > https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0 > BhDbtoYucmByBjRBISs/edit?usp=sharing > > cheers > > Paris and Fouad > >