But this new mode isn't a semantic change, right? It's moving away from micro batches into something that looks a lot like what Flink does - continuous processing with asynchronous snapshot boundaries.
On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote: > Hopefully the new "continuous processing mode" in Spark will enable SDF > implementation (and real streaming)? > > Thanks, > Thomas > > > On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca> > wrote: > >> >> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> >>> >>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca> >>> wrote: >>> >>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <kirpic...@google.com> >>>> wrote: >>>> >>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca> >>>>> wrote: >>>>> >>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov < >>>>>> kirpic...@google.com> wrote: >>>>>> >>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <hol...@pigscanfly.ca> >>>>>>> wrote: >>>>>>> >>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov < >>>>>>>> kirpic...@google.com> wrote: >>>>>>>> >>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark >>>>>>>>> runner streaming. Holden, is it correct that Spark appears to have no >>>>>>>>> way >>>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can >>>>>>>>> somehow dynamically create a new DStream for every initial >>>>>>>>> restriction, >>>>>>>>> said DStream being obtained using a Receiver that under the hood >>>>>>>>> actually >>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable >>>>>>>>> runner >>>>>>>>> would do, and I have doubts about the fault tolerance) >>>>>>>>> >>>>>>>> So on the streaming side we could simply do it with a fixed number >>>>>>>> of levels on DStreams. It’s not great but it would work. >>>>>>>> >>>>>>> Not sure I understand this. Let me try to clarify what SDF demands >>>>>>> of the runner. Imagine the following case: a file contains a list of >>>>>>> "master" Kafka topics, on which there are published additional Kafka >>>>>>> topics >>>>>>> to read. >>>>>>> >>>>>>> PCollection<String> masterTopics = >>>>>>> TextIO.read().from(masterTopicsFile) >>>>>>> PCollection<String> nestedTopics = >>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn)) >>>>>>> PCollection<String> records = >>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn)) >>>>>>> >>>>>>> This exemplifies both use cases of a streaming SDF that emits >>>>>>> infinite output for every input: >>>>>>> - Applying it to a finite set of inputs (in this case to the result >>>>>>> of reading a text file) >>>>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded >>>>>>> number of streams being read concurrently, each of the streams >>>>>>> themselves >>>>>>> is unbounded too) >>>>>>> >>>>>>> Does the multi-level solution you have in mind work for this case? I >>>>>>> suppose the second case is harder, so we can focus on that. >>>>>>> >>>>>> So none of those are a splittabledofn right? >>>>>> >>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a >>>>> splittable DoFn and we're trying to figure out how to make Spark run it. >>>>> >>>>> >>>> Ah ok, sorry I saw that and for some reason parsed them as old style >>>> DoFns in my head. >>>> >>>> To effectively allow us to union back into the “same” DStream we’d >>>> have to end up using Sparks queue streams (or their equivalent custom >>>> source because of some queue stream limitations), which invites some >>>> reliability challenges. This might be at the point where I should send a >>>> diagram/some sample code since it’s a bit convoluted. >>>> >>>> The more I think about the jumps required to make the “simple” union >>>> approach work, the more it seems just using the statemapping for steaming >>>> is probably more reasonable. Although the state tracking in Spark can be >>>> somewhat expensive so it would probably make sense to benchmark to see if >>>> it meets our needs. >>>> >>> So the problem is, I don't think this can be made to work using >>> mapWithState. It doesn't allow a mapping function that emits infinite >>> output for an input element, directly or not. >>> >> So, provided there is an infinite input (eg pick a never ending queue >> stream), and each call produces a finite output, we would have an infinite >> number of calls. >> >>> >>> Dataflow and Flink, for example, had timer support even before SDFs, and >>> a timer can set another timer and thus end up doing an infinite amount of >>> work in a fault tolerant way - so SDF could be implemented on top of that. >>> But AFAIK spark doesn't have a similar feature, hence my concern. >>> >> So we can do an inifinite queue stream which would allow us to be >> triggered at each interval and handle our own persistence. >> >>> >>> >>>> But these still are both DStream based rather than Dataset which we >>>> might want to support (depends on what direction folks take with the >>>> runners). >>>> >>>> If we wanted to do this in the dataset world looking at a custom >>>> sink/source would also be an option, (which is effectively what a custom >>>> queue stream like thing for dstreams requires), but the datasource APIs are >>>> a bit influx so if we ended up doing things at the edge of what’s allowed >>>> there’s a good chance we’d have to rewrite it a few times. >>>> >>>> >>>>>> Assuming that we have a given dstream though in Spark we can get the >>>>>> underlying RDD implementation for each microbatch and do our work inside >>>>>> of >>>>>> that. >>>>>> >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> More generally this does raise an important question if we want to >>>>>>>> target datasets instead of rdds/DStreams in which case i would need to >>>>>>>> do >>>>>>>> some more poking. >>>>>>>> >>>>>>>> >>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> How would timers be implemented? By outputing and reprocessing, >>>>>>>>>> the same way you proposed for SDF? >>>>>>>>>> >>>>>>>>> i mean the timers could be inside the mappers within the system. >>>>>>>> Could use a singleton so if a partition is re-executed it doesn’t end >>>>>>>> up as >>>>>>>> a straggler. >>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau < >>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>> >>>>>>>>>>> So the timers would have to be in our own code. >>>>>>>>>>> >>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov < >>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Does Spark have support for timers? (I know it has support for >>>>>>>>>>>> state) >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Could we alternatively use a state mapping function to keep >>>>>>>>>>>>> track of the computation so far instead of outputting V each >>>>>>>>>>>>> time? (also >>>>>>>>>>>>> the progress so far is probably of a different type R rather than >>>>>>>>>>>>> V). >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau < >>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> So we had a quick chat about what it would take to add >>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy >>>>>>>>>>>>>> thinking >>>>>>>>>>>>>> about this last year but didn't get very far. >>>>>>>>>>>>>> >>>>>>>>>>>>>> My back-of-the-envelope design was as follows: >>>>>>>>>>>>>> For input type T >>>>>>>>>>>>>> Output type V >>>>>>>>>>>>>> >>>>>>>>>>>>>> Implement a mapper which outputs type (T, V) >>>>>>>>>>>>>> and if the computation finishes T will be populated otherwise >>>>>>>>>>>>>> V will be >>>>>>>>>>>>>> >>>>>>>>>>>>>> For determining how long to run we'd up to either K seconds >>>>>>>>>>>>>> or listen for a signal on a port >>>>>>>>>>>>>> >>>>>>>>>>>>>> Once we're done running we take the result and filter for the >>>>>>>>>>>>>> ones with T and V into seperate collections re-run until finished >>>>>>>>>>>>>> and then union the results >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is maybe not a great design but it was minimally >>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and >>>>>>>>>>>>>> improve >>>>>>>>>>>>>> from. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this is >>>>>>>>>>>>>> worse than I remember because its been awhile since I thought >>>>>>>>>>>>>> about this. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>> >>>>>>>>>> -- >>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>> >>>>>>> -- >>>>>> Twitter: https://twitter.com/holdenkarau >>>>>> >>>>> -- >>>> Twitter: https://twitter.com/holdenkarau >>>> >>> -- >> Twitter: https://twitter.com/holdenkarau >> > >