Remove WindowingInternals support from DoFnReflector The test themselves are replaced by mostly-hidden placeholders, to ensure that our code for handling generic parameters remains in place until new context parameters that use generics are added back.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/20208d68 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/20208d68 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/20208d68 Branch: refs/heads/gearpump-runner Commit: 20208d68142e756800507048d9b8339041f2db70 Parents: 063ff2f Author: Kenneth Knowles <[email protected]> Authored: Tue Aug 9 20:42:04 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Wed Aug 10 10:00:40 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/DoFn.java | 44 +++++- .../beam/sdk/transforms/DoFnReflector.java | 92 +++++++---- .../beam/sdk/transforms/DoFnReflectorTest.java | 157 ++++++++++++++----- .../transforms/DoFnReflectorBenchmark.java | 13 +- 4 files changed, 214 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 6f9a6b6..a06467e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; @@ -302,11 +301,43 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD BoundedWindow window(); /** - * Construct the {@link WindowingInternals} to use within a {@link DoFn} that - * needs it. This is called if the {@link ProcessElement} method has a parameter of type - * {@link WindowingInternals}. + * A placeholder for testing purposes. The return type itself is package-private and not + * implemented. */ - WindowingInternals<InputT, OutputT> windowingInternals(); + InputProvider<InputT> inputProvider(); + + /** + * A placeholder for testing purposes. The return type itself is package-private and not + * implemented. + */ + OutputReceiver<OutputT> outputReceiver(); + } + + static interface OutputReceiver<T> { + void output(T output); + } + + static interface InputProvider<T> { + T get(); + } + + /** For testing only, this {@link ExtraContextFactory} returns {@code null} for all parameters. */ + public static class FakeExtraContextFactory<InputT, OutputT> + implements ExtraContextFactory<InputT, OutputT> { + @Override + public BoundedWindow window() { + return null; + } + + @Override + public InputProvider<InputT> inputProvider() { + return null; + } + + @Override + public OutputReceiver<OutputT> outputReceiver() { + return null; + } } ///////////////////////////////////////////////////////////////////////////// @@ -331,8 +362,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * <ul> * <li>It must have at least one argument. * <li>Its first argument must be a {@link DoFn.ProcessContext}. - * <li>Its remaining arguments must be {@link BoundedWindow}, or - * {@link WindowingInternals WindowingInternals<InputT, OutputT>}. + * <li>Its remaining argument, if any, must be {@link BoundedWindow}. * </ul> */ @Documented http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java index c6168b3..3dfda55 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -35,6 +34,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -91,6 +91,7 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -121,20 +122,35 @@ public abstract class DoFnReflector { /** Any {@link BoundedWindow} parameter is populated by the window of the current element. */ WINDOW_OF_ELEMENT(Availability.PROCESS_ELEMENT_ONLY, BoundedWindow.class, "window") { @Override - public <InputT, OutputT> TypeToken<?> - tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) { + public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) { return TypeToken.of(BoundedWindow.class); } }, - WINDOWING_INTERNALS(Availability.PROCESS_ELEMENT_ONLY, - WindowingInternals.class, "windowingInternals") { + INPUT_PROVIDER(Availability.PROCESS_ELEMENT_ONLY, DoFn.InputProvider.class, "inputProvider") { @Override - public <InputT, OutputT> TypeToken<?> tokenFor( - TypeToken<InputT> in, TypeToken<OutputT> out) { - return new TypeToken<WindowingInternals<InputT, OutputT>>() {} - .where(new TypeParameter<InputT>() {}, in) - .where(new TypeParameter<OutputT>() {}, out); + public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) { + return new TypeToken<DoFn.InputProvider<InputT>>() {}.where( + new TypeParameter<InputT>() {}, in); + } + + @Override + public boolean isHidden() { + return true; + } + }, + + OUTPUT_RECEIVER( + Availability.PROCESS_ELEMENT_ONLY, DoFn.OutputReceiver.class, "outputReceiver") { + @Override + public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) { + return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where( + new TypeParameter<OutputT>() {}, out); + } + + @Override + public boolean isHidden() { + return true; } }; @@ -146,6 +162,14 @@ public abstract class DoFnReflector { abstract <InputT, OutputT> TypeToken<?> tokenFor( TypeToken<InputT> in, TypeToken<OutputT> out); + /** + * Indicates whether this enum is for testing only, hence should not appear in error messages, + * etc. Defaults to {@code false}. + */ + boolean isHidden() { + return false; + } + private final Class<?> rawType; private final Availability availability; private final transient MethodDescription method; @@ -241,16 +265,17 @@ public abstract class DoFnReflector { final TypeToken<?> in, final TypeToken<?> out) { return FluentIterable .from(extraProcessContexts.values()) + .filter(new Predicate<AdditionalParameter>() { + @Override + public boolean apply(@Nonnull AdditionalParameter additionalParameter) { + return !additionalParameter.isHidden(); + } + }) .transform(new Function<AdditionalParameter, String>() { - @Override - @Nullable - public String apply(@Nullable AdditionalParameter input) { - if (input == null) { - return null; - } else { - return formatType(input.tokenFor(in, out)); - } + @Nonnull + public String apply(@Nonnull AdditionalParameter input) { + return formatType(input.tokenFor(in, out)); } }) .toSortedSet(String.CASE_INSENSITIVE_ORDER); @@ -285,10 +310,9 @@ public abstract class DoFnReflector { * <li>The method has at least one argument. * <li>The first argument is of type firstContextArg. * <li>The remaining arguments have raw types that appear in {@code contexts} - * <li>Any generics on the extra context arguments match what is expected. Eg., - * {@code WindowingInternals<InputT, OutputT>} either matches the - * {@code InputT} and {@code OutputT} parameters of the - * {@code OldDoFn<InputT, OutputT>.ProcessContext}, or it uses a wildcard, etc. + * <li>Any generics on the extra context arguments match what is expected. Currently, this + * is exercised only by placeholders. For example, {@code InputReceiver<InputT> must either match + * the {@code InputT} {@code OldDoFn<InputT, OutputT>.ProcessContext} or use a wildcard, etc. * </ol> * * @param m the method to verify @@ -298,7 +322,8 @@ public abstract class DoFnReflector { * @param iParam TypeParameter representing the input type * @param oParam TypeParameter representing the output type */ - @VisibleForTesting static <InputT, OutputT> List<AdditionalParameter> verifyMethodArguments( + @VisibleForTesting + static <InputT, OutputT> List<AdditionalParameter> verifyMethodArguments( Method m, Map<Class<?>, AdditionalParameter> contexts, TypeToken<?> firstContextArg, @@ -607,11 +632,13 @@ public abstract class DoFnReflector { } @Override - public WindowingInternals<InputT, OutputT> windowingInternals() { - // The DoFn doesn't allow us to ask for these outside ProcessElements, so this - // should be unreachable. - throw new UnsupportedOperationException( - "Can only get the windowingInternals in ProcessElements"); + public DoFn.InputProvider<InputT> inputProvider() { + throw new UnsupportedOperationException("inputProvider() exists only for testing"); + } + + @Override + public DoFn.OutputReceiver<OutputT> outputReceiver() { + throw new UnsupportedOperationException("outputReceiver() exists only for testing"); } } @@ -679,8 +706,13 @@ public abstract class DoFnReflector { } @Override - public WindowingInternals<InputT, OutputT> windowingInternals() { - return context.windowingInternals(); + public DoFn.InputProvider<InputT> inputProvider() { + throw new UnsupportedOperationException("inputProvider() exists only for testing"); + } + + @Override + public DoFn.OutputReceiver<OutputT> outputReceiver() { + throw new UnsupportedOperationException("outputReceiver() exists only for testing"); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java index df9e441..c47e0cf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowingInternals; import org.junit.Before; import org.junit.Rule; @@ -71,7 +70,9 @@ public class DoFnReflectorTest { @Mock private BoundedWindow mockWindow; @Mock - private WindowingInternals<String, String> mockWindowingInternals; + private DoFn.InputProvider<String> mockInputProvider; + @Mock + private DoFn.OutputReceiver<String> mockOutputReceiver; private ExtraContextFactory<String, String> extraContextFactory; @@ -85,8 +86,13 @@ public class DoFnReflectorTest { } @Override - public WindowingInternals<String, String> windowingInternals() { - return mockWindowingInternals; + public DoFn.InputProvider<String> inputProvider() { + return mockInputProvider; + } + + @Override + public DoFn.OutputReceiver<String> outputReceiver() { + return mockOutputReceiver; } }; } @@ -257,16 +263,35 @@ public class DoFnReflectorTest { } @Test - public void testDoFnWithWindowingInternals() throws Exception { + public void testDoFnWithOutputReceiver() throws Exception { + final Invocations invocations = new Invocations("AnonymousClass"); + DoFnReflector reflector = underTest(new DoFn<String, String>() { + + @ProcessElement + public void processElement(ProcessContext c, DoFn.OutputReceiver<String> o) + throws Exception { + invocations.wasProcessElementInvoked = true; + assertSame(c, mockContext); + assertSame(o, mockOutputReceiver); + } + }); + + assertFalse(reflector.usesSingleWindow()); + + checkInvokeProcessElementWorks(reflector, invocations); + } + + @Test + public void testDoFnWithInputProvider() throws Exception { final Invocations invocations = new Invocations("AnonymousClass"); DoFnReflector reflector = underTest(new DoFn<String, String>() { @ProcessElement - public void processElement(ProcessContext c, WindowingInternals<String, String> w) + public void processElement(ProcessContext c, DoFn.InputProvider<String> i) throws Exception { invocations.wasProcessElementInvoked = true; assertSame(c, mockContext); - assertSame(w, mockWindowingInternals); + assertSame(i, mockInputProvider); } }); @@ -513,7 +538,7 @@ public class DoFnReflectorTest { thrown.expectMessage( "Integer is not a valid context parameter for method " + getClass().getName() + "#badExtraProcessContext(ProcessContext, Integer)" - + ". Should be one of [BoundedWindow, WindowingInternals<Integer, String>]"); + + ". Should be one of [BoundedWindow]"); DoFnReflector.verifyProcessMethodArguments( getClass().getDeclaredMethod("badExtraProcessContext", @@ -534,102 +559,148 @@ public class DoFnReflectorTest { } @SuppressWarnings("unused") - private void goodGenerics(DoFn<Integer, String>.ProcessContext c, - WindowingInternals<Integer, String> i1) {} + private void goodGenerics( + DoFn<Integer, String>.ProcessContext c, + DoFn.InputProvider<Integer> input, + DoFn.OutputReceiver<String> output) {} @Test public void testValidGenerics() throws Exception { - Method method = getClass().getDeclaredMethod("goodGenerics", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "goodGenerics", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") - private void goodWildcards(DoFn<Integer, String>.ProcessContext c, - WindowingInternals<?, ?> i1) {} + private void goodWildcards( + DoFn<Integer, String>.ProcessContext c, + DoFn.InputProvider<?> input, + DoFn.OutputReceiver<?> output) {} @Test public void testGoodWildcards() throws Exception { - Method method = getClass().getDeclaredMethod("goodWildcards", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "goodWildcards", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") - private void goodBoundedWildcards(DoFn<Integer, String>.ProcessContext c, - WindowingInternals<? super Integer, ? super String> i1) {} + private void goodBoundedWildcards( + DoFn<Integer, String>.ProcessContext c, + DoFn.InputProvider<? super Integer> input, + DoFn.OutputReceiver<? super String> output) {} @Test public void testGoodBoundedWildcards() throws Exception { - Method method = getClass().getDeclaredMethod("goodBoundedWildcards", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "goodBoundedWildcards", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") private <InputT, OutputT> void goodTypeVariables( DoFn<InputT, OutputT>.ProcessContext c, - WindowingInternals<InputT, OutputT> i1) {} + DoFn.InputProvider<InputT> input, + DoFn.OutputReceiver<OutputT> output) {} @Test public void testGoodTypeVariables() throws Exception { - Method method = getClass().getDeclaredMethod("goodTypeVariables", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "goodTypeVariables", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") - private void badGenericTwoArgs(DoFn<Integer, String>.ProcessContext c, - WindowingInternals<Integer, Integer> i1) {} + private void badGenericTwoArgs( + DoFn<Integer, String>.ProcessContext c, + DoFn.InputProvider<Integer> input, + DoFn.OutputReceiver<Integer> output) {} @Test public void testBadGenericsTwoArgs() throws Exception { - Method method = getClass().getDeclaredMethod("badGenericTwoArgs", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "badGenericTwoArgs", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); thrown.expect(IllegalStateException.class); thrown.expectMessage("Incompatible generics in context parameter " - + "WindowingInternals<Integer, Integer> " + + "OutputReceiver<Integer> " + "for method " + getClass().getName() - + "#badGenericTwoArgs(ProcessContext, WindowingInternals). Should be " - + "WindowingInternals<Integer, String>"); + + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver). Should be " + + "OutputReceiver<String>"); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") - private void badGenericWildCards(DoFn<Integer, String>.ProcessContext c, - WindowingInternals<Integer, ? super Integer> i1) {} + private void badGenericWildCards( + DoFn<Integer, String>.ProcessContext c, + DoFn.InputProvider<Integer> input, + DoFn.OutputReceiver<? super Integer> output) {} @Test public void testBadGenericWildCards() throws Exception { - Method method = getClass().getDeclaredMethod("badGenericWildCards", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "badGenericWildCards", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); thrown.expect(IllegalStateException.class); thrown.expectMessage("Incompatible generics in context parameter " - + "WindowingInternals<Integer, ? super Integer> for method " + + "OutputReceiver<? super Integer> for method " + getClass().getName() - + "#badGenericWildCards(ProcessContext, WindowingInternals). Should be " - + "WindowingInternals<Integer, String>"); + + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver). Should be " + + "OutputReceiver<String>"); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") private <InputT, OutputT> void badTypeVariables(DoFn<InputT, OutputT>.ProcessContext c, - WindowingInternals<InputT, InputT> i1) {} + DoFn.InputProvider<InputT> input, DoFn.OutputReceiver<InputT> output) {} @Test public void testBadTypeVariables() throws Exception { - Method method = getClass().getDeclaredMethod("badTypeVariables", - DoFn.ProcessContext.class, WindowingInternals.class); + Method method = + getClass() + .getDeclaredMethod( + "badTypeVariables", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); thrown.expect(IllegalStateException.class); thrown.expectMessage("Incompatible generics in context parameter " - + "WindowingInternals<InputT, InputT> for method " + getClass().getName() - + "#badTypeVariables(ProcessContext, WindowingInternals). Should be " - + "WindowingInternals<InputT, OutputT>"); + + "OutputReceiver<InputT> for method " + getClass().getName() + + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver). Should be " + + "OutputReceiver<OutputT>"); DoFnReflector.verifyProcessMethodArguments(method); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java index 233b8be..91ecd16 100644 --- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java @@ -58,18 +58,7 @@ public class DoFnReflectorBenchmark { private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT); private ExtraContextFactory<String, String> extraContextFactory = - new ExtraContextFactory<String, String>() { - - @Override - public BoundedWindow window() { - return null; - } - - @Override - public WindowingInternals<String, String> windowingInternals() { - return null; - } - }; + new DoFn.FakeExtraContextFactory<>(); private DoFnReflector doFnReflector; private OldDoFn<String, String> adaptedDoFnWithContext;
