This is an automated email from the ASF dual-hosted git repository. scott pushed a commit to branch release-2.10.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.10.0 by this push: new b352df3 [BEAM-6352] Revert PR#6467 to fix Watch transform from swegner/revert_pr6467 new bfd6893 Merge pull request #7575: [BEAM-6352] Revert PR#6467 to fix Watch transform b352df3 is described below commit b352df3976c6a687213b611a9a47b6e6093a2dc5 Author: Kenn Knowles <k...@kennknowles.com> AuthorDate: Fri Jan 18 13:33:16 2019 -0800 [BEAM-6352] Revert PR#6467 to fix Watch transform from swegner/revert_pr6467 --- .../runners/apex/translation/ParDoTranslator.java | 8 +- .../construction/SplittableParDoNaiveBounded.java | 5 +- .../core/construction/PTransformMatchersTest.java | 3 +- .../core/construction/SplittableParDoTest.java | 7 +- runners/core-java/build.gradle | 1 - ...TimeBoundedSplittableProcessElementInvoker.java | 30 ++-- .../core/SplittableParDoViaKeyedWorkItems.java | 17 +-- .../core/SplittableProcessElementInvoker.java | 7 +- ...BoundedSplittableProcessElementInvokerTest.java | 35 +++-- .../runners/core/SplittableParDoProcessFnTest.java | 37 ++--- .../SplittableProcessElementsEvaluatorFactory.java | 23 +-- .../flink/FlinkStreamingTransformTranslators.java | 7 +- .../wrappers/streaming/SplittableDoFnOperator.java | 4 +- .../dataflow/DataflowPipelineTranslatorTest.java | 4 +- .../java/org/apache/beam/sdk/transforms/DoFn.java | 16 ++- .../java/org/apache/beam/sdk/transforms/Watch.java | 2 +- .../beam/sdk/transforms/reflect/DoFnInvoker.java | 2 +- .../sdk/transforms/reflect/DoFnSignatures.java | 29 +++- .../splittabledofn/ByteKeyRangeTracker.java | 12 +- .../splittabledofn/OffsetRangeTracker.java | 10 +- .../splittabledofn/RestrictionTracker.java | 51 ++++++- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 2 - .../java/org/apache/beam/sdk/io/FileIOTest.java | 2 - .../org/apache/beam/sdk/io/TextIOReadTest.java | 2 - .../beam/sdk/transforms/SplittableDoFnTest.java | 22 ++- .../org/apache/beam/sdk/transforms/WatchTest.java | 9 -- .../sdk/transforms/reflect/DoFnInvokersTest.java | 15 +- .../reflect/DoFnSignaturesSplittableDoFnTest.java | 46 +++--- .../sdk/fn/splittabledofn/RestrictionTrackers.java | 138 ------------------ .../beam/sdk/fn/splittabledofn/package-info.java | 28 ---- .../fn/splittabledofn/RestrictionTrackersTest.java | 156 --------------------- .../harness/SplittableProcessElementsRunner.java | 11 +- .../beam/sdk/io/hbase/HBaseReadSplittableDoFn.java | 4 +- 33 files changed, 239 insertions(+), 506 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index ca1c7ff..c54ad98 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; @@ -125,12 +126,13 @@ class ParDoTranslator<InputT, OutputT> } } - static class SplittableProcessElementsTranslator<InputT, OutputT, RestrictionT, PositionT> - implements TransformTranslator<ProcessElements<InputT, OutputT, RestrictionT, PositionT>> { + static class SplittableProcessElementsTranslator< + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> + implements TransformTranslator<ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> { @Override public void translate( - ProcessElements<InputT, OutputT, RestrictionT, PositionT> transform, + ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform, TranslationContext context) { Map<TupleTag<?>, PValue> outputs = context.getOutputs(); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java index 3197c69..2239743 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java @@ -109,7 +109,8 @@ public class SplittableParDoNaiveBounded { } } - static class NaiveProcessFn<InputT, OutputT, RestrictionT, PositionT> + static class NaiveProcessFn< + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> extends DoFn<KV<InputT, RestrictionT>, OutputT> { private final DoFn<InputT, OutputT> fn; @@ -141,7 +142,7 @@ public class SplittableParDoNaiveBounded { InputT element = c.element().getKey(); RestrictionT restriction = c.element().getValue(); while (true) { - RestrictionTracker<RestrictionT, PositionT> tracker = invoker.invokeNewTracker(restriction); + TrackerT tracker = invoker.invokeNewTracker(restriction); ProcessContinuation continuation = invoker.invokeProcessElement(new NestedProcessContext<>(fn, c, element, w, tracker)); if (continuation.shouldResume()) { diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 7f4ebda..618a12e 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -163,8 +163,7 @@ public class PTransformMatchersTest implements Serializable { private DoFn<KV<String, Integer>, Integer> splittableDoFn = new DoFn<KV<String, Integer>, Integer>() { @ProcessElement - public void processElement( - ProcessContext context, RestrictionTracker<Void, Void> tracker) {} + public void processElement(ProcessContext context, SomeTracker tracker) {} @GetInitialRestriction public Void getInitialRestriction(KV<String, Integer> element) { diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java index 959120c..68365c8 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java @@ -58,7 +58,7 @@ public class SplittableParDoTest { } @Override - public boolean tryClaim(Void position) { + protected boolean tryClaimImpl(Void position) { return false; } @@ -78,8 +78,7 @@ public class SplittableParDoTest { private static class BoundedFakeFn extends DoFn<Integer, String> { @ProcessElement - public void processElement( - ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {} + public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {} @GetInitialRestriction public SomeRestriction getInitialRestriction(Integer element) { @@ -90,7 +89,7 @@ public class SplittableParDoTest { private static class UnboundedFakeFn extends DoFn<Integer, String> { @ProcessElement public ProcessContinuation processElement( - ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) { + ProcessContext context, SomeRestrictionTracker tracker) { return stop(); } diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle index 6483167..3468c01 100644 --- a/runners/core-java/build.gradle +++ b/runners/core-java/build.gradle @@ -35,7 +35,6 @@ dependencies { shadow project(path: ":beam-sdks-java-core", configuration: "shadow") shadow project(path: ":beam-model-fn-execution", configuration: "shadow") shadow project(path: ":beam-runners-core-construction-java", configuration: "shadow") - shadow project(path: ":beam-sdks-java-fn-execution", configuration: "shadow") shadow library.java.vendored_guava_20_0 shadow library.java.joda_time shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest") diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index e8c57b5..d1ccd69 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -25,7 +25,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; -import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.TimeDomain; @@ -56,8 +55,12 @@ import org.joda.time.Instant; * outputs), or runs for the given duration. */ public class OutputAndTimeBoundedSplittableProcessElementInvoker< - InputT, OutputT, RestrictionT, PositionT> - extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT> { + InputT, + OutputT, + RestrictionT, + PositionT, + TrackerT extends RestrictionTracker<RestrictionT, PositionT>> + extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT> { private final DoFn<InputT, OutputT> fn; private final PipelineOptions pipelineOptions; private final OutputWindowedValue<OutputT> output; @@ -103,9 +106,9 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< public Result invokeProcessElement( DoFnInvoker<InputT, OutputT> invoker, final WindowedValue<InputT> element, - final RestrictionTracker<RestrictionT, PositionT> tracker) { + final TrackerT tracker) { final ProcessContext processContext = new ProcessContext(element, tracker); - + tracker.setClaimObserver(processContext); DoFn.ProcessContinuation cont = invoker.invokeProcessElement( new DoFnInvoker.ArgumentProvider<InputT, OutputT>() { @@ -153,7 +156,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< @Override public RestrictionTracker<?, ?> restrictionTracker() { - return processContext.tracker; + return tracker; } // Unsupported methods below. @@ -223,7 +226,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< // restriction that describes exactly the work that wasn't done in the current call. if (processContext.numClaimedBlocks > 0) { residual = checkNotNull(processContext.takeCheckpointNow()); - processContext.tracker.checkDone(); + tracker.checkDone(); } else { // The call returned resume() without trying to claim any blocks, i.e. it is unaware // of any work to be done at the moment, but more might emerge later. This is a valid @@ -251,14 +254,14 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< // ProcessElement call. // In other words, if we took a checkpoint *after* ProcessElement completed (like in the // branch above), it would have been equivalent to this one. - processContext.tracker.checkDone(); + tracker.checkDone(); } } else { // The ProcessElement call returned stop() - that means the tracker's current restriction // has been fully processed by the call. A checkpoint may or may not have been taken in // "residual"; if it was, then we'll need to process it; if no, then we don't - nothing // special needs to be done. - processContext.tracker.checkDone(); + tracker.checkDone(); } if (residual == null) { // Can only be true if cont.shouldResume() is false and no checkpoint was taken. @@ -270,9 +273,9 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< } private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext - implements RestrictionTrackers.ClaimObserver<PositionT> { + implements RestrictionTracker.ClaimObserver<PositionT> { private final WindowedValue<InputT> element; - private final RestrictionTracker<RestrictionT, PositionT> tracker; + private final TrackerT tracker; private int numClaimedBlocks; private boolean hasClaimFailed; @@ -290,11 +293,10 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< private @Nullable Future<?> scheduledCheckpoint; private @Nullable Instant lastReportedWatermark; - public ProcessContext( - WindowedValue<InputT> element, RestrictionTracker<RestrictionT, PositionT> tracker) { + public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) { fn.super(); this.element = element; - this.tracker = RestrictionTrackers.observe(tracker, this); + this.tracker = tracker; } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 45c847e..3454e75 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -153,7 +153,8 @@ public class SplittableParDoViaKeyedWorkItems { } /** A primitive transform wrapping around {@link ProcessFn}. */ - public static class ProcessElements<InputT, OutputT, RestrictionT, PositionT> + public static class ProcessElements< + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> extends PTransform< PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> { private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original; @@ -162,7 +163,7 @@ public class SplittableParDoViaKeyedWorkItems { this.original = original; } - public ProcessFn<InputT, OutputT, RestrictionT, PositionT> newProcessFn( + public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn( DoFn<InputT, OutputT> fn) { return new ProcessFn<>( fn, @@ -213,7 +214,8 @@ public class SplittableParDoViaKeyedWorkItems { * <p>See also: https://issues.apache.org/jira/browse/BEAM-1983 */ @VisibleForTesting - public static class ProcessFn<InputT, OutputT, RestrictionT, PositionT> + public static class ProcessFn< + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> extends DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> { /** * The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is @@ -250,7 +252,7 @@ public class SplittableParDoViaKeyedWorkItems { private transient @Nullable StateInternalsFactory<byte[]> stateInternalsFactory; private transient @Nullable TimerInternalsFactory<byte[]> timerInternalsFactory; private transient @Nullable SplittableProcessElementInvoker< - InputT, OutputT, RestrictionT, PositionT> + InputT, OutputT, RestrictionT, TrackerT> processElementInvoker; private transient @Nullable DoFnInvoker<InputT, OutputT> invoker; @@ -281,7 +283,7 @@ public class SplittableParDoViaKeyedWorkItems { } public void setProcessElementInvoker( - SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT> invoker) { + SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT> invoker) { this.processElementInvoker = invoker; } @@ -366,9 +368,8 @@ public class SplittableParDoViaKeyedWorkItems { elementAndRestriction = KV.of(elementState.read(), restrictionState.read()); } - final RestrictionTracker<RestrictionT, PositionT> tracker = - invoker.invokeNewTracker(elementAndRestriction.getValue()); - SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT>.Result result = + final TrackerT tracker = invoker.invokeNewTracker(elementAndRestriction.getValue()); + SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT>.Result result = processElementInvoker.invokeProcessElement( invoker, elementAndRestriction.getKey(), tracker); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java index 07aa0ba..81f0bd4 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java @@ -30,7 +30,8 @@ import org.joda.time.Instant; * A runner-specific hook for invoking a {@link DoFn.ProcessElement} method for a splittable {@link * DoFn}, in particular, allowing the runner to access the {@link RestrictionTracker}. */ -public abstract class SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT> { +public abstract class SplittableProcessElementInvoker< + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> { /** Specifies how to resume a splittable {@link DoFn.ProcessElement} call. */ public class Result { @Nullable private final RestrictionT residualRestriction; @@ -76,7 +77,5 @@ public abstract class SplittableProcessElementInvoker<InputT, OutputT, Restricti * DoFn.ProcessContinuation}, and a future output watermark. */ public abstract Result invokeProcessElement( - DoFnInvoker<InputT, OutputT> invoker, - WindowedValue<InputT> element, - RestrictionTracker<RestrictionT, PositionT> tracker); + DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, TrackerT tracker); } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java index a05aa8d..c54080c 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -67,8 +66,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { } @ProcessElement - public ProcessContinuation process( - ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker) { + public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker) { Uninterruptibles.sleepUninterruptibly( sleepBeforeFirstClaim.getMillis(), TimeUnit.MILLISECONDS); for (long i = tracker.currentRestriction().getFrom(), numIterations = 1; @@ -90,19 +88,20 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { } } - private SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result runTest( - int totalNumOutputs, - Duration sleepBeforeFirstClaim, - int numOutputsPerProcessCall, - Duration sleepBeforeEachOutput) { + private SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result + runTest( + int totalNumOutputs, + Duration sleepBeforeFirstClaim, + int numOutputsPerProcessCall, + Duration sleepBeforeEachOutput) { SomeFn fn = new SomeFn(sleepBeforeFirstClaim, numOutputsPerProcessCall, sleepBeforeEachOutput); OffsetRange initialRestriction = new OffsetRange(0, totalNumOutputs); return runTest(fn, initialRestriction); } - private SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result runTest( - DoFn<Void, String> fn, OffsetRange initialRestriction) { - SplittableProcessElementInvoker<Void, String, OffsetRange, Long> invoker = + private SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result + runTest(DoFn<Void, String> fn, OffsetRange initialRestriction) { + SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker> invoker = new OutputAndTimeBoundedSplittableProcessElementInvoker<>( fn, PipelineOptionsFactory.create(), @@ -135,7 +134,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { @Test public void testInvokeProcessElementOutputBounded() throws Exception { - SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res = + SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res = runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.ZERO); assertFalse(res.getContinuation().shouldResume()); OffsetRange residualRange = res.getResidualRestriction(); @@ -146,7 +145,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { @Test public void testInvokeProcessElementTimeBounded() throws Exception { - SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res = + SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res = runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100)); assertFalse(res.getContinuation().shouldResume()); OffsetRange residualRange = res.getResidualRestriction(); @@ -159,7 +158,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { @Test public void testInvokeProcessElementTimeBoundedWithStartupDelay() throws Exception { - SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res = + SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res = runTest(10000, Duration.standardSeconds(3), Integer.MAX_VALUE, Duration.millis(100)); assertFalse(res.getContinuation().shouldResume()); OffsetRange residualRange = res.getResidualRestriction(); @@ -171,7 +170,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { @Test public void testInvokeProcessElementVoluntaryReturnStop() throws Exception { - SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res = + SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res = runTest(5, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100)); assertFalse(res.getContinuation().shouldResume()); assertNull(res.getResidualRestriction()); @@ -179,7 +178,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { @Test public void testInvokeProcessElementVoluntaryReturnResume() throws Exception { - SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res = + SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res = runTest(10, Duration.ZERO, 5, Duration.millis(100)); assertTrue(res.getContinuation().shouldResume()); assertEquals(new OffsetRange(5, 10), res.getResidualRestriction()); @@ -190,7 +189,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { DoFn<Void, String> brokenFn = new DoFn<Void, String>() { @ProcessElement - public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { + public void process(ProcessContext c, OffsetRangeTracker tracker) { c.output("foo"); } @@ -208,7 +207,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { DoFn<Void, String> brokenFn = new DoFn<Void, String>() { @ProcessElement - public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { + public void process(ProcessContext c, OffsetRangeTracker tracker) { assertFalse(tracker.tryClaim(6L)); c.output("foo"); } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java index d312234..4fcd1df 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java @@ -95,7 +95,7 @@ public class SplittableParDoProcessFnTest { } @Override - public boolean tryClaim(Void position) { + protected boolean tryClaimImpl(Void position) { return true; } @@ -119,7 +119,12 @@ public class SplittableParDoProcessFnTest { * A helper for testing {@link ProcessFn} on 1 element (but possibly over multiple {@link * DoFn.ProcessElement} calls). */ - private static class ProcessFnTester<InputT, OutputT, RestrictionT, PositionT> + private static class ProcessFnTester< + InputT, + OutputT, + RestrictionT, + PositionT, + TrackerT extends RestrictionTracker<RestrictionT, PositionT>> implements AutoCloseable { private final DoFnTester<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> tester; private Instant currentProcessingTime; @@ -139,7 +144,7 @@ public class SplittableParDoProcessFnTest { // encode IntervalWindow's because that's what all tests here use. WindowingStrategy<InputT, BoundedWindow> windowingStrategy = (WindowingStrategy) WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(1))); - final ProcessFn<InputT, OutputT, RestrictionT, PositionT> processFn = + final ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn = new ProcessFn<>(fn, inputCoder, restrictionCoder, windowingStrategy); this.tester = DoFnTester.of(processFn); this.timerInternals = new InMemoryTimerInternals(); @@ -267,7 +272,7 @@ public class SplittableParDoProcessFnTest { /** A simple splittable {@link DoFn} that's actually monolithic. */ private static class ToStringFn extends DoFn<Integer, String> { @ProcessElement - public void process(ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) { + public void process(ProcessContext c, SomeRestrictionTracker tracker) { checkState(tracker.tryClaim(null)); c.output(c.element().toString() + "a"); c.output(c.element().toString() + "b"); @@ -293,7 +298,7 @@ public class SplittableParDoProcessFnTest { new IntervalWindow( base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1))); - ProcessFnTester<Integer, String, SomeRestriction, Void> tester = + ProcessFnTester<Integer, String, SomeRestriction, Void, SomeRestrictionTracker> tester = new ProcessFnTester<>( base, fn, @@ -318,7 +323,7 @@ public class SplittableParDoProcessFnTest { private static class WatermarkUpdateFn extends DoFn<Instant, String> { @ProcessElement - public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { + public void process(ProcessContext c, OffsetRangeTracker tracker) { for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { c.updateWatermark(c.element().plus(Duration.standardSeconds(i))); c.output(String.valueOf(i)); @@ -342,7 +347,7 @@ public class SplittableParDoProcessFnTest { Instant base = Instant.now(); dateTimeProvider.setDateTimeFixed(base.getMillis()); - ProcessFnTester<Instant, String, OffsetRange, Long> tester = + ProcessFnTester<Instant, String, OffsetRange, Long, OffsetRangeTracker> tester = new ProcessFnTester<>( base, fn, @@ -369,8 +374,7 @@ public class SplittableParDoProcessFnTest { /** A simple splittable {@link DoFn} that outputs the given element every 5 seconds forever. */ private static class SelfInitiatedResumeFn extends DoFn<Integer, String> { @ProcessElement - public ProcessContinuation process( - ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) { + public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) { checkState(tracker.tryClaim(null)); c.output(c.element().toString()); return resume().withResumeDelay(Duration.standardSeconds(5)); @@ -387,7 +391,7 @@ public class SplittableParDoProcessFnTest { DoFn<Integer, String> fn = new SelfInitiatedResumeFn(); Instant base = Instant.now(); dateTimeProvider.setDateTimeFixed(base.getMillis()); - ProcessFnTester<Integer, String, SomeRestriction, Void> tester = + ProcessFnTester<Integer, String, SomeRestriction, Void, SomeRestrictionTracker> tester = new ProcessFnTester<>( base, fn, @@ -429,8 +433,7 @@ public class SplittableParDoProcessFnTest { } @ProcessElement - public ProcessContinuation process( - ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { + public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) { for (long i = tracker.currentRestriction().getFrom(), numIterations = 0; tracker.tryClaim(i); ++i, ++numIterations) { @@ -453,7 +456,7 @@ public class SplittableParDoProcessFnTest { DoFn<Integer, String> fn = new CounterFn(1); Instant base = Instant.now(); dateTimeProvider.setDateTimeFixed(base.getMillis()); - ProcessFnTester<Integer, String, OffsetRange, Long> tester = + ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker> tester = new ProcessFnTester<>( base, fn, @@ -487,7 +490,7 @@ public class SplittableParDoProcessFnTest { dateTimeProvider.setDateTimeFixed(base.getMillis()); int baseIndex = 42; - ProcessFnTester<Integer, String, OffsetRange, Long> tester = + ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker> tester = new ProcessFnTester<>( base, fn, @@ -535,7 +538,7 @@ public class SplittableParDoProcessFnTest { Instant base = Instant.now(); int baseIndex = 42; - ProcessFnTester<Integer, String, OffsetRange, Long> tester = + ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker> tester = new ProcessFnTester<>( base, fn, @@ -567,7 +570,7 @@ public class SplittableParDoProcessFnTest { private State state = State.BEFORE_SETUP; @ProcessElement - public void process(ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) { + public void process(ProcessContext c, SomeRestrictionTracker tracker) { assertEquals(State.INSIDE_BUNDLE, state); } @@ -604,7 +607,7 @@ public class SplittableParDoProcessFnTest { @Test public void testInvokesLifecycleMethods() throws Exception { DoFn<Integer, String> fn = new LifecycleVerifyingFn(); - try (ProcessFnTester<Integer, String, SomeRestriction, Void> tester = + try (ProcessFnTester<Integer, String, SomeRestriction, Void, SomeRestrictionTracker> tester = new ProcessFnTester<>( Instant.now(), fn, diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 737098f..bdafd95 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElem import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -45,7 +46,12 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Thre import org.joda.time.Duration; import org.joda.time.Instant; -class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT, PositionT> +class SplittableProcessElementsEvaluatorFactory< + InputT, + OutputT, + RestrictionT, + PositionT, + TrackerT extends RestrictionTracker<RestrictionT, PositionT>> implements TransformEvaluatorFactory { private final ParDoEvaluatorFactory<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> delegateFactory; @@ -68,8 +74,8 @@ class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT, P checkArgument( ProcessElements.class.isInstance(application.getTransform()), "No know extraction of the fn from " + application); - final ProcessElements<InputT, OutputT, RestrictionT, PositionT> transform = - (ProcessElements<InputT, OutputT, RestrictionT, PositionT>) + final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform = + (ProcessElements<InputT, OutputT, RestrictionT, TrackerT>) application.getTransform(); return DoFnLifecycleManager.of(transform.newProcessFn(transform.getFn())); } @@ -103,12 +109,13 @@ class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT, P @SuppressWarnings({"unchecked", "rawtypes"}) private TransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> createEvaluator( AppliedPTransform< - PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple, - ProcessElements<InputT, OutputT, RestrictionT, PositionT>> + PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, + PCollectionTuple, + ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> application, CommittedBundle<InputT> inputBundle) throws Exception { - final ProcessElements<InputT, OutputT, RestrictionT, PositionT> transform = + final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform = application.getTransform(); final DoFnLifecycleManagerRemovingTransformEvaluator< @@ -124,8 +131,8 @@ class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT, P application.getTransform().getAdditionalOutputTags().getAll()); final ParDoEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> pde = evaluator.getParDoEvaluator(); - final ProcessFn<InputT, OutputT, RestrictionT, PositionT> processFn = - (ProcessFn<InputT, OutputT, RestrictionT, PositionT>) + final ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn = + (ProcessFn<InputT, OutputT, RestrictionT, TrackerT>) ProcessFnRunner.class.cast(pde.getFnRunner()).getFn(); final DirectExecutionContext.DirectStepContext stepContext = pde.getStepContext(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index c770cdf..1f87438 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -65,6 +65,7 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -661,14 +662,14 @@ class FlinkStreamingTransformTranslators { } private static class SplittableProcessElementsStreamingTranslator< - InputT, OutputT, RestrictionT, PositionT> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< SplittableParDoViaKeyedWorkItems.ProcessElements< - InputT, OutputT, RestrictionT, PositionT>> { + InputT, OutputT, RestrictionT, TrackerT>> { @Override public void translateNode( - SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, PositionT> + SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform, FlinkStreamingTranslationContext context) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 413777f..7f276b6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -57,7 +58,8 @@ import org.joda.time.Instant; * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, for executing the * {@code @ProcessElement} method of a splittable {@link DoFn}. */ -public class SplittableDoFnOperator<InputT, OutputT, RestrictionT> +public class SplittableDoFnOperator< + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> extends DoFnOperator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> { private transient ScheduledExecutorService executorService; diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 0bc1b2e..a087232 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -84,7 +84,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -975,7 +975,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { private static class TestSplittableFn extends DoFn<String, Integer> { @ProcessElement - public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { + public void process(ProcessContext c, OffsetRangeTracker tracker) { // noop } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 73aa429..fd79cb8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -540,7 +540,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * <p>The signature of this method must satisfy the following constraints: * * <ul> - * <li>If one of its arguments is a {@link RestrictionTracker}, then it is a <a + * <li>If one of its arguments is a subtype of {@link RestrictionTracker}, then it is a <a * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} subject to the * separate requirements described below. Items below are assuming this is not a splittable * {@link DoFn}. @@ -573,8 +573,8 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * <h2>Splittable DoFn's</h2> * * <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} method has a parameter - * whose type is of {@link RestrictionTracker}. This is an advanced feature and an overwhelming - * majority of users will never need to write a splittable {@link DoFn}. + * whose type is a subtype of {@link RestrictionTracker}. This is an advanced feature and an + * overwhelming majority of users will never need to write a splittable {@link DoFn}. * * <p>Not all runners support Splittable DoFn. See the <a * href="https://beam.apache.org/documentation/runners/capability-matrix/">capability matrix</a>. @@ -587,10 +587,12 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * <ul> * <li>It <i>must</i> define a {@link GetInitialRestriction} method. * <li>It <i>may</i> define a {@link SplitRestriction} method. - * <li>It <i>may</i> define a {@link NewTracker} method returning a subtype of {@code - * RestrictionTracker<R>} where {@code R} is the restriction type returned by {@link - * GetInitialRestriction}. This method is optional in case the restriction type returned by - * {@link GetInitialRestriction} implements {@link HasDefaultTracker}. + * <li>It <i>may</i> define a {@link NewTracker} method returning the same type as the type of + * the {@link RestrictionTracker} argument of {@link ProcessElement}, which in turn must be + * a subtype of {@code RestrictionTracker<R>} where {@code R} is the restriction type + * returned by {@link GetInitialRestriction}. This method is optional in case the + * restriction type returned by {@link GetInitialRestriction} implements {@link + * HasDefaultTracker}. * <li>It <i>may</i> define a {@link GetRestrictionCoder} method. * <li>The type of restrictions used by all of these methods must be the same. * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java index fd52a81..3dc24d9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java @@ -990,7 +990,7 @@ public class Watch { } @Override - public synchronized boolean tryClaim(HashCode hash) { + protected synchronized boolean tryClaimImpl(HashCode hash) { if (shouldStop) { return false; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 45cf9f4..239f4d5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -92,7 +92,7 @@ public interface DoFnInvoker<InputT, OutputT> { /** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */ @SuppressWarnings("TypeParameterUnusedInFormals") - <RestrictionT, PositionT> RestrictionTracker<RestrictionT, PositionT> invokeNewTracker( + <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> TrackerT invokeNewTracker( RestrictionT restriction); /** Get the bound {@link DoFn}. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 61089df..0900e27 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -556,6 +556,9 @@ public class DoFnSignatures { ErrorReporter processElementErrors = errors.forMethod(DoFn.ProcessElement.class, processElement.targetMethod()); + final TypeDescriptor<?> trackerT; + final String originOfTrackerT; + List<String> missingRequiredMethods = new ArrayList<>(); if (getInitialRestriction == null) { missingRequiredMethods.add("@" + DoFn.GetInitialRestriction.class.getSimpleName()); @@ -565,11 +568,27 @@ public class DoFnSignatures { && getInitialRestriction .restrictionT() .isSubtypeOf(TypeDescriptor.of(HasDefaultTracker.class))) { - // no-op we are using the annotation @HasDefaultTracker + trackerT = + getInitialRestriction + .restrictionT() + .resolveType(HasDefaultTracker.class.getTypeParameters()[1]); + originOfTrackerT = + String.format( + "restriction type %s of @%s method %s", + formatType(getInitialRestriction.restrictionT()), + DoFn.GetInitialRestriction.class.getSimpleName(), + format(getInitialRestriction.targetMethod())); } else { missingRequiredMethods.add("@" + DoFn.NewTracker.class.getSimpleName()); + trackerT = null; + originOfTrackerT = null; } } else { + trackerT = newTracker.trackerT(); + originOfTrackerT = + String.format( + "%s method %s", + DoFn.NewTracker.class.getSimpleName(), format(newTracker.targetMethod())); ErrorReporter getInitialRestrictionErrors = errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod()); TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT(); @@ -592,9 +611,11 @@ public class DoFnSignatures { errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod()); TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT(); processElementErrors.checkArgument( - processElement.trackerT().getRawType().equals(RestrictionTracker.class), - "Has tracker type %s, but the DoFn's tracker type must be of type RestrictionTracker.", - formatType(processElement.trackerT())); + processElement.trackerT().equals(trackerT), + "Has tracker type %s, but the DoFn's tracker type was inferred as %s from %s", + formatType(processElement.trackerT()), + trackerT, + originOfTrackerT); if (getRestrictionCoder != null) { getInitialRestrictionErrors.checkArgument( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java index 44f2f0b..6f72d84 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java @@ -56,12 +56,12 @@ public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKe } @Override - public ByteKeyRange currentRestriction() { + public synchronized ByteKeyRange currentRestriction() { return range; } @Override - public ByteKeyRange checkpoint() { + public synchronized ByteKeyRange checkpoint() { // If we haven't done any work, we should return the original range we were processing // as the checkpoint. if (lastAttemptedKey == null) { @@ -99,7 +99,7 @@ public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKe * current {@link ByteKeyRange} of this tracker. */ @Override - public boolean tryClaim(ByteKey key) { + protected synchronized boolean tryClaimImpl(ByteKey key) { // Handle claiming the end of range EMPTY key if (key.isEmpty()) { checkArgument( @@ -132,7 +132,7 @@ public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKe } @Override - public void checkDone() throws IllegalStateException { + public synchronized void checkDone() throws IllegalStateException { // Handle checking the empty range which is implicitly done. // This case can occur if the range tracker is checkpointed before any keys have been claimed // or if the range tracker is checkpointed once the range is done. @@ -162,7 +162,7 @@ public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKe } @Override - public String toString() { + public synchronized String toString() { return MoreObjects.toStringHelper(this) .add("range", range) .add("lastClaimedKey", lastClaimedKey) @@ -184,7 +184,7 @@ public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKe private static final byte[] ZERO_BYTE_ARRAY = new byte[] {0}; @Override - public Backlog getBacklog() { + public synchronized Backlog getBacklog() { // Return 0 for the empty range which is implicitly done. // This case can occur if the range tracker is checkpointed before any keys have been claimed // or if the range tracker is checkpointed once the range is done. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java index 9d90c69..549aa9b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java @@ -41,12 +41,12 @@ public class OffsetRangeTracker extends RestrictionTracker<OffsetRange, Long> } @Override - public OffsetRange currentRestriction() { + public synchronized OffsetRange currentRestriction() { return range; } @Override - public OffsetRange checkpoint() { + public synchronized OffsetRange checkpoint() { checkState( lastClaimedOffset != null, "Can't checkpoint before any offset was successfully claimed"); OffsetRange res = new OffsetRange(lastClaimedOffset + 1, range.getTo()); @@ -63,7 +63,7 @@ public class OffsetRangeTracker extends RestrictionTracker<OffsetRange, Long> * current {@link OffsetRange} of this tracker (in that case this operation is a no-op). */ @Override - public boolean tryClaim(Long i) { + protected synchronized boolean tryClaimImpl(Long i) { checkArgument( lastAttemptedOffset == null || i > lastAttemptedOffset, "Trying to claim offset %s while last attempted was %s", @@ -81,7 +81,7 @@ public class OffsetRangeTracker extends RestrictionTracker<OffsetRange, Long> } @Override - public void checkDone() throws IllegalStateException { + public synchronized void checkDone() throws IllegalStateException { checkState( lastAttemptedOffset >= range.getTo() - 1, "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted", @@ -101,7 +101,7 @@ public class OffsetRangeTracker extends RestrictionTracker<OffsetRange, Long> } @Override - public Backlog getBacklog() { + public synchronized Backlog getBacklog() { // If we have never attempted an offset, we return the length of the entire range. if (lastAttemptedOffset == null) { return Backlog.of(BigDecimal.valueOf(range.getTo() - range.getFrom())); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index 55e150f..7bf807c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -17,10 +17,15 @@ */ package org.apache.beam.sdk.transforms.splittabledofn; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.DoFn; /** - * Manages access to the restriction and keeps track of its claimed part for a <a + * Manages concurrent access to the restriction and keeps track of its claimed part for a <a * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}. * * <p>Restriction trackers which can provide an estimate for the known amount of outstanding work @@ -31,6 +36,29 @@ import org.apache.beam.sdk.transforms.DoFn; * & Splitting</a> for further details. */ public abstract class RestrictionTracker<RestrictionT, PositionT> { + /** Internal interface allowing a runner to observe the calls to {@link #tryClaim}. */ + @Internal + public interface ClaimObserver<PositionT> { + /** Called when {@link #tryClaim} returns true. */ + void onClaimed(PositionT position); + + /** Called when {@link #tryClaim} returns false. */ + void onClaimFailed(PositionT position); + } + + @Nullable private ClaimObserver<PositionT> claimObserver; + + /** + * Sets a {@link ClaimObserver} to be invoked on every call to {@link #tryClaim}. Internal: + * intended only for runner authors. + */ + @Internal + public void setClaimObserver(ClaimObserver<PositionT> claimObserver) { + checkNotNull(claimObserver, "claimObserver"); + checkState(this.claimObserver == null, "A claim observer has already been set"); + this.claimObserver = claimObserver; + } + /** * Attempts to claim the block of work in the current restriction identified by the given * position. @@ -44,8 +72,27 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> { * call to this method). * <li>{@link RestrictionTracker#checkDone} MUST succeed. * </ul> + * + * <p>Under the hood, calls {@link #tryClaimImpl} and notifies {@link ClaimObserver} of the + * result. */ - public abstract boolean tryClaim(PositionT position); + public final boolean tryClaim(PositionT position) { + if (tryClaimImpl(position)) { + if (claimObserver != null) { + claimObserver.onClaimed(position); + } + return true; + } else { + if (claimObserver != null) { + claimObserver.onClaimFailed(position); + } + return false; + } + } + + /** Tracker-specific implementation of {@link #tryClaim}. */ + @Internal + protected abstract boolean tryClaimImpl(PositionT position); /** * Returns a restriction accurately describing the full range of work the current {@link diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 3e59344..8d70686 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -99,7 +99,6 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap; import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -513,7 +512,6 @@ public class AvroIOTest implements Serializable { } @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-6352") @Category(NeedsRunner.class) public void testContinuouslyWriteAndReadMultipleFilepatterns() throws Throwable { SimpleFunction<Long, GenericClass> mapFn = new CreateGenericClass(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index a282acf..223a9e2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -54,7 +54,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets; import org.joda.time.Duration; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -192,7 +191,6 @@ public class FileIOTest implements Serializable { } @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-6352") @Category(NeedsRunner.class) public void testMatchWatchForNewFiles() throws IOException, InterruptedException { final Path basePath = tmpFolder.getRoot().toPath().resolve("watch"); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 31fc273..0e6e992 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -86,7 +86,6 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.joda.time.Duration; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -825,7 +824,6 @@ public class TextIOReadTest { } @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-6352") @Category(NeedsRunner.class) public void testReadWatchForNewFiles() throws IOException, InterruptedException { final Path basePath = tempFolder.getRoot().toPath().resolve("readWatch"); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index c617b18..6a102b7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -49,7 +49,7 @@ import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; import org.apache.beam.sdk.transforms.splittabledofn.Backlog; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Never; @@ -82,8 +82,7 @@ public class SplittableDoFnTest implements Serializable { static class PairStringWithIndexToLengthBase extends DoFn<String, KV<String, Integer>> { @ProcessElement - public ProcessContinuation process( - ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { + public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) { for (long i = tracker.currentRestriction().getFrom(), numIterations = 0; tracker.tryClaim(i); ++i, ++numIterations) { @@ -245,8 +244,7 @@ public class SplittableDoFnTest implements Serializable { } @ProcessElement - public ProcessContinuation processElement( - ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { + public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) { int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX}; int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts); for (int i = trueStart, numIterations = 1; @@ -325,7 +323,7 @@ public class SplittableDoFnTest implements Serializable { } @ProcessElement - public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { + public void process(ProcessContext c, OffsetRangeTracker tracker) { checkState(tracker.tryClaim(tracker.currentRestriction().getFrom())); String side = c.sideInput(sideInput); c.output(side + ":" + c.element()); @@ -457,8 +455,7 @@ public class SplittableDoFnTest implements Serializable { } @ProcessElement - public ProcessContinuation processElement( - ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { + public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) { int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX}; int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts); for (int i = trueStart, numIterations = 1; @@ -580,7 +577,7 @@ public class SplittableDoFnTest implements Serializable { } @ProcessElement - public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { + public void process(ProcessContext c, OffsetRangeTracker tracker) { checkState(tracker.tryClaim(tracker.currentRestriction().getFrom())); c.output("main:" + c.element()); c.output(additionalOutput, "additional:" + c.element()); @@ -721,7 +718,7 @@ public class SplittableDoFnTest implements Serializable { } @ProcessElement - public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { + public void processElement(ProcessContext c, OffsetRangeTracker tracker) { assertEquals(State.INSIDE_BUNDLE, state); assertTrue(tracker.tryClaim(0L)); c.output(c.element()); @@ -783,8 +780,7 @@ public class SplittableDoFnTest implements Serializable { ParDo.of( new DoFn<String, String>() { @ProcessElement - public void process( - @Element String element, RestrictionTracker<OffsetRange, Long> tracker) { + public void process(@Element String element, OffsetRangeTracker tracker) { // Doesn't matter } @@ -802,7 +798,7 @@ public class SplittableDoFnTest implements Serializable { new DoFn<String, String>() { @ProcessElement public ProcessContinuation process( - @Element String element, RestrictionTracker<OffsetRange, Long> tracker) { + @Element String element, OffsetRangeTracker tracker) { return stop(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java index 8f1615d..b762161 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java @@ -63,7 +63,6 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.HashCode; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.ReadableDuration; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -76,7 +75,6 @@ public class WatchTest implements Serializable { @Rule public transient TestPipeline p = TestPipeline.create(); @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-6352") @Category(NeedsRunner.class) public void testSinglePollMultipleInputs() { PCollection<KV<String, String>> res = @@ -103,7 +101,6 @@ public class WatchTest implements Serializable { } @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-6352") @Category(NeedsRunner.class) public void testSinglePollMultipleInputsWithSideInput() { final PCollectionView<String> moo = @@ -134,14 +131,12 @@ public class WatchTest implements Serializable { } @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-6352") @Category(NeedsRunner.class) public void testMultiplePollsWithTerminationBecauseOutputIsFinal() { testMultiplePolls(false); } @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-6352") @Category(NeedsRunner.class) public void testMultiplePollsWithTerminationDueToTerminationCondition() { testMultiplePolls(true); @@ -179,7 +174,6 @@ public class WatchTest implements Serializable { } @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-6352") @Category(NeedsRunner.class) public void testMultiplePollsWithKeyExtractor() { List<KV<Integer, String>> polls = @@ -229,7 +223,6 @@ public class WatchTest implements Serializable { } @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-6352") @Category(NeedsRunner.class) public void testMultiplePollsStopAfterTimeSinceNewOutput() { List<Integer> all = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); @@ -256,7 +249,6 @@ public class WatchTest implements Serializable { } @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-6352") @Category(NeedsRunner.class) public void testSinglePollWithManyResults() { // More than the default 100 elements per checkpoint for direct runner. @@ -303,7 +295,6 @@ public class WatchTest implements Serializable { } @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-6352") @Category(NeedsRunner.class) public void testMultiplePollsWithManyResults() { final long numResults = 3000; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 5e6a040..292ecf5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -323,8 +323,8 @@ public class DoFnInvokersTest { class MockFn extends DoFn<String, String> { @DoFn.ProcessElement - public ProcessContinuation processElement( - ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) throws Exception { + public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) + throws Exception { return null; } @@ -400,8 +400,7 @@ public class DoFnInvokersTest { /** Public so Mockito can do "delegatesTo()" in the test below. */ public static class MockFn extends DoFn<String, String> { @ProcessElement - public ProcessContinuation processElement( - ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) { + public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) { return null; } @@ -511,7 +510,7 @@ public class DoFnInvokersTest { private static class DefaultTracker extends RestrictionTracker<RestrictionWithDefaultTracker, Void> { @Override - public boolean tryClaim(Void position) { + protected boolean tryClaimImpl(Void position) { throw new UnsupportedOperationException(); } @@ -547,8 +546,7 @@ public class DoFnInvokersTest { public void testSplittableDoFnDefaultMethods() throws Exception { class MockFn extends DoFn<String, String> { @ProcessElement - public void processElement( - ProcessContext c, RestrictionTracker<RestrictionWithDefaultTracker, Void> tracker) {} + public void processElement(ProcessContext c, DefaultTracker tracker) {} @GetInitialRestriction public RestrictionWithDefaultTracker getInitialRestriction(String element) { @@ -758,8 +756,7 @@ public class DoFnInvokersTest { new DoFn<Integer, Integer>() { @ProcessElement public ProcessContinuation processElement( - @SuppressWarnings("unused") ProcessContext c, - RestrictionTracker<SomeRestriction, Void> tracker) { + @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) { throw new IllegalArgumentException("bogus"); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index b1d00e6..af4281d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -114,8 +114,7 @@ public class DoFnSignaturesSplittableDoFnTest { public void testInfersBoundednessFromAnnotation() throws Exception { class BaseSplittableFn extends DoFn<Integer, String> { @ProcessElement - public void processElement( - ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {} + public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {} @GetInitialRestriction public SomeRestriction getInitialRestriction(Integer element) { @@ -142,8 +141,7 @@ public class DoFnSignaturesSplittableDoFnTest { private static class BaseFnWithoutContinuation extends DoFn<Integer, String> { @ProcessElement - public void processElement( - ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {} + public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {} @GetInitialRestriction public SomeRestriction getInitialRestriction(Integer element) { @@ -154,7 +152,7 @@ public class DoFnSignaturesSplittableDoFnTest { private static class BaseFnWithContinuation extends DoFn<Integer, String> { @ProcessElement public ProcessContinuation processElement( - ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) { + ProcessContext context, SomeRestrictionTracker tracker) { return null; } @@ -233,7 +231,7 @@ public class DoFnSignaturesSplittableDoFnTest { class GoodSplittableDoFn extends DoFn<Integer, String> { @ProcessElement public ProcessContinuation processElement( - ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) { + ProcessContext context, SomeRestrictionTracker tracker) { return null; } @@ -261,7 +259,7 @@ public class DoFnSignaturesSplittableDoFnTest { } DoFnSignature signature = DoFnSignatures.getSignature(GoodSplittableDoFn.class); - assertEquals(RestrictionTracker.class, signature.processElement().trackerT().getRawType()); + assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType()); assertTrue(signature.processElement().isSplittable()); assertTrue(signature.processElement().hasReturnValue()); assertEquals( @@ -310,15 +308,14 @@ public class DoFnSignaturesSplittableDoFnTest { DoFnSignature signature = DoFnSignatures.getSignature( new GoodGenericSplittableDoFn< - SomeRestriction, RestrictionTracker<SomeRestriction, ?>, - SomeRestrictionCoder>() {}.getClass()); - assertEquals(RestrictionTracker.class, signature.processElement().trackerT().getRawType()); + SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass()); + assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType()); assertTrue(signature.processElement().isSplittable()); assertTrue(signature.processElement().hasReturnValue()); assertEquals( SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType()); assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType()); - assertEquals(RestrictionTracker.class, signature.newTracker().trackerT().getRawType()); + assertEquals(SomeRestrictionTracker.class, signature.newTracker().trackerT().getRawType()); assertEquals(SomeRestriction.class, signature.newTracker().restrictionT().getRawType()); assertEquals(SomeRestrictionCoder.class, signature.getRestrictionCoder().coderT().getRawType()); } @@ -327,8 +324,7 @@ public class DoFnSignaturesSplittableDoFnTest { public void testSplittableMissingRequiredMethods() throws Exception { class BadFn extends DoFn<Integer, String> { @ProcessElement - public void process( - ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {} + public void process(ProcessContext context, SomeRestrictionTracker tracker) {} } thrown.expectMessage( @@ -347,8 +343,7 @@ public class DoFnSignaturesSplittableDoFnTest { public void testHasDefaultTracker() throws Exception { class Fn extends DoFn<Integer, String> { @ProcessElement - public void process( - ProcessContext c, RestrictionTracker<RestrictionWithDefaultTracker, Void> tracker) {} + public void process(ProcessContext c, SomeDefaultTracker tracker) {} @GetInitialRestriction public RestrictionWithDefaultTracker getInitialRestriction(Integer element) { @@ -357,7 +352,7 @@ public class DoFnSignaturesSplittableDoFnTest { } DoFnSignature signature = DoFnSignatures.getSignature(Fn.class); - assertEquals(RestrictionTracker.class, signature.processElement().trackerT().getRawType()); + assertEquals(SomeDefaultTracker.class, signature.processElement().trackerT().getRawType()); } @Test @@ -373,8 +368,11 @@ public class DoFnSignaturesSplittableDoFnTest { } thrown.expectMessage( - "Has tracker type SomeRestrictionTracker, " - + "but the DoFn's tracker type must be of type RestrictionTracker."); + "Has tracker type SomeRestrictionTracker, but the DoFn's tracker type was inferred as "); + thrown.expectMessage("SomeDefaultTracker"); + thrown.expectMessage( + "from restriction type RestrictionWithDefaultTracker " + + "of @GetInitialRestriction method getInitialRestriction(Integer)"); DoFnSignatures.getSignature(Fn.class); } @@ -382,8 +380,7 @@ public class DoFnSignaturesSplittableDoFnTest { public void testNewTrackerReturnsWrongType() throws Exception { class BadFn extends DoFn<Integer, String> { @ProcessElement - public void process( - ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {} + public void process(ProcessContext context, SomeRestrictionTracker tracker) {} @NewTracker public void newTracker(SomeRestriction restriction) {} @@ -403,8 +400,7 @@ public class DoFnSignaturesSplittableDoFnTest { public void testGetInitialRestrictionMismatchesNewTracker() throws Exception { class BadFn extends DoFn<Integer, String> { @ProcessElement - public void process( - ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {} + public void process(ProcessContext context, SomeRestrictionTracker tracker) {} @NewTracker public SomeRestrictionTracker newTracker(SomeRestriction restriction) { @@ -427,8 +423,7 @@ public class DoFnSignaturesSplittableDoFnTest { public void testGetRestrictionCoderReturnsWrongType() throws Exception { class BadFn extends DoFn<Integer, String> { @ProcessElement - public void process( - ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {} + public void process(ProcessContext context, SomeRestrictionTracker tracker) {} @NewTracker public SomeRestrictionTracker newTracker(SomeRestriction restriction) { @@ -536,8 +531,7 @@ public class DoFnSignaturesSplittableDoFnTest { class BadFn extends DoFn<Integer, String> { @ProcessElement - public void process( - ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {} + public void process(ProcessContext context, SomeRestrictionTracker tracker) {} @NewTracker public SomeRestrictionTracker newTracker(SomeRestriction restriction) { diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java deleted file mode 100644 index addeb68..0000000 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.fn.splittabledofn; - -import javax.annotation.concurrent.ThreadSafe; -import org.apache.beam.sdk.transforms.splittabledofn.Backlog; -import org.apache.beam.sdk.transforms.splittabledofn.Backlogs; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; - -/** Support utilities for interacting with {@link RestrictionTracker RestrictionTrackers}. */ -public class RestrictionTrackers { - - /** Interface allowing a runner to observe the calls to {@link RestrictionTracker#tryClaim}. */ - public interface ClaimObserver<PositionT> { - /** Called when {@link RestrictionTracker#tryClaim} returns true. */ - void onClaimed(PositionT position); - - /** Called when {@link RestrictionTracker#tryClaim} returns false. */ - void onClaimFailed(PositionT position); - } - - /** - * A {@link RestrictionTracker} which forwards all calls to the delegate {@link - * RestrictionTracker}. - */ - @ThreadSafe - private static class RestrictionTrackerObserver<RestrictionT, PositionT> - extends RestrictionTracker<RestrictionT, PositionT> { - protected final RestrictionTracker<RestrictionT, PositionT> delegate; - private final ClaimObserver<PositionT> claimObserver; - - protected RestrictionTrackerObserver( - RestrictionTracker<RestrictionT, PositionT> delegate, - ClaimObserver<PositionT> claimObserver) { - this.delegate = delegate; - this.claimObserver = claimObserver; - } - - @Override - public synchronized boolean tryClaim(PositionT position) { - if (delegate.tryClaim(position)) { - claimObserver.onClaimed(position); - return true; - } else { - claimObserver.onClaimFailed(position); - return false; - } - } - - @Override - public synchronized RestrictionT currentRestriction() { - return delegate.currentRestriction(); - } - - @Override - public synchronized RestrictionT checkpoint() { - return delegate.checkpoint(); - } - - @Override - public synchronized void checkDone() throws IllegalStateException { - delegate.checkDone(); - } - } - - /** - * A {@link RestrictionTracker} which forwards all calls to the delegate backlog reporting {@link - * RestrictionTracker}. - */ - @ThreadSafe - private static class RestrictionTrackerObserverWithBacklog<RestrictionT, PositionT> - extends RestrictionTrackerObserver<RestrictionT, PositionT> implements Backlogs.HasBacklog { - - protected RestrictionTrackerObserverWithBacklog( - RestrictionTracker<RestrictionT, PositionT> delegate, - ClaimObserver<PositionT> claimObserver) { - super(delegate, claimObserver); - } - - @Override - public synchronized Backlog getBacklog() { - return ((Backlogs.HasBacklog) delegate).getBacklog(); - } - } - - /** - * A {@link RestrictionTracker} which forwards all calls to the delegate partitioned backlog - * reporting {@link RestrictionTracker}. - */ - @ThreadSafe - private static class RestrictionTrackerObserverWithPartitionedBacklog<RestrictionT, PositionT> - extends RestrictionTrackerObserverWithBacklog<RestrictionT, PositionT> - implements Backlogs.HasPartitionedBacklog { - - protected RestrictionTrackerObserverWithPartitionedBacklog( - RestrictionTracker<RestrictionT, PositionT> delegate, - ClaimObserver<PositionT> claimObserver) { - super(delegate, claimObserver); - } - - @Override - public synchronized byte[] getBacklogPartition() { - return ((Backlogs.HasPartitionedBacklog) delegate).getBacklogPartition(); - } - } - - /** - * Returns a thread safe {@link RestrictionTracker} which reports all claim attempts to the - * specified {@link ClaimObserver}. - */ - public static <RestrictionT, PositionT> RestrictionTracker<RestrictionT, PositionT> observe( - RestrictionTracker<RestrictionT, PositionT> restrictionTracker, - ClaimObserver<PositionT> claimObserver) { - if (restrictionTracker instanceof Backlogs.HasPartitionedBacklog) { - return new RestrictionTrackerObserverWithPartitionedBacklog<>( - restrictionTracker, claimObserver); - } else if (restrictionTracker instanceof Backlogs.HasBacklog) { - return new RestrictionTrackerObserverWithBacklog<>(restrictionTracker, claimObserver); - } else { - return new RestrictionTrackerObserver<>(restrictionTracker, claimObserver); - } - } -} diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/package-info.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/package-info.java deleted file mode 100644 index 0f2cbd9..0000000 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Defines utilities related to executing <a - * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link - * org.apache.beam.sdk.transforms.DoFn}. - */ -@DefaultAnnotation(NonNull.class) -package org.apache.beam.sdk.fn.splittabledofn; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java deleted file mode 100644 index c3bb289..0000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.fn.splittabledofn; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver; -import org.apache.beam.sdk.transforms.splittabledofn.Backlog; -import org.apache.beam.sdk.transforms.splittabledofn.Backlogs; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link RestrictionTrackers}. */ -@RunWith(JUnit4.class) -public class RestrictionTrackersTest { - @Test - public void testObservingClaims() { - RestrictionTracker<String, String> observedTracker = - new RestrictionTracker() { - - @Override - public boolean tryClaim(Object position) { - return "goodClaim".equals(position); - } - - @Override - public Object currentRestriction() { - throw new UnsupportedOperationException(); - } - - @Override - public Object checkpoint() { - throw new UnsupportedOperationException(); - } - - @Override - public void checkDone() throws IllegalStateException { - throw new UnsupportedOperationException(); - } - }; - - List<String> positionsObserved = new ArrayList<>(); - ClaimObserver<String> observer = - new ClaimObserver<String>() { - - @Override - public void onClaimed(String position) { - positionsObserved.add(position); - assertEquals("goodClaim", position); - } - - @Override - public void onClaimFailed(String position) { - positionsObserved.add(position); - } - }; - - RestrictionTracker<String, String> observingTracker = - RestrictionTrackers.observe(observedTracker, observer); - observingTracker.tryClaim("goodClaim"); - observingTracker.tryClaim("badClaim"); - - assertThat(positionsObserved, contains("goodClaim", "badClaim")); - } - - private static class RestrictionTrackerWithBacklog extends RestrictionTracker<Object, Object> - implements Backlogs.HasBacklog { - - @Override - public Backlog getBacklog() { - return null; - } - - @Override - public boolean tryClaim(Object position) { - return false; - } - - @Override - public Object currentRestriction() { - return null; - } - - @Override - public Object checkpoint() { - return null; - } - - @Override - public void checkDone() throws IllegalStateException {} - } - - private static class RestrictionTrackerWithBacklogPartitionedBacklog - extends RestrictionTracker<Object, Object> implements Backlogs.HasPartitionedBacklog { - - @Override - public Backlog getBacklog() { - return null; - } - - @Override - public boolean tryClaim(Object position) { - return false; - } - - @Override - public Object currentRestriction() { - return null; - } - - @Override - public Object checkpoint() { - return null; - } - - @Override - public void checkDone() throws IllegalStateException {} - - @Override - public byte[] getBacklogPartition() { - return null; - } - } - - @Test - public void testClaimObserversMaintainBacklogInterfaces() { - RestrictionTracker hasBacklog = - RestrictionTrackers.observe(new RestrictionTrackerWithBacklog(), null); - assertThat(hasBacklog, instanceOf(Backlogs.HasBacklog.class)); - RestrictionTracker hasPartitionedBacklog = - RestrictionTrackers.observe(new RestrictionTrackerWithBacklogPartitionedBacklog(), null); - assertThat(hasPartitionedBacklog, instanceOf(Backlogs.HasPartitionedBacklog.class)); - } -} diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java index ef5c40e..ab47ca7 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java @@ -154,7 +154,8 @@ public class SplittableProcessElementsRunner<InputT, RestrictionT, OutputT> processElementTyped(elem); } - private <PositionT> void processElementTyped(WindowedValue<KV<InputT, RestrictionT>> elem) { + private <PositionT, TrackerT extends RestrictionTracker<RestrictionT, PositionT>> + void processElementTyped(WindowedValue<KV<InputT, RestrictionT>> elem) { checkArgument( elem.getWindows().size() == 1, "SPLITTABLE_PROCESS_ELEMENTS expects its input to be in 1 window, but got %s windows", @@ -172,9 +173,9 @@ public class SplittableProcessElementsRunner<InputT, RestrictionT, OutputT> (Coder<BoundedWindow>) context.windowCoder, () -> elem, () -> window); - RestrictionTracker<RestrictionT, PositionT> tracker = - doFnInvoker.invokeNewTracker(elem.getValue().getValue()); - OutputAndTimeBoundedSplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT> + TrackerT tracker = doFnInvoker.invokeNewTracker(elem.getValue().getValue()); + OutputAndTimeBoundedSplittableProcessElementInvoker< + InputT, OutputT, RestrictionT, PositionT, TrackerT> processElementInvoker = new OutputAndTimeBoundedSplittableProcessElementInvoker<>( context.doFn, @@ -210,7 +211,7 @@ public class SplittableProcessElementsRunner<InputT, RestrictionT, OutputT> executor, 10000, Duration.standardSeconds(10)); - SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT>.Result result = + SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT>.Result result = processElementInvoker.invokeProcessElement(doFnInvoker, element, tracker); this.stateAccessor = null; diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java index 801a190..aace05e 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; import org.apache.beam.sdk.transforms.splittabledofn.Backlog; import org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; @@ -59,8 +58,7 @@ class HBaseReadSplittableDoFn extends DoFn<HBaseQuery, Result> { } @ProcessElement - public void processElement(ProcessContext c, RestrictionTracker<ByteKeyRange, ByteKey> tracker) - throws Exception { + public void processElement(ProcessContext c, ByteKeyRangeTracker tracker) throws Exception { final HBaseQuery query = c.element(); TableName tableName = TableName.valueOf(query.getTableId()); Table table = connection.getTable(tableName);