More changes to DoFn{Signatures,Invokers} In preparation for Splittable DoFn: * More generic code generation in DoFnInvokers: supports methods with return values (thanks @bjchambers). * Uses AutoValue builder in DoFnSignature. * Contextual error reporting in DoFnSignatures parsing code. * Rewrote DoFnInvokers tests to use Mockito. * Changed DoFnSignatures tests to use local classes and an "AnonymousMethod" class for testing analysis of single methods.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75c8bb8d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75c8bb8d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75c8bb8d Branch: refs/heads/master Commit: 75c8bb8dd3127b4dce63e8359ae27185f246b28c Parents: 05c6c27 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Thu Aug 11 17:13:53 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Mon Sep 12 10:08:57 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/transforms/DoFnAdapters.java | 2 +- .../sdk/transforms/reflect/DoFnInvoker.java | 20 +- .../sdk/transforms/reflect/DoFnInvokers.java | 408 +++++++++-------- .../sdk/transforms/reflect/DoFnSignature.java | 71 ++- .../sdk/transforms/reflect/DoFnSignatures.java | 245 +++++----- .../beam/sdk/util/common/ReflectHelpers.java | 22 - .../transforms/reflect/DoFnInvokersTest.java | 455 ++++++------------- .../reflect/DoFnInvokersTestHelper.java | 116 ----- .../DoFnSignaturesProcessElementTest.java | 213 +++++++++ .../transforms/reflect/DoFnSignaturesTest.java | 269 +---------- .../reflect/DoFnSignaturesTestUtils.java | 64 +++ .../testhelper/DoFnInvokersTestHelper.java | 124 +++++ 12 files changed, 968 insertions(+), 1041 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index 4803d77..77a71e9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -86,7 +86,7 @@ public class DoFnAdapters { @Override public void startBundle(Context c) throws Exception { - this.fn.prepareForProcessing(); + fn.prepareForProcessing(); invoker.invokeStartBundle(new ContextAdapter<>(fn, c)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 5818a59..9de6759 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -26,28 +26,16 @@ import org.apache.beam.sdk.transforms.DoFn; * referred to as the bound {@link DoFn}. */ public interface DoFnInvoker<InputT, OutputT> { - /** - * Invoke the {@link DoFn.Setup} method on the bound {@link DoFn}. - */ + /** Invoke the {@link DoFn.Setup} method on the bound {@link DoFn}. */ void invokeSetup(); - /** - * Invoke the {@link DoFn.StartBundle} method on the bound {@link DoFn}. - * - * @param c The {@link DoFn.Context} to invoke the fn with. - */ + /** Invoke the {@link DoFn.StartBundle} method on the bound {@link DoFn}. */ void invokeStartBundle(DoFn<InputT, OutputT>.Context c); - /** - * Invoke the {@link DoFn.FinishBundle} method on the bound {@link DoFn}. - * - * @param c The {@link DoFn.Context} to invoke the fn with. - */ + /** Invoke the {@link DoFn.FinishBundle} method on the bound {@link DoFn}. */ void invokeFinishBundle(DoFn<InputT, OutputT>.Context c); - /** - * Invoke the {@link DoFn.Teardown} method on the bound {@link DoFn}. - */ + /** Invoke the {@link DoFn.Teardown} method on the bound {@link DoFn}. */ void invokeTeardown(); /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index 68e2ca9..f622015 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -18,10 +18,10 @@ package org.apache.beam.sdk.transforms.reflect; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.EnumMap; @@ -39,12 +39,15 @@ import net.bytebuddy.dynamic.DynamicType; import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.dynamic.scaffold.InstrumentedType; import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; +import net.bytebuddy.implementation.FixedValue; import net.bytebuddy.implementation.Implementation; -import net.bytebuddy.implementation.MethodCall; -import net.bytebuddy.implementation.bind.MethodDelegationBinder; +import net.bytebuddy.implementation.Implementation.Context; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.TargetMethodAnnotationDrivenBinder; import net.bytebuddy.implementation.bytecode.ByteCodeAppender; import net.bytebuddy.implementation.bytecode.StackManipulation; import net.bytebuddy.implementation.bytecode.Throw; +import net.bytebuddy.implementation.bytecode.assign.Assigner; import net.bytebuddy.implementation.bytecode.member.FieldAccess; import net.bytebuddy.implementation.bytecode.member.MethodInvocation; import net.bytebuddy.implementation.bytecode.member.MethodReturn; @@ -52,13 +55,10 @@ import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess; import net.bytebuddy.jar.asm.Label; import net.bytebuddy.jar.asm.MethodVisitor; import net.bytebuddy.jar.asm.Opcodes; +import net.bytebuddy.jar.asm.Type; import net.bytebuddy.matcher.ElementMatchers; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.FinishBundle; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; -import org.apache.beam.sdk.transforms.DoFn.Setup; -import org.apache.beam.sdk.transforms.DoFn.StartBundle; -import org.apache.beam.sdk.transforms.DoFn.Teardown; import org.apache.beam.sdk.util.UserCodeException; /** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */ @@ -153,25 +153,13 @@ public class DoFnInvokers { .method(ElementMatchers.named("invokeProcessElement")) .intercept(new ProcessElementDelegation(signature.processElement())) .method(ElementMatchers.named("invokeStartBundle")) - .intercept( - signature.startBundle() == null - ? new NoopMethodImplementation() - : new BundleMethodDelegation(signature.startBundle())) + .intercept(delegateOrNoop(signature.startBundle())) .method(ElementMatchers.named("invokeFinishBundle")) - .intercept( - signature.finishBundle() == null - ? new NoopMethodImplementation() - : new BundleMethodDelegation(signature.finishBundle())) + .intercept(delegateOrNoop(signature.finishBundle())) .method(ElementMatchers.named("invokeSetup")) - .intercept( - signature.setup() == null - ? new NoopMethodImplementation() - : new LifecycleMethodDelegation(signature.setup())) + .intercept(delegateOrNoop(signature.setup())) .method(ElementMatchers.named("invokeTeardown")) - .intercept( - signature.teardown() == null - ? new NoopMethodImplementation() - : new LifecycleMethodDelegation(signature.teardown())); + .intercept(delegateOrNoop(signature.teardown())); DynamicType.Unloaded<?> unloaded = builder.make(); @@ -184,35 +172,29 @@ public class DoFnInvokers { return res; } - /** Implements an invoker method by doing nothing and immediately returning void. */ - private static class NoopMethodImplementation implements Implementation { - @Override - public InstrumentedType prepare(InstrumentedType instrumentedType) { - return instrumentedType; - } - - @Override - public ByteCodeAppender appender(final Target implementationTarget) { - return new ByteCodeAppender() { - @Override - public Size apply( - MethodVisitor methodVisitor, - Context implementationContext, - MethodDescription instrumentedMethod) { - StackManipulation manipulation = MethodReturn.VOID; - StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext); - return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize()); - } - }; - } + /** Delegates to the given method if available, or does nothing. */ + private static Implementation delegateOrNoop(DoFnSignature.DoFnMethod method) { + return (method == null) + ? FixedValue.originType() + : new DoFnMethodDelegation(method.targetMethod()); } /** - * Base class for implementing an invoker method by delegating to a method of the target {@link - * DoFn}. + * Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a + * "target method" of the wrapped {@link DoFn}. */ - private abstract static class MethodDelegation implements Implementation { - FieldDescription delegateField; + private static class DoFnMethodDelegation implements Implementation { + /** The {@link MethodDescription} of the wrapped {@link DoFn}'s method. */ + protected final MethodDescription targetMethod; + /** Whether the target method returns non-void. */ + private final boolean targetHasReturn; + + private FieldDescription delegateField; + + public DoFnMethodDelegation(Method targetMethod) { + this.targetMethod = new MethodDescription.ForLoadedMethod(targetMethod); + targetHasReturn = !TypeDescription.VOID.equals(this.targetMethod.getReturnType().asErasure()); + } @Override public InstrumentedType prepare(InstrumentedType instrumentedType) { @@ -222,7 +204,6 @@ public class DoFnInvokers { .getDeclaredFields() .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME)) .getOnly(); - // Delegating the method call doesn't require any changes to the instrumented type. return instrumentedType; } @@ -230,54 +211,102 @@ public class DoFnInvokers { @Override public ByteCodeAppender appender(final Target implementationTarget) { return new ByteCodeAppender() { + /** + * @param instrumentedMethod The {@link DoFnInvoker} method for which we're generating code. + */ @Override public Size apply( MethodVisitor methodVisitor, Context implementationContext, MethodDescription instrumentedMethod) { + // Figure out how many locals we'll need. This corresponds to "this", the parameters + // of the instrumented method, and an argument to hold the return value if the target + // method has a return value. + int numLocals = 1 + instrumentedMethod.getParameters().size() + (targetHasReturn ? 1 : 0); + + Integer returnVarIndex = null; + if (targetHasReturn) { + // Local comes after formal parameters, so figure out where that is. + returnVarIndex = 1; // "this" + for (Type param : Type.getArgumentTypes(instrumentedMethod.getDescriptor())) { + returnVarIndex += param.getSize(); + } + } + StackManipulation manipulation = new StackManipulation.Compound( - // Push "this" reference to the stack + // Push "this" (DoFnInvoker on top of the stack) MethodVariableAccess.REFERENCE.loadOffset(0), - // Access the delegate field of the the invoker + // Access this.delegate (DoFn on top of the stack) FieldAccess.forField(delegateField).getter(), - invokeTargetMethod(instrumentedMethod)); + // Run the beforeDelegation manipulations. + // The arguments necessary to invoke the target are on top of the stack. + beforeDelegation(instrumentedMethod), + // Perform the method delegation. + // This will consume the arguments on top of the stack + // Either the stack is now empty (because the targetMethod returns void) or the + // stack contains the return value. + new UserCodeMethodInvocation(returnVarIndex, targetMethod, instrumentedMethod), + // Run the afterDelegation manipulations. + // Either the stack is now empty (because the instrumentedMethod returns void) + // or the stack contains the return value. + afterDelegation(instrumentedMethod)); + StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext); - return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize()); + return new Size(size.getMaximalSize(), numLocals); } }; } /** - * Generates code to invoke the target method. When this is called the delegate field will be on - * top of the stack. This should add any necessary arguments to the stack and then perform the - * method invocation. + * Return the code to the prepare the operand stack for the method delegation. + * + * <p>Before this method is called, the stack delegate will be the only thing on the stack. + * + * <p>After this method is called, the stack contents should contain exactly the arguments + * necessary to invoke the target method. */ - protected abstract StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod); + protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) { + return MethodVariableAccess.allArgumentsOf(targetMethod); + } + + /** + * Return the code to execute after the method delegation. + * + * <p>Before this method is called, the stack will either be empty (if the target method returns + * void) or contain the method return value. + * + * <p>After this method is called, the stack should either be empty (if the instrumented method + * returns void) or contain the value for the instrumented method to return). + */ + protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) { + return TargetMethodAnnotationDrivenBinder.TerminationHandler.Returning.INSTANCE.resolve( + Assigner.DEFAULT, instrumentedMethod, targetMethod); + } } /** * Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method by delegating to the * {@link DoFn.ProcessElement} method. */ - private static final class ProcessElementDelegation extends MethodDelegation { - private static final Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription> + private static final class ProcessElementDelegation extends DoFnMethodDelegation { + private static final Map<DoFnSignature.Parameter, MethodDescription> EXTRA_CONTEXT_FACTORY_METHODS; static { try { - Map<DoFnSignature.ProcessElementMethod.Parameter, MethodDescription> methods = - new EnumMap<>(DoFnSignature.ProcessElementMethod.Parameter.class); + Map<DoFnSignature.Parameter, MethodDescription> methods = + new EnumMap<>(DoFnSignature.Parameter.class); methods.put( - DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW, + DoFnSignature.Parameter.BOUNDED_WINDOW, new MethodDescription.ForLoadedMethod( DoFn.ExtraContextFactory.class.getMethod("window"))); methods.put( - DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER, + DoFnSignature.Parameter.INPUT_PROVIDER, new MethodDescription.ForLoadedMethod( DoFn.ExtraContextFactory.class.getMethod("inputProvider"))); methods.put( - DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER, + DoFnSignature.Parameter.OUTPUT_RECEIVER, new MethodDescription.ForLoadedMethod( DoFn.ExtraContextFactory.class.getMethod("outputReceiver"))); EXTRA_CONTEXT_FACTORY_METHODS = Collections.unmodifiableMap(methods); @@ -291,16 +320,12 @@ public class DoFnInvokers { /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */ private ProcessElementDelegation(DoFnSignature.ProcessElementMethod signature) { + super(signature.targetMethod()); this.signature = signature; } @Override - protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) { - MethodDescription targetMethod = - new MethodCall.MethodLocator.ForExplicitMethod( - new MethodDescription.ForLoadedMethod(signature.targetMethod())) - .resolve(instrumentedMethod); - + protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) { // Parameters of the wrapper invoker method: // DoFn.ProcessContext, ExtraContextFactory. // Parameters of the wrapped DoFn method: @@ -310,144 +335,159 @@ public class DoFnInvokers { parameters.add(MethodVariableAccess.REFERENCE.loadOffset(1)); // Push the extra arguments in their actual order. StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(2); - for (DoFnSignature.ProcessElementMethod.Parameter param : signature.extraParameters()) { + for (DoFnSignature.Parameter param : signature.extraParameters()) { parameters.add( new StackManipulation.Compound( pushExtraContextFactory, MethodInvocation.invoke(EXTRA_CONTEXT_FACTORY_METHODS.get(param)))); } + return new StackManipulation.Compound(parameters); + } - return new StackManipulation.Compound( - // Push the parameters - new StackManipulation.Compound(parameters), - // Invoke the target method - wrapWithUserCodeException( - MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)), - // Return from the instrumented method - MethodReturn.VOID); + @Override + protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) { + return MethodReturn.VOID; } } - /** - * Implements {@link DoFnInvoker#invokeStartBundle} or {@link DoFnInvoker#invokeFinishBundle} by - * delegating respectively to the {@link StartBundle} and {@link FinishBundle} methods. - */ - private static final class BundleMethodDelegation extends MethodDelegation { - private final DoFnSignature.BundleMethod signature; + private static class UserCodeMethodInvocation implements StackManipulation { - private BundleMethodDelegation(@Nullable DoFnSignature.BundleMethod signature) { - this.signature = signature; + @Nullable private final Integer returnVarIndex; + private final MethodDescription targetMethod; + private final MethodDescription instrumentedMethod; + private final TypeDescription returnType; + + private final Label wrapStart = new Label(); + private final Label wrapEnd = new Label(); + private final Label tryBlockStart = new Label(); + private final Label tryBlockEnd = new Label(); + private final Label catchBlockStart = new Label(); + private final Label catchBlockEnd = new Label(); + + private final MethodDescription createUserCodeException; + + UserCodeMethodInvocation( + @Nullable Integer returnVarIndex, + MethodDescription targetMethod, + MethodDescription instrumentedMethod) { + this.returnVarIndex = returnVarIndex; + this.targetMethod = targetMethod; + this.instrumentedMethod = instrumentedMethod; + this.returnType = targetMethod.getReturnType().asErasure(); + + boolean targetMethodReturnsVoid = TypeDescription.VOID.equals(returnType); + checkArgument( + (returnVarIndex == null) == targetMethodReturnsVoid, + "returnVarIndex should be defined if and only if the target method has a return value"); + + try { + createUserCodeException = + new MethodDescription.ForLoadedMethod( + UserCodeException.class.getDeclaredMethod("wrap", Throwable.class)); + } catch (NoSuchMethodException | SecurityException e) { + throw new RuntimeException("Unable to find UserCodeException.wrap", e); + } } @Override - protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) { - MethodDescription targetMethod = - new MethodCall.MethodLocator.ForExplicitMethod( - new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod())) - .resolve(instrumentedMethod); - return new StackManipulation.Compound( - // Push the parameters - MethodVariableAccess.REFERENCE.loadOffset(1), - // Invoke the target method - wrapWithUserCodeException( - MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)), - MethodReturn.VOID); + public boolean isValid() { + return true; } - } - /** - * Implements {@link DoFnInvoker#invokeSetup} or {@link DoFnInvoker#invokeTeardown} by delegating - * respectively to the {@link Setup} and {@link Teardown} methods. - */ - private static final class LifecycleMethodDelegation extends MethodDelegation { - private final DoFnSignature.LifecycleMethod signature; + private Object describeType(Type type) { + switch (type.getSort()) { + case Type.OBJECT: + return type.getDescriptor(); + case Type.INT: + case Type.BYTE: + case Type.BOOLEAN: + case Type.SHORT: + return Opcodes.INTEGER; + case Type.LONG: + return Opcodes.LONG; + case Type.DOUBLE: + return Opcodes.DOUBLE; + case Type.FLOAT: + return Opcodes.FLOAT; + default: + throw new IllegalArgumentException("Unhandled type as method argument: " + type); + } + } - private LifecycleMethodDelegation(@Nullable DoFnSignature.LifecycleMethod signature) { - this.signature = signature; + private void visitFrame( + MethodVisitor mv, boolean localsIncludeReturn, @Nullable String stackTop) { + boolean hasReturnLocal = (returnVarIndex != null) && localsIncludeReturn; + + Type[] localTypes = Type.getArgumentTypes(instrumentedMethod.getDescriptor()); + Object[] locals = new Object[1 + localTypes.length + (hasReturnLocal ? 1 : 0)]; + locals[0] = instrumentedMethod.getReceiverType().asErasure().getInternalName(); + for (int i = 0; i < localTypes.length; i++) { + locals[i + 1] = describeType(localTypes[i]); + } + if (hasReturnLocal) { + locals[locals.length - 1] = returnType.getInternalName(); + } + + Object[] stack = stackTop == null ? new Object[] {} : new Object[] {stackTop}; + + mv.visitFrame(Opcodes.F_NEW, locals.length, locals, stack.length, stack); } @Override - protected StackManipulation invokeTargetMethod(MethodDescription instrumentedMethod) { - MethodDescription targetMethod = - new MethodCall.MethodLocator.ForExplicitMethod( - new MethodDescription.ForLoadedMethod(checkNotNull(signature).targetMethod())) - .resolve(instrumentedMethod); - return new StackManipulation.Compound( - wrapWithUserCodeException( - MethodDelegationBinder.MethodInvoker.Simple.INSTANCE.invoke(targetMethod)), - MethodReturn.VOID); - } - } + public Size apply(MethodVisitor mv, Context context) { + Size size = new Size(0, 0); - /** - * Wraps a given stack manipulation in a try catch block. Any exceptions thrown within the try are - * wrapped with a {@link UserCodeException}. - */ - private static StackManipulation wrapWithUserCodeException(final StackManipulation tryBody) { - final MethodDescription createUserCodeException; - try { - createUserCodeException = - new MethodDescription.ForLoadedMethod( - UserCodeException.class.getDeclaredMethod("wrap", Throwable.class)); - } catch (NoSuchMethodException | SecurityException e) { - throw new RuntimeException("Unable to find UserCodeException.wrap", e); - } + mv.visitLabel(wrapStart); + + String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName(); + mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName); - return new StackManipulation() { - @Override - public boolean isValid() { - return tryBody.isValid(); + // The try block attempts to perform the expected operations, then jumps to success + mv.visitLabel(tryBlockStart); + size = size.aggregate(MethodInvocation.invoke(targetMethod).apply(mv, context)); + + if (returnVarIndex != null) { + mv.visitVarInsn(Opcodes.ASTORE, returnVarIndex); + size = size.aggregate(new Size(-1, 0)); // Reduces the size of the stack } + mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd); + mv.visitLabel(tryBlockEnd); + + // The handler wraps the exception, and then throws. + mv.visitLabel(catchBlockStart); + // In catch block, should have same locals and {Throwable} on the stack. + visitFrame(mv, false, throwableName); + + // Create the user code exception and throw + size = + size.aggregate( + new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE) + .apply(mv, context)); + + mv.visitLabel(catchBlockEnd); - @Override - public Size apply(MethodVisitor mv, Implementation.Context implementationContext) { - Label tryBlockStart = new Label(); - Label tryBlockEnd = new Label(); - Label catchBlockStart = new Label(); - Label catchBlockEnd = new Label(); - - String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName(); - mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName); - - // The try block attempts to perform the expected operations, then jumps to success - mv.visitLabel(tryBlockStart); - Size trySize = tryBody.apply(mv, implementationContext); - mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd); - mv.visitLabel(tryBlockEnd); - - // The handler wraps the exception, and then throws. - mv.visitLabel(catchBlockStart); - // Add the exception to the frame - mv.visitFrame( - Opcodes.F_SAME1, - // No local variables - 0, - new Object[] {}, - // 1 stack element (the throwable) - 1, - new Object[] {throwableName}); - - Size catchSize = - new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE) - .apply(mv, implementationContext); - - mv.visitLabel(catchBlockEnd); - // The frame contents after the try/catch block is the same - // as it was before. - mv.visitFrame( - Opcodes.F_SAME, - // No local variables - 0, - new Object[] {}, - // No new stack variables - 0, - new Object[] {}); - - return new Size( - trySize.getSizeImpact(), - Math.max(trySize.getMaximalSize(), catchSize.getMaximalSize())); + // After the catch block we should have the return in scope, but nothing on the stack. + visitFrame(mv, true, null); + + // After catch block, should have same locals and will have the return on the stack. + if (returnVarIndex != null) { + mv.visitVarInsn(Opcodes.ALOAD, returnVarIndex); + size = size.aggregate(new Size(1, 0)); // Increases the size of the stack + } + mv.visitLabel(wrapEnd); + if (returnVarIndex != null) { + // Drop the return type from the locals + mv.visitLocalVariable( + "res", + returnType.getDescriptor(), + returnType.getGenericSignature(), + wrapStart, + wrapEnd, + returnVarIndex); } - }; + + return size; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index 181c088..b6864da 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -32,57 +32,74 @@ import org.apache.beam.sdk.transforms.DoFn; */ @AutoValue public abstract class DoFnSignature { + /** Class of the original {@link DoFn} from which this signature was produced. */ public abstract Class<? extends DoFn> fnClass(); + /** Details about this {@link DoFn}'s {@link DoFn.ProcessElement} method. */ public abstract ProcessElementMethod processElement(); + /** Details about this {@link DoFn}'s {@link DoFn.StartBundle} method. */ @Nullable public abstract BundleMethod startBundle(); + /** Details about this {@link DoFn}'s {@link DoFn.FinishBundle} method. */ @Nullable public abstract BundleMethod finishBundle(); + /** Details about this {@link DoFn}'s {@link DoFn.Setup} method. */ @Nullable public abstract LifecycleMethod setup(); + /** Details about this {@link DoFn}'s {@link DoFn.Teardown} method. */ @Nullable public abstract LifecycleMethod teardown(); - static DoFnSignature create( - Class<? extends DoFn> fnClass, - ProcessElementMethod processElement, - @Nullable BundleMethod startBundle, - @Nullable BundleMethod finishBundle, - @Nullable LifecycleMethod setup, - @Nullable LifecycleMethod teardown) { - return new AutoValue_DoFnSignature( - fnClass, - processElement, - startBundle, - finishBundle, - setup, - teardown); + static Builder builder() { + return new AutoValue_DoFnSignature.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setFnClass(Class<? extends DoFn> fnClass); + abstract Builder setProcessElement(ProcessElementMethod processElement); + abstract Builder setStartBundle(BundleMethod startBundle); + abstract Builder setFinishBundle(BundleMethod finishBundle); + abstract Builder setSetup(LifecycleMethod setup); + abstract Builder setTeardown(LifecycleMethod teardown); + abstract DoFnSignature build(); + } + + /** A method delegated to a annotated method of an underlying {@link DoFn}. */ + public interface DoFnMethod { + /** The annotated method itself. */ + Method targetMethod(); + } + + /** A type of optional parameter of the {@link DoFn.ProcessElement} method. */ + public enum Parameter { + BOUNDED_WINDOW, + INPUT_PROVIDER, + OUTPUT_RECEIVER, } /** Describes a {@link DoFn.ProcessElement} method. */ @AutoValue - public abstract static class ProcessElementMethod { - enum Parameter { - BOUNDED_WINDOW, - INPUT_PROVIDER, - OUTPUT_RECEIVER - } - + public abstract static class ProcessElementMethod implements DoFnMethod { + /** The annotated method itself. */ + @Override public abstract Method targetMethod(); + /** Types of optional parameters of the annotated method, in the order they appear. */ public abstract List<Parameter> extraParameters(); - static ProcessElementMethod create(Method targetMethod, List<Parameter> extraParameters) { + static ProcessElementMethod create( + Method targetMethod, + List<Parameter> extraParameters) { return new AutoValue_DoFnSignature_ProcessElementMethod( targetMethod, Collections.unmodifiableList(extraParameters)); } - /** @return true if the reflected {@link DoFn} uses a Single Window. */ + /** Whether this {@link DoFn} uses a Single Window. */ public boolean usesSingleWindow() { return extraParameters().contains(Parameter.BOUNDED_WINDOW); } @@ -90,7 +107,9 @@ public abstract class DoFnSignature { /** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */ @AutoValue - public abstract static class BundleMethod { + public abstract static class BundleMethod implements DoFnMethod { + /** The annotated method itself. */ + @Override public abstract Method targetMethod(); static BundleMethod create(Method targetMethod) { @@ -100,7 +119,9 @@ public abstract class DoFnSignature { /** Describes a {@link DoFn.Setup} or {@link DoFn.Teardown} method. */ @AutoValue - public abstract static class LifecycleMethod { + public abstract static class LifecycleMethod implements DoFnMethod { + /** The annotated method itself. */ + @Override public abstract Method targetMethod(); static LifecycleMethod create(Method targetMethod) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 7e482d5..8283788 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -17,9 +17,6 @@ */ package org.apache.beam.sdk.transforms.reflect; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - import com.google.common.annotations.VisibleForTesting; import com.google.common.reflect.TypeParameter; import com.google.common.reflect.TypeToken; @@ -36,6 +33,7 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.common.ReflectHelpers; @@ -62,15 +60,17 @@ public class DoFnSignatures { /** Analyzes a given {@link DoFn} class and extracts its {@link DoFnSignature}. */ private static DoFnSignature parseSignature(Class<? extends DoFn> fnClass) { - TypeToken<?> inputT = null; - TypeToken<?> outputT = null; + DoFnSignature.Builder builder = DoFnSignature.builder(); + + ErrorReporter errors = new ErrorReporter(null, fnClass.getName()); + errors.checkArgument(DoFn.class.isAssignableFrom(fnClass), "Must be subtype of DoFn"); + builder.setFnClass(fnClass); - // Extract the input and output type. - checkArgument( - DoFn.class.isAssignableFrom(fnClass), - "%s must be subtype of DoFn", - fnClass.getSimpleName()); TypeToken<? extends DoFn> fnToken = TypeToken.of(fnClass); + + // Extract the input and output type, and whether the fn is bounded. + TypeToken<?> inputT = null; + TypeToken<?> outputT = null; for (TypeToken<?> supertype : fnToken.getTypes()) { if (!supertype.getRawType().equals(DoFn.class)) { continue; @@ -79,25 +79,48 @@ public class DoFnSignatures { inputT = TypeToken.of(args[0]); outputT = TypeToken.of(args[1]); } - checkNotNull(inputT, "Unable to determine input type from %s", fnClass); - - Method processElementMethod = findAnnotatedMethod(DoFn.ProcessElement.class, fnClass, true); - Method startBundleMethod = findAnnotatedMethod(DoFn.StartBundle.class, fnClass, false); - Method finishBundleMethod = findAnnotatedMethod(DoFn.FinishBundle.class, fnClass, false); - Method setupMethod = findAnnotatedMethod(DoFn.Setup.class, fnClass, false); - Method teardownMethod = findAnnotatedMethod(DoFn.Teardown.class, fnClass, false); - - return DoFnSignature.create( - fnClass, - analyzeProcessElementMethod(fnToken, processElementMethod, inputT, outputT), - (startBundleMethod == null) - ? null - : analyzeBundleMethod(fnToken, startBundleMethod, inputT, outputT), - (finishBundleMethod == null) - ? null - : analyzeBundleMethod(fnToken, finishBundleMethod, inputT, outputT), - (setupMethod == null) ? null : analyzeLifecycleMethod(setupMethod), - (teardownMethod == null) ? null : analyzeLifecycleMethod(teardownMethod)); + errors.checkNotNull(inputT, "Unable to determine input type"); + + Method processElementMethod = + findAnnotatedMethod(errors, DoFn.ProcessElement.class, fnClass, true); + Method startBundleMethod = findAnnotatedMethod(errors, DoFn.StartBundle.class, fnClass, false); + Method finishBundleMethod = + findAnnotatedMethod(errors, DoFn.FinishBundle.class, fnClass, false); + Method setupMethod = findAnnotatedMethod(errors, DoFn.Setup.class, fnClass, false); + Method teardownMethod = findAnnotatedMethod(errors, DoFn.Teardown.class, fnClass, false); + + ErrorReporter processElementErrors = + errors.forMethod(DoFn.ProcessElement.class, processElementMethod); + DoFnSignature.ProcessElementMethod processElement = + analyzeProcessElementMethod( + processElementErrors, fnToken, processElementMethod, inputT, outputT); + builder.setProcessElement(processElement); + + if (startBundleMethod != null) { + ErrorReporter startBundleErrors = errors.forMethod(DoFn.StartBundle.class, startBundleMethod); + builder.setStartBundle( + analyzeBundleMethod(startBundleErrors, fnToken, startBundleMethod, inputT, outputT)); + } + + if (finishBundleMethod != null) { + ErrorReporter finishBundleErrors = + errors.forMethod(DoFn.FinishBundle.class, finishBundleMethod); + builder.setFinishBundle( + analyzeBundleMethod(finishBundleErrors, fnToken, finishBundleMethod, inputT, outputT)); + } + + if (setupMethod != null) { + builder.setSetup( + analyzeLifecycleMethod(errors.forMethod(DoFn.Setup.class, setupMethod), setupMethod)); + } + + if (teardownMethod != null) { + builder.setTeardown( + analyzeLifecycleMethod( + errors.forMethod(DoFn.Teardown.class, teardownMethod), teardownMethod)); + } + + return builder.build(); } /** @@ -139,10 +162,12 @@ public class DoFnSignatures { @VisibleForTesting static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod( - TypeToken<? extends DoFn> fnClass, Method m, TypeToken<?> inputT, TypeToken<?> outputT) { - checkArgument( - void.class.equals(m.getReturnType()), "%s must have a void return type", format(m)); - checkArgument(!m.isVarArgs(), "%s must not have var args", format(m)); + ErrorReporter errors, + TypeToken<? extends DoFn> fnClass, + Method m, + TypeToken<?> inputT, + TypeToken<?> outputT) { + errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void"); TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT); @@ -151,57 +176,49 @@ public class DoFnSignatures { if (params.length > 0) { contextToken = fnClass.resolveType(params[0]); } - checkArgument( + errors.checkArgument( contextToken != null && contextToken.equals(processContextToken), - "%s must take a %s as its first argument", - format(m), + "Must take %s as the first argument", formatType(processContextToken)); - List<DoFnSignature.ProcessElementMethod.Parameter> extraParameters = new ArrayList<>(); + List<DoFnSignature.Parameter> extraParameters = new ArrayList<>(); + TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT); TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT); for (int i = 1; i < params.length; ++i) { - TypeToken<?> param = fnClass.resolveType(params[i]); - Class<?> rawType = param.getRawType(); + TypeToken<?> paramT = fnClass.resolveType(params[i]); + Class<?> rawType = paramT.getRawType(); if (rawType.equals(BoundedWindow.class)) { - checkArgument( - !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW), - "Multiple BoundedWindow parameters in %s", - format(m)); - extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.BOUNDED_WINDOW); + errors.checkArgument( + !extraParameters.contains(DoFnSignature.Parameter.BOUNDED_WINDOW), + "Multiple BoundedWindow parameters"); + extraParameters.add(DoFnSignature.Parameter.BOUNDED_WINDOW); } else if (rawType.equals(DoFn.InputProvider.class)) { - checkArgument( - !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER), - "Multiple InputProvider parameters in %s", - format(m)); - checkArgument( - param.equals(expectedInputProviderT), - "Wrong type of InputProvider parameter for method %s: %s, should be %s", - format(m), - formatType(param), + errors.checkArgument( + !extraParameters.contains(DoFnSignature.Parameter.INPUT_PROVIDER), + "Multiple InputProvider parameters"); + errors.checkArgument( + paramT.equals(expectedInputProviderT), + "Wrong type of InputProvider parameter: %s, should be %s", + formatType(paramT), formatType(expectedInputProviderT)); - extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.INPUT_PROVIDER); + extraParameters.add(DoFnSignature.Parameter.INPUT_PROVIDER); } else if (rawType.equals(DoFn.OutputReceiver.class)) { - checkArgument( - !extraParameters.contains(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER), - "Multiple OutputReceiver parameters in %s", - format(m)); - checkArgument( - param.equals(expectedOutputReceiverT), - "Wrong type of OutputReceiver parameter for method %s: %s, should be %s", - format(m), - formatType(param), + errors.checkArgument( + !extraParameters.contains(DoFnSignature.Parameter.OUTPUT_RECEIVER), + "Multiple OutputReceiver parameters"); + errors.checkArgument( + paramT.equals(expectedOutputReceiverT), + "Wrong type of OutputReceiver parameter: %s, should be %s", + formatType(paramT), formatType(expectedOutputReceiverT)); - extraParameters.add(DoFnSignature.ProcessElementMethod.Parameter.OUTPUT_RECEIVER); + extraParameters.add(DoFnSignature.Parameter.OUTPUT_RECEIVER); } else { List<String> allowedParamTypes = Arrays.asList(formatType(new TypeToken<BoundedWindow>() {})); - checkArgument( - false, - "%s is not a valid context parameter for method %s. Should be one of %s", - formatType(param), - format(m), - allowedParamTypes); + errors.throwIllegalArgument( + "%s is not a valid context parameter. Should be one of %s", + formatType(paramT), allowedParamTypes); } } @@ -210,35 +227,25 @@ public class DoFnSignatures { @VisibleForTesting static DoFnSignature.BundleMethod analyzeBundleMethod( - TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT, TypeToken<?> outputT) { - checkArgument( - void.class.equals(m.getReturnType()), "%s must have a void return type", format(m)); - checkArgument(!m.isVarArgs(), "%s must not have var args", format(m)); - + ErrorReporter errors, + TypeToken<? extends DoFn> fnToken, + Method m, + TypeToken<?> inputT, + TypeToken<?> outputT) { + errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void"); TypeToken<?> expectedContextToken = doFnContextTypeOf(inputT, outputT); - Type[] params = m.getGenericParameterTypes(); - checkArgument( - params.length == 1, - "%s must have a single argument of type %s", - format(m), + errors.checkArgument( + params.length == 1 && fnToken.resolveType(params[0]).equals(expectedContextToken), + "Must take a single argument of type %s", formatType(expectedContextToken)); - TypeToken<?> contextToken = fnToken.resolveType(params[0]); - checkArgument( - contextToken.equals(expectedContextToken), - "Wrong type of context argument to %s: %s, must be %s", - format(m), - formatType(contextToken), - formatType(expectedContextToken)); - return DoFnSignature.BundleMethod.create(m); } - private static DoFnSignature.LifecycleMethod analyzeLifecycleMethod(Method m) { - checkArgument( - void.class.equals(m.getReturnType()), "%s must have a void return type", format(m)); - checkArgument( - m.getGenericParameterTypes().length == 0, "%s must take zero arguments", format(m)); + private static DoFnSignature.LifecycleMethod analyzeLifecycleMethod( + ErrorReporter errors, Method m) { + errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void"); + errors.checkArgument(m.getGenericParameterTypes().length == 0, "Must take zero arguments"); return DoFnSignature.LifecycleMethod.create(m); } @@ -272,15 +279,11 @@ public class DoFnSignatures { } private static Method findAnnotatedMethod( - Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) { + ErrorReporter errors, Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) { Collection<Method> matches = declaredMethodsWithAnnotation(anno, fnClazz, DoFn.class); if (matches.size() == 0) { - checkArgument( - !required, - "No method annotated with @%s found in %s", - anno.getSimpleName(), - fnClazz.getName()); + errors.checkArgument(!required, "No method annotated with @%s found", anno.getSimpleName()); return null; } @@ -289,7 +292,7 @@ public class DoFnSignatures { // classes). Method first = matches.iterator().next(); for (Method other : matches) { - checkArgument( + errors.checkArgument( first.getName().equals(other.getName()) && Arrays.equals(first.getParameterTypes(), other.getParameterTypes()), "Found multiple methods annotated with @%s. [%s] and [%s]", @@ -298,22 +301,50 @@ public class DoFnSignatures { format(other)); } + ErrorReporter methodErrors = errors.forMethod(anno, first); // We need to be able to call it. We require it is public. - checkArgument( - (first.getModifiers() & Modifier.PUBLIC) != 0, "%s must be public", format(first)); - + methodErrors.checkArgument((first.getModifiers() & Modifier.PUBLIC) != 0, "Must be public"); // And make sure its not static. - checkArgument( - (first.getModifiers() & Modifier.STATIC) == 0, "%s must not be static", format(first)); + methodErrors.checkArgument((first.getModifiers() & Modifier.STATIC) == 0, "Must not be static"); return first; } - private static String format(Method m) { - return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m); + private static String format(Method method) { + return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(method); } private static String formatType(TypeToken<?> t) { return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType()); } + + static class ErrorReporter { + private final String label; + + ErrorReporter(@Nullable ErrorReporter root, String label) { + this.label = (root == null) ? label : String.format("%s, %s", root.label, label); + } + + ErrorReporter forMethod(Class<? extends Annotation> annotation, Method method) { + return new ErrorReporter( + this, + String.format("@%s %s", annotation, (method == null) ? "(absent)" : format(method))); + } + + void throwIllegalArgument(String message, Object... args) { + throw new IllegalArgumentException(label + ": " + String.format(message, args)); + } + + public void checkArgument(boolean condition, String message, Object... args) { + if (!condition) { + throwIllegalArgument(message, args); + } + } + + public void checkNotNull(Object value, String message, Object... args) { + if (value == null) { + throwIllegalArgument(message, args); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java index 2034eba..2d92162 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java @@ -33,8 +33,6 @@ import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.lang.reflect.WildcardType; import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashSet; import java.util.Queue; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -153,26 +151,6 @@ public class ReflectHelpers { }; /** - * Returns all interfaces of the given clazz. - * @param clazz - * @return - */ - public static FluentIterable<Class<?>> getClosureOfInterfaces(Class<?> clazz) { - checkNotNull(clazz); - Queue<Class<?>> interfacesToProcess = Queues.newArrayDeque(); - Collections.addAll(interfacesToProcess, clazz.getInterfaces()); - - LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>(); - while (!interfacesToProcess.isEmpty()) { - Class<?> current = interfacesToProcess.remove(); - if (interfaces.add(current)) { - Collections.addAll(interfacesToProcess, current.getInterfaces()); - } - } - return FluentIterable.from(interfaces); - } - - /** * Returns all the methods visible from the provided interfaces. * * @param interfaces The interfaces to use when searching for all their methods. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 9317ea2..e59cce8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -17,11 +17,12 @@ */ package org.apache.beam.sdk.transforms.reflect; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.UserCodeException; import org.junit.Before; @@ -36,20 +37,6 @@ import org.mockito.MockitoAnnotations; /** Tests for {@link DoFnInvokers}. */ @RunWith(JUnit4.class) public class DoFnInvokersTest { - /** A convenience struct holding flags that indicate whether a particular method was invoked. */ - public static class Invocations { - public boolean wasProcessElementInvoked = false; - public boolean wasStartBundleInvoked = false; - public boolean wasFinishBundleInvoked = false; - public boolean wasSetupInvoked = false; - public boolean wasTeardownInvoked = false; - private final String name; - - public Invocations(String name) { - this.name = name; - } - } - @Rule public ExpectedException thrown = ExpectedException.none(); @Mock private DoFn.ProcessContext mockContext; @@ -81,102 +68,10 @@ public class DoFnInvokersTest { }; } - private void checkInvokeProcessElementWorks(DoFn<String, String> fn, Invocations... invocations) - throws Exception { - assertTrue("Need at least one invocation to check", invocations.length >= 1); - for (Invocations invocation : invocations) { - assertFalse( - "Should not yet have called processElement on " + invocation.name, - invocation.wasProcessElementInvoked); - } + private void invokeProcessElement(DoFn<String, String> fn) { DoFnInvokers.INSTANCE .newByteBuddyInvoker(fn) .invokeProcessElement(mockContext, extraContextFactory); - for (Invocations invocation : invocations) { - assertTrue( - "Should have called processElement on " + invocation.name, - invocation.wasProcessElementInvoked); - } - } - - private void checkInvokeStartBundleWorks(DoFn<String, String> fn, Invocations... invocations) - throws Exception { - assertTrue("Need at least one invocation to check", invocations.length >= 1); - for (Invocations invocation : invocations) { - assertFalse( - "Should not yet have called startBundle on " + invocation.name, - invocation.wasStartBundleInvoked); - } - DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(mockContext); - for (Invocations invocation : invocations) { - assertTrue( - "Should have called startBundle on " + invocation.name, invocation.wasStartBundleInvoked); - } - } - - private void checkInvokeFinishBundleWorks(DoFn<String, String> fn, Invocations... invocations) - throws Exception { - assertTrue("Need at least one invocation to check", invocations.length >= 1); - for (Invocations invocation : invocations) { - assertFalse( - "Should not yet have called finishBundle on " + invocation.name, - invocation.wasFinishBundleInvoked); - } - DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(mockContext); - for (Invocations invocation : invocations) { - assertTrue( - "Should have called finishBundle on " + invocation.name, - invocation.wasFinishBundleInvoked); - } - } - - private void checkInvokeSetupWorks(DoFn<String, String> fn, Invocations... invocations) - throws Exception { - assertTrue("Need at least one invocation to check", invocations.length >= 1); - for (Invocations invocation : invocations) { - assertFalse( - "Should not yet have called setup on " + invocation.name, invocation.wasSetupInvoked); - } - DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeSetup(); - for (Invocations invocation : invocations) { - assertTrue("Should have called setup on " + invocation.name, invocation.wasSetupInvoked); - } - } - - private void checkInvokeTeardownWorks(DoFn<String, String> fn, Invocations... invocations) - throws Exception { - assertTrue("Need at least one invocation to check", invocations.length >= 1); - for (Invocations invocation : invocations) { - assertFalse( - "Should not yet have called teardown on " + invocation.name, - invocation.wasTeardownInvoked); - } - DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeTeardown(); - for (Invocations invocation : invocations) { - assertTrue( - "Should have called teardown on " + invocation.name, invocation.wasTeardownInvoked); - } - } - - @Test - public void testDoFnWithNoExtraContext() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFn<String, String> fn = - new DoFn<String, String>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - invocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - } - }; - - assertFalse( - DoFnSignatures.INSTANCE - .getOrParseSignature(fn.getClass()) - .processElement() - .usesSingleWindow()); - - checkInvokeProcessElementWorks(fn, invocations); } @Test @@ -190,6 +85,21 @@ public class DoFnInvokersTest { DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn2).getClass()); } + // --------------------------------------------------------------------------------------- + // Tests for general invocations of DoFn methods. + // --------------------------------------------------------------------------------------- + + @Test + public void testDoFnWithNoExtraContext() throws Exception { + class MockFn extends DoFn<String, String> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception {} + } + MockFn fn = mock(MockFn.class); + invokeProcessElement(fn); + verify(fn).processElement(mockContext); + } + interface InterfaceWithProcessElement { @DoFn.ProcessElement void processElement(DoFn<String, String>.ProcessContext c); @@ -199,302 +109,221 @@ public class DoFnInvokersTest { private class IdentityUsingInterfaceWithProcessElement extends DoFn<String, String> implements LayersOfInterfaces { - - private Invocations invocations = new Invocations("Named Class"); - @Override - public void processElement(DoFn<String, String>.ProcessContext c) { - invocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - } + public void processElement(DoFn<String, String>.ProcessContext c) {} } @Test public void testDoFnWithProcessElementInterface() throws Exception { - IdentityUsingInterfaceWithProcessElement fn = new IdentityUsingInterfaceWithProcessElement(); - assertFalse( - DoFnSignatures.INSTANCE - .getOrParseSignature(fn.getClass()) - .processElement() - .usesSingleWindow()); - checkInvokeProcessElementWorks(fn, fn.invocations); + IdentityUsingInterfaceWithProcessElement fn = + mock(IdentityUsingInterfaceWithProcessElement.class); + invokeProcessElement(fn); + verify(fn).processElement(mockContext); } private class IdentityParent extends DoFn<String, String> { - protected Invocations parentInvocations = new Invocations("IdentityParent"); - @ProcessElement - public void process(ProcessContext c) { - parentInvocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - } + public void process(ProcessContext c) {} } private class IdentityChildWithoutOverride extends IdentityParent {} private class IdentityChildWithOverride extends IdentityParent { - protected Invocations childInvocations = new Invocations("IdentityChildWithOverride"); - @Override public void process(DoFn<String, String>.ProcessContext c) { super.process(c); - childInvocations.wasProcessElementInvoked = true; } } @Test public void testDoFnWithMethodInSuperclass() throws Exception { - IdentityChildWithoutOverride fn = new IdentityChildWithoutOverride(); - assertFalse( - DoFnSignatures.INSTANCE - .getOrParseSignature(fn.getClass()) - .processElement() - .usesSingleWindow()); - checkInvokeProcessElementWorks(fn, fn.parentInvocations); + IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class); + invokeProcessElement(fn); + verify(fn).process(mockContext); } @Test public void testDoFnWithMethodInSubclass() throws Exception { - IdentityChildWithOverride fn = new IdentityChildWithOverride(); - assertFalse( - DoFnSignatures.INSTANCE - .getOrParseSignature(fn.getClass()) - .processElement() - .usesSingleWindow()); - checkInvokeProcessElementWorks(fn, fn.parentInvocations, fn.childInvocations); + IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class); + invokeProcessElement(fn); + verify(fn).process(mockContext); } @Test public void testDoFnWithWindow() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFn<String, String> fn = - new DoFn<String, String>() { - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow w) throws Exception { - invocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - assertSame(w, mockWindow); - } - }; - - assertTrue( - DoFnSignatures.INSTANCE - .getOrParseSignature(fn.getClass()) - .processElement() - .usesSingleWindow()); - - checkInvokeProcessElementWorks(fn, invocations); + class MockFn extends DoFn<String, String> { + @DoFn.ProcessElement + public void processElement(ProcessContext c, BoundedWindow w) throws Exception {} + } + MockFn fn = mock(MockFn.class); + invokeProcessElement(fn); + verify(fn).processElement(mockContext, mockWindow); } @Test public void testDoFnWithOutputReceiver() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFn<String, String> fn = - new DoFn<String, String>() { - @ProcessElement - public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception { - invocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - assertSame(o, mockOutputReceiver); - } - }; - - assertFalse( - DoFnSignatures.INSTANCE - .getOrParseSignature(fn.getClass()) - .processElement() - .usesSingleWindow()); - - checkInvokeProcessElementWorks(fn, invocations); + class MockFn extends DoFn<String, String> { + @DoFn.ProcessElement + public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {} + } + MockFn fn = mock(MockFn.class); + invokeProcessElement(fn); + verify(fn).processElement(mockContext, mockOutputReceiver); } @Test public void testDoFnWithInputProvider() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFn<String, String> fn = - new DoFn<String, String>() { - @ProcessElement - public void processElement(ProcessContext c, InputProvider<String> i) throws Exception { - invocations.wasProcessElementInvoked = true; - assertSame(c, mockContext); - assertSame(i, mockInputProvider); - } - }; - - assertFalse( - DoFnSignatures.INSTANCE - .getOrParseSignature(fn.getClass()) - .processElement() - .usesSingleWindow()); - - checkInvokeProcessElementWorks(fn, invocations); - } - - @Test - public void testDoFnWithStartBundle() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFn<String, String> fn = - new DoFn<String, String>() { - @ProcessElement - public void processElement(@SuppressWarnings("unused") ProcessContext c) {} - - @StartBundle - public void startBundle(Context c) { - invocations.wasStartBundleInvoked = true; - assertSame(c, mockContext); - } - - @FinishBundle - public void finishBundle(Context c) { - invocations.wasFinishBundleInvoked = true; - assertSame(c, mockContext); - } - }; - - checkInvokeStartBundleWorks(fn, invocations); - checkInvokeFinishBundleWorks(fn, invocations); + class MockFn extends DoFn<String, String> { + @DoFn.ProcessElement + public void processElement(ProcessContext c, InputProvider<String> o) throws Exception {} + } + MockFn fn = mock(MockFn.class); + invokeProcessElement(fn); + verify(fn).processElement(mockContext, mockInputProvider); } @Test - public void testDoFnWithSetupTeardown() throws Exception { - final Invocations invocations = new Invocations("AnonymousClass"); - DoFn<String, String> fn = - new DoFn<String, String>() { - @ProcessElement - public void processElement(@SuppressWarnings("unused") ProcessContext c) {} - - @StartBundle - public void startBundle(Context c) { - invocations.wasStartBundleInvoked = true; - assertSame(c, mockContext); - } + public void testDoFnWithStartBundleSetupTeardown() throws Exception { + class MockFn extends DoFn<String, String> { + @ProcessElement + public void processElement(ProcessContext c) {} - @FinishBundle - public void finishBundle(Context c) { - invocations.wasFinishBundleInvoked = true; - assertSame(c, mockContext); - } + @StartBundle + public void startBundle(Context c) {} - @Setup - public void before() { - invocations.wasSetupInvoked = true; - } + @FinishBundle + public void finishBundle(Context c) {} - @Teardown - public void after() { - invocations.wasTeardownInvoked = true; - } - }; + @Setup + public void before() {} - checkInvokeSetupWorks(fn, invocations); - checkInvokeTeardownWorks(fn, invocations); - } + @Teardown + public void after() {} + } + MockFn fn = mock(MockFn.class); + DoFnInvoker<String, String> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + invoker.invokeSetup(); + invoker.invokeStartBundle(mockContext); + invoker.invokeFinishBundle(mockContext); + invoker.invokeTeardown(); + verify(fn).before(); + verify(fn).startBundle(mockContext); + verify(fn).finishBundle(mockContext); + verify(fn).after(); + } + + // --------------------------------------------------------------------------------------- + // Tests for ability to invoke private, inner and anonymous classes. + // --------------------------------------------------------------------------------------- private static class PrivateDoFnClass extends DoFn<String, String> { - final Invocations invocations = new Invocations(getClass().getName()); - @ProcessElement - public void processThis(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } + public void processThis(ProcessContext c) {} } @Test public void testLocalPrivateDoFnClass() throws Exception { - PrivateDoFnClass fn = new PrivateDoFnClass(); - checkInvokeProcessElementWorks(fn, fn.invocations); + PrivateDoFnClass fn = mock(PrivateDoFnClass.class); + invokeProcessElement(fn); + verify(fn).processThis(mockContext); } @Test public void testStaticPackagePrivateDoFnClass() throws Exception { - Invocations invocations = new Invocations("StaticPackagePrivateDoFn"); - checkInvokeProcessElementWorks( - DoFnInvokersTestHelper.newStaticPackagePrivateDoFn(invocations), invocations); + DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass()); + invokeProcessElement(fn); + DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockContext); } @Test public void testInnerPackagePrivateDoFnClass() throws Exception { - Invocations invocations = new Invocations("InnerPackagePrivateDoFn"); - checkInvokeProcessElementWorks( - new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn(invocations), invocations); + DoFn<String, String> fn = + mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass()); + invokeProcessElement(fn); + DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockContext); } @Test public void testStaticPrivateDoFnClass() throws Exception { - Invocations invocations = new Invocations("StaticPrivateDoFn"); - checkInvokeProcessElementWorks( - DoFnInvokersTestHelper.newStaticPrivateDoFn(invocations), invocations); + DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass()); + invokeProcessElement(fn); + DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockContext); } @Test public void testInnerPrivateDoFnClass() throws Exception { - Invocations invocations = new Invocations("StaticInnerDoFn"); - checkInvokeProcessElementWorks( - new DoFnInvokersTestHelper().newInnerPrivateDoFn(invocations), invocations); + DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass()); + invokeProcessElement(fn); + DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockContext); } @Test - public void testAnonymousInnerDoFnInOtherPackage() throws Exception { - Invocations invocations = new Invocations("AnonymousInnerDoFnInOtherPackage"); - checkInvokeProcessElementWorks( - new DoFnInvokersTestHelper().newInnerAnonymousDoFn(invocations), invocations); + public void testAnonymousInnerDoFn() throws Exception { + DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass()); + invokeProcessElement(fn); + DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockContext); } @Test public void testStaticAnonymousDoFnInOtherPackage() throws Exception { - Invocations invocations = new Invocations("AnonymousStaticDoFnInOtherPackage"); - checkInvokeProcessElementWorks( - DoFnInvokersTestHelper.newStaticAnonymousDoFn(invocations), invocations); + // Can't use mockito for this one - the anonymous class is final and can't be mocked. + DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFn(); + invokeProcessElement(fn); + DoFnInvokersTestHelper.verifyStaticAnonymousDoFnInvoked(fn, mockContext); } + // --------------------------------------------------------------------------------------- + // Tests for wrapping exceptions. + // --------------------------------------------------------------------------------------- + @Test public void testProcessElementException() throws Exception { - DoFn<Integer, Integer> fn = - new DoFn<Integer, Integer>() { - @ProcessElement - public void processElement(@SuppressWarnings("unused") ProcessContext c) { - throw new IllegalArgumentException("bogus"); - } - }; - + DoFnInvoker<Integer, Integer> invoker = + DoFnInvokers.INSTANCE.newByteBuddyInvoker( + new DoFn<Integer, Integer>() { + @ProcessElement + public void processElement(@SuppressWarnings("unused") ProcessContext c) { + throw new IllegalArgumentException("bogus"); + } + }); thrown.expect(UserCodeException.class); thrown.expectMessage("bogus"); - DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeProcessElement(null, null); + invoker.invokeProcessElement(null, null); } @Test public void testStartBundleException() throws Exception { - DoFn<Integer, Integer> fn = - new DoFn<Integer, Integer>() { - @StartBundle - public void startBundle(@SuppressWarnings("unused") Context c) { - throw new IllegalArgumentException("bogus"); - } - - @ProcessElement - public void processElement(@SuppressWarnings("unused") ProcessContext c) {} - }; - + DoFnInvoker<Integer, Integer> invoker = + DoFnInvokers.INSTANCE.newByteBuddyInvoker( + new DoFn<Integer, Integer>() { + @StartBundle + public void startBundle(@SuppressWarnings("unused") Context c) { + throw new IllegalArgumentException("bogus"); + } + + @ProcessElement + public void processElement(@SuppressWarnings("unused") ProcessContext c) {} + }); thrown.expect(UserCodeException.class); thrown.expectMessage("bogus"); - DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeStartBundle(null); + invoker.invokeStartBundle(null); } @Test public void testFinishBundleException() throws Exception { - DoFn<Integer, Integer> fn = - new DoFn<Integer, Integer>() { - @FinishBundle - public void finishBundle(@SuppressWarnings("unused") Context c) { - throw new IllegalArgumentException("bogus"); - } - - @ProcessElement - public void processElement(@SuppressWarnings("unused") ProcessContext c) {} - }; - + DoFnInvoker<Integer, Integer> invoker = + DoFnInvokers.INSTANCE.newByteBuddyInvoker( + new DoFn<Integer, Integer>() { + @FinishBundle + public void finishBundle(@SuppressWarnings("unused") Context c) { + throw new IllegalArgumentException("bogus"); + } + + @ProcessElement + public void processElement(@SuppressWarnings("unused") ProcessContext c) {} + }); thrown.expect(UserCodeException.class); thrown.expectMessage("bogus"); - DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn).invokeFinishBundle(null); + invoker.invokeFinishBundle(null); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java deleted file mode 100644 index 7bfdddc..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTestHelper.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.reflect; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokersTest.Invocations; - -/** - * Test helper for {@link DoFnInvokersTest}, which needs to test package-private access - * to DoFns in other packages. - */ -public class DoFnInvokersTestHelper { - - private static class StaticPrivateDoFn extends DoFn<String, String> { - final Invocations invocations; - - public StaticPrivateDoFn(Invocations invocations) { - this.invocations = invocations; - } - - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - } - - private class InnerPrivateDoFn extends DoFn<String, String> { - final Invocations invocations; - - public InnerPrivateDoFn(Invocations invocations) { - this.invocations = invocations; - } - - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - } - - static class StaticPackagePrivateDoFn extends DoFn<String, String> { - final Invocations invocations; - - public StaticPackagePrivateDoFn(Invocations invocations) { - this.invocations = invocations; - } - - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - } - - class InnerPackagePrivateDoFn extends DoFn<String, String> { - final Invocations invocations; - - public InnerPackagePrivateDoFn(Invocations invocations) { - this.invocations = invocations; - } - - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - } - - public static DoFn<String, String> newStaticPackagePrivateDoFn( - Invocations invocations) { - return new StaticPackagePrivateDoFn(invocations); - } - - public DoFn<String, String> newInnerPackagePrivateDoFn(Invocations invocations) { - return new InnerPackagePrivateDoFn(invocations); - } - - public static DoFn<String, String> newStaticPrivateDoFn(Invocations invocations) { - return new StaticPrivateDoFn(invocations); - } - - public DoFn<String, String> newInnerPrivateDoFn(Invocations invocations) { - return new InnerPrivateDoFn(invocations); - } - - public DoFn<String, String> newInnerAnonymousDoFn(final Invocations invocations) { - return new DoFn<String, String>() { - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - }; - } - - public static DoFn<String, String> newStaticAnonymousDoFn( - final Invocations invocations) { - return new DoFn<String, String>() { - @ProcessElement - public void process(ProcessContext c) { - invocations.wasProcessElementInvoked = true; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75c8bb8d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java new file mode 100644 index 0000000..c269dbd --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.analyzeProcessElementMethod; + +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DoFnSignatures} verification of the {@link DoFn.ProcessElement} method. */ +@SuppressWarnings("unused") +@RunWith(JUnit4.class) +public class DoFnSignaturesProcessElementTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testMissingProcessContext() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Must take ProcessContext<> as the first argument"); + + analyzeProcessElementMethod( + new AnonymousMethod() { + private void method() {} + }); + } + + @Test + public void testBadProcessContextType() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Must take ProcessContext<> as the first argument"); + + analyzeProcessElementMethod( + new AnonymousMethod() { + private void method(String s) {} + }); + } + + @Test + public void testBadExtraProcessContextType() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Integer is not a valid context parameter. " + + "Should be one of [BoundedWindow]"); + + analyzeProcessElementMethod( + new AnonymousMethod() { + private void method(DoFn<Integer, String>.ProcessContext c, Integer n) {} + }); + } + + @Test + public void testBadReturnType() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Must return void"); + + analyzeProcessElementMethod( + new AnonymousMethod() { + private int method(DoFn<Integer, String>.ProcessContext context) { + return 0; + } + }); + } + + @Test + public void testGoodConcreteTypes() throws Exception { + analyzeProcessElementMethod( + new AnonymousMethod() { + private void method( + DoFn<Integer, String>.ProcessContext c, + DoFn.InputProvider<Integer> input, + DoFn.OutputReceiver<String> output) {} + }); + } + + @Test + public void testBadGenericsTwoArgs() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Wrong type of OutputReceiver parameter: " + + "OutputReceiver<Integer>, should be OutputReceiver<String>"); + + analyzeProcessElementMethod( + new AnonymousMethod() { + private void method( + DoFn<Integer, String>.ProcessContext c, + DoFn.InputProvider<Integer> input, + DoFn.OutputReceiver<Integer> output) {} + }); + } + + @Test + public void testBadGenericWildCards() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Wrong type of OutputReceiver parameter: " + + "OutputReceiver<? super Integer>, should be OutputReceiver<String>"); + + analyzeProcessElementMethod( + new AnonymousMethod() { + private void method( + DoFn<Integer, String>.ProcessContext c, + DoFn.InputProvider<Integer> input, + DoFn.OutputReceiver<? super Integer> output) {} + }); + } + + static class BadTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> { + @ProcessElement + @SuppressWarnings("unused") + public void badTypeVariables( + DoFn<InputT, OutputT>.ProcessContext c, + DoFn.InputProvider<InputT> input, + DoFn.OutputReceiver<InputT> output) {} + } + + @Test + public void testBadTypeVariables() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Wrong type of OutputReceiver parameter: " + + "OutputReceiver<InputT>, should be OutputReceiver<OutputT>"); + + DoFnSignatures.INSTANCE.getOrParseSignature(BadTypeVariables.class); + } + + @Test + public void testNoProcessElement() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("No method annotated with @ProcessElement found"); + thrown.expectMessage(getClass().getName() + "$"); + DoFnSignatures.INSTANCE.getOrParseSignature(new DoFn<String, String>() {}.getClass()); + } + + @Test + public void testMultipleProcessElement() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Found multiple methods annotated with @ProcessElement"); + thrown.expectMessage("foo()"); + thrown.expectMessage("bar()"); + thrown.expectMessage(getClass().getName() + "$"); + DoFnSignatures.INSTANCE.getOrParseSignature( + new DoFn<String, String>() { + @ProcessElement + public void foo() {} + + @ProcessElement + public void bar() {} + }.getClass()); + } + + @Test + public void testPrivateProcessElement() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("process()"); + thrown.expectMessage("Must be public"); + thrown.expectMessage(getClass().getName() + "$"); + DoFnSignatures.INSTANCE.getOrParseSignature( + new DoFn<String, String>() { + @ProcessElement + private void process() {} + }.getClass()); + } + + private static class GoodTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> { + @ProcessElement + @SuppressWarnings("unused") + public void goodTypeVariables( + DoFn<InputT, OutputT>.ProcessContext c, + DoFn.InputProvider<InputT> input, + DoFn.OutputReceiver<OutputT> output) {} + } + + @Test + public void testGoodTypeVariables() throws Exception { + DoFnSignatures.INSTANCE.getOrParseSignature(GoodTypeVariables.class); + } + + private static class IdentityFn<T> extends DoFn<T, T> { + @ProcessElement + @SuppressWarnings("unused") + public void processElement(ProcessContext c, InputProvider<T> input, OutputReceiver<T> output) { + c.output(c.element()); + } + } + + private static class IdentityListFn<T> extends IdentityFn<List<T>> {} + + @Test + public void testIdentityFnApplied() throws Exception { + DoFnSignatures.INSTANCE.getOrParseSignature(new IdentityFn<String>() {}.getClass()); + } +}