http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java new file mode 100644 index 0000000..6730140 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import org.apache.beam.sdk.transforms.DoFn; + +import com.google.auto.value.AutoValue; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; + +import javax.annotation.Nullable; + +/** + * Describes the signature of a {@link DoFn}, in particular, which features it uses, which extra + * context it requires, types of the input and output elements, etc. + * + * <p>See <a href="https://s.apache.org/a-new-dofn">A new DoFn</a>. + */ +@AutoValue +public abstract class DoFnSignature { + public abstract Class<? extends DoFn> fnClass(); + + public abstract ProcessElementMethod processElement(); + + @Nullable + public abstract BundleMethod startBundle(); + + @Nullable + public abstract BundleMethod finishBundle(); + + @Nullable + public abstract LifecycleMethod setup(); + + @Nullable + public abstract LifecycleMethod teardown(); + + static DoFnSignature create( + Class<? extends DoFn> fnClass, + ProcessElementMethod processElement, + @Nullable BundleMethod startBundle, + @Nullable BundleMethod finishBundle, + @Nullable LifecycleMethod setup, + @Nullable LifecycleMethod teardown) { + return new AutoValue_DoFnSignature( + fnClass, + processElement, + startBundle, + finishBundle, + setup, + teardown); + } + + /** Describes a {@link DoFn.ProcessElement} method. */ + @AutoValue + public abstract static class ProcessElementMethod { + enum Parameter { + BOUNDED_WINDOW, + INPUT_PROVIDER, + OUTPUT_RECEIVER + } + + public abstract Method targetMethod(); + + public abstract List<Parameter> extraParameters(); + + static ProcessElementMethod create(Method targetMethod, List<Parameter> extraParameters) { + return new AutoValue_DoFnSignature_ProcessElementMethod( + targetMethod, Collections.unmodifiableList(extraParameters)); + } + + /** @return true if the reflected {@link DoFn} uses a Single Window. */ + public boolean usesSingleWindow() { + return extraParameters().contains(Parameter.BOUNDED_WINDOW); + } + } + + /** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */ + @AutoValue + public abstract static class BundleMethod { + public abstract Method targetMethod(); + + static BundleMethod create(Method targetMethod) { + return new AutoValue_DoFnSignature_BundleMethod(targetMethod); + } + } + + /** Describes a {@link DoFn.Setup} or {@link DoFn.Teardown} method. */ + @AutoValue + public abstract static class LifecycleMethod { + public abstract Method targetMethod(); + + static LifecycleMethod create(Method targetMethod) { + return new AutoValue_DoFnSignature_LifecycleMethod(targetMethod); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java new file mode 100644 index 0000000..80b3b4f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.common.ReflectHelpers; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.reflect.TypeParameter; +import com.google.common.reflect.TypeToken; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + +/** + * Parses a {@link DoFn} and computes its {@link DoFnSignature}. See {@link #getOrParseSignature}. + */ +public class DoFnSignatures { + public static final DoFnSignatures INSTANCE = new DoFnSignatures(); + + private DoFnSignatures() {} + + private final Map<Class<?>, DoFnSignature> signatureCache = new LinkedHashMap<>(); + + /** @return the {@link DoFnSignature} for the given {@link DoFn}. */ + public synchronized DoFnSignature getOrParseSignature( + @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) { + DoFnSignature signature = signatureCache.get(fn); + if (signature == null) { + signatureCache.put(fn, signature = parseSignature(fn)); + } + return signature; + } + + /** Analyzes a given {@link DoFn} class and extracts its {@link DoFnSignature}. */ + private static DoFnSignature parseSignature(Class<? extends DoFn> fnClass) { + TypeToken<?> inputT = null; + TypeToken<?> outputT = null; + + // Extract the input and output type. + checkArgument( + DoFn.class.isAssignableFrom(fnClass), + "%s must be subtype of DoFn", + fnClass.getSimpleName()); + TypeToken<? extends DoFn> fnToken = TypeToken.of(fnClass); + for (TypeToken<?> supertype : fnToken.getTypes()) { + if (!supertype.getRawType().equals(DoFn.class)) { + continue; + } + Type[] args = ((ParameterizedType) supertype.getType()).getActualTypeArguments(); + inputT = TypeToken.of(args[0]); + outputT = TypeToken.of(args[1]); + } + checkNotNull(inputT, "Unable to determine input type from %s", fnClass); + + Method processElementMethod = findAnnotatedMethod(DoFn.ProcessElement.class, fnClass, true); + Method startBundleMethod = findAnnotatedMethod(DoFn.StartBundle.class, fnClass, false); + Method finishBundleMethod = findAnnotatedMethod(DoFn.FinishBundle.class, fnClass, false); + Method setupMethod = findAnnotatedMethod(DoFn.Setup.class, fnClass, false); + Method teardownMethod = findAnnotatedMethod(DoFn.Teardown.class, fnClass, false); + + return DoFnSignature.create( + fnClass, + analyzeProcessElementMethod(fnToken, processElementMethod, inputT, outputT), + (startBundleMethod == null) + ? null + : analyzeBundleMethod(fnToken, startBundleMethod, inputT, outputT), + (finishBundleMethod == null) + ? null + : analyzeBundleMethod(fnToken, finishBundleMethod, inputT, outputT), + (setupMethod == null) ? null : analyzeLifecycleMethod(setupMethod), + (teardownMethod == null) ? null : analyzeLifecycleMethod(teardownMethod)); + } + + /** + * Generates a type token for {@code DoFn<InputT, OutputT>.ProcessContext} given {@code InputT} + * and {@code OutputT}. + */ + private static <InputT, OutputT> + TypeToken<DoFn<InputT, OutputT>.ProcessContext> doFnProcessContextTypeOf( + TypeToken<InputT> inputT, TypeToken<OutputT> outputT) { + return new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {}.where( + new TypeParameter<InputT>() {}, inputT) + .where(new TypeParameter<OutputT>() {}, outputT); + } + + /** + * Generates a type token for {@code DoFn<InputT, OutputT>.Context} given {@code InputT} and + * {@code OutputT}. + */ + private static <InputT, OutputT> TypeToken<DoFn<InputT, OutputT>.Context> doFnContextTypeOf( + TypeToken<InputT> inputT, TypeToken<OutputT> outputT) { + return new TypeToken<DoFn<InputT, OutputT>.Context>() {}.where( + new TypeParameter<InputT>() {}, inputT) + .where(new TypeParameter<OutputT>() {}, outputT); + } + + /** Generates a type token for {@code DoFn.InputProvider<InputT>} given {@code InputT}. */ + private static <InputT> TypeToken<DoFn.InputProvider<InputT>> inputProviderTypeOf( + TypeToken<InputT> inputT) { + return new TypeToken<DoFn.InputProvider<InputT>>() {}.where( + new TypeParameter<InputT>() {}, inputT); + } + + /** Generates a type token for {@code DoFn.OutputReceiver<OutputT>} given {@code OutputT}. */ + private static <OutputT> TypeToken<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf( + TypeToken<OutputT> inputT) { + return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where( + new TypeParameter<OutputT>() {}, inputT); + } + + @VisibleForTesting + static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod( + TypeToken<? extends DoFn> fnClass, Method m, TypeToken<?> inputT, TypeToken<?> outputT) { + checkArgument( + void.class.equals(m.getReturnType()), "%s must have a void return type", format(m)); + checkArgument(!m.isVarArgs(), "%s must not have var args", format(m)); + + TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT); + + Type[] params = m.getGenericParameterTypes(); + TypeToken<?> contextToken = null; + if (params.length > 0) { + contextToken = fnClass.resolveType(params[0]); + } + checkArgument( + contextToken != null && contextToken.equals(processContextToken), + "%s must take a %s as its first argument", + format(m), + formatType(processContextToken)); + + List<DoFnSignature.ProcessElementMethod.Parameter> extraParameters = new ArrayList<>(); + TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT); + TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT); + for (int i = 1; i < params.length; ++i) { + TypeToken<?> param = fnClass.resolveType(params[i]); + Class<?> rawType = param.getRawType(); + if (rawType.equals(BoundedWindow.class)) { + checkArgument( + !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW), + "Multiple BoundedWindow parameters in %s", + format(m)); + extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW); + } else if (rawType.equals(DoFn.InputProvider.class)) { + checkArgument( + !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER), + "Multiple InputProvider parameters in %s", + format(m)); + checkArgument( + param.equals(expectedInputProviderT), + "Wrong type of InputProvider parameter for method %s: %s, should be %s", + format(m), + formatType(param), + formatType(expectedInputProviderT)); + extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER); + } else if (rawType.equals(DoFn.OutputReceiver.class)) { + checkArgument( + !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER), + "Multiple OutputReceiver parameters in %s", + format(m)); + checkArgument( + param.equals(expectedOutputReceiverT), + "Wrong type of OutputReceiver parameter for method %s: %s, should be %s", + format(m), + formatType(param), + formatType(expectedOutputReceiverT)); + extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER); + } else { + List<String> allowedParamTypes = + Arrays.asList(formatType(new TypeToken<BoundedWindow>() {})); + checkArgument( + false, + "%s is not a valid context parameter for method %s. Should be one of %s", + formatType(param), + format(m), + allowedParamTypes); + } + } + + return DoFnSignature.ProcessElementMethod.create(m, extraParameters); + } + + @VisibleForTesting + static DoFnSignature.BundleMethod analyzeBundleMethod( + TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT, TypeToken<?> outputT) { + checkArgument( + void.class.equals(m.getReturnType()), "%s must have a void return type", format(m)); + checkArgument(!m.isVarArgs(), "%s must not have var args", format(m)); + + TypeToken<?> expectedContextToken = doFnContextTypeOf(inputT, outputT); + + Type[] params = m.getGenericParameterTypes(); + checkArgument( + params.length == 1, + "%s must have a single argument of type %s", + format(m), + formatType(expectedContextToken)); + TypeToken<?> contextToken = fnToken.resolveType(params[0]); + checkArgument( + contextToken.equals(expectedContextToken), + "Wrong type of context argument to %s: %s, must be %s", + format(m), + formatType(contextToken), + formatType(expectedContextToken)); + + return DoFnSignature.BundleMethod.create(m); + } + + private static DoFnSignature.LifecycleMethod analyzeLifecycleMethod(Method m) { + checkArgument( + void.class.equals(m.getReturnType()), "%s must have a void return type", format(m)); + checkArgument( + m.getGenericParameterTypes().length == 0, "%s must take zero arguments", format(m)); + return DoFnSignature.LifecycleMethod.create(m); + } + + private static Collection<Method> declaredMethodsWithAnnotation( + Class<? extends Annotation> anno, Class<?> startClass, Class<?> stopClass) { + Collection<Method> matches = new ArrayList<>(); + + Class<?> clazz = startClass; + LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>(); + + // First, find all declared methods on the startClass and parents (up to stopClass) + while (clazz != null && !clazz.equals(stopClass)) { + for (Method method : clazz.getDeclaredMethods()) { + if (method.isAnnotationPresent(anno)) { + matches.add(method); + } + } + + Collections.addAll(interfaces, clazz.getInterfaces()); + + clazz = clazz.getSuperclass(); + } + + // Now, iterate over all the discovered interfaces + for (Method method : ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces)) { + if (method.isAnnotationPresent(anno)) { + matches.add(method); + } + } + return matches; + } + + private static Method findAnnotatedMethod( + Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) { + Collection<Method> matches = declaredMethodsWithAnnotation(anno, fnClazz, DoFn.class); + + if (matches.size() == 0) { + checkArgument( + !required, + "No method annotated with @%s found in %s", + anno.getSimpleName(), + fnClazz.getName()); + return null; + } + + // If we have at least one match, then either it should be the only match + // or it should be an extension of the other matches (which came from parent + // classes). + Method first = matches.iterator().next(); + for (Method other : matches) { + checkArgument( + first.getName().equals(other.getName()) + && Arrays.equals(first.getParameterTypes(), other.getParameterTypes()), + "Found multiple methods annotated with @%s. [%s] and [%s]", + anno.getSimpleName(), + format(first), + format(other)); + } + + // We need to be able to call it. We require it is public. + checkArgument( + (first.getModifiers() & Modifier.PUBLIC) != 0, "%s must be public", format(first)); + + // And make sure its not static. + checkArgument( + (first.getModifiers() & Modifier.STATIC) == 0, "%s must not be static", format(first)); + + return first; + } + + private static String format(Method m) { + return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m); + } + + private static String formatType(TypeToken<?> t) { + return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java new file mode 100644 index 0000000..4df5209 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines reflection-based utilities for analyzing {@link org.apache.beam.sdk.transforms.DoFn}'s + * and creating {@link org.apache.beam.sdk.transforms.reflect.DoFnSignature}'s and + * {@link org.apache.beam.sdk.transforms.reflect.DoFnInvoker}'s from them. + */ +package org.apache.beam.sdk.transforms.reflect; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java deleted file mode 100644 index e05e5e2..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java +++ /dev/null @@ -1,822 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.transforms.DoFn.Context; -import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; -import org.apache.beam.sdk.transforms.DoFn.ProcessContext; -import org.apache.beam.sdk.transforms.DoFn.ProcessElement; -import org.apache.beam.sdk.transforms.DoFn.Setup; -import org.apache.beam.sdk.transforms.DoFn.Teardown; -import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.UserCodeException; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import java.lang.reflect.Method; - -/** - * Tests for {@link DoFnReflector}. - */ -@RunWith(JUnit4.class) -public class DoFnReflectorTest { - - /** - * A convenience struct holding flags that indicate whether a particular method was invoked. - */ - public static class Invocations { - public boolean wasProcessElementInvoked = false; - public boolean wasStartBundleInvoked = false; - public boolean wasFinishBundleInvoked = false; - public boolean wasSetupInvoked = false; - public boolean wasTeardownInvoked = false; - private final String name; - - public Invocations(String name) { - this.name = name; - } - } - - private DoFn<String, String> fn; - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Mock - private DoFn<String, String>.ProcessContext mockContext; - @Mock - private BoundedWindow mockWindow; - @Mock - private DoFn.InputProvider<String> mockInputProvider; - @Mock - private DoFn.OutputReceiver<String> mockOutputReceiver; - - private ExtraContextFactory<String, String> extraContextFactory; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - this.extraContextFactory = new ExtraContextFactory<String, String>() { - @Override - public BoundedWindow window() { - return mockWindow; - } - - @Override - public DoFn.InputProvider<String> inputProvider() { - return mockInputProvider; - } - - @Override - public DoFn.OutputReceiver<String> outputReceiver() { - return mockOutputReceiver; - } - }; - } - - private DoFnReflector underTest(DoFn<String, String> fn) { - this.fn = fn; - return DoFnReflector.of(fn.getClass()); - } - - private void checkInvokeProcessElementWorks( - DoFnReflector r, Invocations... invocations) throws Exception { - assertTrue("Need at least one invocation to check", invocations.length >= 1); - for (Invocations invocation : invocations) { - assertFalse("Should not yet have called processElement on " + invocation.name, - invocation.wasProcessElementInvoked); - } - r.bindInvoker(fn).invokeProcessElement(mockContext, extraContextFactory); - for (Invocations invocation : invocations) { - assertTrue("Should have called processElement on " + invocation.name, - invocation.wasProcessElementInvoked); - } - } - - private void checkInvokeStartBundleWorks( - DoFnReflector r, Invocations... invocations) throws Exception { - assertTrue("Need at least one invocation to check", invocations.length >= 1); - for (Invocations invocation : invocations) { - assertFalse("Should not yet have called startBundle on " + invocation.name, - invocation.wasStartBundleInvoked); - } - r.bindInvoker(fn).invokeStartBundle(mockContext, extraContextFactory); - for (Invocations invocation : invocations) { - assertTrue("Should have called startBundle on " + invocation.name, - invocation.wasStartBundleInvoked); - } - } - - private void checkInvokeFinishBundleWorks( - DoFnReflector r, Invocations... invocations) throws Exception { - assertTrue("Need at least one invocation to check", invocations.length >= 1); - for (Invocations invocation : invocations) { - assertFalse("Should not yet have called finishBundle on " + invocation.name, - invocation.wasFinishBundleInvoked); - } - r.bindInvoker(fn).invokeFinishBundle(mockContext, extraContextFactory); - for (Invocations invocation : invocations) { - assertTrue("Should have called finishBundle on " + invocation.name, - invocation.wasFinishBundleInvoked); - } - } - - private void checkInvokeSetupWorks(DoFnReflector r, Invocations... invocations) throws Exception { - assertTrue("Need at least one invocation to check", invocations.length >= 1); - for (Invocations invocation : invocations) { - assertFalse("Should not yet have called setup on " + invocation.name, - invocation.wasSetupInvoked); - } - r.bindInvoker(fn).invokeSetup(); - for (Invocations invocation : invocations) { - assertTrue("Should have called setup on " + invocation.name, - invocation.wasSetupInvoked); - } - } - - private void checkInvokeTeardownWorks(DoFnReflector r, Invocations... invocations) - throws Exception { - assertTrue("Need at least one invocation to check", invocations.length >= 1); - for (Invocations invocation : invocations) { - assertFalse("Should not yet have called teardown on " + invocation.name, - invocation.wasTeardownInvoked); - } - r.bindInvoker(fn).invokeTeardown(); - for (Invocations invocation : invocations) { - assertTrue("Should have called teardown on " + invocation.name, - invocation.wasTeardownInvoked); - } - } - - @Test - public void testDoFnWithNoExtraContext() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFnReflector reflector = underTest(new DoFn<String, String>() { - - @ProcessElement - public void processElement(ProcessContext c) - throws Exception { - invocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - } - }); - - assertFalse(reflector.usesSingleWindow()); - - checkInvokeProcessElementWorks(reflector, invocations); - } - - @Test - public void testDoFnInvokersReused() throws Exception { - // Ensures that we don't create a new Invoker class for every instance of the OldDoFn. - IdentityParent fn1 = new IdentityParent(); - IdentityParent fn2 = new IdentityParent(); - DoFnReflector reflector1 = underTest(fn1); - DoFnReflector reflector2 = underTest(fn2); - assertSame("DoFnReflector instances should be cached and reused for identical types", - reflector1, reflector2); - assertSame("Invoker classes should only be generated once for each type", - reflector1.bindInvoker(fn1).getClass(), - reflector2.bindInvoker(fn2).getClass()); - } - - interface InterfaceWithProcessElement { - @ProcessElement - void processElement(DoFn<String, String>.ProcessContext c); - } - - interface LayersOfInterfaces extends InterfaceWithProcessElement {} - - private class IdentityUsingInterfaceWithProcessElement - extends DoFn<String, String> - implements LayersOfInterfaces { - - private Invocations invocations = new Invocations("Named Class"); - - @Override - public void processElement(DoFn<String, String>.ProcessContext c) { - invocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - } - } - - @Test - public void testDoFnWithProcessElementInterface() throws Exception { - IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement(); - DoFnReflector reflector = underTest(fn); - assertFalse(reflector.usesSingleWindow()); - checkInvokeProcessElementWorks(reflector, fn.invocations); - } - - private class IdentityParent extends DoFn<String, String> { - protected Invocations parentInvocations = new Invocations("IdentityParent"); - - @ProcessElement - public void process(ProcessContext c) { - parentInvocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - } - } - - private class IdentityChildWithoutOverride extends IdentityParent { - } - - private class IdentityChildWithOverride extends IdentityParent { - protected Invocations childInvocations = new Invocations("IdentityChildWithOverride"); - - @Override - public void process(DoFn<String, String>.ProcessContext c) { - super.process(c); - childInvocations.wasProcessElementInvoked = true; - } - } - - @Test - public void testDoFnWithMethodInSuperclass() throws Exception { - IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride(); - DoFnReflector reflector = underTest(fn); - assertFalse(reflector.usesSingleWindow()); - checkInvokeProcessElementWorks(reflector, fn.parentInvocations); - } - - @Test - public void testDoFnWithMethodInSubclass() throws Exception { - IdentityChildWithOverride fn = new IdentityChildWithOverride(); - DoFnReflector reflector = underTest(fn); - assertFalse(reflector.usesSingleWindow()); - checkInvokeProcessElementWorks(reflector, fn.parentInvocations, fn.childInvocations); - } - - @Test - public void testDoFnWithWindow() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFnReflector reflector = underTest(new DoFn<String, String>() { - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow w) - throws Exception { - invocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - assertSame(w, mockWindow); - } - }); - - assertTrue(reflector.usesSingleWindow()); - - checkInvokeProcessElementWorks(reflector, invocations); - } - - @Test - public void testDoFnWithOutputReceiver() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFnReflector reflector = underTest(new DoFn<String, String>() { - - @ProcessElement - public void processElement(ProcessContext c, DoFn.OutputReceiver<String> o) - throws Exception { - invocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - assertSame(o, mockOutputReceiver); - } - }); - - assertFalse(reflector.usesSingleWindow()); - - checkInvokeProcessElementWorks(reflector, invocations); - } - - @Test - public void testDoFnWithInputProvider() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFnReflector reflector = underTest(new DoFn<String, String>() { - - @ProcessElement - public void processElement(ProcessContext c, DoFn.InputProvider<String> i) - throws Exception { - invocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - assertSame(i, mockInputProvider); - } - }); - - assertFalse(reflector.usesSingleWindow()); - - checkInvokeProcessElementWorks(reflector, invocations); - } - - @Test - public void testDoFnWithStartBundle() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFnReflector reflector = underTest(new DoFn<String, String>() { - @ProcessElement - public void processElement(@SuppressWarnings("unused") ProcessContext c) {} - - @StartBundle - public void startBundle(Context c) { - invocations.wasStartBundleInvoked = true; - assertSame(c, mockContext); - } - - @FinishBundle - public void finishBundle(Context c) { - invocations.wasFinishBundleInvoked = true; - assertSame(c, mockContext); - } - }); - - checkInvokeStartBundleWorks(reflector, invocations); - checkInvokeFinishBundleWorks(reflector, invocations); - } - - @Test - public void testDoFnWithSetupTeardown() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFnReflector reflector = underTest(new DoFn<String, String>() { - @ProcessElement - public void processElement(@SuppressWarnings("unused") ProcessContext c) {} - - @StartBundle - public void startBundle(Context c) { - invocations.wasStartBundleInvoked = true; - assertSame(c, mockContext); - } - - @FinishBundle - public void finishBundle(Context c) { - invocations.wasFinishBundleInvoked = true; - assertSame(c, mockContext); - } - - @Setup - public void before() { - invocations.wasSetupInvoked = true; - } - - @Teardown - public void after() { - invocations.wasTeardownInvoked = true; - } - }); - - checkInvokeSetupWorks(reflector, invocations); - checkInvokeTeardownWorks(reflector, invocations); - } - - @Test - public void testNoProcessElement() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("No method annotated with @ProcessElement found"); - thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFn<String, String>() {}); - } - - @Test - public void testMultipleProcessElement() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Found multiple methods annotated with @ProcessElement"); - thrown.expectMessage("foo()"); - thrown.expectMessage("bar()"); - thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFn<String, String>() { - @ProcessElement - public void foo() {} - - @ProcessElement - public void bar() {} - }); - } - - @Test - public void testMultipleStartBundleElement() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Found multiple methods annotated with @StartBundle"); - thrown.expectMessage("bar()"); - thrown.expectMessage("baz()"); - thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFn<String, String>() { - @ProcessElement - public void foo() {} - - @StartBundle - public void bar() {} - - @StartBundle - public void baz() {} - }); - } - - @Test - public void testMultipleFinishBundleElement() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Found multiple methods annotated with @FinishBundle"); - thrown.expectMessage("bar()"); - thrown.expectMessage("baz()"); - thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFn<String, String>() { - @ProcessElement - public void foo() {} - - @FinishBundle - public void bar() {} - - @FinishBundle - public void baz() {} - }); - } - - private static class PrivateDoFnClass extends DoFn<String, String> { - final Invocations invocations = new Invocations(getClass().getName()); - - @ProcessElement - public void processThis(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - } - - @Test - public void testLocalPrivateDoFnClass() throws Exception { - PrivateDoFnClass fn = new PrivateDoFnClass(); - DoFnReflector reflector = underTest(fn); - checkInvokeProcessElementWorks(reflector, fn.invocations); - } - - @Test - public void testStaticPackagePrivateDoFnClass() throws Exception { - Invocations invocations = new Invocations("StaticPackagePrivateDoFn"); - DoFnReflector reflector = - underTest(DoFnReflectorTestHelper.newStaticPackagePrivateDoFn(invocations)); - checkInvokeProcessElementWorks(reflector, invocations); - } - - @Test - public void testInnerPackagePrivateDoFnClass() throws Exception { - Invocations invocations = new Invocations("InnerPackagePrivateDoFn"); - DoFnReflector reflector = - underTest(new DoFnReflectorTestHelper().newInnerPackagePrivateDoFn(invocations)); - checkInvokeProcessElementWorks(reflector, invocations); - } - - @Test - public void testStaticPrivateDoFnClass() throws Exception { - Invocations invocations = new Invocations("StaticPrivateDoFn"); - DoFnReflector reflector = underTest(DoFnReflectorTestHelper.newStaticPrivateDoFn(invocations)); - checkInvokeProcessElementWorks(reflector, invocations); - } - - @Test - public void testInnerPrivateDoFnClass() throws Exception { - Invocations invocations = new Invocations("StaticInnerDoFn"); - DoFnReflector reflector = - underTest(new DoFnReflectorTestHelper().newInnerPrivateDoFn(invocations)); - checkInvokeProcessElementWorks(reflector, invocations); - } - - @Test - public void testAnonymousInnerDoFnInOtherPackage() throws Exception { - Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage"); - DoFnReflector reflector = - underTest(new DoFnReflectorTestHelper().newInnerAnonymousDoFn(invocations)); - checkInvokeProcessElementWorks(reflector, invocations); - } - - @Test - public void testStaticAnonymousDoFnInOtherPackage() throws Exception { - Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage"); - DoFnReflector reflector = - underTest(DoFnReflectorTestHelper.newStaticAnonymousDoFn(invocations)); - checkInvokeProcessElementWorks(reflector, invocations); - } - - @Test - public void testPrivateProcessElement() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("process() must be public"); - thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFn<String, String>() { - @ProcessElement - private void process() {} - }); - } - - @Test - public void testPrivateStartBundle() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("startBundle() must be public"); - thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFn<String, String>() { - @ProcessElement - public void processElement() {} - - @StartBundle - void startBundle() {} - }); - } - - @Test - public void testPrivateFinishBundle() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("finishBundle() must be public"); - thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFn<String, String>() { - @ProcessElement - public void processElement() {} - - @FinishBundle - void finishBundle() {} - }); - } - - @SuppressWarnings({"unused"}) - private void missingProcessContext() {} - - @Test - public void testMissingProcessContext() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage(getClass().getName() - + "#missingProcessContext() must take a ProcessContext as its first argument"); - - DoFnReflector.verifyProcessMethodArguments( - getClass().getDeclaredMethod("missingProcessContext")); - } - - @SuppressWarnings({"unused"}) - private void badProcessContext(String s) {} - - @Test - public void testBadProcessContextType() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage(getClass().getName() - + "#badProcessContext(String) must take a ProcessContext as its first argument"); - - DoFnReflector.verifyProcessMethodArguments( - getClass().getDeclaredMethod("badProcessContext", String.class)); - } - - @SuppressWarnings({"unused"}) - private void badExtraContext(DoFn<Integer, String>.Context c, int n) {} - - @Test - public void testBadExtraContext() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage( - "int is not a valid context parameter for method " - + getClass().getName() + "#badExtraContext(Context, int). Should be one of ["); - - DoFnReflector.verifyBundleMethodArguments( - getClass().getDeclaredMethod("badExtraContext", Context.class, int.class)); - } - - @SuppressWarnings({"unused"}) - private void badExtraProcessContext( - DoFn<Integer, String>.ProcessContext c, Integer n) {} - - @Test - public void testBadExtraProcessContextType() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage( - "Integer is not a valid context parameter for method " - + getClass().getName() + "#badExtraProcessContext(ProcessContext, Integer)" - + ". Should be one of [BoundedWindow]"); - - DoFnReflector.verifyProcessMethodArguments( - getClass().getDeclaredMethod("badExtraProcessContext", - ProcessContext.class, Integer.class)); - } - - @SuppressWarnings("unused") - private int badReturnType() { - return 0; - } - - @Test - public void testBadReturnType() throws Exception { - thrown.expect(IllegalStateException.class); - thrown.expectMessage(getClass().getName() + "#badReturnType() must have a void return type"); - - DoFnReflector.verifyProcessMethodArguments(getClass().getDeclaredMethod("badReturnType")); - } - - @SuppressWarnings("unused") - private void goodGenerics( - DoFn<Integer, String>.ProcessContext c, - DoFn.InputProvider<Integer> input, - DoFn.OutputReceiver<String> output) {} - - @Test - public void testValidGenerics() throws Exception { - Method method = - getClass() - .getDeclaredMethod( - "goodGenerics", - DoFn.ProcessContext.class, - DoFn.InputProvider.class, - DoFn.OutputReceiver.class); - DoFnReflector.verifyProcessMethodArguments(method); - } - - @SuppressWarnings("unused") - private void goodWildcards( - DoFn<Integer, String>.ProcessContext c, - DoFn.InputProvider<?> input, - DoFn.OutputReceiver<?> output) {} - - @Test - public void testGoodWildcards() throws Exception { - Method method = - getClass() - .getDeclaredMethod( - "goodWildcards", - DoFn.ProcessContext.class, - DoFn.InputProvider.class, - DoFn.OutputReceiver.class); - DoFnReflector.verifyProcessMethodArguments(method); - } - - @SuppressWarnings("unused") - private void goodBoundedWildcards( - DoFn<Integer, String>.ProcessContext c, - DoFn.InputProvider<? super Integer> input, - DoFn.OutputReceiver<? super String> output) {} - - @Test - public void testGoodBoundedWildcards() throws Exception { - Method method = - getClass() - .getDeclaredMethod( - "goodBoundedWildcards", - DoFn.ProcessContext.class, - DoFn.InputProvider.class, - DoFn.OutputReceiver.class); - DoFnReflector.verifyProcessMethodArguments(method); - } - - @SuppressWarnings("unused") - private <InputT, OutputT> void goodTypeVariables( - DoFn<InputT, OutputT>.ProcessContext c, - DoFn.InputProvider<InputT> input, - DoFn.OutputReceiver<OutputT> output) {} - - @Test - public void testGoodTypeVariables() throws Exception { - Method method = - getClass() - .getDeclaredMethod( - "goodTypeVariables", - DoFn.ProcessContext.class, - DoFn.InputProvider.class, - DoFn.OutputReceiver.class); - DoFnReflector.verifyProcessMethodArguments(method); - } - - @SuppressWarnings("unused") - private void badGenericTwoArgs( - DoFn<Integer, String>.ProcessContext c, - DoFn.InputProvider<Integer> input, - DoFn.OutputReceiver<Integer> output) {} - - @Test - public void testBadGenericsTwoArgs() throws Exception { - Method method = - getClass() - .getDeclaredMethod( - "badGenericTwoArgs", - DoFn.ProcessContext.class, - DoFn.InputProvider.class, - DoFn.OutputReceiver.class); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Incompatible generics in context parameter " - + "OutputReceiver<Integer> " - + "for method " + getClass().getName() - + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver). Should be " - + "OutputReceiver<String>"); - - DoFnReflector.verifyProcessMethodArguments(method); - } - - @SuppressWarnings("unused") - private void badGenericWildCards( - DoFn<Integer, String>.ProcessContext c, - DoFn.InputProvider<Integer> input, - DoFn.OutputReceiver<? super Integer> output) {} - - @Test - public void testBadGenericWildCards() throws Exception { - Method method = - getClass() - .getDeclaredMethod( - "badGenericWildCards", - DoFn.ProcessContext.class, - DoFn.InputProvider.class, - DoFn.OutputReceiver.class); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Incompatible generics in context parameter " - + "OutputReceiver<? super Integer> for method " - + getClass().getName() - + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver). Should be " - + "OutputReceiver<String>"); - - DoFnReflector.verifyProcessMethodArguments(method); - } - - @SuppressWarnings("unused") - private <InputT, OutputT> void badTypeVariables(DoFn<InputT, OutputT>.ProcessContext c, - DoFn.InputProvider<InputT> input, DoFn.OutputReceiver<InputT> output) {} - - @Test - public void testBadTypeVariables() throws Exception { - Method method = - getClass() - .getDeclaredMethod( - "badTypeVariables", - DoFn.ProcessContext.class, - DoFn.InputProvider.class, - DoFn.OutputReceiver.class); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Incompatible generics in context parameter " - + "OutputReceiver<InputT> for method " + getClass().getName() - + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver). Should be " - + "OutputReceiver<OutputT>"); - - DoFnReflector.verifyProcessMethodArguments(method); - } - - @Test - public void testProcessElementException() throws Exception { - DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() { - @ProcessElement - public void processElement(@SuppressWarnings("unused") ProcessContext c) { - throw new IllegalArgumentException("bogus"); - } - }; - - thrown.expect(UserCodeException.class); - thrown.expectMessage("bogus"); - DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeProcessElement(null, null); - } - - @Test - public void testStartBundleException() throws Exception { - DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() { - @StartBundle - public void startBundle(@SuppressWarnings("unused") Context c) { - throw new IllegalArgumentException("bogus"); - } - - @ProcessElement - public void processElement(@SuppressWarnings("unused") ProcessContext c) { - } - }; - - thrown.expect(UserCodeException.class); - thrown.expectMessage("bogus"); - DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeStartBundle(null, null); - } - - @Test - public void testFinishBundleException() throws Exception { - DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() { - @FinishBundle - public void finishBundle(@SuppressWarnings("unused") Context c) { - throw new IllegalArgumentException("bogus"); - } - - @ProcessElement - public void processElement(@SuppressWarnings("unused") ProcessContext c) { - } - }; - - thrown.expect(UserCodeException.class); - thrown.expectMessage("bogus"); - DoFnReflector.of(fn.getClass()).bindInvoker(fn).invokeFinishBundle(null, null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java index 604536b..3469223 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java @@ -99,7 +99,7 @@ public class FlattenTest implements Serializable { PCollection<String> output = makePCollectionListOfStrings(p, inputs) .apply(Flatten.<String>pCollections()) - .apply(ParDo.of(new IdentityFn<String>(){})); + .apply(ParDo.of(new IdentityFn<String>())); PAssert.that(output).containsInAnyOrder(flattenLists(inputs)); p.run(); @@ -152,7 +152,7 @@ public class FlattenTest implements Serializable { PCollection<String> output = PCollectionList.<String>empty(p) .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of()) - .apply(ParDo.of(new IdentityFn<String>(){})); + .apply(ParDo.of(new IdentityFn<String>())); PAssert.that(output).empty(); p.run(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java deleted file mode 100644 index 90fba12..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/dofnreflector/DoFnReflectorTestHelper.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.dofnreflector; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnReflectorTest.Invocations; - -/** - * Test helper for DoFnReflectorTest, which needs to test package-private access - * to DoFns in other packages. - */ -public class DoFnReflectorTestHelper { - - private static class StaticPrivateDoFn extends DoFn<String, String> { - final Invocations invocations; - - public StaticPrivateDoFn(Invocations invocations) { - this.invocations = invocations; - } - - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - } - - private class InnerPrivateDoFn extends DoFn<String, String> { - final Invocations invocations; - - public InnerPrivateDoFn(Invocations invocations) { - this.invocations = invocations; - } - - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - } - - static class StaticPackagePrivateDoFn extends DoFn<String, String> { - final Invocations invocations; - - public StaticPackagePrivateDoFn(Invocations invocations) { - this.invocations = invocations; - } - - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - } - - class InnerPackagePrivateDoFn extends DoFn<String, String> { - final Invocations invocations; - - public InnerPackagePrivateDoFn(Invocations invocations) { - this.invocations = invocations; - } - - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - } - - public static DoFn<String, String> newStaticPackagePrivateDoFn( - Invocations invocations) { - return new StaticPackagePrivateDoFn(invocations); - } - - public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) { - return new InnerPackagePrivateDoFn(invocations); - } - - public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) { - return new StaticPrivateDoFn(invocations); - } - - public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) { - return new InnerPrivateDoFn(invocations); - } - - public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) { - return new DoFn<String, String>() { - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - }; - } - - public static DoFn<String, String> newStaticAnonymousDoFn( - final Invocations invocations) { - return new DoFn<String, String>() { - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java new file mode 100644 index 0000000..7e756e2 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.UserCodeException; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link DoFnInvokers}. */ +public class DoFnInvokersTest { + /** A convenience struct holding flags that indicate whether a particular method was invoked. */ + public static class Invocations { + public boolean wasProcessElementInvoked = false; + public boolean wasStartBundleInvoked = false; + public boolean wasFinishBundleInvoked = false; + public boolean wasSetupInvoked = false; + public boolean wasTeardownInvoked = false; + private final String name; + + public Invocations(String name) { + this.name = name; + } + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Mock private DoFn.ProcessContext mockContext; + @Mock private BoundedWindow mockWindow; + @Mock private DoFn.InputProvider<String> mockInputProvider; + @Mock private DoFn.OutputReceiver<String> mockOutputReceiver; + + private DoFn.ExtraContextFactory<String, String> extraContextFactory; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + this.extraContextFactory = + new DoFn.ExtraContextFactory<String, String>() { + @Override + public BoundedWindow window() { + return mockWindow; + } + + @Override + public DoFn.InputProvider<String> inputProvider() { + return mockInputProvider; + } + + @Override + public DoFn.OutputReceiver<String> outputReceiver() { + return mockOutputReceiver; + } + }; + } + + private void checkInvokeProcessElementWorks(DoFn<String, String> fn, Invocations... invocations) + throws Exception { + assertTrue("Need at least one invocation to check", invocations.length >= 1); + for (Invocations invocation : invocations) { + assertFalse( + "Should not yet have called processElement on " + invocation.name, + invocation.wasProcessElementInvoked); + } + DoFnInvokers.INSTANCE + .newByteBuddyInvoker(fn) + .invokeProcessElement(mockContext, extraContextFactory); + for (Invocations invocation : invocations) { + assertTrue( + "Should have called processElement on " + invocation.name, + invocation.wasProcessElementInvoked); + } + } + + private void checkInvokeStartBundleWorks(DoFn<String, String> fn, Invocations... invocations) + throws Exception { + assertTrue("Need at least one invocation to check", invocations.length >= 1); + for (Invocations invocation : invocations) { + assertFalse( + "Should not yet have called startBundle on " + invocation.name, + invocation.wasStartBundleInvoked); + } + DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(mockContext); + for (Invocations invocation : invocations) { + assertTrue( + "Should have called startBundle on " + invocation.name, invocation.wasStartBundleInvoked); + } + } + + private void checkInvokeFinishBundleWorks(DoFn<String, String> fn, Invocations... invocations) + throws Exception { + assertTrue("Need at least one invocation to check", invocations.length >= 1); + for (Invocations invocation : invocations) { + assertFalse( + "Should not yet have called finishBundle on " + invocation.name, + invocation.wasFinishBundleInvoked); + } + DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(mockContext); + for (Invocations invocation : invocations) { + assertTrue( + "Should have called finishBundle on " + invocation.name, + invocation.wasFinishBundleInvoked); + } + } + + private void checkInvokeSetupWorks(DoFn<String, String> fn, Invocations... invocations) + throws Exception { + assertTrue("Need at least one invocation to check", invocations.length >= 1); + for (Invocations invocation : invocations) { + assertFalse( + "Should not yet have called setup on " + invocation.name, invocation.wasSetupInvoked); + } + DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeSetup(); + for (Invocations invocation : invocations) { + assertTrue("Should have called setup on " + invocation.name, invocation.wasSetupInvoked); + } + } + + private void checkInvokeTeardownWorks(DoFn<String, String> fn, Invocations... invocations) + throws Exception { + assertTrue("Need at least one invocation to check", invocations.length >= 1); + for (Invocations invocation : invocations) { + assertFalse( + "Should not yet have called teardown on " + invocation.name, + invocation.wasTeardownInvoked); + } + DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeTeardown(); + for (Invocations invocation : invocations) { + assertTrue( + "Should have called teardown on " + invocation.name, invocation.wasTeardownInvoked); + } + } + + @Test + public void testDoFnWithNoExtraContext() throws Exception { + final Invocations invocations = new Invocations("AnonymousClass"); + DoFn<String, String> fn = + new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + invocations.wasProcessElementInvoked = true; + assertSame(c, mockContext); + } + }; + + assertFalse( + DoFnSignatures.INSTANCE + .getOrParseSignature(fn.getClass()) + .processElement() + .usesSingleWindow()); + + checkInvokeProcessElementWorks(fn, invocations); + } + + @Test + public void testDoFnInvokersReused() throws Exception { + // Ensures that we don't create a new Invoker class for every instance of the DoFn. + IdentityParent fn1 = new IdentityParent(); + IdentityParent fn2 = new IdentityParent(); + assertSame( + "Invoker classes should only be generated once for each type", + DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn1).getClass(), + DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn2).getClass()); + } + + interface InterfaceWithProcessElement { + @DoFn.ProcessElement + void processElement(DoFn<String, String>.ProcessContext c); + } + + interface LayersOfInterfaces extends InterfaceWithProcessElement {} + + private class IdentityUsingInterfaceWithProcessElement extends DoFn<String, String> + implements LayersOfInterfaces { + + private Invocations invocations = new Invocations("Named Class"); + + @Override + public void processElement(DoFn<String, String>.ProcessContext c) { + invocations.wasProcessElementInvoked = true; + assertSame(c, mockContext); + } + } + + @Test + public void testDoFnWithProcessElementInterface() throws Exception { + IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement(); + assertFalse( + DoFnSignatures.INSTANCE + .getOrParseSignature(fn.getClass()) + .processElement() + .usesSingleWindow()); + checkInvokeProcessElementWorks(fn, fn.invocations); + } + + private class IdentityParent extends DoFn<String, String> { + protected Invocations parentInvocations = new Invocations("IdentityParent"); + + @ProcessElement + public void process(ProcessContext c) { + parentInvocations.wasProcessElementInvoked = true; + assertSame(c, mockContext); + } + } + + private class IdentityChildWithoutOverride extends IdentityParent {} + + private class IdentityChildWithOverride extends IdentityParent { + protected Invocations childInvocations = new Invocations("IdentityChildWithOverride"); + + @Override + public void process(DoFn<String, String>.ProcessContext c) { + super.process(c); + childInvocations.wasProcessElementInvoked = true; + } + } + + @Test + public void testDoFnWithMethodInSuperclass() throws Exception { + IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride(); + assertFalse( + DoFnSignatures.INSTANCE + .getOrParseSignature(fn.getClass()) + .processElement() + .usesSingleWindow()); + checkInvokeProcessElementWorks(fn, fn.parentInvocations); + } + + @Test + public void testDoFnWithMethodInSubclass() throws Exception { + IdentityChildWithOverride fn = new IdentityChildWithOverride(); + assertFalse( + DoFnSignatures.INSTANCE + .getOrParseSignature(fn.getClass()) + .processElement() + .usesSingleWindow()); + checkInvokeProcessElementWorks(fn, fn.parentInvocations, fn.childInvocations); + } + + @Test + public void testDoFnWithWindow() throws Exception { + final Invocations invocations = new Invocations("AnonymousClass"); + DoFn<String, String> fn = + new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow w) throws Exception { + invocations.wasProcessElementInvoked = true; + assertSame(c, mockContext); + assertSame(w, mockWindow); + } + }; + + assertTrue( + DoFnSignatures.INSTANCE + .getOrParseSignature(fn.getClass()) + .processElement() + .usesSingleWindow()); + + checkInvokeProcessElementWorks(fn, invocations); + } + + @Test + public void testDoFnWithOutputReceiver() throws Exception { + final Invocations invocations = new Invocations("AnonymousClass"); + DoFn<String, String> fn = + new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception { + invocations.wasProcessElementInvoked = true; + assertSame(c, mockContext); + assertSame(o, mockOutputReceiver); + } + }; + + assertFalse( + DoFnSignatures.INSTANCE + .getOrParseSignature(fn.getClass()) + .processElement() + .usesSingleWindow()); + + checkInvokeProcessElementWorks(fn, invocations); + } + + @Test + public void testDoFnWithInputProvider() throws Exception { + final Invocations invocations = new Invocations("AnonymousClass"); + DoFn<String, String> fn = + new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c, InputProvider<String> i) throws Exception { + invocations.wasProcessElementInvoked = true; + assertSame(c, mockContext); + assertSame(i, mockInputProvider); + } + }; + + assertFalse( + DoFnSignatures.INSTANCE + .getOrParseSignature(fn.getClass()) + .processElement() + .usesSingleWindow()); + + checkInvokeProcessElementWorks(fn, invocations); + } + + @Test + public void testDoFnWithStartBundle() throws Exception { + final Invocations invocations = new Invocations("AnonymousClass"); + DoFn<String, String> fn = + new DoFn<String, String>() { + @ProcessElement + public void processElement(@SuppressWarnings("unused") ProcessContext c) {} + + @StartBundle + public void startBundle(Context c) { + invocations.wasStartBundleInvoked = true; + assertSame(c, mockContext); + } + + @FinishBundle + public void finishBundle(Context c) { + invocations.wasFinishBundleInvoked = true; + assertSame(c, mockContext); + } + }; + + checkInvokeStartBundleWorks(fn, invocations); + checkInvokeFinishBundleWorks(fn, invocations); + } + + @Test + public void testDoFnWithSetupTeardown() throws Exception { + final Invocations invocations = new Invocations("AnonymousClass"); + DoFn<String, String> fn = + new DoFn<String, String>() { + @ProcessElement + public void processElement(@SuppressWarnings("unused") ProcessContext c) {} + + @StartBundle + public void startBundle(Context c) { + invocations.wasStartBundleInvoked = true; + assertSame(c, mockContext); + } + + @FinishBundle + public void finishBundle(Context c) { + invocations.wasFinishBundleInvoked = true; + assertSame(c, mockContext); + } + + @Setup + public void before() { + invocations.wasSetupInvoked = true; + } + + @Teardown + public void after() { + invocations.wasTeardownInvoked = true; + } + }; + + checkInvokeSetupWorks(fn, invocations); + checkInvokeTeardownWorks(fn, invocations); + } + + private static class PrivateDoFnClass extends DoFn<String, String> { + final Invocations invocations = new Invocations(getClass().getName()); + + @ProcessElement + public void processThis(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + } + + @Test + public void testLocalPrivateDoFnClass() throws Exception { + PrivateDoFnClass fn = new PrivateDoFnClass(); + checkInvokeProcessElementWorks(fn, fn.invocations); + } + + @Test + public void testStaticPackagePrivateDoFnClass() throws Exception { + Invocations invocations = new Invocations("StaticPackagePrivateDoFn"); + checkInvokeProcessElementWorks( + DoFnInvokersTestHelper.newStaticPackagePrivateDoFn(invocations), invocations); + } + + @Test + public void testInnerPackagePrivateDoFnClass() throws Exception { + Invocations invocations = new Invocations("InnerPackagePrivateDoFn"); + checkInvokeProcessElementWorks( + new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn(invocations), invocations); + } + + @Test + public void testStaticPrivateDoFnClass() throws Exception { + Invocations invocations = new Invocations("StaticPrivateDoFn"); + checkInvokeProcessElementWorks( + DoFnInvokersTestHelper.newStaticPrivateDoFn(invocations), invocations); + } + + @Test + public void testInnerPrivateDoFnClass() throws Exception { + Invocations invocations = new Invocations("StaticInnerDoFn"); + checkInvokeProcessElementWorks( + new DoFnInvokersTestHelper().newInnerPrivateDoFn(invocations), invocations); + } + + @Test + public void testAnonymousInnerDoFnInOtherPackage() throws Exception { + Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage"); + checkInvokeProcessElementWorks( + new DoFnInvokersTestHelper().newInnerAnonymousDoFn(invocations), invocations); + } + + @Test + public void testStaticAnonymousDoFnInOtherPackage() throws Exception { + Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage"); + checkInvokeProcessElementWorks( + DoFnInvokersTestHelper.newStaticAnonymousDoFn(invocations), invocations); + } + + @Test + public void testProcessElementException() throws Exception { + DoFn<Integer, Integer> fn = + new DoFn<Integer, Integer>() { + @ProcessElement + public void processElement(@SuppressWarnings("unused") ProcessContext c) { + throw new IllegalArgumentException("bogus"); + } + }; + + thrown.expect(UserCodeException.class); + thrown.expectMessage("bogus"); + DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeProcessElement(null, null); + } + + @Test + public void testStartBundleException() throws Exception { + DoFn<Integer, Integer> fn = + new DoFn<Integer, Integer>() { + @StartBundle + public void startBundle(@SuppressWarnings("unused") Context c) { + throw new IllegalArgumentException("bogus"); + } + + @ProcessElement + public void processElement(@SuppressWarnings("unused") ProcessContext c) {} + }; + + thrown.expect(UserCodeException.class); + thrown.expectMessage("bogus"); + DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(null); + } + + @Test + public void testFinishBundleException() throws Exception { + DoFn<Integer, Integer> fn = + new DoFn<Integer, Integer>() { + @FinishBundle + public void finishBundle(@SuppressWarnings("unused") Context c) { + throw new IllegalArgumentException("bogus"); + } + + @ProcessElement + public void processElement(@SuppressWarnings("unused") ProcessContext c) {} + }; + + thrown.expect(UserCodeException.class); + thrown.expectMessage("bogus"); + DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java new file mode 100644 index 0000000..7bfdddc --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokersTest.Invocations; + +/** + * Test helper for {@link DoFnInvokersTest}, which needs to test package-private access + * to DoFns in other packages. + */ +public class DoFnInvokersTestHelper { + + private static class StaticPrivateDoFn extends DoFn<String, String> { + final Invocations invocations; + + public StaticPrivateDoFn(Invocations invocations) { + this.invocations = invocations; + } + + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + } + + private class InnerPrivateDoFn extends DoFn<String, String> { + final Invocations invocations; + + public InnerPrivateDoFn(Invocations invocations) { + this.invocations = invocations; + } + + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + } + + static class StaticPackagePrivateDoFn extends DoFn<String, String> { + final Invocations invocations; + + public StaticPackagePrivateDoFn(Invocations invocations) { + this.invocations = invocations; + } + + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + } + + class InnerPackagePrivateDoFn extends DoFn<String, String> { + final Invocations invocations; + + public InnerPackagePrivateDoFn(Invocations invocations) { + this.invocations = invocations; + } + + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + } + + public static DoFn<String, String> newStaticPackagePrivateDoFn( + Invocations invocations) { + return new StaticPackagePrivateDoFn(invocations); + } + + public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) { + return new InnerPackagePrivateDoFn(invocations); + } + + public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) { + return new StaticPrivateDoFn(invocations); + } + + public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) { + return new InnerPrivateDoFn(invocations); + } + + public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) { + return new DoFn<String, String>() { + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + }; + } + + public static DoFn<String, String> newStaticAnonymousDoFn( + final Invocations invocations) { + return new DoFn<String, String>() { + @ProcessElement + public void process(ProcessContext c) { + invocations.wasProcessElementInvoked = true; + } + }; + } +}
