Repository: beam Updated Branches: refs/heads/master e8c557448 -> 621f20f02
Test GBK immediately followed by stateful ParDo Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d9fe885 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d9fe885 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d9fe885 Branch: refs/heads/master Commit: 5d9fe885b2581a98aeb1d470229b733eda52d1cd Parents: e8c5574 Author: Kenneth Knowles <[email protected]> Authored: Fri Jul 14 08:02:00 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Fri Jul 14 13:05:08 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/ParDoTest.java | 37 ++++++++++++++++++++ 1 file changed, 37 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5d9fe885/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index fa4949e..142dff8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -2596,6 +2596,43 @@ public class ParDoTest implements Serializable { pipeline.run(); } + /** + * Tests a GBK followed immediately by a {@link ParDo} that users timers. This checks a common + * case where both GBK and the user code share a timer delivery bundle. + */ + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class}) + public void testGbkFollowedByUserTimers() throws Exception { + + DoFn<KV<String, Iterable<Integer>>, Integer> fn = + new DoFn<KV<String, Iterable<Integer>>, Integer>() { + + public static final String TIMER_ID = "foo"; + + @TimerId(TIMER_ID) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement(ProcessContext context, @TimerId(TIMER_ID) Timer timer) { + timer.offset(Duration.standardSeconds(1)).setRelative(); + context.output(3); + } + + @OnTimer(TIMER_ID) + public void onTimer(OnTimerContext context) { + context.output(42); + } + }; + + PCollection<Integer> output = + pipeline + .apply(Create.of(KV.of("hello", 37))) + .apply(GroupByKey.<String, Integer>create()) + .apply(ParDo.of(fn)); + PAssert.that(output).containsInAnyOrder(3, 42); + pipeline.run(); + } + @Test @Category({ValidatesRunner.class, UsesTimersInParDo.class}) public void testEventTimeTimerAlignBounded() throws Exception {
