Would like to revive this thread one more time.

At this point I'm pretty certain that Spark can't support this out of the
box and we're gonna have to make changes to Spark.

Holden, could you advise who would be some Spark experts (yourself included
:) ) who could advise what kind of Spark change would both support this AND
be useful to the regular Spark community (non-Beam) so that it has a chance
of finding support? E.g. is there any plan in Spark regarding adding timers
similar to Flink's or Beam's timers, maybe we could help out with that?

+Kenneth Knowles <k...@google.com> because timers suffer from the same
problem.

On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> (resurrecting thread as I'm back from leave)
>
> I looked at this mode, and indeed as Reuven points out it seems that it
> affects execution details, but doesn't offer any new APIs.
> Holden - your suggestions of piggybacking an unbounded-per-element SDF on
> top of an infinite stream would work if 1) there was just 1 element and 2)
> the work was guaranteed to be infinite.
>
> Unfortunately, both of these assumptions are insufficient. In particular:
>
> - 1: The SDF is applied to a PCollection; the PCollection itself may be
> unbounded; and the unbounded work done by the SDF happens for every
> element. E.g. we might have a Kafka topic on which names of Kafka topics
> arrive, and we may end up concurrently reading a continuously growing
> number of topics.
> - 2: The work per element is not necessarily infinite, it's just *not
> guaranteed to be finite* - the SDF is allowed at any moment to say "Okay,
> this restriction is done for real" by returning stop() from the
> @ProcessElement method. Continuing the Kafka example, e.g., it could do
> that if the topic/partition being watched is deleted. Having an infinite
> stream as a driver of this process would require being able to send a
> signal to the stream to stop itself.
>
> Is it looking like there's any other way this can be done in Spark as-is,
> or are we going to have to make changes to Spark to support this?
>
> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca> wrote:
>
>> I mean the new mode is very much in the Dataset not the DStream API
>> (although you can use the Dataset API with the old modes too).
>>
>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>>
>>> But this new mode isn't a semantic change, right? It's moving away from
>>> micro batches into something that looks a lot like what Flink does -
>>> continuous processing with asynchronous snapshot boundaries.
>>>
>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote:
>>>
>>>> Hopefully the new "continuous processing mode" in Spark will enable SDF
>>>> implementation (and real streaming)?
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>>
>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <kirpic...@google.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>>> kirpic...@google.com> wrote:
>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for
>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark appears 
>>>>>>>>>>>> to have no
>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? Maybe 
>>>>>>>>>>>> we can
>>>>>>>>>>>> somehow dynamically create a new DStream for every initial 
>>>>>>>>>>>> restriction,
>>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood 
>>>>>>>>>>>> actually
>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a 
>>>>>>>>>>>> timer-capable runner
>>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>>
>>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>>
>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
>>>>>>>>>> demands of the runner. Imagine the following case: a file contains a 
>>>>>>>>>> list
>>>>>>>>>> of "master" Kafka topics, on which there are published additional 
>>>>>>>>>> Kafka
>>>>>>>>>> topics to read.
>>>>>>>>>>
>>>>>>>>>> PCollection<String> masterTopics =
>>>>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>>>>> PCollection<String> nestedTopics =
>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>> PCollection<String> records =
>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>
>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>>> infinite output for every input:
>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>>> result of reading a text file)
>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>>> unbounded number of streams being read concurrently, each of the 
>>>>>>>>>> streams
>>>>>>>>>> themselves is unbounded too)
>>>>>>>>>>
>>>>>>>>>> Does the multi-level solution you have in mind work for this
>>>>>>>>>> case? I suppose the second case is harder, so we can focus on that.
>>>>>>>>>>
>>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>>
>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run 
>>>>>>>> it.
>>>>>>>>
>>>>>>>>
>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old style
>>>>>>> DoFns in my head.
>>>>>>>
>>>>>>> To effectively allow us to union back into the “same” DStream  we’d
>>>>>>> have to end up using Sparks queue streams (or their equivalent custom
>>>>>>> source because of some queue stream limitations), which invites some
>>>>>>> reliability challenges. This might be at the point where I should send a
>>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>>
>>>>>>> The more I think about the jumps required to make the “simple” union
>>>>>>> approach work, the more it seems just using the statemapping for 
>>>>>>> steaming
>>>>>>> is probably more reasonable. Although the state tracking in Spark can be
>>>>>>> somewhat expensive so it would probably make sense to benchmark to see 
>>>>>>> if
>>>>>>> it meets our needs.
>>>>>>>
>>>>>> So the problem is, I don't think this can be made to work using
>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>>> output for an input element, directly or not.
>>>>>>
>>>>> So, provided there is an infinite input (eg pick a never ending queue
>>>>> stream), and each call produces a finite output, we would have an infinite
>>>>> number of calls.
>>>>>
>>>>>>
>>>>>> Dataflow and Flink, for example, had timer support even before SDFs,
>>>>>> and a timer can set another timer and thus end up doing an infinite 
>>>>>> amount
>>>>>> of work in a fault tolerant way - so SDF could be implemented on top of
>>>>>> that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>>>>>
>>>>> So we can do an inifinite queue stream which would allow us to be
>>>>> triggered at each interval and handle our own persistence.
>>>>>
>>>>>>
>>>>>>
>>>>>>> But these still are both DStream based rather than Dataset which we
>>>>>>> might want to support (depends on what direction folks take with the
>>>>>>> runners).
>>>>>>>
>>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>>> sink/source would also be an option, (which is effectively what a custom
>>>>>>> queue stream like thing for dstreams requires), but the datasource APIs 
>>>>>>> are
>>>>>>> a bit influx so if we ended up doing things at the edge of what’s 
>>>>>>> allowed
>>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>>
>>>>>>>
>>>>>>>>> Assuming that we have a given dstream though in Spark we can get
>>>>>>>>> the underlying RDD implementation for each microbatch and do our work
>>>>>>>>> inside of that.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> More generally this does raise an important question if we want
>>>>>>>>>>> to target datasets instead of rdds/DStreams in which case i would 
>>>>>>>>>>> need to
>>>>>>>>>>> do some more poking.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> How would timers be implemented? By outputing and
>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF?
>>>>>>>>>>>>>
>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed it 
>>>>>>>>>>> doesn’t
>>>>>>>>>>> end up as a straggler.
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support
>>>>>>>>>>>>>>> for state)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>>>>>>> track of the computation so far instead of outputting V each 
>>>>>>>>>>>>>>>> time? (also
>>>>>>>>>>>>>>>> the progress so far is probably of a different type R rather 
>>>>>>>>>>>>>>>> than V).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some 
>>>>>>>>>>>>>>>>> sketchy thinking
>>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter for
>>>>>>>>>>>>>>>>> the ones with T and V into seperate collections re-run until 
>>>>>>>>>>>>>>>>> finished
>>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start 
>>>>>>>>>>>>>>>>> and improve
>>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this
>>>>>>>>>>>>>>>>> is worse than I remember because its been awhile since I 
>>>>>>>>>>>>>>>>> thought about this.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>
>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>>
>>>>
>>
>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>

Reply via email to