[
https://issues.apache.org/jira/browse/BEAM-5446?focusedWorklogId=165172&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165172
]
ASF GitHub Bot logged work on BEAM-5446:
----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Nov/18 22:07
Start Date: 12/Nov/18 22:07
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #6467: [BEAM-5446]
SplittableDoFn: Remove "internal" methods for public API surface
URL: https://github.com/apache/beam/pull/6467
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index 7f4c95182d5..85dce13e133 100644
---
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -36,7 +36,6 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
@@ -126,13 +125,12 @@ public void translate(ParDo.MultiOutput<InputT, OutputT>
transform, TranslationC
}
}
- static class SplittableProcessElementsTranslator<
- InputT, OutputT, RestrictionT, TrackerT extends
RestrictionTracker<RestrictionT, ?>>
- implements TransformTranslator<ProcessElements<InputT, OutputT,
RestrictionT, TrackerT>> {
+ static class SplittableProcessElementsTranslator<InputT, OutputT,
RestrictionT, PositionT>
+ implements TransformTranslator<ProcessElements<InputT, OutputT,
RestrictionT, PositionT>> {
@Override
public void translate(
- ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform,
+ ProcessElements<InputT, OutputT, RestrictionT, PositionT> transform,
TranslationContext context) {
Map<TupleTag<?>, PValue> outputs = context.getOutputs();
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 3f361405c5e..261e86f23cc 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -109,8 +109,7 @@ public PCollectionTuple expand(PCollection<KV<byte[],
KV<InputT, RestrictionT>>>
}
}
- static class NaiveProcessFn<
- InputT, OutputT, RestrictionT, TrackerT extends
RestrictionTracker<RestrictionT, ?>>
+ static class NaiveProcessFn<InputT, OutputT, RestrictionT, PositionT>
extends DoFn<KV<InputT, RestrictionT>, OutputT> {
private final DoFn<InputT, OutputT> fn;
@@ -142,7 +141,7 @@ public void process(ProcessContext c, BoundedWindow w) {
InputT element = c.element().getKey();
RestrictionT restriction = c.element().getValue();
while (true) {
- TrackerT tracker = invoker.invokeNewTracker(restriction);
+ RestrictionTracker<RestrictionT, PositionT> tracker =
invoker.invokeNewTracker(restriction);
ProcessContinuation continuation =
invoker.invokeProcessElement(new NestedProcessContext<>(fn, c,
element, w, tracker));
if (continuation.shouldResume()) {
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 86dbfbfdb84..7155efb1555 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -163,7 +163,8 @@ public void simpleProcess(ProcessContext ctxt) {
private DoFn<KV<String, Integer>, Integer> splittableDoFn =
new DoFn<KV<String, Integer>, Integer>() {
@ProcessElement
- public void processElement(ProcessContext context, SomeTracker
tracker) {}
+ public void processElement(
+ ProcessContext context, RestrictionTracker<Void, Void> tracker) {}
@GetInitialRestriction
public Void getInitialRestriction(KV<String, Integer> element) {
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index 68365c85bc9..959120c7d54 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -58,7 +58,7 @@ public SomeRestrictionTracker(SomeRestriction
someRestriction) {
}
@Override
- protected boolean tryClaimImpl(Void position) {
+ public boolean tryClaim(Void position) {
return false;
}
@@ -78,7 +78,8 @@ public void checkDone() {}
private static class BoundedFakeFn extends DoFn<Integer, String> {
@ProcessElement
- public void processElement(ProcessContext context, SomeRestrictionTracker
tracker) {}
+ public void processElement(
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {}
@GetInitialRestriction
public SomeRestriction getInitialRestriction(Integer element) {
@@ -89,7 +90,7 @@ public SomeRestriction getInitialRestriction(Integer element)
{
private static class UnboundedFakeFn extends DoFn<Integer, String> {
@ProcessElement
public ProcessContinuation processElement(
- ProcessContext context, SomeRestrictionTracker tracker) {
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {
return stop();
}
diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle
index b43c410a6d1..4d7082975de 100644
--- a/runners/core-java/build.gradle
+++ b/runners/core-java/build.gradle
@@ -36,6 +36,7 @@ dependencies {
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow project(path: ":beam-model-fn-execution", configuration: "shadow")
shadow project(path: ":beam-runners-core-construction-java", configuration:
"shadow")
+ shadow project(path: ":beam-sdks-java-fn-execution", configuration: "shadow")
shadow library.java.joda_time
shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest")
shadowTest library.java.junit
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 7a0f66fbb22..c6f83844ddb 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -27,6 +27,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.TimeDomain;
@@ -55,12 +56,8 @@
* outputs), or runs for the given duration.
*/
public class OutputAndTimeBoundedSplittableProcessElementInvoker<
- InputT,
- OutputT,
- RestrictionT,
- PositionT,
- TrackerT extends RestrictionTracker<RestrictionT, PositionT>>
- extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT,
TrackerT> {
+ InputT, OutputT, RestrictionT, PositionT>
+ extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT,
PositionT> {
private final DoFn<InputT, OutputT> fn;
private final PipelineOptions pipelineOptions;
private final OutputWindowedValue<OutputT> output;
@@ -106,9 +103,9 @@ public OutputAndTimeBoundedSplittableProcessElementInvoker(
public Result invokeProcessElement(
DoFnInvoker<InputT, OutputT> invoker,
final WindowedValue<InputT> element,
- final TrackerT tracker) {
+ final RestrictionTracker<RestrictionT, PositionT> tracker) {
final ProcessContext processContext = new ProcessContext(element, tracker);
- tracker.setClaimObserver(processContext);
+
DoFn.ProcessContinuation cont =
invoker.invokeProcessElement(
new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
@@ -156,7 +153,7 @@ public MultiOutputReceiver
taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
- return tracker;
+ return processContext.tracker;
}
// Unsupported methods below.
@@ -226,7 +223,7 @@ public Timer timer(String timerId) {
// restriction that describes exactly the work that wasn't done in the
current call.
if (processContext.numClaimedBlocks > 0) {
residual = checkNotNull(processContext.takeCheckpointNow());
- tracker.checkDone();
+ processContext.tracker.checkDone();
} else {
// The call returned resume() without trying to claim any blocks,
i.e. it is unaware
// of any work to be done at the moment, but more might emerge
later. This is a valid
@@ -254,14 +251,14 @@ public Timer timer(String timerId) {
// ProcessElement call.
// In other words, if we took a checkpoint *after* ProcessElement
completed (like in the
// branch above), it would have been equivalent to this one.
- tracker.checkDone();
+ processContext.tracker.checkDone();
}
} else {
// The ProcessElement call returned stop() - that means the tracker's
current restriction
// has been fully processed by the call. A checkpoint may or may not
have been taken in
// "residual"; if it was, then we'll need to process it; if no, then we
don't - nothing
// special needs to be done.
- tracker.checkDone();
+ processContext.tracker.checkDone();
}
if (residual == null) {
// Can only be true if cont.shouldResume() is false and no checkpoint
was taken.
@@ -273,9 +270,9 @@ public Timer timer(String timerId) {
}
private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext
- implements RestrictionTracker.ClaimObserver<PositionT> {
+ implements RestrictionTrackers.ClaimObserver<PositionT> {
private final WindowedValue<InputT> element;
- private final TrackerT tracker;
+ private final RestrictionTracker<RestrictionT, PositionT> tracker;
private int numClaimedBlocks;
private boolean hasClaimFailed;
@@ -293,10 +290,11 @@ public Timer timer(String timerId) {
private @Nullable Future<?> scheduledCheckpoint;
private @Nullable Instant lastReportedWatermark;
- public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
+ public ProcessContext(
+ WindowedValue<InputT> element, RestrictionTracker<RestrictionT,
PositionT> tracker) {
fn.super();
this.element = element;
- this.tracker = tracker;
+ this.tracker = RestrictionTrackers.observe(tracker, this);
}
@Override
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 6c37b057ce6..a727ce910fb 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -153,8 +153,7 @@ public PCollectionTuple expand(PCollection<KV<byte[],
KV<InputT, RestrictionT>>>
}
/** A primitive transform wrapping around {@link ProcessFn}. */
- public static class ProcessElements<
- InputT, OutputT, RestrictionT, TrackerT extends
RestrictionTracker<RestrictionT, ?>>
+ public static class ProcessElements<InputT, OutputT, RestrictionT, PositionT>
extends PTransform<
PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>,
PCollectionTuple> {
private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original;
@@ -163,7 +162,7 @@ public ProcessElements(ProcessKeyedElements<InputT,
OutputT, RestrictionT> origi
this.original = original;
}
- public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(
+ public ProcessFn<InputT, OutputT, RestrictionT, PositionT> newProcessFn(
DoFn<InputT, OutputT> fn) {
return new ProcessFn<>(
fn,
@@ -214,8 +213,7 @@ public PCollectionTuple expand(
* <p>See also: https://issues.apache.org/jira/browse/BEAM-1983
*/
@VisibleForTesting
- public static class ProcessFn<
- InputT, OutputT, RestrictionT, TrackerT extends
RestrictionTracker<RestrictionT, ?>>
+ public static class ProcessFn<InputT, OutputT, RestrictionT, PositionT>
extends DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
/**
* The state cell containing a watermark hold for the output of this
{@link DoFn}. The hold is
@@ -252,7 +250,7 @@ public PCollectionTuple expand(
private transient @Nullable StateInternalsFactory<byte[]>
stateInternalsFactory;
private transient @Nullable TimerInternalsFactory<byte[]>
timerInternalsFactory;
private transient @Nullable SplittableProcessElementInvoker<
- InputT, OutputT, RestrictionT, TrackerT>
+ InputT, OutputT, RestrictionT, PositionT>
processElementInvoker;
private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;
@@ -283,7 +281,7 @@ public void
setTimerInternalsFactory(TimerInternalsFactory<byte[]> timerInternal
}
public void setProcessElementInvoker(
- SplittableProcessElementInvoker<InputT, OutputT, RestrictionT,
TrackerT> invoker) {
+ SplittableProcessElementInvoker<InputT, OutputT, RestrictionT,
PositionT> invoker) {
this.processElementInvoker = invoker;
}
@@ -368,8 +366,9 @@ public void processElement(final ProcessContext c) {
elementAndRestriction = KV.of(elementState.read(),
restrictionState.read());
}
- final TrackerT tracker =
invoker.invokeNewTracker(elementAndRestriction.getValue());
- SplittableProcessElementInvoker<InputT, OutputT, RestrictionT,
TrackerT>.Result result =
+ final RestrictionTracker<RestrictionT, PositionT> tracker =
+ invoker.invokeNewTracker(elementAndRestriction.getValue());
+ SplittableProcessElementInvoker<InputT, OutputT, RestrictionT,
PositionT>.Result result =
processElementInvoker.invokeProcessElement(
invoker, elementAndRestriction.getKey(), tracker);
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
index fba2e91fa6e..49a4860f3a3 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
@@ -30,8 +30,7 @@
* A runner-specific hook for invoking a {@link DoFn.ProcessElement} method
for a splittable {@link
* DoFn}, in particular, allowing the runner to access the {@link
RestrictionTracker}.
*/
-public abstract class SplittableProcessElementInvoker<
- InputT, OutputT, RestrictionT, TrackerT extends
RestrictionTracker<RestrictionT, ?>> {
+public abstract class SplittableProcessElementInvoker<InputT, OutputT,
RestrictionT, PositionT> {
/** Specifies how to resume a splittable {@link DoFn.ProcessElement} call. */
public class Result {
@Nullable private final RestrictionT residualRestriction;
@@ -77,5 +76,7 @@ public RestrictionT getResidualRestriction() {
* DoFn.ProcessContinuation}, and a future output watermark.
*/
public abstract Result invokeProcessElement(
- DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element,
TrackerT tracker);
+ DoFnInvoker<InputT, OutputT> invoker,
+ WindowedValue<InputT> element,
+ RestrictionTracker<RestrictionT, PositionT> tracker);
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index 08d8aca94b8..c64a9539dd0 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -36,6 +36,7 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -66,7 +67,8 @@ private SomeFn(
}
@ProcessElement
- public ProcessContinuation process(ProcessContext context,
OffsetRangeTracker tracker) {
+ public ProcessContinuation process(
+ ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker)
{
Uninterruptibles.sleepUninterruptibly(
sleepBeforeFirstClaim.getMillis(), TimeUnit.MILLISECONDS);
for (long i = tracker.currentRestriction().getFrom(), numIterations = 1;
@@ -88,20 +90,19 @@ public OffsetRange getInitialRestriction(Void element) {
}
}
- private SplittableProcessElementInvoker<Void, String, OffsetRange,
OffsetRangeTracker>.Result
- runTest(
- int totalNumOutputs,
- Duration sleepBeforeFirstClaim,
- int numOutputsPerProcessCall,
- Duration sleepBeforeEachOutput) {
+ private SplittableProcessElementInvoker<Void, String, OffsetRange,
Long>.Result runTest(
+ int totalNumOutputs,
+ Duration sleepBeforeFirstClaim,
+ int numOutputsPerProcessCall,
+ Duration sleepBeforeEachOutput) {
SomeFn fn = new SomeFn(sleepBeforeFirstClaim, numOutputsPerProcessCall,
sleepBeforeEachOutput);
OffsetRange initialRestriction = new OffsetRange(0, totalNumOutputs);
return runTest(fn, initialRestriction);
}
- private SplittableProcessElementInvoker<Void, String, OffsetRange,
OffsetRangeTracker>.Result
- runTest(DoFn<Void, String> fn, OffsetRange initialRestriction) {
- SplittableProcessElementInvoker<Void, String, OffsetRange,
OffsetRangeTracker> invoker =
+ private SplittableProcessElementInvoker<Void, String, OffsetRange,
Long>.Result runTest(
+ DoFn<Void, String> fn, OffsetRange initialRestriction) {
+ SplittableProcessElementInvoker<Void, String, OffsetRange, Long> invoker =
new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
fn,
PipelineOptionsFactory.create(),
@@ -134,7 +135,7 @@ public void outputWindowedValue(
@Test
public void testInvokeProcessElementOutputBounded() throws Exception {
- SplittableProcessElementInvoker<Void, String, OffsetRange,
OffsetRangeTracker>.Result res =
+ SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result
res =
runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.ZERO);
assertFalse(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
@@ -145,7 +146,7 @@ public void testInvokeProcessElementOutputBounded() throws
Exception {
@Test
public void testInvokeProcessElementTimeBounded() throws Exception {
- SplittableProcessElementInvoker<Void, String, OffsetRange,
OffsetRangeTracker>.Result res =
+ SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result
res =
runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100));
assertFalse(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
@@ -158,7 +159,7 @@ public void testInvokeProcessElementTimeBounded() throws
Exception {
@Test
public void testInvokeProcessElementTimeBoundedWithStartupDelay() throws
Exception {
- SplittableProcessElementInvoker<Void, String, OffsetRange,
OffsetRangeTracker>.Result res =
+ SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result
res =
runTest(10000, Duration.standardSeconds(3), Integer.MAX_VALUE,
Duration.millis(100));
assertFalse(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
@@ -170,7 +171,7 @@ public void
testInvokeProcessElementTimeBoundedWithStartupDelay() throws Excepti
@Test
public void testInvokeProcessElementVoluntaryReturnStop() throws Exception {
- SplittableProcessElementInvoker<Void, String, OffsetRange,
OffsetRangeTracker>.Result res =
+ SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result
res =
runTest(5, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100));
assertFalse(res.getContinuation().shouldResume());
assertNull(res.getResidualRestriction());
@@ -178,7 +179,7 @@ public void testInvokeProcessElementVoluntaryReturnStop()
throws Exception {
@Test
public void testInvokeProcessElementVoluntaryReturnResume() throws Exception
{
- SplittableProcessElementInvoker<Void, String, OffsetRange,
OffsetRangeTracker>.Result res =
+ SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result
res =
runTest(10, Duration.ZERO, 5, Duration.millis(100));
assertTrue(res.getContinuation().shouldResume());
assertEquals(new OffsetRange(5, 10), res.getResidualRestriction());
@@ -189,7 +190,7 @@ public void
testInvokeProcessElementOutputDisallowedBeforeTryClaim() throws Exce
DoFn<Void, String> brokenFn =
new DoFn<Void, String>() {
@ProcessElement
- public void process(ProcessContext c, OffsetRangeTracker tracker) {
+ public void process(ProcessContext c,
RestrictionTracker<OffsetRange, Long> tracker) {
c.output("foo");
}
@@ -207,7 +208,7 @@ public void
testInvokeProcessElementOutputDisallowedAfterFailedTryClaim() throws
DoFn<Void, String> brokenFn =
new DoFn<Void, String>() {
@ProcessElement
- public void process(ProcessContext c, OffsetRangeTracker tracker) {
+ public void process(ProcessContext c,
RestrictionTracker<OffsetRange, Long> tracker) {
assertFalse(tracker.tryClaim(6L));
c.output("foo");
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 443e9f91d60..07453e068a7 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -93,7 +93,7 @@ public SomeRestrictionTracker(SomeRestriction
someRestriction) {
}
@Override
- protected boolean tryClaimImpl(Void position) {
+ public boolean tryClaim(Void position) {
return true;
}
@@ -117,12 +117,7 @@ public void checkDone() {}
* A helper for testing {@link ProcessFn} on 1 element (but possibly over
multiple {@link
* DoFn.ProcessElement} calls).
*/
- private static class ProcessFnTester<
- InputT,
- OutputT,
- RestrictionT,
- PositionT,
- TrackerT extends RestrictionTracker<RestrictionT, PositionT>>
+ private static class ProcessFnTester<InputT, OutputT, RestrictionT,
PositionT>
implements AutoCloseable {
private final DoFnTester<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>,
OutputT> tester;
private Instant currentProcessingTime;
@@ -142,7 +137,7 @@ public void checkDone() {}
// encode IntervalWindow's because that's what all tests here use.
WindowingStrategy<InputT, BoundedWindow> windowingStrategy =
(WindowingStrategy)
WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(1)));
- final ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
+ final ProcessFn<InputT, OutputT, RestrictionT, PositionT> processFn =
new ProcessFn<>(fn, inputCoder, restrictionCoder, windowingStrategy);
this.tester = DoFnTester.of(processFn);
this.timerInternals = new InMemoryTimerInternals();
@@ -270,7 +265,7 @@ public void outputWindowedValue(
/** A simple splittable {@link DoFn} that's actually monolithic. */
private static class ToStringFn extends DoFn<Integer, String> {
@ProcessElement
- public void process(ProcessContext c, SomeRestrictionTracker tracker) {
+ public void process(ProcessContext c, RestrictionTracker<SomeRestriction,
Void> tracker) {
checkState(tracker.tryClaim(null));
c.output(c.element().toString() + "a");
c.output(c.element().toString() + "b");
@@ -296,7 +291,7 @@ public void
testTrivialProcessFnPropagatesOutputWindowAndTimestamp() throws Exce
new IntervalWindow(
base.minus(Duration.standardMinutes(1)),
base.plus(Duration.standardMinutes(1)));
- ProcessFnTester<Integer, String, SomeRestriction, Void,
SomeRestrictionTracker> tester =
+ ProcessFnTester<Integer, String, SomeRestriction, Void> tester =
new ProcessFnTester<>(
base,
fn,
@@ -321,7 +316,7 @@ public void
testTrivialProcessFnPropagatesOutputWindowAndTimestamp() throws Exce
private static class WatermarkUpdateFn extends DoFn<Instant, String> {
@ProcessElement
- public void process(ProcessContext c, OffsetRangeTracker tracker) {
+ public void process(ProcessContext c, RestrictionTracker<OffsetRange,
Long> tracker) {
for (long i = tracker.currentRestriction().getFrom();
tracker.tryClaim(i); ++i) {
c.updateWatermark(c.element().plus(Duration.standardSeconds(i)));
c.output(String.valueOf(i));
@@ -344,7 +339,7 @@ public void testUpdatesWatermark() throws Exception {
DoFn<Instant, String> fn = new WatermarkUpdateFn();
Instant base = Instant.now();
- ProcessFnTester<Instant, String, OffsetRange, Long, OffsetRangeTracker>
tester =
+ ProcessFnTester<Instant, String, OffsetRange, Long> tester =
new ProcessFnTester<>(
base,
fn,
@@ -369,7 +364,8 @@ public void testUpdatesWatermark() throws Exception {
/** A simple splittable {@link DoFn} that outputs the given element every 5
seconds forever. */
private static class SelfInitiatedResumeFn extends DoFn<Integer, String> {
@ProcessElement
- public ProcessContinuation process(ProcessContext c,
SomeRestrictionTracker tracker) {
+ public ProcessContinuation process(
+ ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) {
checkState(tracker.tryClaim(null));
c.output(c.element().toString());
return resume().withResumeDelay(Duration.standardSeconds(5));
@@ -385,7 +381,7 @@ public SomeRestriction getInitialRestriction(Integer elem) {
public void testResumeSetsTimer() throws Exception {
DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
Instant base = Instant.now();
- ProcessFnTester<Integer, String, SomeRestriction, Void,
SomeRestrictionTracker> tester =
+ ProcessFnTester<Integer, String, SomeRestriction, Void> tester =
new ProcessFnTester<>(
base,
fn,
@@ -423,7 +419,8 @@ public CounterFn(int numOutputsPerCall) {
}
@ProcessElement
- public ProcessContinuation process(ProcessContext c, OffsetRangeTracker
tracker) {
+ public ProcessContinuation process(
+ ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
tracker.tryClaim(i);
++i, ++numIterations) {
@@ -445,7 +442,7 @@ public OffsetRange getInitialRestriction(Integer elem) {
public void testResumeCarriesOverState() throws Exception {
DoFn<Integer, String> fn = new CounterFn(1);
Instant base = Instant.now();
- ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker>
tester =
+ ProcessFnTester<Integer, String, OffsetRange, Long> tester =
new ProcessFnTester<>(
base,
fn,
@@ -474,7 +471,7 @@ public void testCheckpointsAfterNumOutputs() throws
Exception {
Instant base = Instant.now();
int baseIndex = 42;
- ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker>
tester =
+ ProcessFnTester<Integer, String, OffsetRange, Long> tester =
new ProcessFnTester<>(
base,
fn,
@@ -520,7 +517,7 @@ public void testCheckpointsAfterDuration() throws Exception
{
Instant base = Instant.now();
int baseIndex = 42;
- ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker>
tester =
+ ProcessFnTester<Integer, String, OffsetRange, Long> tester =
new ProcessFnTester<>(
base,
fn,
@@ -552,7 +549,7 @@ public void testCheckpointsAfterDuration() throws Exception
{
private State state = State.BEFORE_SETUP;
@ProcessElement
- public void process(ProcessContext c, SomeRestrictionTracker tracker) {
+ public void process(ProcessContext c, RestrictionTracker<SomeRestriction,
Void> tracker) {
assertEquals(State.INSIDE_BUNDLE, state);
}
@@ -589,7 +586,7 @@ public void finishBundle() {
@Test
public void testInvokesLifecycleMethods() throws Exception {
DoFn<Integer, String> fn = new LifecycleVerifyingFn();
- try (ProcessFnTester<Integer, String, SomeRestriction, Void,
SomeRestrictionTracker> tester =
+ try (ProcessFnTester<Integer, String, SomeRestriction, Void> tester =
new ProcessFnTester<>(
Instant.now(),
fn,
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 1f8e6ea9664..8f71eb020fc 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -35,7 +35,6 @@
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -46,12 +45,7 @@
import org.joda.time.Duration;
import org.joda.time.Instant;
-class SplittableProcessElementsEvaluatorFactory<
- InputT,
- OutputT,
- RestrictionT,
- PositionT,
- TrackerT extends RestrictionTracker<RestrictionT, PositionT>>
+class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT,
PositionT>
implements TransformEvaluatorFactory {
private final ParDoEvaluatorFactory<KeyedWorkItem<byte[], KV<InputT,
RestrictionT>>, OutputT>
delegateFactory;
@@ -74,8 +68,8 @@ public DoFnLifecycleManager load(final AppliedPTransform<?,
?, ?> application) {
checkArgument(
ProcessElements.class.isInstance(application.getTransform()),
"No know extraction of the fn from " + application);
- final ProcessElements<InputT, OutputT, RestrictionT, TrackerT>
transform =
- (ProcessElements<InputT, OutputT, RestrictionT, TrackerT>)
+ final ProcessElements<InputT, OutputT, RestrictionT,
PositionT> transform =
+ (ProcessElements<InputT, OutputT, RestrictionT, PositionT>)
application.getTransform();
return
DoFnLifecycleManager.of(transform.newProcessFn(transform.getFn()));
}
@@ -110,11 +104,11 @@ public void cleanup() throws Exception {
private TransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>
createEvaluator(
AppliedPTransform<
PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>,
PCollectionTuple,
- ProcessElements<InputT, OutputT, RestrictionT, TrackerT>>
+ ProcessElements<InputT, OutputT, RestrictionT, PositionT>>
application,
CommittedBundle<InputT> inputBundle)
throws Exception {
- final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
+ final ProcessElements<InputT, OutputT, RestrictionT, PositionT> transform =
application.getTransform();
final DoFnLifecycleManagerRemovingTransformEvaluator<
@@ -130,8 +124,8 @@ public void cleanup() throws Exception {
application.getTransform().getAdditionalOutputTags().getAll());
final ParDoEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> pde =
evaluator.getParDoEvaluator();
- final ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
- (ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
+ final ProcessFn<InputT, OutputT, RestrictionT, PositionT> processFn =
+ (ProcessFn<InputT, OutputT, RestrictionT, PositionT>)
ProcessFnRunner.class.cast(pde.getFnRunner()).getFn();
final DirectExecutionContext.DirectStepContext stepContext =
pde.getStepContext();
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index b2ecd4cdf4d..bce117a1d1b 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -67,7 +67,6 @@
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -659,14 +658,14 @@ public void translateNode(
}
private static class SplittableProcessElementsStreamingTranslator<
- InputT, OutputT, RestrictionT, TrackerT extends
RestrictionTracker<RestrictionT, ?>>
+ InputT, OutputT, RestrictionT, PositionT>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
SplittableParDoViaKeyedWorkItems.ProcessElements<
- InputT, OutputT, RestrictionT, TrackerT>> {
+ InputT, OutputT, RestrictionT, PositionT>> {
@Override
public void translateNode(
- SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT,
RestrictionT, TrackerT>
+ SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT,
RestrictionT, PositionT>
transform,
FlinkStreamingTranslationContext context) {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 25a5ba1579d..f376c127021 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -40,7 +40,6 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -58,8 +57,7 @@
* Flink operator for executing splittable {@link DoFn DoFns}. Specifically,
for executing the
* {@code @ProcessElement} method of a splittable {@link DoFn}.
*/
-public class SplittableDoFnOperator<
- InputT, OutputT, RestrictionT, TrackerT extends
RestrictionTracker<RestrictionT, ?>>
+public class SplittableDoFnOperator<InputT, OutputT, RestrictionT>
extends DoFnOperator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>,
OutputT> {
private transient ScheduledExecutorService executorService;
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index e2f2e6479e5..212d5bcbd6d 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -88,7 +88,7 @@
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -975,7 +975,7 @@ private static void assertAllStepOutputsHaveUniqueIds(Job
job) throws Exception
private static class TestSplittableFn extends DoFn<String, Integer> {
@ProcessElement
- public void process(ProcessContext c, OffsetRangeTracker tracker) {
+ public void process(ProcessContext c, RestrictionTracker<OffsetRange,
Long> tracker) {
// noop
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 67c658e5ac9..88556eaf486 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -533,7 +533,7 @@ public Duration getAllowedTimestampSkew() {
* <p>The signature of this method must satisfy the following constraints:
*
* <ul>
- * <li>If one of its arguments is a subtype of {@link RestrictionTracker},
then it is a <a
+ * <li>If one of its arguments is a {@link RestrictionTracker}, then it is
a <a
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link
DoFn} subject to the
* separate requirements described below. Items below are assuming
this is not a splittable
* {@link DoFn}.
@@ -561,8 +561,8 @@ public Duration getAllowedTimestampSkew() {
* <h2>Splittable DoFn's</h2>
*
* <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement}
method has a parameter
- * whose type is a subtype of {@link RestrictionTracker}. This is an
advanced feature and an
- * overwhelming majority of users will never need to write a splittable
{@link DoFn}.
+ * whose type is of {@link RestrictionTracker}. This is an advanced feature
and an overwhelming
+ * majority of users will never need to write a splittable {@link DoFn}.
*
* <p>Not all runners support Splittable DoFn. See the <a
*
href="https://beam.apache.org/documentation/runners/capability-matrix/">capability
matrix</a>.
@@ -575,12 +575,10 @@ public Duration getAllowedTimestampSkew() {
* <ul>
* <li>It <i>must</i> define a {@link GetInitialRestriction} method.
* <li>It <i>may</i> define a {@link SplitRestriction} method.
- * <li>It <i>may</i> define a {@link NewTracker} method returning the same
type as the type of
- * the {@link RestrictionTracker} argument of {@link ProcessElement},
which in turn must be
- * a subtype of {@code RestrictionTracker<R>} where {@code R} is the
restriction type
- * returned by {@link GetInitialRestriction}. This method is optional
in case the
- * restriction type returned by {@link GetInitialRestriction}
implements {@link
- * HasDefaultTracker}.
+ * <li>It <i>may</i> define a {@link NewTracker} method returning a
subtype of {@code
+ * RestrictionTracker<R>} where {@code R} is the restriction type
returned by {@link
+ * GetInitialRestriction}. This method is optional in case the
restriction type returned by
+ * {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
* <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
* <li>The type of restrictions used by all of these methods must be the
same.
* <li>Its {@link ProcessElement} method <i>may</i> return a {@link
ProcessContinuation} to
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index de100242d9b..34f79ccd4d2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -990,7 +990,7 @@ private synchronized int getNumPending() {
}
@Override
- protected synchronized boolean tryClaimImpl(HashCode hash) {
+ public synchronized boolean tryClaim(HashCode hash) {
if (shouldStop) {
return false;
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index f96e28eb05f..21d1653fa11 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -90,7 +90,7 @@
/** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */
@SuppressWarnings("TypeParameterUnusedInFormals")
- <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
TrackerT invokeNewTracker(
+ <RestrictionT, PositionT> RestrictionTracker<RestrictionT, PositionT>
invokeNewTracker(
RestrictionT restriction);
/** Get the bound {@link DoFn}. */
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 98b9016a607..4c9b61b98d5 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -555,9 +555,6 @@ private static void verifySplittableMethods(DoFnSignature
signature, ErrorReport
ErrorReporter processElementErrors =
errors.forMethod(DoFn.ProcessElement.class,
processElement.targetMethod());
- final TypeDescriptor<?> trackerT;
- final String originOfTrackerT;
-
List<String> missingRequiredMethods = new ArrayList<>();
if (getInitialRestriction == null) {
missingRequiredMethods.add("@" +
DoFn.GetInitialRestriction.class.getSimpleName());
@@ -567,27 +564,11 @@ private static void verifySplittableMethods(DoFnSignature
signature, ErrorReport
&& getInitialRestriction
.restrictionT()
.isSubtypeOf(TypeDescriptor.of(HasDefaultTracker.class))) {
- trackerT =
- getInitialRestriction
- .restrictionT()
- .resolveType(HasDefaultTracker.class.getTypeParameters()[1]);
- originOfTrackerT =
- String.format(
- "restriction type %s of @%s method %s",
- formatType(getInitialRestriction.restrictionT()),
- DoFn.GetInitialRestriction.class.getSimpleName(),
- format(getInitialRestriction.targetMethod()));
+ // no-op we are using the annotation @HasDefaultTracker
} else {
missingRequiredMethods.add("@" +
DoFn.NewTracker.class.getSimpleName());
- trackerT = null;
- originOfTrackerT = null;
}
} else {
- trackerT = newTracker.trackerT();
- originOfTrackerT =
- String.format(
- "%s method %s",
- DoFn.NewTracker.class.getSimpleName(),
format(newTracker.targetMethod()));
ErrorReporter getInitialRestrictionErrors =
errors.forMethod(DoFn.GetInitialRestriction.class,
getInitialRestriction.targetMethod());
TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
@@ -610,11 +591,9 @@ private static void verifySplittableMethods(DoFnSignature
signature, ErrorReport
errors.forMethod(DoFn.GetInitialRestriction.class,
getInitialRestriction.targetMethod());
TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
processElementErrors.checkArgument(
- processElement.trackerT().equals(trackerT),
- "Has tracker type %s, but the DoFn's tracker type was inferred as %s
from %s",
- formatType(processElement.trackerT()),
- trackerT,
- originOfTrackerT);
+
processElement.trackerT().getRawType().equals(RestrictionTracker.class),
+ "Has tracker type %s, but the DoFn's tracker type must be of type
RestrictionTracker.",
+ formatType(processElement.trackerT()));
if (getRestrictionCoder != null) {
getInitialRestrictionErrors.checkArgument(
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
index 457c19e876c..db83af4c976 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
@@ -54,12 +54,12 @@ public static ByteKeyRangeTracker of(ByteKeyRange range) {
}
@Override
- public synchronized ByteKeyRange currentRestriction() {
+ public ByteKeyRange currentRestriction() {
return range;
}
@Override
- public synchronized ByteKeyRange checkpoint() {
+ public ByteKeyRange checkpoint() {
// If we haven't done any work, we should return the original range we
were processing
// as the checkpoint.
if (lastAttemptedKey == null) {
@@ -97,7 +97,7 @@ public synchronized ByteKeyRange checkpoint() {
* current {@link ByteKeyRange} of this tracker.
*/
@Override
- protected synchronized boolean tryClaimImpl(ByteKey key) {
+ public boolean tryClaim(ByteKey key) {
// Handle claiming the end of range EMPTY key
if (key.isEmpty()) {
checkArgument(
@@ -130,7 +130,7 @@ protected synchronized boolean tryClaimImpl(ByteKey key) {
}
@Override
- public synchronized void checkDone() throws IllegalStateException {
+ public void checkDone() throws IllegalStateException {
// Handle checking the empty range which is implicitly done.
// This case can occur if the range tracker is checkpointed before any
keys have been claimed
// or if the range tracker is checkpointed once the range is done.
@@ -160,7 +160,7 @@ public synchronized void checkDone() throws
IllegalStateException {
}
@Override
- public synchronized String toString() {
+ public String toString() {
return MoreObjects.toStringHelper(this)
.add("range", range)
.add("lastClaimedKey", lastClaimedKey)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index f2d9e5cfd81..2d5626b8d7c 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -40,12 +40,12 @@ public OffsetRangeTracker(OffsetRange range) {
}
@Override
- public synchronized OffsetRange currentRestriction() {
+ public OffsetRange currentRestriction() {
return range;
}
@Override
- public synchronized OffsetRange checkpoint() {
+ public OffsetRange checkpoint() {
checkState(
lastClaimedOffset != null, "Can't checkpoint before any offset was
successfully claimed");
OffsetRange res = new OffsetRange(lastClaimedOffset + 1, range.getTo());
@@ -62,7 +62,7 @@ public synchronized OffsetRange checkpoint() {
* current {@link OffsetRange} of this tracker (in that case this
operation is a no-op).
*/
@Override
- protected synchronized boolean tryClaimImpl(Long i) {
+ public boolean tryClaim(Long i) {
checkArgument(
lastAttemptedOffset == null || i > lastAttemptedOffset,
"Trying to claim offset %s while last attempted was %s",
@@ -86,12 +86,12 @@ protected synchronized boolean tryClaimImpl(Long i) {
* call this if it hits EOF - even though the last attempted claim was
before the end of the
* range, there are no more offsets to claim.
*/
- public synchronized void markDone() {
+ public void markDone() {
lastAttemptedOffset = Long.MAX_VALUE;
}
@Override
- public synchronized void checkDone() throws IllegalStateException {
+ public void checkDone() throws IllegalStateException {
checkState(
lastAttemptedOffset >= range.getTo() - 1,
"Last attempted offset was %s in range %s, claiming work in [%s, %s)
was not attempted",
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 8b59f054b96..d596862b5ac 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -17,41 +17,13 @@
*/
package org.apache.beam.sdk.transforms.splittabledofn;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.DoFn;
/**
- * Manages concurrent access to the restriction and keeps track of its claimed
part for a <a
+ * Manages access to the restriction and keeps track of its claimed part for a
<a
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
*/
public abstract class RestrictionTracker<RestrictionT, PositionT> {
- /** Internal interface allowing a runner to observe the calls to {@link
#tryClaim}. */
- @Internal
- public interface ClaimObserver<PositionT> {
- /** Called when {@link #tryClaim} returns true. */
- void onClaimed(PositionT position);
-
- /** Called when {@link #tryClaim} returns false. */
- void onClaimFailed(PositionT position);
- }
-
- @Nullable private ClaimObserver<PositionT> claimObserver;
-
- /**
- * Sets a {@link ClaimObserver} to be invoked on every call to {@link
#tryClaim}. Internal:
- * intended only for runner authors.
- */
- @Internal
- public void setClaimObserver(ClaimObserver<PositionT> claimObserver) {
- checkNotNull(claimObserver, "claimObserver");
- checkState(this.claimObserver == null, "A claim observer has already been
set");
- this.claimObserver = claimObserver;
- }
-
/**
* Attempts to claim the block of work in the current restriction identified
by the given
* position.
@@ -65,27 +37,8 @@ public void setClaimObserver(ClaimObserver<PositionT>
claimObserver) {
* call to this method).
* <li>{@link RestrictionTracker#checkDone} MUST succeed.
* </ul>
- *
- * <p>Under the hood, calls {@link #tryClaimImpl} and notifies {@link
ClaimObserver} of the
- * result.
*/
- public final boolean tryClaim(PositionT position) {
- if (tryClaimImpl(position)) {
- if (claimObserver != null) {
- claimObserver.onClaimed(position);
- }
- return true;
- } else {
- if (claimObserver != null) {
- claimObserver.onClaimFailed(position);
- }
- return false;
- }
- }
-
- /** Tracker-specific implementation of {@link #tryClaim}. */
- @Internal
- protected abstract boolean tryClaimImpl(PositionT position);
+ public abstract boolean tryClaim(PositionT position);
/**
* Returns a restriction accurately describing the full range of work the
current {@link
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 54cae0dfae0..0d2be5a2d53 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -317,8 +317,8 @@ public void onWindowExpiration(BoundedWindow window) {}
public void testDoFnWithReturn() throws Exception {
class MockFn extends DoFn<String, String> {
@DoFn.ProcessElement
- public ProcessContinuation processElement(ProcessContext c,
SomeRestrictionTracker tracker)
- throws Exception {
+ public ProcessContinuation processElement(
+ ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker)
throws Exception {
return null;
}
@@ -394,7 +394,8 @@ public SomeRestriction decode(InputStream inStream) {
/** Public so Mockito can do "delegatesTo()" in the test below. */
public static class MockFn extends DoFn<String, String> {
@ProcessElement
- public ProcessContinuation processElement(ProcessContext c,
SomeRestrictionTracker tracker) {
+ public ProcessContinuation processElement(
+ ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) {
return null;
}
@@ -495,7 +496,7 @@ public DefaultTracker newTracker() {
private static class DefaultTracker
extends RestrictionTracker<RestrictionWithDefaultTracker, Void> {
@Override
- protected boolean tryClaimImpl(Void position) {
+ public boolean tryClaim(Void position) {
throw new UnsupportedOperationException();
}
@@ -531,7 +532,8 @@ public RestrictionWithDefaultTracker decode(InputStream
inStream) {
public void testSplittableDoFnDefaultMethods() throws Exception {
class MockFn extends DoFn<String, String> {
@ProcessElement
- public void processElement(ProcessContext c, DefaultTracker tracker) {}
+ public void processElement(
+ ProcessContext c, RestrictionTracker<RestrictionWithDefaultTracker,
Void> tracker) {}
@GetInitialRestriction
public RestrictionWithDefaultTracker getInitialRestriction(String
element) {
@@ -740,7 +742,8 @@ public void testProcessElementExceptionWithReturn() throws
Exception {
new DoFn<Integer, Integer>() {
@ProcessElement
public ProcessContinuation processElement(
- @SuppressWarnings("unused") ProcessContext c,
SomeRestrictionTracker tracker) {
+ @SuppressWarnings("unused") ProcessContext c,
+ RestrictionTracker<SomeRestriction, Void> tracker) {
throw new IllegalArgumentException("bogus");
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 8c181ba96de..317b1453f5c 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -113,7 +113,8 @@ private void method(
public void testInfersBoundednessFromAnnotation() throws Exception {
class BaseSplittableFn extends DoFn<Integer, String> {
@ProcessElement
- public void processElement(ProcessContext context,
SomeRestrictionTracker tracker) {}
+ public void processElement(
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {}
@GetInitialRestriction
public SomeRestriction getInitialRestriction(Integer element) {
@@ -140,7 +141,8 @@ public SomeRestriction getInitialRestriction(Integer
element) {
private static class BaseFnWithoutContinuation extends DoFn<Integer, String>
{
@ProcessElement
- public void processElement(ProcessContext context, SomeRestrictionTracker
tracker) {}
+ public void processElement(
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {}
@GetInitialRestriction
public SomeRestriction getInitialRestriction(Integer element) {
@@ -151,7 +153,7 @@ public SomeRestriction getInitialRestriction(Integer
element) {
private static class BaseFnWithContinuation extends DoFn<Integer, String> {
@ProcessElement
public ProcessContinuation processElement(
- ProcessContext context, SomeRestrictionTracker tracker) {
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {
return null;
}
@@ -230,7 +232,7 @@ public void testSplittableWithAllFunctions() throws
Exception {
class GoodSplittableDoFn extends DoFn<Integer, String> {
@ProcessElement
public ProcessContinuation processElement(
- ProcessContext context, SomeRestrictionTracker tracker) {
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {
return null;
}
@@ -255,7 +257,7 @@ public SomeRestrictionCoder getRestrictionCoder() {
}
DoFnSignature signature =
DoFnSignatures.getSignature(GoodSplittableDoFn.class);
- assertEquals(SomeRestrictionTracker.class,
signature.processElement().trackerT().getRawType());
+ assertEquals(RestrictionTracker.class,
signature.processElement().trackerT().getRawType());
assertTrue(signature.processElement().isSplittable());
assertTrue(signature.processElement().hasReturnValue());
assertEquals(
@@ -301,14 +303,15 @@ public CoderT getRestrictionCoder() {
DoFnSignature signature =
DoFnSignatures.getSignature(
new GoodGenericSplittableDoFn<
- SomeRestriction, SomeRestrictionTracker,
SomeRestrictionCoder>() {}.getClass());
- assertEquals(SomeRestrictionTracker.class,
signature.processElement().trackerT().getRawType());
+ SomeRestriction, RestrictionTracker<SomeRestriction, ?>,
+ SomeRestrictionCoder>() {}.getClass());
+ assertEquals(RestrictionTracker.class,
signature.processElement().trackerT().getRawType());
assertTrue(signature.processElement().isSplittable());
assertTrue(signature.processElement().hasReturnValue());
assertEquals(
SomeRestriction.class,
signature.getInitialRestriction().restrictionT().getRawType());
assertEquals(SomeRestriction.class,
signature.splitRestriction().restrictionT().getRawType());
- assertEquals(SomeRestrictionTracker.class,
signature.newTracker().trackerT().getRawType());
+ assertEquals(RestrictionTracker.class,
signature.newTracker().trackerT().getRawType());
assertEquals(SomeRestriction.class,
signature.newTracker().restrictionT().getRawType());
assertEquals(SomeRestrictionCoder.class,
signature.getRestrictionCoder().coderT().getRawType());
}
@@ -317,7 +320,8 @@ public CoderT getRestrictionCoder() {
public void testSplittableMissingRequiredMethods() throws Exception {
class BadFn extends DoFn<Integer, String> {
@ProcessElement
- public void process(ProcessContext context, SomeRestrictionTracker
tracker) {}
+ public void process(
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {}
}
thrown.expectMessage(
@@ -336,7 +340,8 @@ public void process(ProcessContext context,
SomeRestrictionTracker tracker) {}
public void testHasDefaultTracker() throws Exception {
class Fn extends DoFn<Integer, String> {
@ProcessElement
- public void process(ProcessContext c, SomeDefaultTracker tracker) {}
+ public void process(
+ ProcessContext c, RestrictionTracker<RestrictionWithDefaultTracker,
Void> tracker) {}
@GetInitialRestriction
public RestrictionWithDefaultTracker getInitialRestriction(Integer
element) {
@@ -345,7 +350,7 @@ public RestrictionWithDefaultTracker
getInitialRestriction(Integer element) {
}
DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
- assertEquals(SomeDefaultTracker.class,
signature.processElement().trackerT().getRawType());
+ assertEquals(RestrictionTracker.class,
signature.processElement().trackerT().getRawType());
}
@Test
@@ -361,11 +366,8 @@ public RestrictionWithDefaultTracker
getInitialRestriction(Integer element) {
}
thrown.expectMessage(
- "Has tracker type SomeRestrictionTracker, but the DoFn's tracker type
was inferred as ");
- thrown.expectMessage("SomeDefaultTracker");
- thrown.expectMessage(
- "from restriction type RestrictionWithDefaultTracker "
- + "of @GetInitialRestriction method
getInitialRestriction(Integer)");
+ "Has tracker type SomeRestrictionTracker, "
+ + "but the DoFn's tracker type must be of type
RestrictionTracker.");
DoFnSignatures.getSignature(Fn.class);
}
@@ -373,7 +375,8 @@ public RestrictionWithDefaultTracker
getInitialRestriction(Integer element) {
public void testNewTrackerReturnsWrongType() throws Exception {
class BadFn extends DoFn<Integer, String> {
@ProcessElement
- public void process(ProcessContext context, SomeRestrictionTracker
tracker) {}
+ public void process(
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {}
@NewTracker
public void newTracker(SomeRestriction restriction) {}
@@ -393,7 +396,8 @@ public SomeRestriction getInitialRestriction(Integer
element) {
public void testGetInitialRestrictionMismatchesNewTracker() throws Exception
{
class BadFn extends DoFn<Integer, String> {
@ProcessElement
- public void process(ProcessContext context, SomeRestrictionTracker
tracker) {}
+ public void process(
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {}
@NewTracker
public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
@@ -416,7 +420,8 @@ public String getInitialRestriction(Integer element) {
public void testGetRestrictionCoderReturnsWrongType() throws Exception {
class BadFn extends DoFn<Integer, String> {
@ProcessElement
- public void process(ProcessContext context, SomeRestrictionTracker
tracker) {}
+ public void process(
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {}
@NewTracker
public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
@@ -497,7 +502,8 @@ public void testSplitRestrictionConsistentButWrongType()
throws Exception {
class BadFn extends DoFn<Integer, String> {
@ProcessElement
- public void process(ProcessContext context, SomeRestrictionTracker
tracker) {}
+ public void process(
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {}
@NewTracker
public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
new file mode 100644
index 00000000000..53270f474c0
--- /dev/null
+++
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.splittabledofn;
+
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+
+/** Support utilities for interacting with {@link RestrictionTracker
RestrictionTrackers}. */
+public class RestrictionTrackers {
+
+ /** Interface allowing a runner to observe the calls to {@link
RestrictionTracker#tryClaim}. */
+ public interface ClaimObserver<PositionT> {
+ /** Called when {@link RestrictionTracker#tryClaim} returns true. */
+ void onClaimed(PositionT position);
+
+ /** Called when {@link RestrictionTracker#tryClaim} returns false. */
+ void onClaimFailed(PositionT position);
+ }
+
+ /**
+ * A {@link RestrictionTracker} which forwards all calls to the delegate
{@link
+ * RestrictionTracker}.
+ */
+ private static class ForwardingRestrictionTracker<RestrictionT, PositionT>
+ extends RestrictionTracker<RestrictionT, PositionT> {
+ private final RestrictionTracker<RestrictionT, PositionT> delegate;
+
+ protected ForwardingRestrictionTracker(RestrictionTracker<RestrictionT,
PositionT> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean tryClaim(PositionT position) {
+ return delegate.tryClaim(position);
+ }
+
+ @Override
+ public RestrictionT currentRestriction() {
+ return delegate.currentRestriction();
+ }
+
+ @Override
+ public RestrictionT checkpoint() {
+ return delegate.checkpoint();
+ }
+
+ @Override
+ public void checkDone() throws IllegalStateException {
+ delegate.checkDone();
+ }
+ }
+
+ /**
+ * A {@link RestrictionTracker} which notifies the {@link ClaimObserver} if
a claim succeeded or
+ * failed.
+ */
+ private static class RestrictionTrackerObserver<RestrictionT, PositionT>
+ extends ForwardingRestrictionTracker<RestrictionT, PositionT> {
+ private final ClaimObserver<PositionT> claimObserver;
+
+ private RestrictionTrackerObserver(
+ RestrictionTracker<RestrictionT, PositionT> delegate,
+ ClaimObserver<PositionT> claimObserver) {
+ super(delegate);
+ this.claimObserver = claimObserver;
+ }
+
+ @Override
+ public boolean tryClaim(PositionT position) {
+ if (super.tryClaim(position)) {
+ claimObserver.onClaimed(position);
+ return true;
+ } else {
+ claimObserver.onClaimFailed(position);
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Returns a {@link RestrictionTracker} which reports all claim attempts to
the specified {@link
+ * ClaimObserver}.
+ */
+ public static <RestrictionT, PositionT> RestrictionTracker<RestrictionT,
PositionT> observe(
+ RestrictionTracker<RestrictionT, PositionT> restrictionTracker,
+ ClaimObserver<PositionT> claimObserver) {
+ return new RestrictionTrackerObserver<RestrictionT, PositionT>(
+ restrictionTracker, claimObserver);
+ }
+}
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/package-info.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/package-info.java
new file mode 100644
index 00000000000..0f2cbd9465a
--- /dev/null
+++
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines utilities related to executing <a
+ * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link
+ * org.apache.beam.sdk.transforms.DoFn}.
+ */
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.fn.splittabledofn;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
diff --git
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
new file mode 100644
index 00000000000..52d03ed11d1
--- /dev/null
+++
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.splittabledofn;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RestrictionTrackers}. */
+@RunWith(JUnit4.class)
+public class RestrictionTrackersTest {
+ @Test
+ public void testObservingClaims() {
+ RestrictionTracker<String, String> observedTracker =
+ new RestrictionTracker() {
+
+ @Override
+ public boolean tryClaim(Object position) {
+ return "goodClaim".equals(position);
+ }
+
+ @Override
+ public Object currentRestriction() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object checkpoint() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void checkDone() throws IllegalStateException {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ List<String> positionsObserved = new ArrayList<>();
+ ClaimObserver<String> observer =
+ new ClaimObserver<String>() {
+
+ @Override
+ public void onClaimed(String position) {
+ positionsObserved.add(position);
+ assertEquals("goodClaim", position);
+ }
+
+ @Override
+ public void onClaimFailed(String position) {
+ positionsObserved.add(position);
+ }
+ };
+
+ RestrictionTracker<String, String> observingTracker =
+ RestrictionTrackers.observe(observedTracker, observer);
+ observingTracker.tryClaim("goodClaim");
+ observingTracker.tryClaim("badClaim");
+
+ assertThat(positionsObserved, contains("goodClaim", "badClaim"));
+ }
+}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
index 92a966fd6a1..d05fe4376a0 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
@@ -154,8 +154,7 @@ public void processElement(WindowedValue<KV<InputT,
RestrictionT>> elem) {
processElementTyped(elem);
}
- private <PositionT, TrackerT extends RestrictionTracker<RestrictionT,
PositionT>>
- void processElementTyped(WindowedValue<KV<InputT, RestrictionT>> elem) {
+ private <PositionT> void processElementTyped(WindowedValue<KV<InputT,
RestrictionT>> elem) {
checkArgument(
elem.getWindows().size() == 1,
"SPLITTABLE_PROCESS_ELEMENTS expects its input to be in 1 window, but
got %s windows",
@@ -173,9 +172,9 @@ void processElementTyped(WindowedValue<KV<InputT,
RestrictionT>> elem) {
(Coder<BoundedWindow>) context.windowCoder,
() -> elem,
() -> window);
- TrackerT tracker =
doFnInvoker.invokeNewTracker(elem.getValue().getValue());
- OutputAndTimeBoundedSplittableProcessElementInvoker<
- InputT, OutputT, RestrictionT, PositionT, TrackerT>
+ RestrictionTracker<RestrictionT, PositionT> tracker =
+ doFnInvoker.invokeNewTracker(elem.getValue().getValue());
+ OutputAndTimeBoundedSplittableProcessElementInvoker<InputT, OutputT,
RestrictionT, PositionT>
processElementInvoker =
new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
context.doFn,
@@ -211,7 +210,7 @@ public void outputWindowedValue(
executor,
10000,
Duration.standardSeconds(10));
- SplittableProcessElementInvoker<InputT, OutputT, RestrictionT,
TrackerT>.Result result =
+ SplittableProcessElementInvoker<InputT, OutputT, RestrictionT,
PositionT>.Result result =
processElementInvoker.invokeProcessElement(doFnInvoker, element,
tracker);
this.stateAccessor = null;
diff --git
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
index a3c17e6ca1b..dc99b769ee4 100644
---
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
+++
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
@@ -25,6 +25,7 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
@@ -57,7 +58,8 @@ private static Scan newScanInRange(Scan scan, ByteKeyRange
range) throws IOExcep
}
@ProcessElement
- public void processElement(ProcessContext c, ByteKeyRangeTracker tracker)
throws Exception {
+ public void processElement(ProcessContext c,
RestrictionTracker<ByteKeyRange, ByteKey> tracker)
+ throws Exception {
final HBaseQuery query = c.element();
TableName tableName = TableName.valueOf(query.getTableId());
Table table = connection.getTable(tableName);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 165172)
Time Spent: 40m (was: 0.5h)
> SplittableDoFn: Remove runner time execution information from public API
> surface
> --------------------------------------------------------------------------------
>
> Key: BEAM-5446
> URL: https://issues.apache.org/jira/browse/BEAM-5446
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: Minor
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Move the setting of "claim observers" within RestrictionTracker to another
> location to clean up the RestrictionTracker interface.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)