Hi Thomas,

The side inputs 1-pager is a forward-looking document for the design of
side inputs in Beam once the portability layers are completed. The current
SDK and implementations do not quite respect the same abstraction
boundaries, even though they are similar.

Here are some specifics about that 1-pager that I hope will help you right
now:

 - The purple cylinder that says "Runner materializes" corresponds to the
CreatePCollectionView transform. Eventually this should not appear in the
SDK or the pipeline representation, but today that is where you put your
logic to write to some runner-specific storage medium, etc.
 - This "Runner materializes" / "CreatePCollectionView" is consistent with
streaming, of course. When new data arrives, the runner makes the new side
input value available. Most of the View.asXYZ transforms have a GroupByKey
within them, so the triggering on the side input PCollection will regulate
this.
 - The red "RPC" boundary in the diagram will be part of the cross-language
Fn API. For today, that layer is not present, and it is the Java class
ViewFn on each PCollectionView<ViewT>. It takes an
Iterable<WindowedValue<ElemT>> and produces a ViewT.
 - If we were to use the existing ViewFns without modification, the
primitive "access_pattern" would be "iterable", not "multimap". Thus, the
access pattern does not support fetching an individual KV record
efficiently when the side input is large (when it is small, the map can be
built in memory and cached). As we move forwards, this should change.

And here are answers beyond the side input 1-pager:

 - The problem of expiry of the side input data is [BEAM-260]. The solution
is pretty easy, but I have been waiting to send out my proposal to solve it
with a [WindowMappingFn] since we have so many proposals already in flight.
I am sharing it now, here, since you brought it up again.
 - A good, though very large, reference is the recent addition of side
inputs to the Flink runner in [PR #737] by Aljoscha. In particular, it adds
[SideInputHandler] as a runner-independent way to build side inputs on top
of StateInternals. I suspect you would benefit from using this.

I hope this helps!

Kenn

[BEAM-260] https://issues.apache.org/jira/browse/BEAM-260
[WindowMappingFn] https://s.apache.org/beam-windowmappingfn-1-pager
<https://s.apache.org/beam-windowmappingfn-1-pager>[PR #737]
https://github.com/apache/incubator-beam/pull/737
[SideInputHandler] https://github.com/apache/incubator-beam/blob/master/
runners/core-java/src/main/java/org/apache/beam/runners/
core/SideInputHandler.java

On Thu, Sep 15, 2016 at 10:12 AM, Thomas Weise <[email protected]> wrote:

> Hi,
>
> I'm working on the Apex runner (
> https://github.com/apache/incubator-beam/pull/540) and based on the
> integration test results my next target is support for PCollectionView.
>
> I looked at the side inputs doc (
> https://s.apache.org/beam-side-inputs-1-pager) and see that a suggested
> implementation approach is RPC.
>
> Apex is a streaming engine where individual records flow through the
> pipeline and operators process data once it becomes available. Hence I'm
> also looking at side inputs as a stream vs. a call to fetch a specific
> record. But that would also require a ParDo operator to hold on to the side
> input state until it is no longer needed (based on expiry of the window)?
>
> I would appreciate your thoughts on this. Is there a good streaming based
> implementation to look at for reference? Also, any suggestions to break the
> support for side inputs into multiple tasks that can be taken up
> independently?
>
> Thanks!
> Thomas
>

Reply via email to