Hi Kenn,

Thanks, this was very helpful. I got the side input translation working
now, although I want to go back and see if the View.asXYZ expansions can be
simplified.

But before that I need to tackle PAssert, which is the next blocker for me
to get many of the integration tests working. I see that the PAsserts
generate TimestampedValueInGlobalWindow with no triggers and so grouping
will accumulate state but not emit anything
(PAssert$0/GroupGlobally/GatherAllOutputs/GroupByKey).

    PCollection<Integer> pcollection = pipeline.apply(Create.of(...));
    PAssert.that(pcollection).empty();

Is there a good place to look for a basic understanding of PAssert and what
the runner needs to support?

Thanks,
Thomas



On Thu, Sep 15, 2016 at 11:51 AM, Kenneth Knowles <[email protected]>
wrote:

> Hi Thomas,
>
> The side inputs 1-pager is a forward-looking document for the design of
> side inputs in Beam once the portability layers are completed. The current
> SDK and implementations do not quite respect the same abstraction
> boundaries, even though they are similar.
>
> Here are some specifics about that 1-pager that I hope will help you right
> now:
>
>  - The purple cylinder that says "Runner materializes" corresponds to the
> CreatePCollectionView transform. Eventually this should not appear in the
> SDK or the pipeline representation, but today that is where you put your
> logic to write to some runner-specific storage medium, etc.
>  - This "Runner materializes" / "CreatePCollectionView" is consistent with
> streaming, of course. When new data arrives, the runner makes the new side
> input value available. Most of the View.asXYZ transforms have a GroupByKey
> within them, so the triggering on the side input PCollection will regulate
> this.
>  - The red "RPC" boundary in the diagram will be part of the cross-language
> Fn API. For today, that layer is not present, and it is the Java class
> ViewFn on each PCollectionView<ViewT>. It takes an
> Iterable<WindowedValue<ElemT>> and produces a ViewT.
>  - If we were to use the existing ViewFns without modification, the
> primitive "access_pattern" would be "iterable", not "multimap". Thus, the
> access pattern does not support fetching an individual KV record
> efficiently when the side input is large (when it is small, the map can be
> built in memory and cached). As we move forwards, this should change.
>
> And here are answers beyond the side input 1-pager:
>
>  - The problem of expiry of the side input data is [BEAM-260]. The solution
> is pretty easy, but I have been waiting to send out my proposal to solve it
> with a [WindowMappingFn] since we have so many proposals already in flight.
> I am sharing it now, here, since you brought it up again.
>  - A good, though very large, reference is the recent addition of side
> inputs to the Flink runner in [PR #737] by Aljoscha. In particular, it adds
> [SideInputHandler] as a runner-independent way to build side inputs on top
> of StateInternals. I suspect you would benefit from using this.
>
> I hope this helps!
>
> Kenn
>
> [BEAM-260] https://issues.apache.org/jira/browse/BEAM-260
> [WindowMappingFn] https://s.apache.org/beam-windowmappingfn-1-pager
> <https://s.apache.org/beam-windowmappingfn-1-pager>[PR #737]
> https://github.com/apache/incubator-beam/pull/737
> [SideInputHandler] https://github.com/apache/incubator-beam/blob/master/
> runners/core-java/src/main/java/org/apache/beam/runners/
> core/SideInputHandler.java
>
> On Thu, Sep 15, 2016 at 10:12 AM, Thomas Weise <[email protected]> wrote:
>
> > Hi,
> >
> > I'm working on the Apex runner (
> > https://github.com/apache/incubator-beam/pull/540) and based on the
> > integration test results my next target is support for PCollectionView.
> >
> > I looked at the side inputs doc (
> > https://s.apache.org/beam-side-inputs-1-pager) and see that a suggested
> > implementation approach is RPC.
> >
> > Apex is a streaming engine where individual records flow through the
> > pipeline and operators process data once it becomes available. Hence I'm
> > also looking at side inputs as a stream vs. a call to fetch a specific
> > record. But that would also require a ParDo operator to hold on to the
> side
> > input state until it is no longer needed (based on expiry of the window)?
> >
> > I would appreciate your thoughts on this. Is there a good streaming based
> > implementation to look at for reference? Also, any suggestions to break
> the
> > support for side inputs into multiple tasks that can be taken up
> > independently?
> >
> > Thanks!
> > Thomas
> >
>

Reply via email to