Hopefully the new "continuous processing mode" in Spark will enable SDF
implementation (and real streaming)?

Thanks,
Thomas


On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

>
> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>>
>>
>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca> wrote:
>>
>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <kirpic...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>> kirpic...@google.com> wrote:
>>>>>>>
>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>>>> runner streaming. Holden, is it correct that Spark appears to have no 
>>>>>>>> way
>>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>> said DStream being obtained using a Receiver that under the hood 
>>>>>>>> actually
>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable 
>>>>>>>> runner
>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>
>>>>>>> So on the streaming side we could simply do it with a fixed number
>>>>>>> of levels on DStreams. It’s not great but it would work.
>>>>>>>
>>>>>> Not sure I understand this. Let me try to clarify what SDF demands of
>>>>>> the runner. Imagine the following case: a file contains a list of 
>>>>>> "master"
>>>>>> Kafka topics, on which there are published additional Kafka topics to 
>>>>>> read.
>>>>>>
>>>>>> PCollection<String> masterTopics = TextIO.read().from(
>>>>>> masterTopicsFile)
>>>>>> PCollection<String> nestedTopics = masterTopics.apply(ParDo(
>>>>>> ReadFromKafkaFn))
>>>>>> PCollection<String> records = nestedTopics.apply(ParDo(
>>>>>> ReadFromKafkaFn))
>>>>>>
>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>> infinite output for every input:
>>>>>> - Applying it to a finite set of inputs (in this case to the result
>>>>>> of reading a text file)
>>>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>>>>>> number of streams being read concurrently, each of the streams themselves
>>>>>> is unbounded too)
>>>>>>
>>>>>> Does the multi-level solution you have in mind work for this case? I
>>>>>> suppose the second case is harder, so we can focus on that.
>>>>>>
>>>>> So none of those are a splittabledofn right?
>>>>>
>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>
>>>>
>>> Ah ok, sorry I saw that and for some reason parsed them as old style
>>> DoFns in my head.
>>>
>>> To effectively allow us to union back into the “same” DStream  we’d have
>>> to end up using Sparks queue streams (or their equivalent custom source
>>> because of some queue stream limitations), which invites some reliability
>>> challenges. This might be at the point where I should send a diagram/some
>>> sample code since it’s a bit convoluted.
>>>
>>> The more I think about the jumps required to make the “simple” union
>>> approach work, the more it seems just using the statemapping for steaming
>>> is probably more reasonable. Although the state tracking in Spark can be
>>> somewhat expensive so it would probably make sense to benchmark to see if
>>> it meets our needs.
>>>
>> So the problem is, I don't think this can be made to work using
>> mapWithState. It doesn't allow a mapping function that emits infinite
>> output for an input element, directly or not.
>>
> So, provided there is an infinite input (eg pick a never ending queue
> stream), and each call produces a finite output, we would have an infinite
> number of calls.
>
>>
>> Dataflow and Flink, for example, had timer support even before SDFs, and
>> a timer can set another timer and thus end up doing an infinite amount of
>> work in a fault tolerant way - so SDF could be implemented on top of that.
>> But AFAIK spark doesn't have a similar feature, hence my concern.
>>
> So we can do an inifinite queue stream which would allow us to be
> triggered at each interval and handle our own persistence.
>
>>
>>
>>> But these still are both DStream based rather than Dataset which we
>>> might want to support (depends on what direction folks take with the
>>> runners).
>>>
>>> If we wanted to do this in the dataset world looking at a custom
>>> sink/source would also be an option, (which is effectively what a custom
>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>> there’s a good chance we’d have to rewrite it a few times.
>>>
>>>
>>>>> Assuming that we have a given dstream though in Spark we can get the
>>>>> underlying RDD implementation for each microbatch and do our work inside 
>>>>> of
>>>>> that.
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> More generally this does raise an important question if we want to
>>>>>>> target datasets instead of rdds/DStreams in which case i would need to 
>>>>>>> do
>>>>>>> some more poking.
>>>>>>>
>>>>>>>
>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> How would timers be implemented? By outputing and reprocessing,
>>>>>>>>> the same way you proposed for SDF?
>>>>>>>>>
>>>>>>>> i mean the timers could be inside the mappers within the system.
>>>>>>> Could use a singleton so if a partition is re-executed it doesn’t end 
>>>>>>> up as
>>>>>>> a straggler.
>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>>>>> state)
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>>> track of the computation so far instead of outputting V each time? 
>>>>>>>>>>>> (also
>>>>>>>>>>>> the progress so far is probably of a different type R rather than 
>>>>>>>>>>>> V).
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy 
>>>>>>>>>>>>> thinking
>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>
>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>
>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>> and if the computation finishes T will be populated otherwise
>>>>>>>>>>>>> V will be
>>>>>>>>>>>>>
>>>>>>>>>>>>> For determining how long to run we'd up to either K seconds or
>>>>>>>>>>>>> listen for a signal on a port
>>>>>>>>>>>>>
>>>>>>>>>>>>> Once we're done running we take the result and filter for the
>>>>>>>>>>>>> ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and 
>>>>>>>>>>>>> improve
>>>>>>>>>>>>> from.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>>>>>> worse than I remember because its been awhile since I thought 
>>>>>>>>>>>>> about this.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>
>>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>> --
> Twitter: https://twitter.com/holdenkarau
>

Reply via email to