On Mon, Mar 26, 2018 at 1:09 PM Shen Li <cs.she...@gmail.com> wrote:

> Hi Lukasz,
>
> Thanks for your response.
>
> > Each call to split may return a different set of sub sources but they
> always represent the entire original source.
>
> Inconsistent sets of sub-sources prevent runners/engines from calling the
> split API in a distributed manner during runtime.
>
Correct. split() is applied to a single argument, so there's nothing to
execute in parallel here. It executes sequentially, and produces a number
of sources that can then be executed in parallel. It's pretty similar to
executing a DoFn on a single element.


> Besides, the splitAtFraction(double fraction) is only available in
> BoundedSources. How do you perform dynamic splitting for UnboundedSources?
>
There is no analogous API for unbounded sources.


>
> Another question: will Source transforms eventually become deprecated and
> be replaced by the SplittableParDo?
>
Yes; this is already the case in the Portability framework.


>
> Thanks,
> Shen
>
>
>
> On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> Contractually, the sources returned by splitting must represent the
>> original source. Each call to split may return a different set of sub
>> sources but they always represent the entire original source.
>>
>> Note that Dataflow does call split effectively during translation and
>> then later calls APIs on sources to perform dynamic splitting[1].
>>
>> Note, that this is being replaced with SplittableDoFn. Worthwhile to look
>> at this doc[2] and presentation[3].
>>
>> 1:
>> https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f113617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L387
>> 2: https://s.apache.org/splittable-do-fn
>> 3:
>> https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/63696
>>
>>
>>
>> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <cs.she...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Does the split API in Bounded/UnboundedSource guarantee to return the
>>> same result if invoked in different parallel instances in a distributed
>>> environment?
>>>
>>> For example, assume the original source can split into 3 sub-sources.
>>> Say the runner creates 3 parallel source operator instances (perhaps
>>> running in different servers) and uses each instance to handle 1 of the 3
>>> sub-sources. In this case, if each operator instance invokes the split
>>> method in a distributed manner, will they get the same split result?
>>>
>>> My understanding is that the current API does not guarantee the 3
>>> operator instances will receive the same split result. It is possible that
>>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3.
>>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps in
>>> the data streams. If so, shall we add an API to indicate that whether a
>>> source can split at runtime?
>>>
>>> One solution is to avoid this problem is to split the source at
>>> translation time and directly pass sub-sources to operator instances. But
>>> this is not ideal. The server runs the translation might not have access to
>>> the source (DB, KV, MQ, etc). Or the application may want to dynamically
>>> change the source parallel width at runtime. Hence, the runner/engine
>>> sometimes have to split the source during runtime in a distributed
>>> environment.
>>>
>>> Thanks,
>>> Shen
>>>
>>>
>

Reply via email to