On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca> wrote:
>
>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>>
>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>> runner streaming. Holden, is it correct that Spark appears to have no way
>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>> runs the SDF? (this is of course less efficient than a timer-capable 
>>>>> runner
>>>>> would do, and I have doubts about the fault tolerance)
>>>>>
>>>> So on the streaming side we could simply do it with a fixed number of
>>>> levels on DStreams. It’s not great but it would work.
>>>>
>>> Not sure I understand this. Let me try to clarify what SDF demands of
>>> the runner. Imagine the following case: a file contains a list of "master"
>>> Kafka topics, on which there are published additional Kafka topics to read.
>>>
>>> PCollection<String> masterTopics = TextIO.read().from(masterTopicsFile)
>>> PCollection<String> nestedTopics =
>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>> PCollection<String> records = nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>
>>> This exemplifies both use cases of a streaming SDF that emits infinite
>>> output for every input:
>>> - Applying it to a finite set of inputs (in this case to the result of
>>> reading a text file)
>>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>>> number of streams being read concurrently, each of the streams themselves
>>> is unbounded too)
>>>
>>> Does the multi-level solution you have in mind work for this case? I
>>> suppose the second case is harder, so we can focus on that.
>>>
>> So none of those are a splittabledofn right?
>>
> Not sure what you mean? ReadFromKafkaFn in these examples is a splittable
> DoFn and we're trying to figure out how to make Spark run it.
>
>
Ah ok, sorry I saw that and for some reason parsed them as old style DoFns
in my head.

To effectively allow us to union back into the “same” DStream  we’d have to
end up using Sparks queue streams (or their equivalent custom source
because of some queue stream limitations), which invites some reliability
challenges. This might be at the point where I should send a diagram/some
sample code since it’s a bit convoluted.

The more I think about the jumps required to make the “simple” union
approach work, the more it seems just using the statemapping for steaming
is probably more reasonable. Although the state tracking in Spark can be
somewhat expensive so it would probably make sense to benchmark to see if
it meets our needs.

But these still are both DStream based rather than Dataset which we might
want to support (depends on what direction folks take with the runners).

If we wanted to do this in the dataset world looking at a custom
sink/source would also be an option, (which is effectively what a custom
queue stream like thing for dstreams requires), but the datasource APIs are
a bit influx so if we ended up doing things at the edge of what’s allowed
there’s a good chance we’d have to rewrite it a few times.


>> Assuming that we have a given dstream though in Spark we can get the
>> underlying RDD implementation for each microbatch and do our work inside of
>> that.
>>
>>>
>>>
>>>>
>>>> More generally this does raise an important question if we want to
>>>> target datasets instead of rdds/DStreams in which case i would need to do
>>>> some more poking.
>>>>
>>>>
>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> How would timers be implemented? By outputing and reprocessing, the
>>>>>> same way you proposed for SDF?
>>>>>>
>>>>> i mean the timers could be inside the mappers within the system. Could
>>>> use a singleton so if a partition is re-executed it doesn’t end up as a
>>>> straggler.
>>>>
>>>>>
>>>>>>
>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> So the timers would have to be in our own code.
>>>>>>>
>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>> kirpic...@google.com> wrote:
>>>>>>>
>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>> state)
>>>>>>>>
>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Could we alternatively use a state mapping function to keep track
>>>>>>>>> of the computation so far instead of outputting V each time? (also the
>>>>>>>>> progress so far is probably of a different type R rather than V).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> So we had a quick chat about what it would take to add something
>>>>>>>>>> like SplittableDoFns to Spark. I'd done some sketchy thinking about 
>>>>>>>>>> this
>>>>>>>>>> last year but didn't get very far.
>>>>>>>>>>
>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>> For input type T
>>>>>>>>>> Output type V
>>>>>>>>>>
>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>> and if the computation finishes T will be populated otherwise V
>>>>>>>>>> will be
>>>>>>>>>>
>>>>>>>>>> For determining how long to run we'd up to either K seconds or
>>>>>>>>>> listen for a signal on a port
>>>>>>>>>>
>>>>>>>>>> Once we're done running we take the result and filter for the
>>>>>>>>>> ones with T and V into seperate collections re-run until finished
>>>>>>>>>> and then union the results
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This is maybe not a great design but it was minimally complicated
>>>>>>>>>> and I figured terrible was a good place to start and improve from.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>>> worse than I remember because its been awhile since I thought about 
>>>>>>>>>> this.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>
>>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
> --
Twitter: https://twitter.com/holdenkarau

Reply via email to