But this new mode isn't a semantic change, right? It's moving away from
micro batches into something that looks a lot like what Flink does -
continuous processing with asynchronous snapshot boundaries.

On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote:

> Hopefully the new "continuous processing mode" in Spark will enable SDF
> implementation (and real streaming)?
>
> Thanks,
> Thomas
>
>
> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
>>
>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>>
>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>> kirpic...@google.com> wrote:
>>>>>>
>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>>>>> runner streaming. Holden, is it correct that Spark appears to have no 
>>>>>>>>> way
>>>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>>> somehow dynamically create a new DStream for every initial 
>>>>>>>>> restriction,
>>>>>>>>> said DStream being obtained using a Receiver that under the hood 
>>>>>>>>> actually
>>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable 
>>>>>>>>> runner
>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>
>>>>>>>> So on the streaming side we could simply do it with a fixed number
>>>>>>>> of levels on DStreams. It’s not great but it would work.
>>>>>>>>
>>>>>>> Not sure I understand this. Let me try to clarify what SDF demands
>>>>>>> of the runner. Imagine the following case: a file contains a list of
>>>>>>> "master" Kafka topics, on which there are published additional Kafka 
>>>>>>> topics
>>>>>>> to read.
>>>>>>>
>>>>>>> PCollection<String> masterTopics =
>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>> PCollection<String> nestedTopics =
>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>> PCollection<String> records =
>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>
>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>> infinite output for every input:
>>>>>>> - Applying it to a finite set of inputs (in this case to the result
>>>>>>> of reading a text file)
>>>>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>>>>>>> number of streams being read concurrently, each of the streams 
>>>>>>> themselves
>>>>>>> is unbounded too)
>>>>>>>
>>>>>>> Does the multi-level solution you have in mind work for this case? I
>>>>>>> suppose the second case is harder, so we can focus on that.
>>>>>>>
>>>>>> So none of those are a splittabledofn right?
>>>>>>
>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>
>>>>>
>>>> Ah ok, sorry I saw that and for some reason parsed them as old style
>>>> DoFns in my head.
>>>>
>>>> To effectively allow us to union back into the “same” DStream  we’d
>>>> have to end up using Sparks queue streams (or their equivalent custom
>>>> source because of some queue stream limitations), which invites some
>>>> reliability challenges. This might be at the point where I should send a
>>>> diagram/some sample code since it’s a bit convoluted.
>>>>
>>>> The more I think about the jumps required to make the “simple” union
>>>> approach work, the more it seems just using the statemapping for steaming
>>>> is probably more reasonable. Although the state tracking in Spark can be
>>>> somewhat expensive so it would probably make sense to benchmark to see if
>>>> it meets our needs.
>>>>
>>> So the problem is, I don't think this can be made to work using
>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>> output for an input element, directly or not.
>>>
>> So, provided there is an infinite input (eg pick a never ending queue
>> stream), and each call produces a finite output, we would have an infinite
>> number of calls.
>>
>>>
>>> Dataflow and Flink, for example, had timer support even before SDFs, and
>>> a timer can set another timer and thus end up doing an infinite amount of
>>> work in a fault tolerant way - so SDF could be implemented on top of that.
>>> But AFAIK spark doesn't have a similar feature, hence my concern.
>>>
>> So we can do an inifinite queue stream which would allow us to be
>> triggered at each interval and handle our own persistence.
>>
>>>
>>>
>>>> But these still are both DStream based rather than Dataset which we
>>>> might want to support (depends on what direction folks take with the
>>>> runners).
>>>>
>>>> If we wanted to do this in the dataset world looking at a custom
>>>> sink/source would also be an option, (which is effectively what a custom
>>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>
>>>>
>>>>>> Assuming that we have a given dstream though in Spark we can get the
>>>>>> underlying RDD implementation for each microbatch and do our work inside 
>>>>>> of
>>>>>> that.
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> More generally this does raise an important question if we want to
>>>>>>>> target datasets instead of rdds/DStreams in which case i would need to 
>>>>>>>> do
>>>>>>>> some more poking.
>>>>>>>>
>>>>>>>>
>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> How would timers be implemented? By outputing and reprocessing,
>>>>>>>>>> the same way you proposed for SDF?
>>>>>>>>>>
>>>>>>>>> i mean the timers could be inside the mappers within the system.
>>>>>>>> Could use a singleton so if a partition is re-executed it doesn’t end 
>>>>>>>> up as
>>>>>>>> a straggler.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>>>>>> state)
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>>>> track of the computation so far instead of outputting V each 
>>>>>>>>>>>>> time? (also
>>>>>>>>>>>>> the progress so far is probably of a different type R rather than 
>>>>>>>>>>>>> V).
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy 
>>>>>>>>>>>>>> thinking
>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>> and if the computation finishes T will be populated otherwise
>>>>>>>>>>>>>> V will be
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For determining how long to run we'd up to either K seconds
>>>>>>>>>>>>>> or listen for a signal on a port
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Once we're done running we take the result and filter for the
>>>>>>>>>>>>>> ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and 
>>>>>>>>>>>>>> improve
>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>>>>>>> worse than I remember because its been awhile since I thought 
>>>>>>>>>>>>>> about this.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
>

Reply via email to