Kenn - I'm arguing that in Spark SDF style computation can not be expressed at all, and neither can Beam's timers.
Spark, unlike Flink, does not have a timer facility (only state), and as far as I can tell its programming model has no other primitive that can map a finite RDD into an infinite DStream - the only way to create a new infinite DStream appears to be to write a Receiver. I cc'd you because I'm wondering whether you've already investigated this when considering whether timers can be implemented on the Spark runner. On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <k...@google.com> wrote: > I don't think I understand what the limitations of timers are that you are > referring to. FWIW I would say implementing other primitives like SDF is an > explicit non-goal for Beam state & timers. > > I got lost at some point in this thread, but is it actually necessary that > a bounded PCollection maps to a finite/bounded structure in Spark? > Skimming, I'm not sure if the problem is that we can't transliterate Beam > to Spark (this might be a good sign) or that we can't express SDF style > computation at all (seems far-fetched, but I could be convinced). Does > doing a lightweight analysis and just promoting some things to be some kind > of infinite representation help? > > Kenn > > On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> Would like to revive this thread one more time. >> >> At this point I'm pretty certain that Spark can't support this out of the >> box and we're gonna have to make changes to Spark. >> >> Holden, could you advise who would be some Spark experts (yourself >> included :) ) who could advise what kind of Spark change would both support >> this AND be useful to the regular Spark community (non-Beam) so that it has >> a chance of finding support? E.g. is there any plan in Spark regarding >> adding timers similar to Flink's or Beam's timers, maybe we could help out >> with that? >> >> +Kenneth Knowles <k...@google.com> because timers suffer from the same >> problem. >> >> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> (resurrecting thread as I'm back from leave) >>> >>> I looked at this mode, and indeed as Reuven points out it seems that it >>> affects execution details, but doesn't offer any new APIs. >>> Holden - your suggestions of piggybacking an unbounded-per-element SDF >>> on top of an infinite stream would work if 1) there was just 1 element and >>> 2) the work was guaranteed to be infinite. >>> >>> Unfortunately, both of these assumptions are insufficient. In particular: >>> >>> - 1: The SDF is applied to a PCollection; the PCollection itself may be >>> unbounded; and the unbounded work done by the SDF happens for every >>> element. E.g. we might have a Kafka topic on which names of Kafka topics >>> arrive, and we may end up concurrently reading a continuously growing >>> number of topics. >>> - 2: The work per element is not necessarily infinite, it's just *not >>> guaranteed to be finite* - the SDF is allowed at any moment to say >>> "Okay, this restriction is done for real" by returning stop() from the >>> @ProcessElement method. Continuing the Kafka example, e.g., it could do >>> that if the topic/partition being watched is deleted. Having an infinite >>> stream as a driver of this process would require being able to send a >>> signal to the stream to stop itself. >>> >>> Is it looking like there's any other way this can be done in Spark >>> as-is, or are we going to have to make changes to Spark to support this? >>> >>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca> >>> wrote: >>> >>>> I mean the new mode is very much in the Dataset not the DStream API >>>> (although you can use the Dataset API with the old modes too). >>>> >>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote: >>>> >>>>> But this new mode isn't a semantic change, right? It's moving away >>>>> from micro batches into something that looks a lot like what Flink does - >>>>> continuous processing with asynchronous snapshot boundaries. >>>>> >>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote: >>>>> >>>>>> Hopefully the new "continuous processing mode" in Spark will enable >>>>>> SDF implementation (and real streaming)? >>>>>> >>>>>> Thanks, >>>>>> Thomas >>>>>> >>>>>> >>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov < >>>>>>> kirpic...@google.com> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov < >>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>> >>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau < >>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>> >>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov < >>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau < >>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov < >>>>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for >>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark appears >>>>>>>>>>>>>> to have no >>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? >>>>>>>>>>>>>> Maybe we can >>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial >>>>>>>>>>>>>> restriction, >>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood >>>>>>>>>>>>>> actually >>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a >>>>>>>>>>>>>> timer-capable runner >>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance) >>>>>>>>>>>>>> >>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed >>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work. >>>>>>>>>>>>> >>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF >>>>>>>>>>>> demands of the runner. Imagine the following case: a file contains >>>>>>>>>>>> a list >>>>>>>>>>>> of "master" Kafka topics, on which there are published additional >>>>>>>>>>>> Kafka >>>>>>>>>>>> topics to read. >>>>>>>>>>>> >>>>>>>>>>>> PCollection<String> masterTopics = >>>>>>>>>>>> TextIO.read().from(masterTopicsFile) >>>>>>>>>>>> PCollection<String> nestedTopics = >>>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn)) >>>>>>>>>>>> PCollection<String> records = >>>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn)) >>>>>>>>>>>> >>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits >>>>>>>>>>>> infinite output for every input: >>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the >>>>>>>>>>>> result of reading a text file) >>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an >>>>>>>>>>>> unbounded number of streams being read concurrently, each of the >>>>>>>>>>>> streams >>>>>>>>>>>> themselves is unbounded too) >>>>>>>>>>>> >>>>>>>>>>>> Does the multi-level solution you have in mind work for this >>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on that. >>>>>>>>>>>> >>>>>>>>>>> So none of those are a splittabledofn right? >>>>>>>>>>> >>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a >>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run >>>>>>>>>> it. >>>>>>>>>> >>>>>>>>>> >>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old >>>>>>>>> style DoFns in my head. >>>>>>>>> >>>>>>>>> To effectively allow us to union back into the “same” DStream >>>>>>>>> we’d have to end up using Sparks queue streams (or their equivalent >>>>>>>>> custom >>>>>>>>> source because of some queue stream limitations), which invites some >>>>>>>>> reliability challenges. This might be at the point where I should >>>>>>>>> send a >>>>>>>>> diagram/some sample code since it’s a bit convoluted. >>>>>>>>> >>>>>>>>> The more I think about the jumps required to make the “simple” >>>>>>>>> union approach work, the more it seems just using the statemapping for >>>>>>>>> steaming is probably more reasonable. Although the state tracking in >>>>>>>>> Spark >>>>>>>>> can be somewhat expensive so it would probably make sense to >>>>>>>>> benchmark to >>>>>>>>> see if it meets our needs. >>>>>>>>> >>>>>>>> So the problem is, I don't think this can be made to work using >>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite >>>>>>>> output for an input element, directly or not. >>>>>>>> >>>>>>> So, provided there is an infinite input (eg pick a never ending >>>>>>> queue stream), and each call produces a finite output, we would have an >>>>>>> infinite number of calls. >>>>>>> >>>>>>>> >>>>>>>> Dataflow and Flink, for example, had timer support even before >>>>>>>> SDFs, and a timer can set another timer and thus end up doing an >>>>>>>> infinite >>>>>>>> amount of work in a fault tolerant way - so SDF could be implemented >>>>>>>> on top >>>>>>>> of that. But AFAIK spark doesn't have a similar feature, hence my >>>>>>>> concern. >>>>>>>> >>>>>>> So we can do an inifinite queue stream which would allow us to be >>>>>>> triggered at each interval and handle our own persistence. >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> But these still are both DStream based rather than Dataset which >>>>>>>>> we might want to support (depends on what direction folks take with >>>>>>>>> the >>>>>>>>> runners). >>>>>>>>> >>>>>>>>> If we wanted to do this in the dataset world looking at a custom >>>>>>>>> sink/source would also be an option, (which is effectively what a >>>>>>>>> custom >>>>>>>>> queue stream like thing for dstreams requires), but the datasource >>>>>>>>> APIs are >>>>>>>>> a bit influx so if we ended up doing things at the edge of what’s >>>>>>>>> allowed >>>>>>>>> there’s a good chance we’d have to rewrite it a few times. >>>>>>>>> >>>>>>>>> >>>>>>>>>>> Assuming that we have a given dstream though in Spark we can get >>>>>>>>>>> the underlying RDD implementation for each microbatch and do our >>>>>>>>>>> work >>>>>>>>>>> inside of that. >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> More generally this does raise an important question if we >>>>>>>>>>>>> want to target datasets instead of rdds/DStreams in which case i >>>>>>>>>>>>> would need >>>>>>>>>>>>> to do some more poking. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> How would timers be implemented? By outputing and >>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF? >>>>>>>>>>>>>>> >>>>>>>>>>>>>> i mean the timers could be inside the mappers within the >>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed it >>>>>>>>>>>>> doesn’t >>>>>>>>>>>>> end up as a straggler. >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau < >>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So the timers would have to be in our own code. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov < >>>>>>>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support >>>>>>>>>>>>>>>>> for state) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax < >>>>>>>>>>>>>>>>> re...@google.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to >>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting V >>>>>>>>>>>>>>>>>> each time? >>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type R >>>>>>>>>>>>>>>>>> rather than V). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau < >>>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add >>>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some >>>>>>>>>>>>>>>>>>> sketchy thinking >>>>>>>>>>>>>>>>>>> about this last year but didn't get very far. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows: >>>>>>>>>>>>>>>>>>> For input type T >>>>>>>>>>>>>>>>>>> Output type V >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V) >>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated >>>>>>>>>>>>>>>>>>> otherwise V will be >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K >>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter >>>>>>>>>>>>>>>>>>> for the ones with T and V into seperate collections re-run >>>>>>>>>>>>>>>>>>> until finished >>>>>>>>>>>>>>>>>>> and then union the results >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally >>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to >>>>>>>>>>>>>>>>>>> start and improve >>>>>>>>>>>>>>>>>>> from. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where >>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile since >>>>>>>>>>>>>>>>>>> I thought about >>>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>> >>>>>>>>>> -- >>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>> >>>>>>>> -- >>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>> >>>>>> >>>>>> >>>> >>>> >>>> -- >>>> Twitter: https://twitter.com/holdenkarau >>>> >>>