Remove OnTimerInvokers.INSTANCE; deprecate DoFnInvokers.INSTANCE
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/14a71e43 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/14a71e43 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/14a71e43 Branch: refs/heads/master Commit: 14a71e435acd9435ce02afe774df3adebd7355f0 Parents: efad9d4 Author: Kenneth Knowles <[email protected]> Authored: Mon Nov 7 23:03:46 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Thu Nov 10 14:18:07 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunner.java | 2 +- .../beam/runners/core/SplittableParDo.java | 10 +- .../runners/direct/DoFnLifecycleManager.java | 4 +- .../beam/sdk/transforms/DoFnAdapters.java | 4 +- .../reflect/ByteBuddyDoFnInvokerFactory.java | 828 +++++++++++++++++++ .../reflect/ByteBuddyOnTimerInvokerFactory.java | 279 +++++++ .../transforms/reflect/DoFnInvokerFactory.java | 27 + .../sdk/transforms/reflect/DoFnInvokers.java | 711 +--------------- .../reflect/OnTimerInvokerFactory.java | 36 + .../sdk/transforms/reflect/OnTimerInvokers.java | 243 +----- .../transforms/reflect/DoFnInvokersTest.java | 24 +- .../transforms/reflect/OnTimerInvokersTest.java | 2 +- .../transforms/DoFnInvokersBenchmark.java | 2 +- 13 files changed, 1227 insertions(+), 945 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 2c5a850..3b784d1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -97,7 +97,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out this.fn = fn; this.observesWindow = DoFnSignatures.getSignature(fn.getClass()).processElement().observesWindow(); - this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + this.invoker = DoFnInvokers.invokerFor(fn); this.outputManager = outputManager; this.mainOutputTag = mainOutputTag; this.context = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 7143ffe..e05ba56 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -102,8 +102,8 @@ public class SplittableParDo< public PCollection<OutputT> apply(PCollection<InputT> input) { PCollection.IsBounded isFnBounded = signature.isBoundedPerElement(); Coder<RestrictionT> restrictionCoder = - DoFnInvokers.INSTANCE - .newByteBuddyInvoker(fn) + DoFnInvokers + .invokerFor(fn) .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry()); Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder = ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder); @@ -166,7 +166,7 @@ public class SplittableParDo< @Setup public void setup() { - invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + invoker = DoFnInvokers.invokerFor(fn); } @ProcessElement @@ -246,7 +246,7 @@ public class SplittableParDo< @Override public void setup() throws Exception { - invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + invoker = DoFnInvokers.invokerFor(fn); } @Override @@ -460,7 +460,7 @@ public class SplittableParDo< @Setup public void setup() { - invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(splittableFn); + invoker = DoFnInvokers.invokerFor(splittableFn); } @ProcessElement http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java index 472b28b..67d957c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java @@ -99,7 +99,7 @@ class DoFnLifecycleManager { public DoFn<?, ?> load(Thread key) throws Exception { DoFn<?, ?> fn = (DoFn<?, ?>) SerializableUtils.deserializeFromByteArray(original, "DoFn Copy in thread " + key.getName()); - DoFnInvokers.INSTANCE.invokerFor(fn).invokeSetup(); + DoFnInvokers.invokerFor(fn).invokeSetup(); return fn; } } @@ -108,7 +108,7 @@ class DoFnLifecycleManager { @Override public void onRemoval(RemovalNotification<Thread, DoFn<?, ?>> notification) { try { - DoFnInvokers.INSTANCE.newByteBuddyInvoker(notification.getValue()).invokeTeardown(); + DoFnInvokers.invokerFor(notification.getValue()).invokeTeardown(); } catch (Exception e) { thrownOnTeardown.put(notification.getKey(), e); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index a5e1c21..086b985 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -201,7 +201,7 @@ public class DoFnAdapters { SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) { super(fn.aggregators); this.fn = fn; - this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + this.invoker = DoFnInvokers.invokerFor(fn); } @Override @@ -254,7 +254,7 @@ public class DoFnAdapters { private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + this.invoker = DoFnInvokers.invokerFor(fn); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java new file mode 100644 index 0000000..825aa09 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -0,0 +1,828 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.NamingStrategy; +import net.bytebuddy.description.field.FieldDescription; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.modifier.FieldManifestation; +import net.bytebuddy.description.modifier.Visibility; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.description.type.TypeList; +import net.bytebuddy.dynamic.DynamicType; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.dynamic.scaffold.InstrumentedType; +import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; +import net.bytebuddy.implementation.ExceptionMethod; +import net.bytebuddy.implementation.FixedValue; +import net.bytebuddy.implementation.Implementation; +import net.bytebuddy.implementation.Implementation.Context; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.TargetMethodAnnotationDrivenBinder; +import net.bytebuddy.implementation.bytecode.ByteCodeAppender; +import net.bytebuddy.implementation.bytecode.StackManipulation; +import net.bytebuddy.implementation.bytecode.Throw; +import net.bytebuddy.implementation.bytecode.assign.Assigner; +import net.bytebuddy.implementation.bytecode.assign.TypeCasting; +import net.bytebuddy.implementation.bytecode.constant.TextConstant; +import net.bytebuddy.implementation.bytecode.member.FieldAccess; +import net.bytebuddy.implementation.bytecode.member.MethodInvocation; +import net.bytebuddy.implementation.bytecode.member.MethodReturn; +import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess; +import net.bytebuddy.jar.asm.Label; +import net.bytebuddy.jar.asm.MethodVisitor; +import net.bytebuddy.jar.asm.Opcodes; +import net.bytebuddy.jar.asm.Type; +import net.bytebuddy.matcher.ElementMatchers; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.DoFnAdapters; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** Dynamically generates a {@link DoFnInvoker} instances for invoking a {@link DoFn}. */ +public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { + + /** + * Returns a {@link ByteBuddyDoFnInvokerFactory} shared with all other invocations, so that its + * cache of generated classes is global. + */ + public static ByteBuddyDoFnInvokerFactory only() { + return INSTANCE; + } + + /** + * Creates a {@link DoFnInvoker} for the given {@link DoFn} by generating bytecode that directly + * invokes its methods with arguments extracted from the {@link ExtraContextFactory}. + */ + @Override + public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT, OutputT> fn) { + return newByteBuddyInvoker(fn); + } + + private static final ByteBuddyDoFnInvokerFactory INSTANCE = new ByteBuddyDoFnInvokerFactory(); + + private static final String FN_DELEGATE_FIELD_NAME = "delegate"; + + /** + * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class. + * Needed because generating an invoker class is expensive, and to avoid generating an excessive + * number of classes consuming PermGen memory. + */ + private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache = + new LinkedHashMap<>(); + + private ByteBuddyDoFnInvokerFactory() {} + + static class OldDoFnInvoker<InputT, OutputT> implements DoFnInvoker<InputT, OutputT> { + + private final OldDoFn<InputT, OutputT> fn; + + public OldDoFnInvoker(OldDoFn<InputT, OutputT> fn) { + this.fn = fn; + } + + @Override + public DoFn.ProcessContinuation invokeProcessElement( + DoFn<InputT, OutputT>.ProcessContext c, ExtraContextFactory<InputT, OutputT> extra) { + OldDoFn<InputT, OutputT>.ProcessContext oldCtx = + DoFnAdapters.adaptProcessContext(fn, c, extra); + try { + fn.processElement(oldCtx); + return DoFn.ProcessContinuation.stop(); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeStartBundle(DoFn.Context c) { + OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c); + try { + fn.startBundle(oldCtx); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeFinishBundle(DoFn.Context c) { + OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c); + try { + fn.finishBundle(oldCtx); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeSetup() { + try { + fn.setup(); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeTeardown() { + try { + fn.teardown(); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element) { + throw new UnsupportedOperationException("OldDoFn is not splittable"); + } + + @Override + public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder( + CoderRegistry coderRegistry) { + throw new UnsupportedOperationException("OldDoFn is not splittable"); + } + + @Override + public <RestrictionT> void invokeSplitRestriction( + InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) { + throw new UnsupportedOperationException("OldDoFn is not splittable"); + } + + @Override + public <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> + TrackerT invokeNewTracker(RestrictionT restriction) { + throw new UnsupportedOperationException("OldDoFn is not splittable"); + } + } + + /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker( + DoFn<InputT, OutputT> fn) { + return newByteBuddyInvoker( + DoFnSignatures.getSignature((Class) fn.getClass()), fn); + } + + /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */ + public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker( + DoFnSignature signature, DoFn<InputT, OutputT> fn) { + checkArgument( + signature.fnClass().equals(fn.getClass()), + "Signature is for class %s, but fn is of class %s", + signature.fnClass(), + fn.getClass()); + try { + @SuppressWarnings("unchecked") + DoFnInvoker<InputT, OutputT> invoker = + (DoFnInvoker<InputT, OutputT>) + getByteBuddyInvokerConstructor(signature).newInstance(fn); + return invoker; + } catch (InstantiationException + | IllegalAccessException + | IllegalArgumentException + | InvocationTargetException + | SecurityException e) { + throw new RuntimeException("Unable to bind invoker for " + fn.getClass(), e); + } + } + + /** + * Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFn} class. + * + * <p>These are cached such that at most one {@link DoFnInvoker} class exists for a given + * {@link DoFn} class. + */ + private synchronized Constructor<?> getByteBuddyInvokerConstructor( + DoFnSignature signature) { + Class<? extends DoFn<?, ?>> fnClass = signature.fnClass(); + Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass); + if (constructor == null) { + Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature); + try { + constructor = invokerClass.getConstructor(fnClass); + } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) { + throw new RuntimeException(e); + } + byteBuddyInvokerConstructorCache.put(fnClass, constructor); + } + return constructor; + } + + /** Default implementation of {@link DoFn.SplitRestriction}, for delegation by bytebuddy. */ + public static class DefaultSplitRestriction { + /** Doesn't split the restriction. */ + @SuppressWarnings("unused") + public static <InputT, RestrictionT> void invokeSplitRestriction( + InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) { + receiver.output(restriction); + } + } + + /** Default implementation of {@link DoFn.GetRestrictionCoder}, for delegation by bytebuddy. */ + public static class DefaultRestrictionCoder { + private final TypeDescriptor<?> restrictionType; + + DefaultRestrictionCoder(TypeDescriptor<?> restrictionType) { + this.restrictionType = restrictionType; + } + + /** Doesn't split the restriction. */ + @SuppressWarnings({"unused", "unchecked"}) + public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(CoderRegistry registry) + throws CannotProvideCoderException { + return (Coder) registry.getCoder(restrictionType); + } + } + + /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */ + private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) { + Class<? extends DoFn<?, ?>> fnClass = signature.fnClass(); + + final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass); + + DynamicType.Builder<?> builder = + new ByteBuddy() + // Create subclasses inside the target class, to have access to + // private and package-private bits + .with( + new NamingStrategy.SuffixingRandom("auxiliary") { + @Override + public String subclass(TypeDescription.Generic superClass) { + return super.name(clazzDescription); + } + }) + // Create a subclass of DoFnInvoker + .subclass(DoFnInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS) + .defineField( + FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL) + .defineConstructor(Visibility.PUBLIC) + .withParameter(fnClass) + .intercept(new InvokerConstructor()) + .method(ElementMatchers.named("invokeProcessElement")) + .intercept(new ProcessElementDelegation(signature.processElement())) + .method(ElementMatchers.named("invokeStartBundle")) + .intercept(delegateOrNoop(signature.startBundle())) + .method(ElementMatchers.named("invokeFinishBundle")) + .intercept(delegateOrNoop(signature.finishBundle())) + .method(ElementMatchers.named("invokeSetup")) + .intercept(delegateOrNoop(signature.setup())) + .method(ElementMatchers.named("invokeTeardown")) + .intercept(delegateOrNoop(signature.teardown())) + .method(ElementMatchers.named("invokeGetInitialRestriction")) + .intercept(delegateWithDowncastOrThrow(signature.getInitialRestriction())) + .method(ElementMatchers.named("invokeSplitRestriction")) + .intercept(splitRestrictionDelegation(signature)) + .method(ElementMatchers.named("invokeGetRestrictionCoder")) + .intercept(getRestrictionCoderDelegation(signature)) + .method(ElementMatchers.named("invokeNewTracker")) + .intercept(delegateWithDowncastOrThrow(signature.newTracker())); + + DynamicType.Unloaded<?> unloaded = builder.make(); + + @SuppressWarnings("unchecked") + Class<? extends DoFnInvoker<?, ?>> res = + (Class<? extends DoFnInvoker<?, ?>>) + unloaded + .load( + ByteBuddyDoFnInvokerFactory.class.getClassLoader(), + ClassLoadingStrategy.Default.INJECTION) + .getLoaded(); + return res; + } + + private static Implementation getRestrictionCoderDelegation(DoFnSignature signature) { + if (signature.processElement().isSplittable()) { + if (signature.getRestrictionCoder() == null) { + return MethodDelegation.to( + new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT())); + } else { + return new DowncastingParametersMethodDelegation( + signature.getRestrictionCoder().targetMethod()); + } + } else { + return ExceptionMethod.throwing(UnsupportedOperationException.class); + } + } + + private static Implementation splitRestrictionDelegation(DoFnSignature signature) { + if (signature.splitRestriction() == null) { + return MethodDelegation.to(DefaultSplitRestriction.class); + } else { + return new DowncastingParametersMethodDelegation(signature.splitRestriction().targetMethod()); + } + } + + /** Delegates to the given method if available, or does nothing. */ + private static Implementation delegateOrNoop(DoFnSignature.DoFnMethod method) { + return (method == null) + ? FixedValue.originType() + : new DoFnMethodDelegation(method.targetMethod()); + } + + /** Delegates to the given method if available, or throws UnsupportedOperationException. */ + private static Implementation delegateWithDowncastOrThrow(DoFnSignature.DoFnMethod method) { + return (method == null) + ? ExceptionMethod.throwing(UnsupportedOperationException.class) + : new DowncastingParametersMethodDelegation(method.targetMethod()); + } + + /** + * Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a + * "target method" of the wrapped {@link DoFn}. + */ + static class DoFnMethodDelegation implements Implementation { + /** The {@link MethodDescription} of the wrapped {@link DoFn}'s method. */ + protected final MethodDescription targetMethod; + /** Whether the target method returns non-void. */ + private final boolean targetHasReturn; + + private FieldDescription delegateField; + + public DoFnMethodDelegation(Method targetMethod) { + this.targetMethod = new MethodDescription.ForLoadedMethod(targetMethod); + targetHasReturn = !TypeDescription.VOID.equals(this.targetMethod.getReturnType().asErasure()); + } + + @Override + public InstrumentedType prepare(InstrumentedType instrumentedType) { + // Remember the field description of the instrumented type. + delegateField = + instrumentedType + .getDeclaredFields() + .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME)) + .getOnly(); + // Delegating the method call doesn't require any changes to the instrumented type. + return instrumentedType; + } + + @Override + public ByteCodeAppender appender(final Target implementationTarget) { + return new ByteCodeAppender() { + /** + * @param instrumentedMethod The {@link DoFnInvoker} method for which we're generating code. + */ + @Override + public Size apply( + MethodVisitor methodVisitor, + Context implementationContext, + MethodDescription instrumentedMethod) { + // Figure out how many locals we'll need. This corresponds to "this", the parameters + // of the instrumented method, and an argument to hold the return value if the target + // method has a return value. + int numLocals = 1 + instrumentedMethod.getParameters().size() + (targetHasReturn ? 1 : 0); + + Integer returnVarIndex = null; + if (targetHasReturn) { + // Local comes after formal parameters, so figure out where that is. + returnVarIndex = 1; // "this" + for (Type param : Type.getArgumentTypes(instrumentedMethod.getDescriptor())) { + returnVarIndex += param.getSize(); + } + } + + StackManipulation manipulation = + new StackManipulation.Compound( + // Push "this" (DoFnInvoker on top of the stack) + MethodVariableAccess.REFERENCE.loadOffset(0), + // Access this.delegate (DoFn on top of the stack) + FieldAccess.forField(delegateField).getter(), + // Run the beforeDelegation manipulations. + // The arguments necessary to invoke the target are on top of the stack. + beforeDelegation(instrumentedMethod), + // Perform the method delegation. + // This will consume the arguments on top of the stack + // Either the stack is now empty (because the targetMethod returns void) or the + // stack contains the return value. + new UserCodeMethodInvocation(returnVarIndex, targetMethod, instrumentedMethod), + // Run the afterDelegation manipulations. + // Either the stack is now empty (because the instrumentedMethod returns void) + // or the stack contains the return value. + afterDelegation(instrumentedMethod)); + + StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext); + return new Size(size.getMaximalSize(), numLocals); + } + }; + } + + /** + * Return the code to the prepare the operand stack for the method delegation. + * + * <p>Before this method is called, the stack delegate will be the only thing on the stack. + * + * <p>After this method is called, the stack contents should contain exactly the arguments + * necessary to invoke the target method. + */ + protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) { + return MethodVariableAccess.allArgumentsOf(targetMethod); + } + + /** + * Return the code to execute after the method delegation. + * + * <p>Before this method is called, the stack will either be empty (if the target method returns + * void) or contain the method return value. + * + * <p>After this method is called, the stack should either be empty (if the instrumented method + * returns void) or contain the value for the instrumented method to return). + */ + protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) { + return TargetMethodAnnotationDrivenBinder.TerminationHandler.Returning.INSTANCE.resolve( + Assigner.DEFAULT, instrumentedMethod, targetMethod); + } + } + + /** + * Passes parameters to the delegated method by downcasting each parameter of non-primitive type + * to its expected type. + */ + private static class DowncastingParametersMethodDelegation extends DoFnMethodDelegation { + DowncastingParametersMethodDelegation(Method method) { + super(method); + } + + @Override + protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) { + List<StackManipulation> pushParameters = new ArrayList<>(); + TypeList.Generic paramTypes = targetMethod.getParameters().asTypeList(); + for (int i = 0; i < paramTypes.size(); i++) { + TypeDescription.Generic paramT = paramTypes.get(i); + pushParameters.add(MethodVariableAccess.of(paramT).loadOffset(i + 1)); + if (!paramT.isPrimitive()) { + pushParameters.add(TypeCasting.to(paramT)); + } + } + return new StackManipulation.Compound(pushParameters); + } + } + + /** + * This wrapper exists to convert checked exceptions to unchecked exceptions, since if this fails + * the library itself is malformed. + */ + private static MethodDescription getExtraContextFactoryMethodDescription( + String methodName, Class<?>... parameterTypes) { + try { + return new MethodDescription.ForLoadedMethod( + ExtraContextFactory.class.getMethod(methodName, parameterTypes)); + } catch (Exception e) { + throw new IllegalStateException( + String.format( + "Failed to locate required method %s.%s", + ExtraContextFactory.class.getSimpleName(), methodName), + e); + } + } + + private static StackManipulation simpleExtraContextParameter( + String methodName, + StackManipulation pushExtraContextFactory) { + return new StackManipulation.Compound( + pushExtraContextFactory, + MethodInvocation.invoke(getExtraContextFactoryMethodDescription(methodName))); + } + + static StackManipulation getExtraContextParameter( + DoFnSignature.Parameter parameter, + final StackManipulation pushExtraContextFactory) { + + return parameter.match( + new Cases<StackManipulation>() { + + @Override + public StackManipulation dispatch(WindowParameter p) { + return new StackManipulation.Compound( + simpleExtraContextParameter("window", pushExtraContextFactory), + TypeCasting.to(new TypeDescription.ForLoadedType(p.windowT().getRawType()))); + } + + @Override + public StackManipulation dispatch(InputProviderParameter p) { + return simpleExtraContextParameter("inputProvider", pushExtraContextFactory); + } + + @Override + public StackManipulation dispatch(OutputReceiverParameter p) { + return simpleExtraContextParameter("outputReceiver", pushExtraContextFactory); + } + + @Override + public StackManipulation dispatch(RestrictionTrackerParameter p) { + // ExtraContextFactory.restrictionTracker() returns a RestrictionTracker, + // but the @ProcessElement method expects a concrete subtype of it. + // Insert a downcast. + return new StackManipulation.Compound( + simpleExtraContextParameter("restrictionTracker", pushExtraContextFactory), + TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType()))); + } + + @Override + public StackManipulation dispatch(StateParameter p) { + return new StackManipulation.Compound( + // TOP = extraContextFactory.state(<id>) + pushExtraContextFactory, + new TextConstant(p.referent().id()), + MethodInvocation.invoke( + getExtraContextFactoryMethodDescription("state", String.class)), + TypeCasting.to( + new TypeDescription.ForLoadedType(p.referent().stateType().getRawType()))); + } + + @Override + public StackManipulation dispatch(TimerParameter p) { + return new StackManipulation.Compound( + // TOP = extraContextFactory.state(<id>) + pushExtraContextFactory, + new TextConstant(p.referent().id()), + MethodInvocation.invoke( + getExtraContextFactoryMethodDescription("timer", String.class)), + TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class))); + } + }); + } + + /** + * Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method by delegating to the + * {@link ProcessElement} method. + */ + private static final class ProcessElementDelegation extends DoFnMethodDelegation { + private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD; + + static { + try { + PROCESS_CONTINUATION_STOP_METHOD = + new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop")); + } catch (NoSuchMethodException e) { + throw new RuntimeException("Failed to locate ProcessContinuation.stop()"); + } + } + + private final DoFnSignature.ProcessElementMethod signature; + + /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */ + private ProcessElementDelegation(DoFnSignature.ProcessElementMethod signature) { + super(signature.targetMethod()); + this.signature = signature; + } + + @Override + protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) { + // Parameters of the wrapper invoker method: + // DoFn.ProcessContext, ExtraContextFactory. + // Parameters of the wrapped DoFn method: + // DoFn.ProcessContext, [BoundedWindow, InputProvider, OutputReceiver] in any order + ArrayList<StackManipulation> pushParameters = new ArrayList<>(); + // Push the ProcessContext argument. + pushParameters.add(MethodVariableAccess.REFERENCE.loadOffset(1)); + // Push the extra arguments in their actual order. + StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(2); + for (DoFnSignature.Parameter param : signature.extraParameters()) { + pushParameters.add(getExtraContextParameter(param, pushExtraContextFactory)); + } + return new StackManipulation.Compound(pushParameters); + } + + @Override + protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) { + if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) { + return new StackManipulation.Compound( + MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE); + } else { + return MethodReturn.returning(targetMethod.getReturnType().asErasure()); + } + } + } + + private static class UserCodeMethodInvocation implements StackManipulation { + + @Nullable private final Integer returnVarIndex; + private final MethodDescription targetMethod; + private final MethodDescription instrumentedMethod; + private final TypeDescription returnType; + + private final Label wrapStart = new Label(); + private final Label wrapEnd = new Label(); + private final Label tryBlockStart = new Label(); + private final Label tryBlockEnd = new Label(); + private final Label catchBlockStart = new Label(); + private final Label catchBlockEnd = new Label(); + + private final MethodDescription createUserCodeException; + + UserCodeMethodInvocation( + @Nullable Integer returnVarIndex, + MethodDescription targetMethod, + MethodDescription instrumentedMethod) { + this.returnVarIndex = returnVarIndex; + this.targetMethod = targetMethod; + this.instrumentedMethod = instrumentedMethod; + this.returnType = targetMethod.getReturnType().asErasure(); + + boolean targetMethodReturnsVoid = TypeDescription.VOID.equals(returnType); + checkArgument( + (returnVarIndex == null) == targetMethodReturnsVoid, + "returnVarIndex should be defined if and only if the target method has a return value"); + + try { + createUserCodeException = + new MethodDescription.ForLoadedMethod( + UserCodeException.class.getDeclaredMethod("wrap", Throwable.class)); + } catch (NoSuchMethodException | SecurityException e) { + throw new RuntimeException("Unable to find UserCodeException.wrap", e); + } + } + + @Override + public boolean isValid() { + return true; + } + + private Object describeType(Type type) { + switch (type.getSort()) { + case Type.OBJECT: + return type.getInternalName(); + case Type.INT: + case Type.BYTE: + case Type.BOOLEAN: + case Type.SHORT: + return Opcodes.INTEGER; + case Type.LONG: + return Opcodes.LONG; + case Type.DOUBLE: + return Opcodes.DOUBLE; + case Type.FLOAT: + return Opcodes.FLOAT; + default: + throw new IllegalArgumentException("Unhandled type as method argument: " + type); + } + } + + private void visitFrame( + MethodVisitor mv, boolean localsIncludeReturn, @Nullable String stackTop) { + boolean hasReturnLocal = (returnVarIndex != null) && localsIncludeReturn; + + Type[] localTypes = Type.getArgumentTypes(instrumentedMethod.getDescriptor()); + Object[] locals = new Object[1 + localTypes.length + (hasReturnLocal ? 1 : 0)]; + locals[0] = instrumentedMethod.getReceiverType().asErasure().getInternalName(); + for (int i = 0; i < localTypes.length; i++) { + locals[i + 1] = describeType(localTypes[i]); + } + if (hasReturnLocal) { + locals[locals.length - 1] = returnType.getInternalName(); + } + + Object[] stack = stackTop == null ? new Object[] {} : new Object[] {stackTop}; + + mv.visitFrame(Opcodes.F_NEW, locals.length, locals, stack.length, stack); + } + + @Override + public Size apply(MethodVisitor mv, Context context) { + Size size = new Size(0, 0); + + mv.visitLabel(wrapStart); + + String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName(); + mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName); + + // The try block attempts to perform the expected operations, then jumps to success + mv.visitLabel(tryBlockStart); + size = size.aggregate(MethodInvocation.invoke(targetMethod).apply(mv, context)); + + if (returnVarIndex != null) { + mv.visitVarInsn(Opcodes.ASTORE, returnVarIndex); + size = size.aggregate(new Size(-1, 0)); // Reduces the size of the stack + } + mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd); + mv.visitLabel(tryBlockEnd); + + // The handler wraps the exception, and then throws. + mv.visitLabel(catchBlockStart); + // In catch block, should have same locals and {Throwable} on the stack. + visitFrame(mv, false, throwableName); + + // Create the user code exception and throw + size = + size.aggregate( + new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE) + .apply(mv, context)); + + mv.visitLabel(catchBlockEnd); + + // After the catch block we should have the return in scope, but nothing on the stack. + visitFrame(mv, true, null); + + // After catch block, should have same locals and will have the return on the stack. + if (returnVarIndex != null) { + mv.visitVarInsn(Opcodes.ALOAD, returnVarIndex); + size = size.aggregate(new Size(1, 0)); // Increases the size of the stack + } + mv.visitLabel(wrapEnd); + if (returnVarIndex != null) { + // Drop the return type from the locals + mv.visitLocalVariable( + "res", + returnType.getDescriptor(), + returnType.getGenericSignature(), + wrapStart, + wrapEnd, + returnVarIndex); + } + + return size; + } + } + + /** + * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code + * for a constructor that takes a single argument and assigns it to the delegate field. + */ + private static final class InvokerConstructor implements Implementation { + @Override + public InstrumentedType prepare(InstrumentedType instrumentedType) { + return instrumentedType; + } + + @Override + public ByteCodeAppender appender(final Target implementationTarget) { + return new ByteCodeAppender() { + @Override + public Size apply( + MethodVisitor methodVisitor, + Context implementationContext, + MethodDescription instrumentedMethod) { + StackManipulation.Size size = + new StackManipulation.Compound( + // Load the this reference + MethodVariableAccess.REFERENCE.loadOffset(0), + // Invoke the super constructor (default constructor of Object) + MethodInvocation.invoke( + new TypeDescription.ForLoadedType(Object.class) + .getDeclaredMethods() + .filter( + ElementMatchers.isConstructor() + .and(ElementMatchers.takesArguments(0))) + .getOnly()), + // Load the this reference + MethodVariableAccess.REFERENCE.loadOffset(0), + // Load the delegate argument + MethodVariableAccess.REFERENCE.loadOffset(1), + // Assign the delegate argument to the delegate field + FieldAccess.forField( + implementationTarget + .getInstrumentedType() + .getDeclaredFields() + .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME)) + .getOnly()) + .putter(), + // Return void. + MethodReturn.VOID) + .apply(methodVisitor, implementationContext); + return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize()); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java new file mode 100644 index 0000000..5300a86 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import com.google.common.base.CharMatcher; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.NamingStrategy; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.modifier.FieldManifestation; +import net.bytebuddy.description.modifier.Visibility; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.DynamicType; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.dynamic.scaffold.InstrumentedType; +import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; +import net.bytebuddy.implementation.Implementation; +import net.bytebuddy.implementation.bytecode.ByteCodeAppender; +import net.bytebuddy.implementation.bytecode.StackManipulation; +import net.bytebuddy.implementation.bytecode.member.FieldAccess; +import net.bytebuddy.implementation.bytecode.member.MethodInvocation; +import net.bytebuddy.implementation.bytecode.member.MethodReturn; +import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess; +import net.bytebuddy.jar.asm.MethodVisitor; +import net.bytebuddy.matcher.ElementMatchers; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.OnTimer; +import org.apache.beam.sdk.transforms.DoFn.TimerId; +import org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory.DoFnMethodDelegation; + +/** + * Dynamically generates {@link OnTimerInvoker} instances for invoking a particular {@link TimerId} + * on a particular {@link DoFn}. + */ +class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory { + + @Override + public <InputT, OutputT> OnTimerInvoker<InputT, OutputT> forTimer( + DoFn<InputT, OutputT> fn, String timerId) { + + @SuppressWarnings("unchecked") + Class<? extends DoFn<?, ?>> fnClass = (Class<? extends DoFn<?, ?>>) fn.getClass(); + + try { + Constructor<?> constructor = constructorCache.get(fnClass).get(timerId); + @SuppressWarnings("unchecked") + OnTimerInvoker<InputT, OutputT> invoker = + (OnTimerInvoker<InputT, OutputT>) constructor.newInstance(fn); + return invoker; + } catch (InstantiationException + | IllegalAccessException + | IllegalArgumentException + | InvocationTargetException + | SecurityException + | ExecutionException e) { + throw new RuntimeException( + String.format( + "Unable to construct @%s invoker for %s", + OnTimer.class.getSimpleName(), fn.getClass().getName()), + e); + } + } + + public static ByteBuddyOnTimerInvokerFactory only() { + return INSTANCE; + } + + private static final ByteBuddyOnTimerInvokerFactory INSTANCE = + new ByteBuddyOnTimerInvokerFactory(); + + private ByteBuddyOnTimerInvokerFactory() {} + + /** + * The field name for the delegate of {@link DoFn} subclass that a bytebuddy invoker will call. + */ + private static final String FN_DELEGATE_FIELD_NAME = "delegate"; + + /** + * A cache of constructors of generated {@link OnTimerInvoker} classes, keyed by {@link DoFn} + * class and then by {@link TimerId}. + * + * <p>Needed because generating an invoker class is expensive, and to avoid generating an + * excessive number of classes consuming PermGen memory in Java's that still have PermGen. + */ + private final LoadingCache<Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>> + constructorCache = + CacheBuilder.newBuilder() + .build( + new CacheLoader< + Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>() { + @Override + public LoadingCache<String, Constructor<?>> load( + final Class<? extends DoFn<?, ?>> fnClass) throws Exception { + return CacheBuilder.newBuilder().build(new OnTimerConstructorLoader(fnClass)); + } + }); + + /** + * A cache loader fixed to a particular {@link DoFn} class that loads constructors for the + * invokers for its {@link OnTimer @OnTimer} methods. + */ + private static class OnTimerConstructorLoader extends CacheLoader<String, Constructor<?>> { + + private final DoFnSignature signature; + + public OnTimerConstructorLoader(Class<? extends DoFn<?, ?>> clazz) { + this.signature = DoFnSignatures.getSignature(clazz); + } + + @Override + public Constructor<?> load(String timerId) throws Exception { + Class<? extends OnTimerInvoker<?, ?>> invokerClass = + generateOnTimerInvokerClass(signature, timerId); + try { + return invokerClass.getConstructor(signature.fnClass()); + } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Generates a {@link OnTimerInvoker} class for the given {@link DoFnSignature} and {@link + * TimerId}. + */ + private static Class<? extends OnTimerInvoker<?, ?>> generateOnTimerInvokerClass( + DoFnSignature signature, String timerId) { + Class<? extends DoFn<?, ?>> fnClass = signature.fnClass(); + + final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass); + + final String className = + "auxiliary_OnTimer_" + CharMatcher.JAVA_LETTER_OR_DIGIT.retainFrom(timerId); + + DynamicType.Builder<?> builder = + new ByteBuddy() + // Create subclasses inside the target class, to have access to + // private and package-private bits + .with( + new NamingStrategy.SuffixingRandom(className) { + @Override + public String subclass(TypeDescription.Generic superClass) { + return super.name(clazzDescription); + } + }) + // class <invoker class> implements OnTimerInvoker { + .subclass(OnTimerInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS) + + // private final <fn class> delegate; + .defineField( + FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL) + + // <invoker class>(<fn class> delegate) { this.delegate = delegate; } + .defineConstructor(Visibility.PUBLIC) + .withParameter(fnClass) + .intercept(new InvokerConstructor()) + + // public invokeOnTimer(ExtraContextFactory) { + // this.delegate.<@OnTimer method>(... pass the right args ...) + // } + .method(ElementMatchers.named("invokeOnTimer")) + .intercept(new InvokeOnTimerDelegation(signature.onTimerMethods().get(timerId))); + + DynamicType.Unloaded<?> unloaded = builder.make(); + + @SuppressWarnings("unchecked") + Class<? extends OnTimerInvoker<?, ?>> res = + (Class<? extends OnTimerInvoker<?, ?>>) + unloaded + .load( + ByteBuddyOnTimerInvokerFactory.class.getClassLoader(), + ClassLoadingStrategy.Default.INJECTION) + .getLoaded(); + return res; + } + + /** + * An "invokeOnTimer" method implementation akin to @ProcessElement, but simpler because no + * splitting-related parameters need to be handled. + */ + private static class InvokeOnTimerDelegation extends DoFnMethodDelegation { + + private final DoFnSignature.OnTimerMethod signature; + + public InvokeOnTimerDelegation(DoFnSignature.OnTimerMethod signature) { + super(signature.targetMethod()); + this.signature = signature; + } + + @Override + protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) { + // Parameters of the wrapper invoker method: + // ExtraContextFactory. + // Parameters of the wrapped DoFn method: + // a dynamic set of allowed "extra" parameters in any order subject to + // validation prior to getting the DoFnSignature + ArrayList<StackManipulation> parameters = new ArrayList<>(); + // Push the extra arguments in their actual order. + StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(1); + for (DoFnSignature.Parameter param : signature.extraParameters()) { + parameters.add( + ByteBuddyDoFnInvokerFactory.getExtraContextParameter(param, pushExtraContextFactory)); + } + return new StackManipulation.Compound(parameters); + } + } + + /** + * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code + * for a constructor that takes a single argument and assigns it to the delegate field. + */ + private static final class InvokerConstructor implements Implementation { + @Override + public InstrumentedType prepare(InstrumentedType instrumentedType) { + return instrumentedType; + } + + @Override + public ByteCodeAppender appender(final Target implementationTarget) { + return new ByteCodeAppender() { + @Override + public Size apply( + MethodVisitor methodVisitor, + Context implementationContext, + MethodDescription instrumentedMethod) { + StackManipulation.Size size = + new StackManipulation.Compound( + // Load the this reference + MethodVariableAccess.REFERENCE.loadOffset(0), + // Invoke the super constructor (default constructor of Object) + MethodInvocation.invoke( + new TypeDescription.ForLoadedType(Object.class) + .getDeclaredMethods() + .filter( + ElementMatchers.isConstructor() + .and(ElementMatchers.takesArguments(0))) + .getOnly()), + // Load the this reference + MethodVariableAccess.REFERENCE.loadOffset(0), + // Load the delegate argument + MethodVariableAccess.REFERENCE.loadOffset(1), + // Assign the delegate argument to the delegate field + FieldAccess.forField( + implementationTarget + .getInstrumentedType() + .getDeclaredFields() + .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME)) + .getOnly()) + .putter(), + // Return void. + MethodReturn.VOID) + .apply(methodVisitor, implementationContext); + return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize()); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokerFactory.java new file mode 100644 index 0000000..a54f2bc --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokerFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import org.apache.beam.sdk.transforms.DoFn; + +/** A factory for providing a {@link DoFnInvoker} for invoking a {@link DoFn}. */ +interface DoFnInvokerFactory { + + /** Creates a {@link DoFnInvoker} for the given {@link DoFn}. */ + <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT, OutputT> doFn); +}
