[BEAM-2447] Reintroduces DoFn.ProcessContinuation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f7f1699 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f7f1699 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f7f1699 Branch: refs/heads/DSL_SQL Commit: 4f7f16990a8fc49a9b6ae199809f0ada7dc7448d Parents: bd2a8cc Author: Eugene Kirpichov <[email protected]> Authored: Tue Jun 13 16:50:35 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 20:01:02 2017 -0700 ---------------------------------------------------------------------- .../core/construction/SplittableParDoTest.java | 10 +- ...eBoundedSplittableProcessElementInvoker.java | 35 ++++++- .../core/SplittableParDoViaKeyedWorkItems.java | 9 +- .../core/SplittableProcessElementInvoker.java | 25 ++++- ...ndedSplittableProcessElementInvokerTest.java | 45 +++++++-- .../core/SplittableParDoProcessFnTest.java | 99 ++++++++++++++++-- .../org/apache/beam/sdk/transforms/DoFn.java | 51 +++++++++- .../reflect/ByteBuddyDoFnInvokerFactory.java | 19 +++- .../sdk/transforms/reflect/DoFnInvoker.java | 4 +- .../sdk/transforms/reflect/DoFnSignature.java | 10 +- .../sdk/transforms/reflect/DoFnSignatures.java | 22 +++- .../splittabledofn/OffsetRangeTracker.java | 10 ++ .../splittabledofn/RestrictionTracker.java | 11 +- .../beam/sdk/transforms/SplittableDoFnTest.java | 100 ++++++++----------- .../transforms/reflect/DoFnInvokersTest.java | 93 +++++++++++++---- .../DoFnSignaturesProcessElementTest.java | 2 +- .../DoFnSignaturesSplittableDoFnTest.java | 83 +++++++++++++-- 17 files changed, 487 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java index f4c596e..267232c 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core.construction; +import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; import static org.junit.Assert.assertEquals; import java.io.Serializable; @@ -24,8 +25,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; -import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -70,7 +69,6 @@ public class SplittableParDoTest { public void checkDone() {} } - @BoundedPerElement private static class BoundedFakeFn extends DoFn<Integer, String> { @ProcessElement public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {} @@ -81,10 +79,12 @@ public class SplittableParDoTest { } } - @UnboundedPerElement private static class UnboundedFakeFn extends DoFn<Integer, String> { @ProcessElement - public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {} + public ProcessContinuation processElement( + ProcessContext context, SomeRestrictionTracker tracker) { + return stop(); + } @GetInitialRestriction public SomeRestriction getInitialRestriction(Integer element) { http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 475abf2..0c956d5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -96,7 +96,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< final WindowedValue<InputT> element, final TrackerT tracker) { final ProcessContext processContext = new ProcessContext(element, tracker); - invoker.invokeProcessElement( + DoFn.ProcessContinuation cont = invoker.invokeProcessElement( new DoFnInvoker.ArgumentProvider<InputT, OutputT>() { @Override public DoFn<InputT, OutputT>.ProcessContext processContext( @@ -155,10 +155,37 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< "Access to timers not supported in Splittable DoFn"); } }); - + // TODO: verify that if there was a failed tryClaim() call, then cont.shouldResume() is false. + // Currently we can't verify this because there are no hooks into tryClaim(). + // See https://issues.apache.org/jira/browse/BEAM-2607 + RestrictionT residual = processContext.extractCheckpoint(); + if (cont.shouldResume()) { + if (residual == null) { + // No checkpoint had been taken by the runner while the ProcessElement call ran, however + // the call says that not the whole restriction has been processed. So we need to take + // a checkpoint now: checkpoint() guarantees that the primary restriction describes exactly + // the work that was done in the current ProcessElement call, and returns a residual + // restriction that describes exactly the work that wasn't done in the current call. + residual = tracker.checkpoint(); + } else { + // A checkpoint was taken by the runner, and then the ProcessElement call returned resume() + // without making more tryClaim() calls (since no tryClaim() calls can succeed after + // checkpoint(), and since if it had made a failed tryClaim() call, it should have returned + // stop()). + // This means that the resulting primary restriction and the taken checkpoint already + // accurately describe respectively the work that was and wasn't done in the current + // ProcessElement call. + // In other words, if we took a checkpoint *after* ProcessElement completed (like in the + // branch above), it would have been equivalent to this one. + } + } else { + // The ProcessElement call returned stop() - that means the tracker's current restriction + // has been fully processed by the call. A checkpoint may or may not have been taken in + // "residual"; if it was, then we'll need to process it; if no, then we don't - nothing + // special needs to be done. + } tracker.checkDone(); - return new Result( - processContext.extractCheckpoint(), processContext.getLastReportedWatermark()); + return new Result(residual, cont, processContext.getLastReportedWatermark()); } private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext { http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 09f3b15..6e97645 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -200,8 +200,8 @@ public class SplittableParDoViaKeyedWorkItems { /** * The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is * acquired during the first {@link DoFn.ProcessElement} call for each element and restriction, - * and is released when the {@link DoFn.ProcessElement} call returns and there is no residual - * restriction captured by the {@link SplittableProcessElementInvoker}. + * and is released when the {@link DoFn.ProcessElement} call returns {@link + * ProcessContinuation#stop()}. * * <p>A hold is needed to avoid letting the output watermark immediately progress together with * the input watermark when the first {@link DoFn.ProcessElement} call for this element @@ -365,11 +365,12 @@ public class SplittableParDoViaKeyedWorkItems { if (futureOutputWatermark == null) { futureOutputWatermark = elementAndRestriction.getKey().getTimestamp(); } + Instant wakeupTime = + timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay()); holdState.add(futureOutputWatermark); // Set a timer to continue processing this element. timerInternals.setTimer( - TimerInternals.TimerData.of( - stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME)); + TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME)); } private DoFn<InputT, OutputT>.StartBundleContext wrapContextAsStartBundle( http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java index ced6c01..7732df3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.core; +import static com.google.common.base.Preconditions.checkNotNull; + import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; @@ -34,20 +36,35 @@ public abstract class SplittableProcessElementInvoker< public class Result { @Nullable private final RestrictionT residualRestriction; + private final DoFn.ProcessContinuation continuation; private final Instant futureOutputWatermark; public Result( - @Nullable RestrictionT residualRestriction, Instant futureOutputWatermark) { + @Nullable RestrictionT residualRestriction, + DoFn.ProcessContinuation continuation, + Instant futureOutputWatermark) { + this.continuation = checkNotNull(continuation); + if (continuation.shouldResume()) { + checkNotNull(residualRestriction); + } this.residualRestriction = residualRestriction; this.futureOutputWatermark = futureOutputWatermark; } - /** If {@code null}, means the call should not resume. */ + /** + * Can be {@code null} only if {@link #getContinuation} specifies the call should not resume. + * However, the converse is not true: this can be non-null even if {@link #getContinuation} + * is {@link DoFn.ProcessContinuation#stop()}. + */ @Nullable public RestrictionT getResidualRestriction() { return residualRestriction; } + public DoFn.ProcessContinuation getContinuation() { + return continuation; + } + public Instant getFutureOutputWatermark() { return futureOutputWatermark; } @@ -57,8 +74,8 @@ public abstract class SplittableProcessElementInvoker< * Invokes the {@link DoFn.ProcessElement} method using the given {@link DoFnInvoker} for the * original {@link DoFn}, on the given element and with the given {@link RestrictionTracker}. * - * @return Information on how to resume the call: residual restriction and a - * future output watermark. + * @return Information on how to resume the call: residual restriction, a {@link + * DoFn.ProcessContinuation}, and a future output watermark. */ public abstract Result invokeProcessElement( DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, TrackerT tracker); http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java index b80a632..959909e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java @@ -17,11 +17,15 @@ */ package org.apache.beam.runners.core; +import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; +import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import java.util.Collection; import java.util.concurrent.Executors; @@ -42,19 +46,27 @@ import org.junit.Test; /** Tests for {@link OutputAndTimeBoundedSplittableProcessElementInvoker}. */ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { private static class SomeFn extends DoFn<Integer, String> { + private final int numOutputsPerProcessCall; private final Duration sleepBeforeEachOutput; - private SomeFn(Duration sleepBeforeEachOutput) { + private SomeFn(int numOutputsPerProcessCall, Duration sleepBeforeEachOutput) { + this.numOutputsPerProcessCall = numOutputsPerProcessCall; this.sleepBeforeEachOutput = sleepBeforeEachOutput; } @ProcessElement - public void process(ProcessContext context, OffsetRangeTracker tracker) + public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker) throws Exception { - for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { + for (long i = tracker.currentRestriction().getFrom(), numIterations = 1; + tracker.tryClaim(i); + ++i, ++numIterations) { Thread.sleep(sleepBeforeEachOutput.getMillis()); context.output("" + i); + if (numIterations == numOutputsPerProcessCall) { + return resume(); + } } + return stop(); } @GetInitialRestriction @@ -64,8 +76,8 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { } private SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result - runTest(int count, Duration sleepPerElement) { - SomeFn fn = new SomeFn(sleepPerElement); + runTest(int totalNumOutputs, int numOutputsPerProcessCall, Duration sleepPerElement) { + SomeFn fn = new SomeFn(numOutputsPerProcessCall, sleepPerElement); SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker> invoker = new OutputAndTimeBoundedSplittableProcessElementInvoker<>( fn, @@ -93,14 +105,15 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { return invoker.invokeProcessElement( DoFnInvokers.invokerFor(fn), - WindowedValue.of(count, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), - new OffsetRangeTracker(new OffsetRange(0, count))); + WindowedValue.of(totalNumOutputs, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), + new OffsetRangeTracker(new OffsetRange(0, totalNumOutputs))); } @Test public void testInvokeProcessElementOutputBounded() throws Exception { SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res = - runTest(10000, Duration.ZERO); + runTest(10000, Integer.MAX_VALUE, Duration.ZERO); + assertFalse(res.getContinuation().shouldResume()); OffsetRange residualRange = res.getResidualRestriction(); // Should process the first 100 elements. assertEquals(1000, residualRange.getFrom()); @@ -110,7 +123,8 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { @Test public void testInvokeProcessElementTimeBounded() throws Exception { SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res = - runTest(10000, Duration.millis(100)); + runTest(10000, Integer.MAX_VALUE, Duration.millis(100)); + assertFalse(res.getContinuation().shouldResume()); OffsetRange residualRange = res.getResidualRestriction(); // Should process ideally around 30 elements - but due to timing flakiness, we can't enforce // that precisely. Just test that it's not egregiously off. @@ -120,9 +134,18 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { } @Test - public void testInvokeProcessElementVoluntaryReturn() throws Exception { + public void testInvokeProcessElementVoluntaryReturnStop() throws Exception { SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res = - runTest(5, Duration.millis(100)); + runTest(5, Integer.MAX_VALUE, Duration.millis(100)); + assertFalse(res.getContinuation().shouldResume()); assertNull(res.getResidualRestriction()); } + + @Test + public void testInvokeProcessElementVoluntaryReturnResume() throws Exception { + SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res = + runTest(10, 5, Duration.millis(100)); + assertTrue(res.getContinuation().shouldResume()); + assertEquals(new OffsetRange(5, 10), res.getResidualRestriction()); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java index 1cd1275..7449af3 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.core; +import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; +import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasItems; @@ -365,16 +368,71 @@ public class SplittableParDoProcessFnTest { assertEquals(null, tester.getWatermarkHold()); } - /** - * A splittable {@link DoFn} that generates the sequence [init, init + total). - */ + /** A simple splittable {@link DoFn} that outputs the given element every 5 seconds forever. */ + private static class SelfInitiatedResumeFn extends DoFn<Integer, String> { + @ProcessElement + public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) { + c.output(c.element().toString()); + return resume().withResumeDelay(Duration.standardSeconds(5)); + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer elem) { + return new SomeRestriction(); + } + } + + @Test + public void testResumeSetsTimer() throws Exception { + DoFn<Integer, String> fn = new SelfInitiatedResumeFn(); + Instant base = Instant.now(); + ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester = + new ProcessFnTester<>( + base, + fn, + BigEndianIntegerCoder.of(), + SerializableCoder.of(SomeRestriction.class), + MAX_OUTPUTS_PER_BUNDLE, + MAX_BUNDLE_DURATION); + + tester.startElement(42, new SomeRestriction()); + assertThat(tester.takeOutputElements(), contains("42")); + + // Should resume after 5 seconds: advancing by 3 seconds should have no effect. + assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); + assertTrue(tester.takeOutputElements().isEmpty()); + + // 6 seconds should be enough should invoke the fn again. + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); + assertThat(tester.takeOutputElements(), contains("42")); + + // Should again resume after 5 seconds: advancing by 3 seconds should again have no effect. + assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); + assertTrue(tester.takeOutputElements().isEmpty()); + + // 6 seconds should again be enough. + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3))); + assertThat(tester.takeOutputElements(), contains("42")); + } + + /** A splittable {@link DoFn} that generates the sequence [init, init + total). */ private static class CounterFn extends DoFn<Integer, String> { + private final int numOutputsPerCall; + + public CounterFn(int numOutputsPerCall) { + this.numOutputsPerCall = numOutputsPerCall; + } + @ProcessElement - public void process(ProcessContext c, OffsetRangeTracker tracker) { - for (long i = tracker.currentRestriction().getFrom(); - tracker.tryClaim(i); ++i) { + public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) { + for (long i = tracker.currentRestriction().getFrom(), numIterations = 0; + tracker.tryClaim(i); ++i, ++numIterations) { c.output(String.valueOf(c.element() + i)); + if (numIterations == numOutputsPerCall) { + return resume(); + } } + return stop(); } @GetInitialRestriction @@ -383,10 +441,35 @@ public class SplittableParDoProcessFnTest { } } + public void testResumeCarriesOverState() throws Exception { + DoFn<Integer, String> fn = new CounterFn(1); + Instant base = Instant.now(); + ProcessFnTester<Integer, String, OffsetRange, OffsetRangeTracker> tester = + new ProcessFnTester<>( + base, + fn, + BigEndianIntegerCoder.of(), + SerializableCoder.of(OffsetRange.class), + MAX_OUTPUTS_PER_BUNDLE, + MAX_BUNDLE_DURATION); + + tester.startElement(42, new OffsetRange(0, 3)); + assertThat(tester.takeOutputElements(), contains("42")); + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + assertThat(tester.takeOutputElements(), contains("43")); + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + assertThat(tester.takeOutputElements(), contains("44")); + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + // After outputting all 3 items, should not output anything more. + assertEquals(0, tester.takeOutputElements().size()); + // Should also not ask to resume. + assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + } + @Test public void testCheckpointsAfterNumOutputs() throws Exception { int max = 100; - DoFn<Integer, String> fn = new CounterFn(); + DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE); Instant base = Instant.now(); int baseIndex = 42; @@ -428,7 +511,7 @@ public class SplittableParDoProcessFnTest { // But bound bundle duration - the bundle should terminate. Duration maxBundleDuration = Duration.standardSeconds(1); // Create an fn that attempts to 2x output more than checkpointing allows. - DoFn<Integer, String> fn = new CounterFn(); + DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE); Instant base = Instant.now(); int baseIndex = 42; http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index a2e5c16..1b809c2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms; +import com.google.auto.value.AutoValue; import java.io.Serializable; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -545,11 +546,15 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * returned by {@link GetInitialRestriction} implements {@link HasDefaultTracker}. * <li>It <i>may</i> define a {@link GetRestrictionCoder} method. * <li>The type of restrictions used by all of these methods must be the same. + * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to + * indicate whether there is more work to be done for the current element. * <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such as * {@link BoundedWindow}. * <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or * {@link UnboundedPerElement}, but not both at the same time. If it's not annotated with - * either of these, it's assumed to be {@link BoundedPerElement}. + * either of these, it's assumed to be {@link BoundedPerElement} if its {@link + * ProcessElement} method returns {@code void} and {@link UnboundedPerElement} if it + * returns a {@link ProcessContinuation}. * </ul> * * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods. @@ -677,8 +682,48 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD @Experimental(Kind.SPLITTABLE_DO_FN) public @interface UnboundedPerElement {} - /** Temporary, do not use. See https://issues.apache.org/jira/browse/BEAM-1904 */ - public class ProcessContinuation {} + // This can't be put into ProcessContinuation itself due to the following problem: + // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html + private static final ProcessContinuation PROCESS_CONTINUATION_STOP = + new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO); + + /** + * When used as a return value of {@link ProcessElement}, indicates whether there is more work to + * be done for the current element. + * + * <p>If the {@link ProcessElement} call completes because of a failed {@code tryClaim()} call + * on the {@link RestrictionTracker}, then the call MUST return {@link #stop()}. + */ + @Experimental(Kind.SPLITTABLE_DO_FN) + @AutoValue + public abstract static class ProcessContinuation { + /** Indicates that there is no more work to be done for the current element. */ + public static ProcessContinuation stop() { + return PROCESS_CONTINUATION_STOP; + } + + /** Indicates that there is more work to be done for the current element. */ + public static ProcessContinuation resume() { + return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO); + } + + /** + * If false, the {@link DoFn} promises that there is no more work remaining for the current + * element, so the runner should not resume the {@link ProcessElement} call. + */ + public abstract boolean shouldResume(); + + /** + * A minimum duration that should elapse between the end of this {@link ProcessElement} call and + * the {@link ProcessElement} call continuing processing of the same element. By default, zero. + */ + public abstract Duration resumeDelay(); + + /** Builder method to set the value of {@link #resumeDelay()}. */ + public ProcessContinuation withResumeDelay(Duration resumeDelay) { + return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeDelay); + } + } /** * Finalize the {@link DoFn} construction to prepare for processing. http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 8378204..cf96c9b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -49,7 +49,6 @@ import net.bytebuddy.implementation.bytecode.Throw; import net.bytebuddy.implementation.bytecode.assign.Assigner; import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing; import net.bytebuddy.implementation.bytecode.assign.TypeCasting; -import net.bytebuddy.implementation.bytecode.constant.NullConstant; import net.bytebuddy.implementation.bytecode.constant.TextConstant; import net.bytebuddy.implementation.bytecode.member.FieldAccess; import net.bytebuddy.implementation.bytecode.member.MethodInvocation; @@ -641,6 +640,17 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { * {@link ProcessElement} method. */ private static final class ProcessElementDelegation extends DoFnMethodDelegation { + private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD; + + static { + try { + PROCESS_CONTINUATION_STOP_METHOD = + new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop")); + } catch (NoSuchMethodException e) { + throw new RuntimeException("Failed to locate ProcessContinuation.stop()"); + } + } + private final DoFnSignature.ProcessElementMethod signature; /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */ @@ -677,7 +687,12 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { @Override protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) { - return new StackManipulation.Compound(NullConstant.INSTANCE, MethodReturn.REFERENCE); + if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) { + return new StackManipulation.Compound( + MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE); + } else { + return MethodReturn.of(targetMethod.getReturnType().asErasure()); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 3b22fda..8b41fee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -54,8 +54,8 @@ public interface DoFnInvoker<InputT, OutputT> { * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}. * * @param extra Factory for producing extra parameter objects (such as window), if necessary. - * @return {@code null} - see <a href="https://issues.apache.org/jira/browse/BEAM-1904">JIRA</a> - * tracking the complete removal of {@link DoFn.ProcessContinuation}. + * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link + * DoFn.ProcessContinuation#stop()} if it returns {@code void}. */ DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra); http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index 6eeed8e..bfad69e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; @@ -433,16 +434,21 @@ public abstract class DoFnSignature { @Nullable public abstract TypeDescriptor<? extends BoundedWindow> windowT(); + /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */ + public abstract boolean hasReturnValue(); + static ProcessElementMethod create( Method targetMethod, List<Parameter> extraParameters, TypeDescriptor<?> trackerT, - @Nullable TypeDescriptor<? extends BoundedWindow> windowT) { + @Nullable TypeDescriptor<? extends BoundedWindow> windowT, + boolean hasReturnValue) { return new AutoValue_DoFnSignature_ProcessElementMethod( targetMethod, Collections.unmodifiableList(extraParameters), trackerT, - windowT); + windowT, + hasReturnValue); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 1b27e66..de57c3b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.transforms.reflect; +import static com.google.common.base.Preconditions.checkState; + import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicates; @@ -440,6 +442,8 @@ public class DoFnSignatures { * <li>If the {@link DoFn} (or any of its supertypes) is annotated as {@link * DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, use that. Only one of * these must be specified. + * <li>If {@link DoFn.ProcessElement} returns {@link DoFn.ProcessContinuation}, assume it is + * unbounded. Otherwise (if it returns {@code void}), assume it is bounded. * <li>If {@link DoFn.ProcessElement} returns {@code void}, but the {@link DoFn} is annotated * {@link DoFn.UnboundedPerElement}, this is an error. * </ol> @@ -465,7 +469,10 @@ public class DoFnSignatures { } if (processElement.isSplittable()) { if (isBounded == null) { - isBounded = PCollection.IsBounded.BOUNDED; + isBounded = + processElement.hasReturnValue() + ? PCollection.IsBounded.UNBOUNDED + : PCollection.IsBounded.BOUNDED; } } else { errors.checkArgument( @@ -474,6 +481,7 @@ public class DoFnSignatures { + ((isBounded == PCollection.IsBounded.BOUNDED) ? DoFn.BoundedPerElement.class.getSimpleName() : DoFn.UnboundedPerElement.class.getSimpleName())); + checkState(!processElement.hasReturnValue(), "Should have been inferred splittable"); isBounded = PCollection.IsBounded.BOUNDED; } return isBounded; @@ -710,8 +718,10 @@ public class DoFnSignatures { TypeDescriptor<?> outputT, FnAnalysisContext fnContext) { errors.checkArgument( - void.class.equals(m.getReturnType()), - "Must return void"); + void.class.equals(m.getReturnType()) + || DoFn.ProcessContinuation.class.equals(m.getReturnType()), + "Must return void or %s", + DoFn.ProcessContinuation.class.getSimpleName()); MethodAnalysisContext methodContext = MethodAnalysisContext.create(); @@ -751,7 +761,11 @@ public class DoFnSignatures { } return DoFnSignature.ProcessElementMethod.create( - m, methodContext.getExtraParameters(), trackerT, windowT); + m, + methodContext.getExtraParameters(), + trackerT, + windowT, + DoFn.ProcessContinuation.class.equals(m.getReturnType())); } private static void checkParameterOneOf( http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java index 62c10a7..4987409 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.MoreObjects; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; @@ -100,4 +101,13 @@ public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> { lastAttemptedOffset + 1, range.getTo()); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("range", range) + .add("lastClaimedOffset", lastClaimedOffset) + .add("lastAttemptedOffset", lastAttemptedOffset) + .toString(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index 27ef68f..8cb0a6b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -31,10 +31,13 @@ public interface RestrictionTracker<RestrictionT> { RestrictionT currentRestriction(); /** - * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible. - * Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the work: - * the old value of {@link #currentRestriction} is equivalent to the new value and the return - * value of this method combined. Must be called at most once on a given object. + * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible: + * after this method returns, the tracker MUST refuse all future claim calls, and {@link + * #checkDone} MUST succeed. + * + * <p>Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the + * work: the old value of {@link #currentRestriction} is equivalent to the new value and the + * return value of this method combined. Must be called at most once on a given object. */ RestrictionT checkpoint(); http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index cb60f9a..d2d2529 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -19,10 +19,10 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.testing.TestPipeline.testingPipelineOptions; -import static org.hamcrest.Matchers.greaterThan; +import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; +import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.common.collect.Ordering; @@ -33,7 +33,6 @@ import java.util.List; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; @@ -74,10 +73,16 @@ public class SplittableDoFnTest implements Serializable { static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> { @ProcessElement - public void process(ProcessContext c, OffsetRangeTracker tracker) { - for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { + public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) { + for (long i = tracker.currentRestriction().getFrom(), numIterations = 0; + tracker.tryClaim(i); + ++i, ++numIterations) { c.output(KV.of(c.element(), (int) i)); + if (numIterations % 3 == 0) { + return resume(); + } } + return stop(); } @GetInitialRestriction @@ -206,10 +211,10 @@ public class SplittableDoFnTest implements Serializable { private static class SDFWithMultipleOutputsPerBlock extends DoFn<String, Integer> { private static final int MAX_INDEX = 98765; - private final TupleTag<Integer> numProcessCalls; + private final int numClaimsPerCall; - private SDFWithMultipleOutputsPerBlock(TupleTag<Integer> numProcessCalls) { - this.numProcessCalls = numProcessCalls; + private SDFWithMultipleOutputsPerBlock(int numClaimsPerCall) { + this.numClaimsPerCall = numClaimsPerCall; } private static int snapToNextBlock(int index, int[] blockStarts) { @@ -222,15 +227,20 @@ public class SplittableDoFnTest implements Serializable { } @ProcessElement - public void processElement(ProcessContext c, OffsetRangeTracker tracker) { + public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) { int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX}; int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts); - c.output(numProcessCalls, 1); - for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) { + for (int i = trueStart, numIterations = 1; + tracker.tryClaim(blockStarts[i]); + ++i, ++numIterations) { for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) { c.output(index); } + if (numIterations == numClaimsPerCall) { + return resume(); + } } + return stop(); } @GetInitialRestriction @@ -242,26 +252,10 @@ public class SplittableDoFnTest implements Serializable { @Test @Category({ValidatesRunner.class, UsesSplittableParDo.class}) public void testOutputAfterCheckpoint() throws Exception { - TupleTag<Integer> main = new TupleTag<>(); - TupleTag<Integer> numProcessCalls = new TupleTag<>(); - PCollectionTuple outputs = - p.apply(Create.of("foo")) - .apply( - ParDo.of(new SDFWithMultipleOutputsPerBlock(numProcessCalls)) - .withOutputTags(main, TupleTagList.of(numProcessCalls))); - PAssert.thatSingleton(outputs.get(main).apply(Count.<Integer>globally())) + PCollection<Integer> outputs = p.apply(Create.of("foo")) + .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock(3))); + PAssert.thatSingleton(outputs.apply(Count.<Integer>globally())) .isEqualTo((long) SDFWithMultipleOutputsPerBlock.MAX_INDEX); - // Verify that more than 1 process() call was involved, i.e. that there was checkpointing. - PAssert.thatSingleton( - outputs.get(numProcessCalls).setCoder(VarIntCoder.of()).apply(Sum.integersGlobally())) - .satisfies( - new SerializableFunction<Integer, Void>() { - @Override - public Void apply(Integer input) { - assertThat(input, greaterThan(1)); - return null; - } - }); p.run(); } @@ -341,12 +335,12 @@ public class SplittableDoFnTest implements Serializable { extends DoFn<Integer, KV<String, Integer>> { private static final int MAX_INDEX = 98765; private final PCollectionView<String> sideInput; - private final TupleTag<Integer> numProcessCalls; + private final int numClaimsPerCall; public SDFWithMultipleOutputsPerBlockAndSideInput( - PCollectionView<String> sideInput, TupleTag<Integer> numProcessCalls) { + PCollectionView<String> sideInput, int numClaimsPerCall) { this.sideInput = sideInput; - this.numProcessCalls = numProcessCalls; + this.numClaimsPerCall = numClaimsPerCall; } private static int snapToNextBlock(int index, int[] blockStarts) { @@ -359,15 +353,20 @@ public class SplittableDoFnTest implements Serializable { } @ProcessElement - public void processElement(ProcessContext c, OffsetRangeTracker tracker) { + public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) { int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX}; int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts); - c.output(numProcessCalls, 1); - for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) { + for (int i = trueStart, numIterations = 1; + tracker.tryClaim(blockStarts[i]); + ++i, ++numIterations) { for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) { c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index)); } + if (numIterations == numClaimsPerCall) { + return resume(); + } } + return stop(); } @GetInitialRestriction @@ -400,15 +399,14 @@ public class SplittableDoFnTest implements Serializable { .apply("window 2", Window.<String>into(FixedWindows.of(Duration.millis(2)))) .apply("singleton", View.<String>asSingleton()); - TupleTag<KV<String, Integer>> main = new TupleTag<>(); - TupleTag<Integer> numProcessCalls = new TupleTag<>(); - PCollectionTuple res = + PCollection<KV<String, Integer>> res = mainInput.apply( - ParDo.of(new SDFWithMultipleOutputsPerBlockAndSideInput(sideInput, numProcessCalls)) - .withSideInputs(sideInput) - .withOutputTags(main, TupleTagList.of(numProcessCalls))); + ParDo.of( + new SDFWithMultipleOutputsPerBlockAndSideInput( + sideInput, 3 /* numClaimsPerCall */)) + .withSideInputs(sideInput)); PCollection<KV<String, Iterable<Integer>>> grouped = - res.get(main).apply(GroupByKey.<String, Integer>create()); + res.apply(GroupByKey.<String, Integer>create()); PAssert.that(grouped.apply(Keys.<String>create())) .containsInAnyOrder("a:0", "a:1", "b:2", "b:3"); @@ -427,22 +425,6 @@ public class SplittableDoFnTest implements Serializable { return null; } }); - - // Verify that more than 1 process() call was involved, i.e. that there was checkpointing. - PAssert.thatSingleton( - res.get(numProcessCalls) - .setCoder(VarIntCoder.of()) - .apply(Sum.integersGlobally().withoutDefaults())) - // This should hold in all windows, but verifying a particular window is sufficient. - .inOnlyPane(new IntervalWindow(new Instant(0), new Instant(1))) - .satisfies( - new SerializableFunction<Integer, Void>() { - @Override - public Void apply(Integer input) { - assertThat(input, greaterThan(1)); - return null; - } - }); p.run(); // TODO: also test coverage when some of the windows of the side input are not ready. http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 3edb194..2098c66 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.transforms.reflect; +import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; +import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; @@ -89,8 +91,8 @@ public class DoFnInvokersTest { when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext); } - private void invokeProcessElement(DoFn<String, String> fn) { - DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider); + private DoFn.ProcessContinuation invokeProcessElement(DoFn<String, String> fn) { + return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider); } private void invokeOnTimer(String timerId, DoFn<String, String> fn) { @@ -119,7 +121,7 @@ public class DoFnInvokersTest { public void processElement(ProcessContext c) throws Exception {} } MockFn mockFn = mock(MockFn.class); - invokeProcessElement(mockFn); + assertEquals(stop(), invokeProcessElement(mockFn)); verify(mockFn).processElement(mockProcessContext); } @@ -140,7 +142,7 @@ public class DoFnInvokersTest { public void testDoFnWithProcessElementInterface() throws Exception { IdentityUsingInterfaceWithProcessElement fn = mock(IdentityUsingInterfaceWithProcessElement.class); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); verify(fn).processElement(mockProcessContext); } @@ -161,14 +163,14 @@ public class DoFnInvokersTest { @Test public void testDoFnWithMethodInSuperclass() throws Exception { IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); verify(fn).process(mockProcessContext); } @Test public void testDoFnWithMethodInSubclass() throws Exception { IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); verify(fn).process(mockProcessContext); } @@ -179,7 +181,7 @@ public class DoFnInvokersTest { public void processElement(ProcessContext c, IntervalWindow w) throws Exception {} } MockFn fn = mock(MockFn.class); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); verify(fn).processElement(mockProcessContext, mockWindow); } @@ -203,7 +205,7 @@ public class DoFnInvokersTest { throws Exception {} } MockFn fn = mock(MockFn.class); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); verify(fn).processElement(mockProcessContext, mockState); } @@ -229,11 +231,35 @@ public class DoFnInvokersTest { public void onTimer() {} } MockFn fn = mock(MockFn.class); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); verify(fn).processElement(mockProcessContext, mockTimer); } @Test + public void testDoFnWithReturn() throws Exception { + class MockFn extends DoFn<String, String> { + @DoFn.ProcessElement + public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) + throws Exception { + return null; + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(String element) { + return null; + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + } + MockFn fn = mock(MockFn.class); + when(fn.processElement(mockProcessContext, null)).thenReturn(resume()); + assertEquals(resume(), invokeProcessElement(fn)); + } + + @Test public void testDoFnWithStartBundleSetupTeardown() throws Exception { class MockFn extends DoFn<String, String> { @ProcessElement @@ -288,7 +314,9 @@ public class DoFnInvokersTest { /** Public so Mockito can do "delegatesTo()" in the test below. */ public static class MockFn extends DoFn<String, String> { @ProcessElement - public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {} + public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) { + return null; + } @GetInitialRestriction public SomeRestriction getInitialRestriction(String element) { @@ -340,7 +368,7 @@ public class DoFnInvokersTest { .splitRestriction( eq("blah"), same(restriction), Mockito.<DoFn.OutputReceiver<SomeRestriction>>any()); when(fn.newTracker(restriction)).thenReturn(tracker); - fn.processElement(mockProcessContext, tracker); + when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume()); assertEquals(coder, invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault())); assertEquals(restriction, invoker.invokeGetInitialRestriction("blah")); @@ -356,6 +384,8 @@ public class DoFnInvokersTest { }); assertEquals(Arrays.asList(part1, part2, part3), outputs); assertEquals(tracker, invoker.invokeNewTracker(restriction)); + assertEquals( + resume(), invoker.invokeProcessElement( new FakeArgumentProvider<String, String>() { @Override @@ -367,7 +397,7 @@ public class DoFnInvokersTest { public RestrictionTracker<?> restrictionTracker() { return tracker; } - }); + })); } private static class RestrictionWithDefaultTracker @@ -441,7 +471,7 @@ public class DoFnInvokersTest { assertEquals("foo", output); } }); - invoker.invokeProcessElement(mockArgumentProvider); + assertEquals(stop(), invoker.invokeProcessElement(mockArgumentProvider)); assertThat( invoker.invokeNewTracker(new RestrictionWithDefaultTracker()), instanceOf(DefaultTracker.class)); @@ -531,14 +561,14 @@ public class DoFnInvokersTest { @Test public void testLocalPrivateDoFnClass() throws Exception { PrivateDoFnClass fn = mock(PrivateDoFnClass.class); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); verify(fn).processThis(mockProcessContext); } @Test public void testStaticPackagePrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass()); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockProcessContext); } @@ -546,28 +576,28 @@ public class DoFnInvokersTest { public void testInnerPackagePrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass()); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockProcessContext); } @Test public void testStaticPrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass()); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockProcessContext); } @Test public void testInnerPrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass()); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockProcessContext); } @Test public void testAnonymousInnerDoFn() throws Exception { DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass()); - invokeProcessElement(fn); + assertEquals(stop(), invokeProcessElement(fn)); DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockProcessContext); } @@ -604,6 +634,31 @@ public class DoFnInvokersTest { } @Test + public void testProcessElementExceptionWithReturn() throws Exception { + thrown.expect(UserCodeException.class); + thrown.expectMessage("bogus"); + DoFnInvokers.invokerFor( + new DoFn<Integer, Integer>() { + @ProcessElement + public ProcessContinuation processElement( + @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) { + throw new IllegalArgumentException("bogus"); + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } + }) + .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>()); + } + + @Test public void testStartBundleException() throws Exception { DoFnInvoker<Integer, Integer> invoker = DoFnInvokers.invokerFor( http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java index d321f54..44ae5c4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java @@ -50,7 +50,7 @@ public class DoFnSignaturesProcessElementTest { @Test public void testBadReturnType() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Must return void"); + thrown.expectMessage("Must return void or ProcessContinuation"); analyzeProcessElementMethod( new AnonymousMethod() { http://git-wip-us.apache.org/repos/asf/beam/blob/4f7f1699/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index 07b3348..08af65e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -52,7 +52,8 @@ import org.junit.runners.JUnit4; public class DoFnSignaturesSplittableDoFnTest { @Rule public ExpectedException thrown = ExpectedException.none(); - private static class SomeRestriction {} + private abstract static class SomeRestriction + implements HasDefaultTracker<SomeRestriction, SomeRestrictionTracker> {} private abstract static class SomeRestrictionTracker implements RestrictionTracker<SomeRestriction> {} @@ -60,6 +61,20 @@ public class DoFnSignaturesSplittableDoFnTest { private abstract static class SomeRestrictionCoder extends StructuredCoder<SomeRestriction> {} @Test + public void testReturnsProcessContinuation() throws Exception { + DoFnSignature.ProcessElementMethod signature = + analyzeProcessElementMethod( + new AnonymousMethod() { + private DoFn.ProcessContinuation method( + DoFn<Integer, String>.ProcessContext context) { + return null; + } + }); + + assertTrue(signature.hasReturnValue()); + } + + @Test public void testHasRestrictionTracker() throws Exception { DoFnSignature.ProcessElementMethod signature = analyzeProcessElementMethod( @@ -100,11 +115,6 @@ public class DoFnSignaturesSplittableDoFnTest { public SomeRestriction getInitialRestriction(Integer element) { return null; } - - @NewTracker - public SomeRestrictionTracker newTracker(SomeRestriction restriction) { - return null; - } } @BoundedPerElement @@ -130,6 +140,55 @@ public class DoFnSignaturesSplittableDoFnTest { .isBoundedPerElement()); } + private static class BaseFnWithoutContinuation extends DoFn<Integer, String> { + @ProcessElement + public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {} + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + } + + private static class BaseFnWithContinuation extends DoFn<Integer, String> { + @ProcessElement + public ProcessContinuation processElement( + ProcessContext context, SomeRestrictionTracker tracker) { + return null; + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return null; + } + } + + @Test + public void testSplittableBoundednessInferredFromReturnValue() throws Exception { + assertEquals( + PCollection.IsBounded.BOUNDED, + DoFnSignatures.getSignature(BaseFnWithoutContinuation.class).isBoundedPerElement()); + assertEquals( + PCollection.IsBounded.UNBOUNDED, + DoFnSignatures.getSignature(BaseFnWithContinuation.class).isBoundedPerElement()); + } + + @Test + public void testSplittableRespectsBoundednessAnnotation() throws Exception { + @BoundedPerElement + class BoundedFnWithContinuation extends BaseFnWithContinuation {} + + assertEquals( + PCollection.IsBounded.BOUNDED, + DoFnSignatures.getSignature(BoundedFnWithContinuation.class).isBoundedPerElement()); + + @UnboundedPerElement + class UnboundedFnWithContinuation extends BaseFnWithContinuation {} + + assertEquals( + PCollection.IsBounded.UNBOUNDED, + DoFnSignatures.getSignature(UnboundedFnWithContinuation.class).isBoundedPerElement()); + } @Test public void testUnsplittableIsBounded() throws Exception { class UnsplittableFn extends DoFn<Integer, String> { @@ -172,8 +231,10 @@ public class DoFnSignaturesSplittableDoFnTest { public void testSplittableWithAllFunctions() throws Exception { class GoodSplittableDoFn extends DoFn<Integer, String> { @ProcessElement - public void processElement( - ProcessContext context, SomeRestrictionTracker tracker) {} + public ProcessContinuation processElement( + ProcessContext context, SomeRestrictionTracker tracker) { + return null; + } @GetInitialRestriction public SomeRestriction getInitialRestriction(Integer element) { @@ -198,6 +259,7 @@ public class DoFnSignaturesSplittableDoFnTest { DoFnSignature signature = DoFnSignatures.getSignature(GoodSplittableDoFn.class); assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType()); assertTrue(signature.processElement().isSplittable()); + assertTrue(signature.processElement().hasReturnValue()); assertEquals( SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType()); assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType()); @@ -214,7 +276,9 @@ public class DoFnSignaturesSplittableDoFnTest { public void testSplittableWithAllFunctionsGeneric() throws Exception { class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> { @ProcessElement - public void processElement(ProcessContext context, TrackerT tracker) {} + public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) { + return null; + } @GetInitialRestriction public RestrictionT getInitialRestriction(Integer element) { @@ -242,6 +306,7 @@ public class DoFnSignaturesSplittableDoFnTest { SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass()); assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType()); assertTrue(signature.processElement().isSplittable()); + assertTrue(signature.processElement().hasReturnValue()); assertEquals( SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType()); assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
