Hi Paris,

I like the proposed changes to the iteration API, this cleans up things in
the Java API without any strict restriction I think (it was never a problem
in the Scala API).

The termination algorithm based on the proposed scoped loops seems to be
fairly simple and looks good :)

Cheers,
Gyula

Paris Carbone <par...@kth.se> ezt írta (időpont: 2016. nov. 14., H, 8:50):

> That would be great Shi! Let's take that offline.
>
> Anyone else interested in the iteration changes? It would be nice to
> incorporate these to v1.2 if possible so I count on your review asap.
>
> cheers,
> Paris
>
> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg <xiaogang....@alibaba-inc.com
> <mailto:xiaogang....@alibaba-inc.com>> wrote:
>
> Hi Paris
>
> Unfortunately, the project is not public yet.
> But i can provide you a primitive implementation of the update protocol in
> the paper. It’s implemented in Storm. Since the protocol assumes the
> communication channels between different tasks are dual, i think it’s not
> easy to adapt it to Flink.
>
> Regards
> Xiaogang
>
>
> 在 2016年11月12日,上午3:03,Paris Carbone <par...@kth.se<mailto:par...@kth.se>>
> 写道:
>
> Hi Shi,
>
> Naiad/Timely Dataflow and other projects use global coordination which is
> very convenient for asynchronous progress tracking in general but it has
> some downsides in a production systems that count on in-flight
> transactional control mechanisms and rollback recovery guarantees. This is
> why we generally prefer decentralized approaches (despite their our
> downsides).
>
> Regarding synchronous/structured iterations, this is a bit off topic and
> they are a bit of a different story as you already know.
> We maintain a graph streaming (gelly-streams) library on Flink that you
> might find interesting [1]. Vasia, another Flink committer is also working
> on that among others.
> You can keep an eye on it since we are planning to use this project as a
> showcase for a new way of doing structured and fixpoint iterations on
> streams in the future.
>
> P.S. many thanks for sharing your publication, it was an interesting read.
> Do you happen to have your source code public? We could most certainly use
> it in an benchmark soon.
>
> [1] https://github.com/vasia/gelly-streaming
>
>
> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaoga...@gmail.com<mailto:
> shixiaoga...@gmail.com><mailto:shixiaoga...@gmail.com>> wrote:
>
> Hi, Fouad
>
> Thank you for the explanation. Now the centralized method seems correct to
> me.
> The passing of StatusUpdate events will lead to synchronous iterations and
> we are using the information in each iterations to terminate the
> computation.
>
> Actually, i prefer the centralized method because in many applications, the
> convergence may depend on some global statistics.
> For example, a PageRank program may terminate the computation when 99%
> vertices are converged.
> I think those learning programs which cannot reach the fixed-point
> (oscillating around the fixed-point) can benefit a lot from such features.
> The decentralized method makes it hard to support such convergence
> conditions.
>
>
> Another concern is that Flink cannot produce periodical results in the
> iteration over infinite data streams.
> Take a concrete example. Given an edge stream constructing a graph, the
> user may need the PageRank weight of each vertex in the graphs formed at
> certain instants.
> Currently Flink does not provide any input or iteration information to
> users, making users hard to implement such real-time iterative
> applications.
> Such features are supported in both Naiad and Tornado. I think Flink should
> support it as well.
>
> What do you think?
>
> Regards
> Xiaogang
>
>
> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsay...@gmail.com<mailto:
> fouad.alsay...@gmail.com><mailto:fouad.alsay...@gmail.com>>:
>
> Hi Shi,
>
> It seems that you are referring to the centralized algorithm which is no
> longer the proposed version.
> In the decentralized version (check last doc) there is no master node or
> global coordination involved.
>
> Let us keep this discussion to the decentralized one if possible.
>
> To answer your points on the previous approach, there is a catch in your
> trace at t7. Here is what is happening :
> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
> runtime (see 2.1 in the steps).
> - RS and Heads will broadcast StatusUpdate  event and will not notify its
> status.
> - When StatusUpdate event gets back to the head it will notify its
> WORKING  status.
>
> Hope that answers your concern.
>
> Best,
> Fouad
>
> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaoga...@gmail.com<mailto:
> shixiaoga...@gmail.com><mailto:shixiaoga...@gmail.com>>
> wrote:
>
> Hi Paris
>
> I have several concerns about the correctness of the termination
> protocol.
> I think the termination protocol put an end to the computation even when
> the computation has not converged.
>
> Suppose there exists a loop context constructed by a OP operator, a Head
> operator and a Tail operator (illustrated in Figure 2 in the first
> draft).
> The stream only contains one record. OP will pass the record to its
> downstream operators 10 times. In other words, the loop should iterate 10
> times.
>
> If I understood the protocol correctly, the following event sequence may
> happen in the computation:
> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream", the
> system enters into Speculative Phase.
> t2:  OP receives Record and emits it to TAIL.
> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
> state.
> t4. OP receives the UpdateStatus event from HEAD, and notifies with an
> WORKING state.
> t5. TAIL receives Record and emits it to HEAD.
> t6. TAIL receives the UpdateStatus event from OP, and notifies with an
> WORKING state.
> t7. The system starts a new attempt. HEAD receives the UpdateStatus event
> and notifies with an IDLE state.  (Record is still in transition.)
> t8. OP receives the UpdateStatus event from HEAD and notifies with an
> IDLE
> state.
> t9. TAIL receives the UpdateStatus event from OP and notifies with an
> IDLE
> state.
> t10. HEAD receives Record from TAIL and emits it to OP.
> t11. System puts an end to the computation.
>
> Though the computation is expected to iterate 10 times, it ends earlier.
> The cause is that the communication channels of MASTER=>HEAD and
> TAIL=>HEAD
> are not synchronized.
>
> I think the protocol follows the idea of the Chandy-Lamport algorithm to
> determine a global state.
> But the information of whether a node has processed any record to since
> the
> last request is not STABLE.
> Hence i doubt the correctness of the protocol.
>
> To determine the termination correctly, we need some information that is
> stable.
> In timelyflow, Naiad collects the progress made in each iteration and
> terminates the loop when a little progress is made in an iteration
> (identified by the timestamp vector).
> The information is stable because the result of an iteration cannot be
> changed by the execution of later iterations.
>
> A similar method is also adopted in Tornado.
> You may see my paper for more details about the termination of loops:
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
>
> Regards
> Xiaogang
>
> 2016-11-11 3:19 GMT+08:00 Paris Carbone <par...@kth.se<mailto:
> par...@kth.se><mailto:par...@kth.se> <mailto:
> par...@kth.se<mailto:par...@kth.se><mailto:par...@kth.se>>>:
>
> Hi again Flink folks,
>
> Here is our new proposal that addresses Job Termination - the loop fault
> tolerance proposal will follow shortly.
> As Stephan hinted, we need operators to be aware of their scope level.
>
> Thus, it is time we make loops great again! :)
>
> Part of this FLIP basically introduces a new functional, compositional
> API
> for defining asynchronous loops for DataStreams.
> This is coupled with a decentralized algorithm for job termination with
> loops - along the lines of what Stephan described.
> We are already working on the actual prototypes as you can observe in
> the
> links of the doc.
>
> Please let us know if you like (or don't like) it and why, in this mail
> discussion.
>
> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
> PfTHtq3173EhsAkpBoQ
>
> cheers
> Paris and Fouad
>
> On 31 Oct 2016, at 12:53, Paris Carbone <par...@kth.se<mailto:
> par...@kth.se> <mailto:
> par...@kth.se<mailto:par...@kth.se><mailto:par...@kth.se>><mailto:parisc@
> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/>>>
> wrote:
>
> Hey Stephan,
>
> Thanks for looking into it!
>
> +1 for breaking this up, will do that.
>
> I can see your point and maybe it makes sense to introduce part of
> scoping
> to incorporate support for nested loops (otherwise it can’t work).
> Let us think about this a bit. We will share another draft for a more
> detail description of the approach you are suggesting asap.
>
>
> On 27 Oct 2016, at 10:55, Stephan Ewen <se...@apache.org<mailto:
> se...@apache.org><mailto:se...@apache.org> <mailto:
> se...@apache.org<mailto:se...@apache.org><mailto:se...@apache.org
> >><mailto:sewen
> @apache.org<http://apache.org/>>> wrote:
>
> How about we break this up into two FLIPs? There are after all two
> orthogonal problems (termination, fault tolerance) with quite different
> discussion states.
>
> Concerning fault tolerance, I like the ideas.
> For the termination proposal, I would like to iterate a bit more.
>
> *Termination algorithm:*
>
> My main concern here is the introduction of a termination coordinator
> and
> any involvement of RPC messages when deciding termination.
> That would be such a fundamental break with the current runtime
> architecture, and it would make the currently very elegant and simple
> model
> much more complicated and harder to maintain. Given that Flink's
> runtime is
> complex enough, I would really like to avoid that.
>
> The current runtime paradigm coordinates between operators strictly via
> in-band events. RPC calls happen between operators and the master for
> triggering and acknowledging execution and checkpoints.
>
> I was wondering whether we can keep following that paradigm and still
> get
> most of what you are proposing here. In some sense, all we need to do is
> replace RPC calls with in-band events, and "decentralize" the
> coordinator
> such that every operator can make its own termination decision by
> itself.
>
> This is only a rough sketch, you probably need to flesh it out more.
>
> - I assume that the OP in the diagram knows that it is in a loop and
> that
> it is the one connected to the head and tail
>
> - When OP receives and EndOfStream Event from the regular source (RS),
> it
> emits an "AttemptTermination" event downstream to the operators
> involved in
> the loop. It attaches an attempt sequence number and memorizes that
> - Tail and Head forward these events
> - When OP receives the event back with the same attempt sequence number,
> and no records came in the meantime, it shuts down and emits EndOfStream
> downstream
> - When other records came back between emitting the AttemptTermination
> event and receiving it back, then it emits a new AttemptTermination
> event
> with the next sequence number.
> - This should terminate as soon as the loop is empty.
>
> Might this model even generalize to nested loops, where the
> "AttemptTermination" event is scoped by the loop's nesting level?
>
> Let me know what you think!
>
>
> Best,
> Stephan
>
>
> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <se...@apache.org<mailto:
> se...@apache.org><mailto:se...@apache.org>
> <mailto:se...@apache.org><mailto:
> se...@apache.org<mailto:se...@apache.org><mailto:se...@apache.org>
> <mailto:se...@apache.org>>> wrote:
>
> Hi!
>
> I am still scanning it and compiling some comments. Give me a bit ;-)
>
> Stephan
>
>
> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <par...@kth.se<mailto:
> par...@kth.se><mailto:par...@kth.se> <mailto:
> par...@kth.se<mailto:par...@kth.se><mailto:par...@kth.se>><mailto:
> par...@kth.se<mailto:par...@kth.se><mailto:par...@kth.se> <mailto:
> par...@kth.se>>> wrote:
>
> Hey all,
>
> Now that many of you have already scanned the document (judging from the
> views) maybe it is time to give back some feedback!
> Did you like it? Would you suggest an improvement?
>
> I would suggest not to leave this in the void. It has to do with
> important properties that the system promises to provide.
> Me and Fouad will do our best to answer your questions and discuss this
> further.
>
> cheers
> Paris
>
> On 21 Oct 2016, at 08:54, Paris Carbone <par...@kth.se<mailto:
> par...@kth.se><mailto:par...@kth.se> <mailto:
> par...@kth.se<mailto:par...@kth.se><mailto:par...@kth.se>><mailto:parisc@
> kth.se<http://kth.se/><http://kth.se<http://kth.se/>> <http://kth.se/
> >><mailto:parisc@k
> th.se<http://th.se/><http://th.se<http://th.se/>> <http://th.se/><
> http://th.se<http://th.se/> <http://th.se/>>>> wrote:
>
> Hello everyone,
>
> Loops in Apache Flink have a good potential to become a much more
> powerful thing in future version of Apache Flink.
> There is generally high demand to make them usable and first of all
> production-ready for upcoming releases.
>
> As a first commitment we would like to propose FLIP-13 for consistent
> processing with Loops.
> We are also working on scoped loops for Q1 2017 which we can share if
> there is enough interest.
>
> For now, that is an improvement proposal that solves two pending major
> issues:
>
> 1) The (not so trivial) problem of correct termination of jobs with
> iterations
> 2) The applicability of the checkpointing algorithm to iterative
> dataflow
> graphs.
>
> We would really appreciate it if you go through the linked draft
> (motivation and proposed changes) for FLIP-13 and point out comments,
> preferably publicly in this devlist discussion before we go ahead and
> update the wiki.
>
> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
> BhDbtoYucmByBjRBISs/edit?usp=sharing
>
> cheers
>
> Paris and Fouad
>
>

Reply via email to