Hi Thomas, The side inputs 1-pager is a forward-looking document for the design of side inputs in Beam once the portability layers are completed. The current SDK and implementations do not quite respect the same abstraction boundaries, even though they are similar.
Here are some specifics about that 1-pager that I hope will help you right now: - The purple cylinder that says "Runner materializes" corresponds to the CreatePCollectionView transform. Eventually this should not appear in the SDK or the pipeline representation, but today that is where you put your logic to write to some runner-specific storage medium, etc. - This "Runner materializes" / "CreatePCollectionView" is consistent with streaming, of course. When new data arrives, the runner makes the new side input value available. Most of the View.asXYZ transforms have a GroupByKey within them, so the triggering on the side input PCollection will regulate this. - The red "RPC" boundary in the diagram will be part of the cross-language Fn API. For today, that layer is not present, and it is the Java class ViewFn on each PCollectionView<ViewT>. It takes an Iterable<WindowedValue<ElemT>> and produces a ViewT. - If we were to use the existing ViewFns without modification, the primitive "access_pattern" would be "iterable", not "multimap". Thus, the access pattern does not support fetching an individual KV record efficiently when the side input is large (when it is small, the map can be built in memory and cached). As we move forwards, this should change. And here are answers beyond the side input 1-pager: - The problem of expiry of the side input data is [BEAM-260]. The solution is pretty easy, but I have been waiting to send out my proposal to solve it with a [WindowMappingFn] since we have so many proposals already in flight. I am sharing it now, here, since you brought it up again. - A good, though very large, reference is the recent addition of side inputs to the Flink runner in [PR #737] by Aljoscha. In particular, it adds [SideInputHandler] as a runner-independent way to build side inputs on top of StateInternals. I suspect you would benefit from using this. I hope this helps! Kenn [BEAM-260] https://issues.apache.org/jira/browse/BEAM-260 [WindowMappingFn] https://s.apache.org/beam-windowmappingfn-1-pager <https://s.apache.org/beam-windowmappingfn-1-pager>[PR #737] https://github.com/apache/incubator-beam/pull/737 [SideInputHandler] https://github.com/apache/incubator-beam/blob/master/ runners/core-java/src/main/java/org/apache/beam/runners/ core/SideInputHandler.java On Thu, Sep 15, 2016 at 10:12 AM, Thomas Weise <[email protected]> wrote: > Hi, > > I'm working on the Apex runner ( > https://github.com/apache/incubator-beam/pull/540) and based on the > integration test results my next target is support for PCollectionView. > > I looked at the side inputs doc ( > https://s.apache.org/beam-side-inputs-1-pager) and see that a suggested > implementation approach is RPC. > > Apex is a streaming engine where individual records flow through the > pipeline and operators process data once it becomes available. Hence I'm > also looking at side inputs as a stream vs. a call to fetch a specific > record. But that would also require a ParDo operator to hold on to the side > input state until it is no longer needed (based on expiry of the window)? > > I would appreciate your thoughts on this. Is there a good streaming based > implementation to look at for reference? Also, any suggestions to break the > support for side inputs into multiple tasks that can be taken up > independently? > > Thanks! > Thomas >
