(resurrecting thread as I'm back from leave)

I looked at this mode, and indeed as Reuven points out it seems that it
affects execution details, but doesn't offer any new APIs.
Holden - your suggestions of piggybacking an unbounded-per-element SDF on
top of an infinite stream would work if 1) there was just 1 element and 2)
the work was guaranteed to be infinite.

Unfortunately, both of these assumptions are insufficient. In particular:

- 1: The SDF is applied to a PCollection; the PCollection itself may be
unbounded; and the unbounded work done by the SDF happens for every
element. E.g. we might have a Kafka topic on which names of Kafka topics
arrive, and we may end up concurrently reading a continuously growing
number of topics.
- 2: The work per element is not necessarily infinite, it's just *not
guaranteed to be finite* - the SDF is allowed at any moment to say "Okay,
this restriction is done for real" by returning stop() from the
@ProcessElement method. Continuing the Kafka example, e.g., it could do
that if the topic/partition being watched is deleted. Having an infinite
stream as a driver of this process would require being able to send a
signal to the stream to stop itself.

Is it looking like there's any other way this can be done in Spark as-is,
or are we going to have to make changes to Spark to support this?

On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca> wrote:

> I mean the new mode is very much in the Dataset not the DStream API
> (although you can use the Dataset API with the old modes too).
>
> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>
>> But this new mode isn't a semantic change, right? It's moving away from
>> micro batches into something that looks a lot like what Flink does -
>> continuous processing with asynchronous snapshot boundaries.
>>
>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote:
>>
>>> Hopefully the new "continuous processing mode" in Spark will enable SDF
>>> implementation (and real streaming)?
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>>
>>>>
>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>> kirpic...@google.com> wrote:
>>>>>>
>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>
>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>>>>>>> runner streaming. Holden, is it correct that Spark appears to have 
>>>>>>>>>>> no way
>>>>>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we 
>>>>>>>>>>> can
>>>>>>>>>>> somehow dynamically create a new DStream for every initial 
>>>>>>>>>>> restriction,
>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood 
>>>>>>>>>>> actually
>>>>>>>>>>> runs the SDF? (this is of course less efficient than a 
>>>>>>>>>>> timer-capable runner
>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>
>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>
>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF demands
>>>>>>>>> of the runner. Imagine the following case: a file contains a list of
>>>>>>>>> "master" Kafka topics, on which there are published additional Kafka 
>>>>>>>>> topics
>>>>>>>>> to read.
>>>>>>>>>
>>>>>>>>> PCollection<String> masterTopics =
>>>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>>>> PCollection<String> nestedTopics =
>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>> PCollection<String> records =
>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>
>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>> infinite output for every input:
>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>> result of reading a text file)
>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>> unbounded number of streams being read concurrently, each of the 
>>>>>>>>> streams
>>>>>>>>> themselves is unbounded too)
>>>>>>>>>
>>>>>>>>> Does the multi-level solution you have in mind work for this case?
>>>>>>>>> I suppose the second case is harder, so we can focus on that.
>>>>>>>>>
>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>
>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>>>
>>>>>>>
>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old style
>>>>>> DoFns in my head.
>>>>>>
>>>>>> To effectively allow us to union back into the “same” DStream  we’d
>>>>>> have to end up using Sparks queue streams (or their equivalent custom
>>>>>> source because of some queue stream limitations), which invites some
>>>>>> reliability challenges. This might be at the point where I should send a
>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>
>>>>>> The more I think about the jumps required to make the “simple” union
>>>>>> approach work, the more it seems just using the statemapping for steaming
>>>>>> is probably more reasonable. Although the state tracking in Spark can be
>>>>>> somewhat expensive so it would probably make sense to benchmark to see if
>>>>>> it meets our needs.
>>>>>>
>>>>> So the problem is, I don't think this can be made to work using
>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>> output for an input element, directly or not.
>>>>>
>>>> So, provided there is an infinite input (eg pick a never ending queue
>>>> stream), and each call produces a finite output, we would have an infinite
>>>> number of calls.
>>>>
>>>>>
>>>>> Dataflow and Flink, for example, had timer support even before SDFs,
>>>>> and a timer can set another timer and thus end up doing an infinite amount
>>>>> of work in a fault tolerant way - so SDF could be implemented on top of
>>>>> that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>>>>
>>>> So we can do an inifinite queue stream which would allow us to be
>>>> triggered at each interval and handle our own persistence.
>>>>
>>>>>
>>>>>
>>>>>> But these still are both DStream based rather than Dataset which we
>>>>>> might want to support (depends on what direction folks take with the
>>>>>> runners).
>>>>>>
>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>> sink/source would also be an option, (which is effectively what a custom
>>>>>> queue stream like thing for dstreams requires), but the datasource APIs 
>>>>>> are
>>>>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>
>>>>>>
>>>>>>>> Assuming that we have a given dstream though in Spark we can get
>>>>>>>> the underlying RDD implementation for each microbatch and do our work
>>>>>>>> inside of that.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> More generally this does raise an important question if we want
>>>>>>>>>> to target datasets instead of rdds/DStreams in which case i would 
>>>>>>>>>> need to
>>>>>>>>>> do some more poking.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> How would timers be implemented? By outputing and reprocessing,
>>>>>>>>>>>> the same way you proposed for SDF?
>>>>>>>>>>>>
>>>>>>>>>>> i mean the timers could be inside the mappers within the system.
>>>>>>>>>> Could use a singleton so if a partition is re-executed it doesn’t 
>>>>>>>>>> end up as
>>>>>>>>>> a straggler.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support
>>>>>>>>>>>>>> for state)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>>>>>> track of the computation so far instead of outputting V each 
>>>>>>>>>>>>>>> time? (also
>>>>>>>>>>>>>>> the progress so far is probably of a different type R rather 
>>>>>>>>>>>>>>> than V).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy 
>>>>>>>>>>>>>>>> thinking
>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K seconds
>>>>>>>>>>>>>>>> or listen for a signal on a port
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Once we're done running we take the result and filter for
>>>>>>>>>>>>>>>> the ones with T and V into seperate collections re-run until 
>>>>>>>>>>>>>>>> finished
>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start 
>>>>>>>>>>>>>>>> and improve
>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this
>>>>>>>>>>>>>>>> is worse than I remember because its been awhile since I 
>>>>>>>>>>>>>>>> thought about this.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>>
>>>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>

Reply via email to