Thanks Thomas,
I saw your commit on the PR.
I will do a test and review later today or tomorrow.
Regards
JB
On 10/02/2016 12:29 AM, Thomas Weise wrote:
Kenn,
Thanks for the pointer WRT the source watermark. It wasn't set to
MAX_TIMESTAMP upon end of input and hence the global window was never
emitted. Got the assertions almost working now, just need to propagate the
exceptions to the unit test driver and then we should be ready for a new
revision of the PR.
Thomas
On Tue, Sep 27, 2016 at 10:17 PM, Thomas Weise <[email protected]>
wrote:
I have GroupByKey working, here is a unit test:
https://github.com/tweise/incubator-beam/blob/BEAM-261.
sideinputs/runners/apex/src/test/java/org/apache/beam/
runners/apex/translators/GroupByKeyTranslatorTest.java#L65L110
For the earlier PAssert example, PAssert.GroupGlobally will assign global
window and remove any triggering. Then the following groupBy won't emit an
aggregate. I'm trying to figure out what I'm missing.
PCollection<Integer> pcollection = pipeline.apply(Create.of(...));
PAssert.that(pcollection).empty();
Thomas
On Tue, Sep 27, 2016 at 5:02 PM, Kenneth Knowles <[email protected]>
wrote:
Hi Thomas,
Great news about the side inputs! The only thing you should need for most
PAsserts to work is GroupByKey. A few PAsserts require side inputs, so if
you got those working then you should have everything you need for all the
PAsserts.
The lack of triggering in PAsserts like the one you mention is because
they
rely on the behavior that the aggregation for a window is emitted when the
window expires and is garbage collected, so all the values for the window
are in one aggregation, thus it is the final value and can be tested. This
happens as part of the GroupAlsoByWindowViaWindowSetDoFn (the logic
itself
is part of ReduceFnRunner), so if you have state and timers working, you
should see output.
If this doesn't seem to be happening, maybe you can give some more
details?
Kenn
On Tue, Sep 27, 2016 at 7:09 PM Thomas Weise <[email protected]> wrote:
Hi Kenn,
Thanks, this was very helpful. I got the side input translation working
now, although I want to go back and see if the View.asXYZ expansions
can be
simplified.
But before that I need to tackle PAssert, which is the next blocker for
me
to get many of the integration tests working. I see that the PAsserts
generate TimestampedValueInGlobalWindow with no triggers and so grouping
will accumulate state but not emit anything
(PAssert$0/GroupGlobally/GatherAllOutputs/GroupByKey).
PCollection<Integer> pcollection = pipeline.apply(Create.of(...));
PAssert.that(pcollection).empty();
Is there a good place to look for a basic understanding of PAssert and
what
the runner needs to support?
Thanks,
Thomas
On Thu, Sep 15, 2016 at 11:51 AM, Kenneth Knowles
<[email protected]>
wrote:
Hi Thomas,
The side inputs 1-pager is a forward-looking document for the design
of
side inputs in Beam once the portability layers are completed. The
current
SDK and implementations do not quite respect the same abstraction
boundaries, even though they are similar.
Here are some specifics about that 1-pager that I hope will help you
right
now:
- The purple cylinder that says "Runner materializes" corresponds to
the
CreatePCollectionView transform. Eventually this should not appear in
the
SDK or the pipeline representation, but today that is where you put
your
logic to write to some runner-specific storage medium, etc.
- This "Runner materializes" / "CreatePCollectionView" is consistent
with
streaming, of course. When new data arrives, the runner makes the new
side
input value available. Most of the View.asXYZ transforms have a
GroupByKey
within them, so the triggering on the side input PCollection will
regulate
this.
- The red "RPC" boundary in the diagram will be part of the
cross-language
Fn API. For today, that layer is not present, and it is the Java class
ViewFn on each PCollectionView<ViewT>. It takes an
Iterable<WindowedValue<ElemT>> and produces a ViewT.
- If we were to use the existing ViewFns without modification, the
primitive "access_pattern" would be "iterable", not "multimap". Thus,
the
access pattern does not support fetching an individual KV record
efficiently when the side input is large (when it is small, the map
can
be
built in memory and cached). As we move forwards, this should change.
And here are answers beyond the side input 1-pager:
- The problem of expiry of the side input data is [BEAM-260]. The
solution
is pretty easy, but I have been waiting to send out my proposal to
solve
it
with a [WindowMappingFn] since we have so many proposals already in
flight.
I am sharing it now, here, since you brought it up again.
- A good, though very large, reference is the recent addition of side
inputs to the Flink runner in [PR #737] by Aljoscha. In particular, it
adds
[SideInputHandler] as a runner-independent way to build side inputs on
top
of StateInternals. I suspect you would benefit from using this.
I hope this helps!
Kenn
[BEAM-260] https://issues.apache.org/jira/browse/BEAM-260
[WindowMappingFn] https://s.apache.org/beam-windowmappingfn-1-pager
<https://s.apache.org/beam-windowmappingfn-1-pager>[PR #737]
https://github.com/apache/incubator-beam/pull/737
[SideInputHandler] https://github.com/apache/incu
bator-beam/blob/master/
runners/core-java/src/main/java/org/apache/beam/runners/
core/SideInputHandler.java
On Thu, Sep 15, 2016 at 10:12 AM, Thomas Weise <[email protected]>
wrote:
Hi,
I'm working on the Apex runner (
https://github.com/apache/incubator-beam/pull/540) and based on the
integration test results my next target is support for
PCollectionView.
I looked at the side inputs doc (
https://s.apache.org/beam-side-inputs-1-pager) and see that a
suggested
implementation approach is RPC.
Apex is a streaming engine where individual records flow through the
pipeline and operators process data once it becomes available. Hence
I'm
also looking at side inputs as a stream vs. a call to fetch a
specific
record. But that would also require a ParDo operator to hold on to
the
side
input state until it is no longer needed (based on expiry of the
window)?
I would appreciate your thoughts on this. Is there a good streaming
based
implementation to look at for reference? Also, any suggestions to
break
the
support for side inputs into multiple tasks that can be taken up
independently?
Thanks!
Thomas
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com