Hopefully the new "continuous processing mode" in Spark will enable SDF implementation (and real streaming)?
Thanks, Thomas On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > > On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> >> >> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca> wrote: >> >>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca> >>>> wrote: >>>> >>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <kirpic...@google.com> >>>>> wrote: >>>>> >>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <hol...@pigscanfly.ca> >>>>>> wrote: >>>>>> >>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov < >>>>>>> kirpic...@google.com> wrote: >>>>>>> >>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark >>>>>>>> runner streaming. Holden, is it correct that Spark appears to have no >>>>>>>> way >>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can >>>>>>>> somehow dynamically create a new DStream for every initial restriction, >>>>>>>> said DStream being obtained using a Receiver that under the hood >>>>>>>> actually >>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable >>>>>>>> runner >>>>>>>> would do, and I have doubts about the fault tolerance) >>>>>>>> >>>>>>> So on the streaming side we could simply do it with a fixed number >>>>>>> of levels on DStreams. It’s not great but it would work. >>>>>>> >>>>>> Not sure I understand this. Let me try to clarify what SDF demands of >>>>>> the runner. Imagine the following case: a file contains a list of >>>>>> "master" >>>>>> Kafka topics, on which there are published additional Kafka topics to >>>>>> read. >>>>>> >>>>>> PCollection<String> masterTopics = TextIO.read().from( >>>>>> masterTopicsFile) >>>>>> PCollection<String> nestedTopics = masterTopics.apply(ParDo( >>>>>> ReadFromKafkaFn)) >>>>>> PCollection<String> records = nestedTopics.apply(ParDo( >>>>>> ReadFromKafkaFn)) >>>>>> >>>>>> This exemplifies both use cases of a streaming SDF that emits >>>>>> infinite output for every input: >>>>>> - Applying it to a finite set of inputs (in this case to the result >>>>>> of reading a text file) >>>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded >>>>>> number of streams being read concurrently, each of the streams themselves >>>>>> is unbounded too) >>>>>> >>>>>> Does the multi-level solution you have in mind work for this case? I >>>>>> suppose the second case is harder, so we can focus on that. >>>>>> >>>>> So none of those are a splittabledofn right? >>>>> >>>> Not sure what you mean? ReadFromKafkaFn in these examples is a >>>> splittable DoFn and we're trying to figure out how to make Spark run it. >>>> >>>> >>> Ah ok, sorry I saw that and for some reason parsed them as old style >>> DoFns in my head. >>> >>> To effectively allow us to union back into the “same” DStream we’d have >>> to end up using Sparks queue streams (or their equivalent custom source >>> because of some queue stream limitations), which invites some reliability >>> challenges. This might be at the point where I should send a diagram/some >>> sample code since it’s a bit convoluted. >>> >>> The more I think about the jumps required to make the “simple” union >>> approach work, the more it seems just using the statemapping for steaming >>> is probably more reasonable. Although the state tracking in Spark can be >>> somewhat expensive so it would probably make sense to benchmark to see if >>> it meets our needs. >>> >> So the problem is, I don't think this can be made to work using >> mapWithState. It doesn't allow a mapping function that emits infinite >> output for an input element, directly or not. >> > So, provided there is an infinite input (eg pick a never ending queue > stream), and each call produces a finite output, we would have an infinite > number of calls. > >> >> Dataflow and Flink, for example, had timer support even before SDFs, and >> a timer can set another timer and thus end up doing an infinite amount of >> work in a fault tolerant way - so SDF could be implemented on top of that. >> But AFAIK spark doesn't have a similar feature, hence my concern. >> > So we can do an inifinite queue stream which would allow us to be > triggered at each interval and handle our own persistence. > >> >> >>> But these still are both DStream based rather than Dataset which we >>> might want to support (depends on what direction folks take with the >>> runners). >>> >>> If we wanted to do this in the dataset world looking at a custom >>> sink/source would also be an option, (which is effectively what a custom >>> queue stream like thing for dstreams requires), but the datasource APIs are >>> a bit influx so if we ended up doing things at the edge of what’s allowed >>> there’s a good chance we’d have to rewrite it a few times. >>> >>> >>>>> Assuming that we have a given dstream though in Spark we can get the >>>>> underlying RDD implementation for each microbatch and do our work inside >>>>> of >>>>> that. >>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> More generally this does raise an important question if we want to >>>>>>> target datasets instead of rdds/DStreams in which case i would need to >>>>>>> do >>>>>>> some more poking. >>>>>>> >>>>>>> >>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> How would timers be implemented? By outputing and reprocessing, >>>>>>>>> the same way you proposed for SDF? >>>>>>>>> >>>>>>>> i mean the timers could be inside the mappers within the system. >>>>>>> Could use a singleton so if a partition is re-executed it doesn’t end >>>>>>> up as >>>>>>> a straggler. >>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <hol...@pigscanfly.ca> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> So the timers would have to be in our own code. >>>>>>>>>> >>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov < >>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> Does Spark have support for timers? (I know it has support for >>>>>>>>>>> state) >>>>>>>>>>> >>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Could we alternatively use a state mapping function to keep >>>>>>>>>>>> track of the computation so far instead of outputting V each time? >>>>>>>>>>>> (also >>>>>>>>>>>> the progress so far is probably of a different type R rather than >>>>>>>>>>>> V). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau < >>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> So we had a quick chat about what it would take to add >>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy >>>>>>>>>>>>> thinking >>>>>>>>>>>>> about this last year but didn't get very far. >>>>>>>>>>>>> >>>>>>>>>>>>> My back-of-the-envelope design was as follows: >>>>>>>>>>>>> For input type T >>>>>>>>>>>>> Output type V >>>>>>>>>>>>> >>>>>>>>>>>>> Implement a mapper which outputs type (T, V) >>>>>>>>>>>>> and if the computation finishes T will be populated otherwise >>>>>>>>>>>>> V will be >>>>>>>>>>>>> >>>>>>>>>>>>> For determining how long to run we'd up to either K seconds or >>>>>>>>>>>>> listen for a signal on a port >>>>>>>>>>>>> >>>>>>>>>>>>> Once we're done running we take the result and filter for the >>>>>>>>>>>>> ones with T and V into seperate collections re-run until finished >>>>>>>>>>>>> and then union the results >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> This is maybe not a great design but it was minimally >>>>>>>>>>>>> complicated and I figured terrible was a good place to start and >>>>>>>>>>>>> improve >>>>>>>>>>>>> from. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Let me know your thoughts, especially the parts where this is >>>>>>>>>>>>> worse than I remember because its been awhile since I thought >>>>>>>>>>>>> about this. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>> >>>>>>>>> -- >>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>> >>>>>> -- >>>>> Twitter: https://twitter.com/holdenkarau >>>>> >>>> -- >>> Twitter: https://twitter.com/holdenkarau >>> >> -- > Twitter: https://twitter.com/holdenkarau >