Repository: incubator-beam Updated Branches: refs/heads/master ac252a7e1 -> 9de9ce69f
Allow BoundedWindow subclasses in DoFn parameter list Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c3e59fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c3e59fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c3e59fa Branch: refs/heads/master Commit: 9c3e59fab86e93477f14e0709ae8ecc37b84f3ef Parents: 85b908b Author: Kenneth Knowles <k...@google.com> Authored: Thu Nov 3 21:30:25 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Nov 7 15:25:03 2016 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/ParDo.java | 43 +++++++++++++- .../sdk/transforms/reflect/DoFnInvokers.java | 8 ++- .../sdk/transforms/reflect/DoFnSignature.java | 40 +++++++++---- .../sdk/transforms/reflect/DoFnSignatures.java | 41 +++++++++++-- .../beam/sdk/transforms/windowing/WindowFn.java | 12 ++++ .../apache/beam/sdk/transforms/ParDoTest.java | 61 ++++++++++++++++++++ .../transforms/reflect/DoFnInvokersTest.java | 6 +- 7 files changed, 190 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 0684a5c..26799c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -32,7 +32,10 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.MethodWithExtraParameters; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.StringUtils; @@ -41,6 +44,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypedPValue; /** @@ -548,6 +552,42 @@ public class ParDo { } /** + * Perform common validations of the {@link DoFn} against the input {@link PCollection}, for + * example ensuring that the window type expected by the {@link DoFn} matches the window type of + * the {@link PCollection}. + */ + private static <InputT, OutputT> void validateWindowType( + PCollection<? extends InputT> input, Serializable fn) { + // No validation for OldDoFn + if (!(fn instanceof DoFn)) { + return; + } + + DoFnSignature signature = DoFnSignatures.INSTANCE.getSignature((Class) fn.getClass()); + + TypeDescriptor<? extends BoundedWindow> actualWindowT = + input.getWindowingStrategy().getWindowFn().getWindowTypeDescriptor(); + + validateWindowTypeForMethod(actualWindowT, signature.processElement()); + for (OnTimerMethod method : signature.onTimerMethods().values()) { + validateWindowTypeForMethod(actualWindowT, method); + } + } + + private static void validateWindowTypeForMethod( + TypeDescriptor<? extends BoundedWindow> actualWindowT, + MethodWithExtraParameters methodSignature) { + if (methodSignature.windowT() != null) { + checkArgument( + methodSignature.windowT().isSupertypeOf(actualWindowT), + "%s expects window type %s, which is not a supertype of actual window type %s", + methodSignature.targetMethod(), + methodSignature.windowT(), + actualWindowT); + } + } + + /** * Perform common validations of the {@link DoFn}, for example ensuring that state is used * correctly and that its features can be supported. */ @@ -768,6 +808,7 @@ public class ParDo { public PCollection<OutputT> apply(PCollection<? extends InputT> input) { checkArgument( !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner"); + validateWindowType(input, fn); return PCollection.<OutputT>createPrimitiveOutputInternal( input.getPipeline(), input.getWindowingStrategy(), @@ -1024,7 +1065,7 @@ public class ParDo { public PCollectionTuple apply(PCollection<? extends InputT> input) { checkArgument( !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner"); - + validateWindowType(input, fn); PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal( input.getPipeline(), TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index ba95f98..b975711 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -510,8 +510,8 @@ public class DoFnInvokers { private static MethodDescription getExtraContextFactoryMethodDescription( String methodName, Class<?>... parameterTypes) { try { - return new MethodDescription.ForLoadedMethod( - DoFn.ExtraContextFactory.class.getMethod(methodName, parameterTypes)); + return new MethodDescription.ForLoadedMethod( + DoFn.ExtraContextFactory.class.getMethod(methodName, parameterTypes)); } catch (Exception e) { throw new IllegalStateException( String.format( @@ -538,7 +538,9 @@ public class DoFnInvokers { @Override public StackManipulation dispatch(WindowParameter p) { - return simpleExtraContextParameter("window", pushExtraContextFactory); + return new StackManipulation.Compound( + simpleExtraContextParameter("window", pushExtraContextFactory), + TypeCasting.to(new TypeDescription.ForLoadedType(p.windowT().getRawType()))); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index befc10b..4cbe219 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -144,6 +145,10 @@ public abstract class DoFnSignature { * <p>Validation that these are allowed is external to this class. */ List<Parameter> extraParameters(); + + /** The type of window expected by this method, if any. */ + @Nullable + TypeDescriptor<? extends BoundedWindow> windowT(); } /** A descriptor for an optional parameter of the {@link DoFn.ProcessElement} method. */ @@ -229,18 +234,14 @@ public abstract class DoFnSignature { } // These parameter descriptors are constant - private static final WindowParameter BOUNDED_WINDOW_PARAMETER = - new AutoValue_DoFnSignature_Parameter_WindowParameter(); private static final InputProviderParameter INPUT_PROVIDER_PARAMETER = new AutoValue_DoFnSignature_Parameter_InputProviderParameter(); private static final OutputReceiverParameter OUTPUT_RECEIVER_PARAMETER = new AutoValue_DoFnSignature_Parameter_OutputReceiverParameter(); - /** - * Returns a {@link WindowParameter}. - */ - public static WindowParameter boundedWindow() { - return BOUNDED_WINDOW_PARAMETER; + /** Returns a {@link WindowParameter}. */ + public static WindowParameter boundedWindow(TypeDescriptor<? extends BoundedWindow> windowT) { + return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT); } /** @@ -283,6 +284,7 @@ public abstract class DoFnSignature { @AutoValue public abstract static class WindowParameter extends Parameter { WindowParameter() {} + public abstract TypeDescriptor<? extends BoundedWindow> windowT(); } /** @@ -357,6 +359,10 @@ public abstract class DoFnSignature { @Nullable public abstract TypeDescriptor<?> trackerT(); + /** The window type used by this method, if any. */ + @Nullable + public abstract TypeDescriptor<? extends BoundedWindow> windowT(); + /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */ public abstract boolean hasReturnValue(); @@ -364,9 +370,14 @@ public abstract class DoFnSignature { Method targetMethod, List<Parameter> extraParameters, TypeDescriptor<?> trackerT, + @Nullable TypeDescriptor<? extends BoundedWindow> windowT, boolean hasReturnValue) { return new AutoValue_DoFnSignature_ProcessElementMethod( - targetMethod, Collections.unmodifiableList(extraParameters), trackerT, hasReturnValue); + targetMethod, + Collections.unmodifiableList(extraParameters), + trackerT, + windowT, + hasReturnValue); } /** @@ -381,6 +392,7 @@ public abstract class DoFnSignature { extraParameters(), Predicates.or( Predicates.instanceOf(WindowParameter.class), + Predicates.instanceOf(TimerParameter.class), Predicates.instanceOf(StateParameter.class))); } @@ -404,13 +416,21 @@ public abstract class DoFnSignature { @Override public abstract Method targetMethod(); + /** The window type used by this method, if any. */ + @Nullable + public abstract TypeDescriptor<? extends BoundedWindow> windowT(); + /** Types of optional parameters of the annotated method, in the order they appear. */ @Override public abstract List<Parameter> extraParameters(); - static OnTimerMethod create(Method targetMethod, String id, List<Parameter> extraParameters) { + static OnTimerMethod create( + Method targetMethod, + String id, + TypeDescriptor<? extends BoundedWindow> windowT, + List<Parameter> extraParameters) { return new AutoValue_DoFnSignature_OnTimerMethod( - id, targetMethod, Collections.unmodifiableList(extraParameters)); + id, targetMethod, windowT, Collections.unmodifiableList(extraParameters)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 09c5f3d..e918182 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -149,6 +150,9 @@ public class DoFnSignatures { private final Map<String, TimerParameter> timerParameters = new HashMap<>(); private final List<Parameter> extraParameters = new ArrayList<>(); + @Nullable + private TypeDescriptor<? extends BoundedWindow> windowT; + private MethodAnalysisContext() {} /** Indicates whether a {@link RestrictionTrackerParameter} is known in this context. */ @@ -157,6 +161,18 @@ public class DoFnSignatures { extraParameters, Predicates.instanceOf(RestrictionTrackerParameter.class)); } + /** Indicates whether a {@link WindowParameter} is known in this context. */ + public boolean hasWindowParameter() { + return Iterables.any( + extraParameters, Predicates.instanceOf(WindowParameter.class)); + } + + /** The window type, if any, used by this method. */ + @Nullable + public TypeDescriptor<? extends BoundedWindow> getWindowType() { + return windowT; + } + /** State parameters declared in this context, keyed by {@link StateId}. */ public Map<String, StateParameter> getStateParameters() { return Collections.unmodifiableMap(stateParameters); @@ -599,6 +615,8 @@ public class DoFnSignatures { MethodAnalysisContext methodContext = MethodAnalysisContext.create(); + @Nullable TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnClass, m); + List<DoFnSignature.Parameter> extraParameters = new ArrayList<>(); TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT); ErrorReporter onTimerErrors = errors.forMethod(DoFn.OnTimer.class, m); @@ -618,7 +636,7 @@ public class DoFnSignatures { expectedOutputReceiverT)); } - return DoFnSignature.OnTimerMethod.create(m, timerId, extraParameters); + return DoFnSignature.OnTimerMethod.create(m, timerId, windowT, extraParameters); } @VisibleForTesting @@ -650,6 +668,7 @@ public class DoFnSignatures { formatType(processContextT)); TypeDescriptor<?> trackerT = getTrackerType(fnClass, m); + TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnClass, m); TypeDescriptor<?> expectedInputProviderT = inputProviderTypeOf(inputT); TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT); for (int i = 1; i < params.length; ++i) { @@ -684,6 +703,7 @@ public class DoFnSignatures { m, methodContext.getExtraParameters(), trackerT, + windowT, DoFn.ProcessContinuation.class.equals(m.getReturnType())); } @@ -700,12 +720,12 @@ public class DoFnSignatures { ErrorReporter paramErrors = methodErrors.forParameter(param); - if (rawType.equals(BoundedWindow.class)) { + if (BoundedWindow.class.isAssignableFrom(rawType)) { methodErrors.checkArgument( - !methodContext.getExtraParameters().contains(Parameter.boundedWindow()), + !methodContext.hasWindowParameter(), "Multiple %s parameters", BoundedWindow.class.getSimpleName()); - return Parameter.boundedWindow(); + return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) paramT); } else if (rawType.equals(DoFn.InputProvider.class)) { methodErrors.checkArgument( !methodContext.getExtraParameters().contains(Parameter.inputProvider()), @@ -856,6 +876,19 @@ public class DoFnSignatures { return null; } + @Nullable + private static TypeDescriptor<? extends BoundedWindow> getWindowType( + TypeDescriptor<?> fnClass, Method method) { + Type[] params = method.getGenericParameterTypes(); + for (int i = 0; i < params.length; i++) { + TypeDescriptor<?> paramT = fnClass.resolveType(params[i]); + if (BoundedWindow.class.isAssignableFrom(paramT.getRawType())) { + return (TypeDescriptor<? extends BoundedWindow>) paramT; + } + } + return null; + } + @VisibleForTesting static DoFnSignature.BundleMethod analyzeBundleMethod( ErrorReporter errors, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java index 127fb4f..ea0bb79 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Instant; /** @@ -159,6 +160,17 @@ public abstract class WindowFn<T, W extends BoundedWindow> } /** + * Returns a {@link TypeDescriptor} capturing what is known statically about the window type of + * this {@link WindowFn} instance's most-derived class. + * + * <p>In the normal case of a concrete {@link WindowFn} subclass with no generic type parameters + * of its own (including anonymous inner classes), this will be a complete non-generic type. + */ + public TypeDescriptor<W> getWindowTypeDescriptor() { + return new TypeDescriptor<W>(this) {}; + } + + /** * {@inheritDoc} * * <p>By default, does not register any display data. Implementors may override this method http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index d3ea9fb..26f5570 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -51,6 +51,8 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn.OnTimer; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; @@ -58,7 +60,12 @@ import org.apache.beam.sdk.transforms.display.DisplayDataMatchers; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; @@ -72,6 +79,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -1513,6 +1521,59 @@ public class ParDoTest implements Serializable { } @Test + public void testRejectsWrongWindowType() { + Pipeline p = TestPipeline.create(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(GlobalWindow.class.getSimpleName()); + thrown.expectMessage(IntervalWindow.class.getSimpleName()); + thrown.expectMessage("window type"); + thrown.expectMessage("not a supertype"); + + p.apply(Create.of(1, 2, 3)) + .apply( + ParDo.of( + new DoFn<Integer, Integer>() { + @ProcessElement + public void process(ProcessContext c, IntervalWindow w) {} + })); + } + + /** + * Tests that it is OK to use different window types in the parameter lists to different + * {@link DoFn} functions, as long as they are all subtypes of the actual window type + * of the input. + * + * <p>Today, the only method other than {@link ProcessElement @ProcessElement} that can accept + * extended parameters is {@link OnTimer @OnTimer}, which is rejected before it reaches window + * type validation. Rather than delay validation, this test is temporarily disabled. + */ + @Ignore("ParDo rejects this on account of it using timers") + @Test + public void testMultipleWindowSubtypesOK() { + final String timerId = "gobbledegook"; + + Pipeline p = TestPipeline.create(); + + p.apply(Create.of(1, 2, 3)) + .apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(10)))) + .apply( + ParDo.of( + new DoFn<Integer, Integer>() { + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void process(ProcessContext c, IntervalWindow w) {} + + @OnTimer(timerId) + public void onTimer(BoundedWindow w) {} + })); + + // If it doesn't crash, we made it! + } + + @Test public void testRejectsSplittableDoFnByDefault() { // ParDo with a splittable DoFn must be overridden by the runner. // Without an override, applying it directly must fail. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 60f82a8..7bdc007 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -44,7 +44,7 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; @@ -71,7 +71,7 @@ public class DoFnInvokersTest { @Rule public ExpectedException thrown = ExpectedException.none(); @Mock private DoFn<String, String>.ProcessContext mockContext; - @Mock private BoundedWindow mockWindow; + @Mock private IntervalWindow mockWindow; @Mock private DoFn.InputProvider<String> mockInputProvider; @Mock private DoFn.OutputReceiver<String> mockOutputReceiver; @Mock private WindowingInternals<String, String> mockWindowingInternals; @@ -173,7 +173,7 @@ public class DoFnInvokersTest { public void testDoFnWithWindow() throws Exception { class MockFn extends DoFn<String, String> { @DoFn.ProcessElement - public void processElement(ProcessContext c, BoundedWindow w) throws Exception {} + public void processElement(ProcessContext c, IntervalWindow w) throws Exception {} } MockFn fn = mock(MockFn.class); assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));