On Mon, Mar 26, 2018 at 1:09 PM Shen Li <cs.she...@gmail.com> wrote: > Hi Lukasz, > > Thanks for your response. > > > Each call to split may return a different set of sub sources but they > always represent the entire original source. > > Inconsistent sets of sub-sources prevent runners/engines from calling the > split API in a distributed manner during runtime. > Correct. split() is applied to a single argument, so there's nothing to execute in parallel here. It executes sequentially, and produces a number of sources that can then be executed in parallel. It's pretty similar to executing a DoFn on a single element.
> Besides, the splitAtFraction(double fraction) is only available in > BoundedSources. How do you perform dynamic splitting for UnboundedSources? > There is no analogous API for unbounded sources. > > Another question: will Source transforms eventually become deprecated and > be replaced by the SplittableParDo? > Yes; this is already the case in the Portability framework. > > Thanks, > Shen > > > > On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <lc...@google.com> wrote: > >> Contractually, the sources returned by splitting must represent the >> original source. Each call to split may return a different set of sub >> sources but they always represent the entire original source. >> >> Note that Dataflow does call split effectively during translation and >> then later calls APIs on sources to perform dynamic splitting[1]. >> >> Note, that this is being replaced with SplittableDoFn. Worthwhile to look >> at this doc[2] and presentation[3]. >> >> 1: >> https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f113617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L387 >> 2: https://s.apache.org/splittable-do-fn >> 3: >> https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/63696 >> >> >> >> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <cs.she...@gmail.com> wrote: >> >>> Hi, >>> >>> Does the split API in Bounded/UnboundedSource guarantee to return the >>> same result if invoked in different parallel instances in a distributed >>> environment? >>> >>> For example, assume the original source can split into 3 sub-sources. >>> Say the runner creates 3 parallel source operator instances (perhaps >>> running in different servers) and uses each instance to handle 1 of the 3 >>> sub-sources. In this case, if each operator instance invokes the split >>> method in a distributed manner, will they get the same split result? >>> >>> My understanding is that the current API does not guarantee the 3 >>> operator instances will receive the same split result. It is possible that >>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3. >>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps in >>> the data streams. If so, shall we add an API to indicate that whether a >>> source can split at runtime? >>> >>> One solution is to avoid this problem is to split the source at >>> translation time and directly pass sub-sources to operator instances. But >>> this is not ideal. The server runs the translation might not have access to >>> the source (DB, KV, MQ, etc). Or the application may want to dynamically >>> change the source parallel width at runtime. Hence, the runner/engine >>> sometimes have to split the source during runtime in a distributed >>> environment. >>> >>> Thanks, >>> Shen >>> >>> >