Repository: incubator-beam
Updated Branches:
  refs/heads/master f7745dc29 -> 3f8db06bd


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 455e49b..64454e4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -17,74 +17,34 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
+import java.io.Serializable;
 import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
-import net.bytebuddy.ByteBuddy;
-import net.bytebuddy.NamingStrategy;
-import net.bytebuddy.description.field.FieldDescription;
-import net.bytebuddy.description.method.MethodDescription;
-import net.bytebuddy.description.modifier.FieldManifestation;
-import net.bytebuddy.description.modifier.Visibility;
-import net.bytebuddy.description.type.TypeDescription;
-import net.bytebuddy.description.type.TypeList;
-import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
-import net.bytebuddy.dynamic.scaffold.InstrumentedType;
-import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
-import net.bytebuddy.implementation.ExceptionMethod;
-import net.bytebuddy.implementation.FixedValue;
-import net.bytebuddy.implementation.Implementation;
-import net.bytebuddy.implementation.Implementation.Context;
-import net.bytebuddy.implementation.MethodDelegation;
-import 
net.bytebuddy.implementation.bind.annotation.TargetMethodAnnotationDrivenBinder;
-import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
-import net.bytebuddy.implementation.bytecode.StackManipulation;
-import net.bytebuddy.implementation.bytecode.Throw;
-import net.bytebuddy.implementation.bytecode.assign.Assigner;
-import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
-import net.bytebuddy.implementation.bytecode.constant.TextConstant;
-import net.bytebuddy.implementation.bytecode.member.FieldAccess;
-import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
-import net.bytebuddy.implementation.bytecode.member.MethodReturn;
-import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
-import net.bytebuddy.jar.asm.Label;
-import net.bytebuddy.jar.asm.MethodVisitor;
-import net.bytebuddy.jar.asm.Opcodes;
-import net.bytebuddy.jar.asm.Type;
-import net.bytebuddy.matcher.ElementMatchers;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
-import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter;
-import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
-import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
-import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
-import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
-import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.values.TypeDescriptor;
 
-/** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link 
DoFn}. */
+/** Static utilities for working with {@link DoFnInvoker}. */
 public class DoFnInvokers {
-  public static final DoFnInvokers INSTANCE = new DoFnInvokers();
 
-  private static final String FN_DELEGATE_FIELD_NAME = "delegate";
+  /**
+   * Returns an {@link DoFnInvoker} for the given {@link DoFn}, using a 
default choice of {@link
+   * DoFnInvokerFactory}.
+   *
+   * <p>The default is permitted to change at any time. Users of this method 
may not depend on any
+   * details {@link DoFnInvokerFactory}-specific details of the invoker. Today 
it is {@link
+   * ByteBuddyDoFnInvokerFactory}.
+   */
+  public static <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(
+      DoFn<InputT, OutputT> fn) {
+    return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn);
+  }
 
   /**
    * A cache of constructors of generated {@link DoFnInvoker} classes, keyed 
by {@link DoFn} class.
@@ -97,7 +57,7 @@ public class DoFnInvokers {
   private DoFnInvokers() {}
 
   /**
-   * Creates a {@link DoFnInvoker} for the given {@link Object}, which should 
be either a {@link
+   * Returns a {@link DoFnInvoker} for the given {@link Object}, which should 
be either a {@link
    * DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a 
user's function as an
    * {@link Object} and then pass it to this method, so there is no need to 
statically specify what
    * sort of object it is.
@@ -105,9 +65,9 @@ public class DoFnInvokers {
    * @deprecated this is to be used only as a migration path for decoupling 
upgrades
    */
   @Deprecated
-  public DoFnInvoker<?, ?> invokerFor(Object deserializedFn) {
+  public static DoFnInvoker<?, ?> invokerFor(Serializable deserializedFn) {
     if (deserializedFn instanceof DoFn) {
-      return newByteBuddyInvoker((DoFn<?, ?>) deserializedFn);
+      return invokerFor((DoFn<?, ?>) deserializedFn);
     } else if (deserializedFn instanceof OldDoFn) {
       return new OldDoFnInvoker<>((OldDoFn<?, ?>) deserializedFn);
     } else {
@@ -121,6 +81,16 @@ public class DoFnInvokers {
     }
   }
 
+  /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */
+  @Deprecated public static final DoFnInvokers INSTANCE = new DoFnInvokers();
+
+  /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */
+  @Deprecated
+  public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(Object 
deserializedFn) {
+    return (DoFnInvoker<InputT, OutputT>) 
DoFnInvokers.invokerFor((Serializable) deserializedFn);
+  }
+
+
   static class OldDoFnInvoker<InputT, OutputT> implements DoFnInvoker<InputT, 
OutputT> {
 
     private final OldDoFn<InputT, OutputT> fn;
@@ -203,631 +173,4 @@ public class DoFnInvokers {
       throw new UnsupportedOperationException("OldDoFn is not splittable");
     }
   }
-
-  /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
-      DoFn<InputT, OutputT> fn) {
-    return newByteBuddyInvoker(
-        DoFnSignatures.getSignature((Class) fn.getClass()), fn);
-  }
-
-  /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
-  public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
-      DoFnSignature signature, DoFn<InputT, OutputT> fn) {
-    checkArgument(
-        signature.fnClass().equals(fn.getClass()),
-        "Signature is for class %s, but fn is of class %s",
-        signature.fnClass(),
-        fn.getClass());
-    try {
-      @SuppressWarnings("unchecked")
-      DoFnInvoker<InputT, OutputT> invoker =
-          (DoFnInvoker<InputT, OutputT>)
-              getByteBuddyInvokerConstructor(signature).newInstance(fn);
-      return invoker;
-    } catch (InstantiationException
-        | IllegalAccessException
-        | IllegalArgumentException
-        | InvocationTargetException
-        | SecurityException e) {
-      throw new RuntimeException("Unable to bind invoker for " + 
fn.getClass(), e);
-    }
-  }
-
-  /**
-   * Returns a generated constructor for a {@link DoFnInvoker} for the given 
{@link DoFn} class.
-   *
-   * <p>These are cached such that at most one {@link DoFnInvoker} class 
exists for a given
-   * {@link DoFn} class.
-   */
-  private synchronized Constructor<?> getByteBuddyInvokerConstructor(
-      DoFnSignature signature) {
-    Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
-    Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass);
-    if (constructor == null) {
-      Class<? extends DoFnInvoker<?, ?>> invokerClass = 
generateInvokerClass(signature);
-      try {
-        constructor = invokerClass.getConstructor(fnClass);
-      } catch (IllegalArgumentException | NoSuchMethodException | 
SecurityException e) {
-        throw new RuntimeException(e);
-      }
-      byteBuddyInvokerConstructorCache.put(fnClass, constructor);
-    }
-    return constructor;
-  }
-
-  /** Default implementation of {@link DoFn.SplitRestriction}, for delegation 
by bytebuddy. */
-  public static class DefaultSplitRestriction {
-    /** Doesn't split the restriction. */
-    @SuppressWarnings("unused")
-    public static <InputT, RestrictionT> void invokeSplitRestriction(
-        InputT element, RestrictionT restriction, 
DoFn.OutputReceiver<RestrictionT> receiver) {
-      receiver.output(restriction);
-    }
-  }
-
-  /** Default implementation of {@link DoFn.GetRestrictionCoder}, for 
delegation by bytebuddy. */
-  public static class DefaultRestrictionCoder {
-    private final TypeDescriptor<?> restrictionType;
-
-    DefaultRestrictionCoder(TypeDescriptor<?> restrictionType) {
-      this.restrictionType = restrictionType;
-    }
-
-    /** Doesn't split the restriction. */
-    @SuppressWarnings({"unused", "unchecked"})
-    public <RestrictionT> Coder<RestrictionT> 
invokeGetRestrictionCoder(CoderRegistry registry)
-        throws CannotProvideCoderException {
-      return (Coder) registry.getCoder(restrictionType);
-    }
-  }
-
-  /** Generates a {@link DoFnInvoker} class for the given {@link 
DoFnSignature}. */
-  private static Class<? extends DoFnInvoker<?, ?>> 
generateInvokerClass(DoFnSignature signature) {
-    Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
-
-    final TypeDescription clazzDescription = new 
TypeDescription.ForLoadedType(fnClass);
-
-    DynamicType.Builder<?> builder =
-        new ByteBuddy()
-            // Create subclasses inside the target class, to have access to
-            // private and package-private bits
-            .with(
-                new NamingStrategy.SuffixingRandom("auxiliary") {
-                  @Override
-                  public String subclass(TypeDescription.Generic superClass) {
-                    return super.name(clazzDescription);
-                  }
-                })
-            // Create a subclass of DoFnInvoker
-            .subclass(DoFnInvoker.class, 
ConstructorStrategy.Default.NO_CONSTRUCTORS)
-            .defineField(
-                FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, 
FieldManifestation.FINAL)
-            .defineConstructor(Visibility.PUBLIC)
-            .withParameter(fnClass)
-            .intercept(new InvokerConstructor())
-            .method(ElementMatchers.named("invokeProcessElement"))
-            .intercept(new 
ProcessElementDelegation(signature.processElement()))
-            .method(ElementMatchers.named("invokeStartBundle"))
-            .intercept(delegateOrNoop(signature.startBundle()))
-            .method(ElementMatchers.named("invokeFinishBundle"))
-            .intercept(delegateOrNoop(signature.finishBundle()))
-            .method(ElementMatchers.named("invokeSetup"))
-            .intercept(delegateOrNoop(signature.setup()))
-            .method(ElementMatchers.named("invokeTeardown"))
-            .intercept(delegateOrNoop(signature.teardown()))
-            .method(ElementMatchers.named("invokeGetInitialRestriction"))
-            
.intercept(delegateWithDowncastOrThrow(signature.getInitialRestriction()))
-            .method(ElementMatchers.named("invokeSplitRestriction"))
-            .intercept(splitRestrictionDelegation(signature))
-            .method(ElementMatchers.named("invokeGetRestrictionCoder"))
-            .intercept(getRestrictionCoderDelegation(signature))
-            .method(ElementMatchers.named("invokeNewTracker"))
-            .intercept(delegateWithDowncastOrThrow(signature.newTracker()));
-
-    DynamicType.Unloaded<?> unloaded = builder.make();
-
-    @SuppressWarnings("unchecked")
-    Class<? extends DoFnInvoker<?, ?>> res =
-        (Class<? extends DoFnInvoker<?, ?>>)
-            unloaded
-                .load(DoFnInvokers.class.getClassLoader(), 
ClassLoadingStrategy.Default.INJECTION)
-                .getLoaded();
-    return res;
-  }
-
-  private static Implementation getRestrictionCoderDelegation(DoFnSignature 
signature) {
-    if (signature.processElement().isSplittable()) {
-      if (signature.getRestrictionCoder() == null) {
-        return MethodDelegation.to(
-            new 
DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT()));
-      } else {
-        return new DowncastingParametersMethodDelegation(
-            signature.getRestrictionCoder().targetMethod());
-      }
-    } else {
-      return ExceptionMethod.throwing(UnsupportedOperationException.class);
-    }
-  }
-
-  private static Implementation splitRestrictionDelegation(DoFnSignature 
signature) {
-    if (signature.splitRestriction() == null) {
-      return MethodDelegation.to(DefaultSplitRestriction.class);
-    } else {
-      return new 
DowncastingParametersMethodDelegation(signature.splitRestriction().targetMethod());
-    }
-  }
-
-  /** Delegates to the given method if available, or does nothing. */
-  private static Implementation delegateOrNoop(DoFnSignature.DoFnMethod 
method) {
-    return (method == null)
-        ? FixedValue.originType()
-        : new DoFnMethodDelegation(method.targetMethod());
-  }
-
-  /** Delegates to the given method if available, or throws 
UnsupportedOperationException. */
-  private static Implementation 
delegateWithDowncastOrThrow(DoFnSignature.DoFnMethod method) {
-    return (method == null)
-        ? ExceptionMethod.throwing(UnsupportedOperationException.class)
-        : new DowncastingParametersMethodDelegation(method.targetMethod());
-  }
-
-  /**
-   * Implements a method of {@link DoFnInvoker} (the "instrumented method") by 
delegating to a
-   * "target method" of the wrapped {@link DoFn}.
-   */
-  static class DoFnMethodDelegation implements Implementation {
-    /** The {@link MethodDescription} of the wrapped {@link DoFn}'s method. */
-    protected final MethodDescription targetMethod;
-    /** Whether the target method returns non-void. */
-    private final boolean targetHasReturn;
-
-    private FieldDescription delegateField;
-
-    public DoFnMethodDelegation(Method targetMethod) {
-      this.targetMethod = new MethodDescription.ForLoadedMethod(targetMethod);
-      targetHasReturn = 
!TypeDescription.VOID.equals(this.targetMethod.getReturnType().asErasure());
-    }
-
-    @Override
-    public InstrumentedType prepare(InstrumentedType instrumentedType) {
-      // Remember the field description of the instrumented type.
-      delegateField =
-          instrumentedType
-              .getDeclaredFields()
-              .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
-              .getOnly();
-      // Delegating the method call doesn't require any changes to the 
instrumented type.
-      return instrumentedType;
-    }
-
-    @Override
-    public ByteCodeAppender appender(final Target implementationTarget) {
-      return new ByteCodeAppender() {
-        /**
-         * @param instrumentedMethod The {@link DoFnInvoker} method for which 
we're generating code.
-         */
-        @Override
-        public Size apply(
-            MethodVisitor methodVisitor,
-            Context implementationContext,
-            MethodDescription instrumentedMethod) {
-          // Figure out how many locals we'll need. This corresponds to 
"this", the parameters
-          // of the instrumented method, and an argument to hold the return 
value if the target
-          // method has a return value.
-          int numLocals = 1 + instrumentedMethod.getParameters().size() + 
(targetHasReturn ? 1 : 0);
-
-          Integer returnVarIndex = null;
-          if (targetHasReturn) {
-            // Local comes after formal parameters, so figure out where that 
is.
-            returnVarIndex = 1; // "this"
-            for (Type param : 
Type.getArgumentTypes(instrumentedMethod.getDescriptor())) {
-              returnVarIndex += param.getSize();
-            }
-          }
-
-          StackManipulation manipulation =
-              new StackManipulation.Compound(
-                  // Push "this" (DoFnInvoker on top of the stack)
-                  MethodVariableAccess.REFERENCE.loadOffset(0),
-                  // Access this.delegate (DoFn on top of the stack)
-                  FieldAccess.forField(delegateField).getter(),
-                  // Run the beforeDelegation manipulations.
-                  // The arguments necessary to invoke the target are on top 
of the stack.
-                  beforeDelegation(instrumentedMethod),
-                  // Perform the method delegation.
-                  // This will consume the arguments on top of the stack
-                  // Either the stack is now empty (because the targetMethod 
returns void) or the
-                  // stack contains the return value.
-                  new UserCodeMethodInvocation(returnVarIndex, targetMethod, 
instrumentedMethod),
-                  // Run the afterDelegation manipulations.
-                  // Either the stack is now empty (because the 
instrumentedMethod returns void)
-                  // or the stack contains the return value.
-                  afterDelegation(instrumentedMethod));
-
-          StackManipulation.Size size = manipulation.apply(methodVisitor, 
implementationContext);
-          return new Size(size.getMaximalSize(), numLocals);
-        }
-      };
-    }
-
-    /**
-     * Return the code to the prepare the operand stack for the method 
delegation.
-     *
-     * <p>Before this method is called, the stack delegate will be the only 
thing on the stack.
-     *
-     * <p>After this method is called, the stack contents should contain 
exactly the arguments
-     * necessary to invoke the target method.
-     */
-    protected StackManipulation beforeDelegation(MethodDescription 
instrumentedMethod) {
-      return MethodVariableAccess.allArgumentsOf(targetMethod);
-    }
-
-    /**
-     * Return the code to execute after the method delegation.
-     *
-     * <p>Before this method is called, the stack will either be empty (if the 
target method returns
-     * void) or contain the method return value.
-     *
-     * <p>After this method is called, the stack should either be empty (if 
the instrumented method
-     * returns void) or contain the value for the instrumented method to 
return).
-     */
-    protected StackManipulation afterDelegation(MethodDescription 
instrumentedMethod) {
-      return 
TargetMethodAnnotationDrivenBinder.TerminationHandler.Returning.INSTANCE.resolve(
-          Assigner.DEFAULT, instrumentedMethod, targetMethod);
-    }
-  }
-
-  /**
-   * Passes parameters to the delegated method by downcasting each parameter 
of non-primitive type
-   * to its expected type.
-   */
-  private static class DowncastingParametersMethodDelegation extends 
DoFnMethodDelegation {
-    DowncastingParametersMethodDelegation(Method method) {
-      super(method);
-    }
-
-    @Override
-    protected StackManipulation beforeDelegation(MethodDescription 
instrumentedMethod) {
-      List<StackManipulation> pushParameters = new ArrayList<>();
-      TypeList.Generic paramTypes = targetMethod.getParameters().asTypeList();
-      for (int i = 0; i < paramTypes.size(); i++) {
-        TypeDescription.Generic paramT = paramTypes.get(i);
-        pushParameters.add(MethodVariableAccess.of(paramT).loadOffset(i + 1));
-        if (!paramT.isPrimitive()) {
-          pushParameters.add(TypeCasting.to(paramT));
-        }
-      }
-      return new StackManipulation.Compound(pushParameters);
-    }
-  }
-
-  /**
-   * This wrapper exists to convert checked exceptions to unchecked 
exceptions, since if this fails
-   * the library itself is malformed.
-   */
-  private static MethodDescription getExtraContextFactoryMethodDescription(
-      String methodName, Class<?>... parameterTypes) {
-    try {
-      return new MethodDescription.ForLoadedMethod(
-          DoFn.ExtraContextFactory.class.getMethod(methodName, 
parameterTypes));
-    } catch (Exception e) {
-      throw new IllegalStateException(
-          String.format(
-              "Failed to locate required method %s.%s",
-              ExtraContextFactory.class.getSimpleName(), methodName),
-          e);
-    }
-  }
-
-  private static StackManipulation simpleExtraContextParameter(
-    String methodName,
-    StackManipulation pushExtraContextFactory) {
-      return new StackManipulation.Compound(
-        pushExtraContextFactory,
-        
MethodInvocation.invoke(getExtraContextFactoryMethodDescription(methodName)));
-  }
-
-  static StackManipulation getExtraContextParameter(
-      DoFnSignature.Parameter parameter,
-      final StackManipulation pushExtraContextFactory) {
-
-    return parameter.match(
-        new Cases<StackManipulation>() {
-
-          @Override
-          public StackManipulation dispatch(WindowParameter p) {
-            return new StackManipulation.Compound(
-                simpleExtraContextParameter("window", pushExtraContextFactory),
-                TypeCasting.to(new 
TypeDescription.ForLoadedType(p.windowT().getRawType())));
-          }
-
-          @Override
-          public StackManipulation dispatch(InputProviderParameter p) {
-            return simpleExtraContextParameter("inputProvider", 
pushExtraContextFactory);
-          }
-
-          @Override
-          public StackManipulation dispatch(OutputReceiverParameter p) {
-            return simpleExtraContextParameter("outputReceiver", 
pushExtraContextFactory);
-          }
-
-          @Override
-          public StackManipulation dispatch(RestrictionTrackerParameter p) {
-            // ExtraContextFactory.restrictionTracker() returns a 
RestrictionTracker,
-            // but the @ProcessElement method expects a concrete subtype of it.
-            // Insert a downcast.
-            return new StackManipulation.Compound(
-                simpleExtraContextParameter("restrictionTracker", 
pushExtraContextFactory),
-                TypeCasting.to(new 
TypeDescription.ForLoadedType(p.trackerT().getRawType())));
-          }
-
-          @Override
-          public StackManipulation dispatch(StateParameter p) {
-            return new StackManipulation.Compound(
-                // TOP = extraContextFactory.state(<id>)
-                pushExtraContextFactory,
-                new TextConstant(p.referent().id()),
-                MethodInvocation.invoke(
-                    getExtraContextFactoryMethodDescription("state", 
String.class)),
-                TypeCasting.to(
-                    new 
TypeDescription.ForLoadedType(p.referent().stateType().getRawType())));
-          }
-
-          @Override
-          public StackManipulation dispatch(TimerParameter p) {
-            return new StackManipulation.Compound(
-                // TOP = extraContextFactory.state(<id>)
-                pushExtraContextFactory,
-                new TextConstant(p.referent().id()),
-                MethodInvocation.invoke(
-                    getExtraContextFactoryMethodDescription("timer", 
String.class)),
-                TypeCasting.to(new 
TypeDescription.ForLoadedType(Timer.class)));
-          }
-        });
-  }
-
-  /**
-   * Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method 
by delegating to the
-   * {@link DoFn.ProcessElement} method.
-   */
-  private static final class ProcessElementDelegation extends 
DoFnMethodDelegation {
-    private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD;
-
-    static {
-      try {
-        PROCESS_CONTINUATION_STOP_METHOD =
-            new 
MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop"));
-      } catch (NoSuchMethodException e) {
-        throw new RuntimeException("Failed to locate 
ProcessContinuation.stop()");
-      }
-    }
-
-    private final DoFnSignature.ProcessElementMethod signature;
-
-    /** Implementation of {@link MethodDelegation} for the {@link 
ProcessElement} method. */
-    private ProcessElementDelegation(DoFnSignature.ProcessElementMethod 
signature) {
-      super(signature.targetMethod());
-      this.signature = signature;
-    }
-
-    @Override
-    protected StackManipulation beforeDelegation(MethodDescription 
instrumentedMethod) {
-      // Parameters of the wrapper invoker method:
-      //   DoFn.ProcessContext, ExtraContextFactory.
-      // Parameters of the wrapped DoFn method:
-      //   DoFn.ProcessContext, [BoundedWindow, InputProvider, OutputReceiver] 
in any order
-      ArrayList<StackManipulation> pushParameters = new ArrayList<>();
-      // Push the ProcessContext argument.
-      pushParameters.add(MethodVariableAccess.REFERENCE.loadOffset(1));
-      // Push the extra arguments in their actual order.
-      StackManipulation pushExtraContextFactory = 
MethodVariableAccess.REFERENCE.loadOffset(2);
-      for (DoFnSignature.Parameter param : signature.extraParameters()) {
-        pushParameters.add(getExtraContextParameter(param, 
pushExtraContextFactory));
-      }
-      return new StackManipulation.Compound(pushParameters);
-    }
-
-    @Override
-    protected StackManipulation afterDelegation(MethodDescription 
instrumentedMethod) {
-      if 
(TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) {
-        return new StackManipulation.Compound(
-            MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), 
MethodReturn.REFERENCE);
-      } else {
-        return 
MethodReturn.returning(targetMethod.getReturnType().asErasure());
-      }
-    }
-  }
-
-  private static class UserCodeMethodInvocation implements StackManipulation {
-
-    @Nullable private final Integer returnVarIndex;
-    private final MethodDescription targetMethod;
-    private final MethodDescription instrumentedMethod;
-    private final TypeDescription returnType;
-
-    private final Label wrapStart = new Label();
-    private final Label wrapEnd = new Label();
-    private final Label tryBlockStart = new Label();
-    private final Label tryBlockEnd = new Label();
-    private final Label catchBlockStart = new Label();
-    private final Label catchBlockEnd = new Label();
-
-    private final MethodDescription createUserCodeException;
-
-    UserCodeMethodInvocation(
-        @Nullable Integer returnVarIndex,
-        MethodDescription targetMethod,
-        MethodDescription instrumentedMethod) {
-      this.returnVarIndex = returnVarIndex;
-      this.targetMethod = targetMethod;
-      this.instrumentedMethod = instrumentedMethod;
-      this.returnType = targetMethod.getReturnType().asErasure();
-
-      boolean targetMethodReturnsVoid = 
TypeDescription.VOID.equals(returnType);
-      checkArgument(
-          (returnVarIndex == null) == targetMethodReturnsVoid,
-          "returnVarIndex should be defined if and only if the target method 
has a return value");
-
-      try {
-        createUserCodeException =
-            new MethodDescription.ForLoadedMethod(
-                UserCodeException.class.getDeclaredMethod("wrap", 
Throwable.class));
-      } catch (NoSuchMethodException | SecurityException e) {
-        throw new RuntimeException("Unable to find UserCodeException.wrap", e);
-      }
-    }
-
-    @Override
-    public boolean isValid() {
-      return true;
-    }
-
-    private Object describeType(Type type) {
-      switch (type.getSort()) {
-        case Type.OBJECT:
-          return type.getInternalName();
-        case Type.INT:
-        case Type.BYTE:
-        case Type.BOOLEAN:
-        case Type.SHORT:
-          return Opcodes.INTEGER;
-        case Type.LONG:
-          return Opcodes.LONG;
-        case Type.DOUBLE:
-          return Opcodes.DOUBLE;
-        case Type.FLOAT:
-          return Opcodes.FLOAT;
-        default:
-          throw new IllegalArgumentException("Unhandled type as method 
argument: " + type);
-      }
-    }
-
-    private void visitFrame(
-        MethodVisitor mv, boolean localsIncludeReturn, @Nullable String 
stackTop) {
-      boolean hasReturnLocal = (returnVarIndex != null) && localsIncludeReturn;
-
-      Type[] localTypes = 
Type.getArgumentTypes(instrumentedMethod.getDescriptor());
-      Object[] locals = new Object[1 + localTypes.length + (hasReturnLocal ? 1 
: 0)];
-      locals[0] = 
instrumentedMethod.getReceiverType().asErasure().getInternalName();
-      for (int i = 0; i < localTypes.length; i++) {
-        locals[i + 1] = describeType(localTypes[i]);
-      }
-      if (hasReturnLocal) {
-        locals[locals.length - 1] = returnType.getInternalName();
-      }
-
-      Object[] stack = stackTop == null ? new Object[] {} : new Object[] 
{stackTop};
-
-      mv.visitFrame(Opcodes.F_NEW, locals.length, locals, stack.length, stack);
-    }
-
-    @Override
-    public Size apply(MethodVisitor mv, Context context) {
-      Size size = new Size(0, 0);
-
-      mv.visitLabel(wrapStart);
-
-      String throwableName = new 
TypeDescription.ForLoadedType(Throwable.class).getInternalName();
-      mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, 
throwableName);
-
-      // The try block attempts to perform the expected operations, then jumps 
to success
-      mv.visitLabel(tryBlockStart);
-      size = size.aggregate(MethodInvocation.invoke(targetMethod).apply(mv, 
context));
-
-      if (returnVarIndex != null) {
-        mv.visitVarInsn(Opcodes.ASTORE, returnVarIndex);
-        size = size.aggregate(new Size(-1, 0)); // Reduces the size of the 
stack
-      }
-      mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
-      mv.visitLabel(tryBlockEnd);
-
-      // The handler wraps the exception, and then throws.
-      mv.visitLabel(catchBlockStart);
-      // In catch block, should have same locals and {Throwable} on the stack.
-      visitFrame(mv, false, throwableName);
-
-      // Create the user code exception and throw
-      size =
-          size.aggregate(
-              new Compound(MethodInvocation.invoke(createUserCodeException), 
Throw.INSTANCE)
-                  .apply(mv, context));
-
-      mv.visitLabel(catchBlockEnd);
-
-      // After the catch block we should have the return in scope, but nothing 
on the stack.
-      visitFrame(mv, true, null);
-
-      // After catch block, should have same locals and will have the return 
on the stack.
-      if (returnVarIndex != null) {
-        mv.visitVarInsn(Opcodes.ALOAD, returnVarIndex);
-        size = size.aggregate(new Size(1, 0)); // Increases the size of the 
stack
-      }
-      mv.visitLabel(wrapEnd);
-      if (returnVarIndex != null) {
-        // Drop the return type from the locals
-        mv.visitLocalVariable(
-            "res",
-            returnType.getDescriptor(),
-            returnType.getGenericSignature(),
-            wrapStart,
-            wrapEnd,
-            returnVarIndex);
-      }
-
-      return size;
-    }
-  }
-
-  /**
-   * A constructor {@link Implementation} for a {@link DoFnInvoker class}. 
Produces the byte code
-   * for a constructor that takes a single argument and assigns it to the 
delegate field.
-   */
-  private static final class InvokerConstructor implements Implementation {
-    @Override
-    public InstrumentedType prepare(InstrumentedType instrumentedType) {
-      return instrumentedType;
-    }
-
-    @Override
-    public ByteCodeAppender appender(final Target implementationTarget) {
-      return new ByteCodeAppender() {
-        @Override
-        public Size apply(
-            MethodVisitor methodVisitor,
-            Context implementationContext,
-            MethodDescription instrumentedMethod) {
-          StackManipulation.Size size =
-              new StackManipulation.Compound(
-                      // Load the this reference
-                      MethodVariableAccess.REFERENCE.loadOffset(0),
-                      // Invoke the super constructor (default constructor of 
Object)
-                      MethodInvocation.invoke(
-                          new TypeDescription.ForLoadedType(Object.class)
-                              .getDeclaredMethods()
-                              .filter(
-                                  ElementMatchers.isConstructor()
-                                      .and(ElementMatchers.takesArguments(0)))
-                              .getOnly()),
-                      // Load the this reference
-                      MethodVariableAccess.REFERENCE.loadOffset(0),
-                      // Load the delegate argument
-                      MethodVariableAccess.REFERENCE.loadOffset(1),
-                      // Assign the delegate argument to the delegate field
-                      FieldAccess.forField(
-                              implementationTarget
-                                  .getInstrumentedType()
-                                  .getDeclaredFields()
-                                  
.filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
-                                  .getOnly())
-                          .putter(),
-                      // Return void.
-                      MethodReturn.VOID)
-                  .apply(methodVisitor, implementationContext);
-          return new Size(size.getMaximalSize(), 
instrumentedMethod.getStackSize());
-        }
-      };
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokerFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokerFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokerFactory.java
new file mode 100644
index 0000000..a7a6765
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokerFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimer;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
+
+/**
+ * Provides {@link OnTimerInvoker} instances for invoking a particular {@link 
TimerId}
+ * on a particular {@link DoFn}.
+ */
+interface OnTimerInvokerFactory {
+
+  /**
+   * Returns an invoker that will call the given {@link DoFn DoFn's} {@link 
OnTimer @OnTimer} method
+   * for the given {@code timerId}.
+   */
+  <InputT, OutputT> OnTimerInvoker<InputT, OutputT> forTimer(
+      DoFn<InputT, OutputT> fn, String timerId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java
index 9517551..287828a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java
@@ -17,255 +17,26 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
-import com.google.common.base.CharMatcher;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.concurrent.ExecutionException;
-import net.bytebuddy.ByteBuddy;
-import net.bytebuddy.NamingStrategy;
-import net.bytebuddy.description.method.MethodDescription;
-import net.bytebuddy.description.modifier.FieldManifestation;
-import net.bytebuddy.description.modifier.Visibility;
-import net.bytebuddy.description.type.TypeDescription;
-import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
-import net.bytebuddy.dynamic.scaffold.InstrumentedType;
-import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
-import net.bytebuddy.implementation.Implementation;
-import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
-import net.bytebuddy.implementation.bytecode.StackManipulation;
-import net.bytebuddy.implementation.bytecode.member.FieldAccess;
-import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
-import net.bytebuddy.implementation.bytecode.member.MethodReturn;
-import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
-import net.bytebuddy.jar.asm.MethodVisitor;
-import net.bytebuddy.matcher.ElementMatchers;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
-import 
org.apache.beam.sdk.transforms.reflect.DoFnInvokers.DoFnMethodDelegation;
 
 /**
  * Dynamically generates {@link OnTimerInvoker} instances for invoking a 
particular {@link TimerId}
  * on a particular {@link DoFn}.
  */
 class OnTimerInvokers {
-  public static final OnTimerInvokers INSTANCE = new OnTimerInvokers();
-
-  private OnTimerInvokers() {}
 
   /**
-   * The field name for the delegate of {@link DoFn} subclass that a bytebuddy 
invoker will call.
-   */
-  private static final String FN_DELEGATE_FIELD_NAME = "delegate";
-
-  /**
-   * A cache of constructors of generated {@link OnTimerInvoker} classes, 
keyed by {@link DoFn}
-   * class and then by {@link TimerId}.
+   * Returns an invoker that will call the given {@link DoFn DoFn's} {@link 
OnTimer @OnTimer} method
+   * for the given {@code timerId}, using a default choice of {@link 
OnTimerInvokerFactory}.
    *
-   * <p>Needed because generating an invoker class is expensive, and to avoid 
generating an
-   * excessive number of classes consuming PermGen memory in Java's that still 
have PermGen.
+   * <p>The default is permitted to change at any time. Users of this method 
may not depend on any
+   * details {@link OnTimerInvokerFactory}-specific details of the invoker. 
Today it is {@link
+   * ByteBuddyOnTimerInvokerFactory}.
    */
-  private final LoadingCache<Class<? extends DoFn<?, ?>>, LoadingCache<String, 
Constructor<?>>>
-      constructorCache =
-          CacheBuilder.newBuilder()
-              .build(
-                  new CacheLoader<
-                      Class<? extends DoFn<?, ?>>, LoadingCache<String, 
Constructor<?>>>() {
-                    @Override
-                    public LoadingCache<String, Constructor<?>> load(
-                        final Class<? extends DoFn<?, ?>> fnClass) throws 
Exception {
-                      return CacheBuilder.newBuilder().build(new 
OnTimerConstructorLoader(fnClass));
-                    }
-                  });
-
-  /** Creates invoker. */
-  public <InputT, OutputT> OnTimerInvoker<InputT, OutputT> forTimer(
+  public static <InputT, OutputT> OnTimerInvoker<InputT, OutputT> forTimer(
       DoFn<InputT, OutputT> fn, String timerId) {
-
-    @SuppressWarnings("unchecked")
-    Class<? extends DoFn<?, ?>> fnClass = (Class<? extends DoFn<?, ?>>) 
fn.getClass();
-
-    try {
-      Constructor<?> constructor = constructorCache.get(fnClass).get(timerId);
-      @SuppressWarnings("unchecked")
-      OnTimerInvoker<InputT, OutputT> invoker =
-          (OnTimerInvoker<InputT, OutputT>) constructor.newInstance(fn);
-      return invoker;
-    } catch (InstantiationException
-        | IllegalAccessException
-        | IllegalArgumentException
-        | InvocationTargetException
-        | SecurityException
-        | ExecutionException e) {
-      throw new RuntimeException(
-          String.format(
-              "Unable to construct @%s invoker for %s",
-              DoFn.OnTimer.class.getSimpleName(), fn.getClass().getName()),
-          e);
-    }
-  }
-
-  /**
-   * A cache loader fixed to a particular {@link DoFn} class that loads 
constructors for the
-   * invokers for its {@link OnTimer @OnTimer} methods.
-   */
-  private static class OnTimerConstructorLoader extends CacheLoader<String, 
Constructor<?>> {
-
-    private final DoFnSignature signature;
-
-    public OnTimerConstructorLoader(Class<? extends DoFn<?, ?>> clazz) {
-      this.signature = DoFnSignatures.getSignature(clazz);
-    }
-
-    @Override
-    public Constructor<?> load(String timerId) throws Exception {
-      Class<? extends OnTimerInvoker<?, ?>> invokerClass =
-          generateOnTimerInvokerClass(signature, timerId);
-      try {
-        return invokerClass.getConstructor(signature.fnClass());
-      } catch (IllegalArgumentException | NoSuchMethodException | 
SecurityException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  /**
-   * Generates a {@link OnTimerInvoker} class for the given {@link 
DoFnSignature} and {@link
-   * TimerId}.
-   */
-  private static Class<? extends OnTimerInvoker<?, ?>> 
generateOnTimerInvokerClass(
-      DoFnSignature signature, String timerId) {
-    Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
-
-    final TypeDescription clazzDescription = new 
TypeDescription.ForLoadedType(fnClass);
-
-    final String className =
-        "auxiliary_OnTimer_" + 
CharMatcher.JAVA_LETTER_OR_DIGIT.retainFrom(timerId);
-
-    DynamicType.Builder<?> builder =
-        new ByteBuddy()
-            // Create subclasses inside the target class, to have access to
-            // private and package-private bits
-            .with(
-                new NamingStrategy.SuffixingRandom(className) {
-                  @Override
-                  public String subclass(TypeDescription.Generic superClass) {
-                    return super.name(clazzDescription);
-                  }
-                })
-            // class <invoker class> implements OnTimerInvoker {
-            .subclass(OnTimerInvoker.class, 
ConstructorStrategy.Default.NO_CONSTRUCTORS)
-
-            //   private final <fn class> delegate;
-            .defineField(
-                FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, 
FieldManifestation.FINAL)
-
-            //   <invoker class>(<fn class> delegate) { this.delegate = 
delegate; }
-            .defineConstructor(Visibility.PUBLIC)
-            .withParameter(fnClass)
-            .intercept(new InvokerConstructor())
-
-            //   public invokeOnTimer(ExtraContextFactory) {
-            //     this.delegate.<@OnTimer method>(... pass the right args ...)
-            //   }
-            .method(ElementMatchers.named("invokeOnTimer"))
-            .intercept(new 
InvokeOnTimerDelegation(signature.onTimerMethods().get(timerId)));
-
-    DynamicType.Unloaded<?> unloaded = builder.make();
-
-    @SuppressWarnings("unchecked")
-    Class<? extends OnTimerInvoker<?, ?>> res =
-        (Class<? extends OnTimerInvoker<?, ?>>)
-            unloaded
-                .load(
-                    OnTimerInvokers.class.getClassLoader(), 
ClassLoadingStrategy.Default.INJECTION)
-                .getLoaded();
-    return res;
-  }
-
-  /**
-   * An "invokeOnTimer" method implementation akin to @ProcessElement, but 
simpler because no
-   * splitting-related parameters need to be handled.
-   */
-  private static class InvokeOnTimerDelegation extends DoFnMethodDelegation {
-
-    private final DoFnSignature.OnTimerMethod signature;
-
-    public InvokeOnTimerDelegation(DoFnSignature.OnTimerMethod signature) {
-      super(signature.targetMethod());
-      this.signature = signature;
-    }
-
-    @Override
-    protected StackManipulation beforeDelegation(MethodDescription 
instrumentedMethod) {
-      // Parameters of the wrapper invoker method:
-      //   ExtraContextFactory.
-      // Parameters of the wrapped DoFn method:
-      //   a dynamic set of allowed "extra" parameters in any order subject to
-      //   validation prior to getting the DoFnSignature
-      ArrayList<StackManipulation> parameters = new ArrayList<>();
-      // Push the extra arguments in their actual order.
-      StackManipulation pushExtraContextFactory = 
MethodVariableAccess.REFERENCE.loadOffset(1);
-      for (DoFnSignature.Parameter param : signature.extraParameters()) {
-        parameters.add(DoFnInvokers.getExtraContextParameter(param, 
pushExtraContextFactory));
-      }
-      return new StackManipulation.Compound(parameters);
-    }
-  }
-
-  /**
-   * A constructor {@link Implementation} for a {@link DoFnInvoker class}. 
Produces the byte code
-   * for a constructor that takes a single argument and assigns it to the 
delegate field.
-   */
-  private static final class InvokerConstructor implements Implementation {
-    @Override
-    public InstrumentedType prepare(InstrumentedType instrumentedType) {
-      return instrumentedType;
-    }
-
-    @Override
-    public ByteCodeAppender appender(final Target implementationTarget) {
-      return new ByteCodeAppender() {
-        @Override
-        public Size apply(
-            MethodVisitor methodVisitor,
-            Context implementationContext,
-            MethodDescription instrumentedMethod) {
-          StackManipulation.Size size =
-              new StackManipulation.Compound(
-                      // Load the this reference
-                      MethodVariableAccess.REFERENCE.loadOffset(0),
-                      // Invoke the super constructor (default constructor of 
Object)
-                      MethodInvocation.invoke(
-                          new TypeDescription.ForLoadedType(Object.class)
-                              .getDeclaredMethods()
-                              .filter(
-                                  ElementMatchers.isConstructor()
-                                      .and(ElementMatchers.takesArguments(0)))
-                              .getOnly()),
-                      // Load the this reference
-                      MethodVariableAccess.REFERENCE.loadOffset(0),
-                      // Load the delegate argument
-                      MethodVariableAccess.REFERENCE.loadOffset(1),
-                      // Assign the delegate argument to the delegate field
-                      FieldAccess.forField(
-                              implementationTarget
-                                  .getInstrumentedType()
-                                  .getDeclaredFields()
-                                  
.filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
-                                  .getOnly())
-                          .putter(),
-                      // Return void.
-                      MethodReturn.VOID)
-                  .apply(methodVisitor, implementationContext);
-          return new Size(size.getMaximalSize(), 
instrumentedMethod.getStackSize());
-        }
-      };
-    }
+    return ByteBuddyOnTimerInvokerFactory.only().forTimer(fn, timerId);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 7bdc007..aba626e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -89,9 +89,7 @@ public class DoFnInvokersTest {
   }
 
   private ProcessContinuation invokeProcessElement(DoFn<String, String> fn) {
-    return DoFnInvokers.INSTANCE
-        .newByteBuddyInvoker(fn)
-        .invokeProcessElement(mockContext, extraContextFactory);
+    return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockContext, 
extraContextFactory);
   }
 
   @Test
@@ -101,8 +99,8 @@ public class DoFnInvokersTest {
     IdentityParent fn2 = new IdentityParent();
     assertSame(
         "Invoker classes should only be generated once for each type",
-        DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn1).getClass(),
-        DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn2).getClass());
+        DoFnInvokers.invokerFor(fn1).getClass(),
+        DoFnInvokers.invokerFor(fn2).getClass());
   }
 
   // 
---------------------------------------------------------------------------------------
@@ -295,7 +293,7 @@ public class DoFnInvokersTest {
       public void after() {}
     }
     MockFn fn = mock(MockFn.class);
-    DoFnInvoker<String, String> invoker = 
DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
     invoker.invokeSetup();
     invoker.invokeStartBundle(mockContext);
     invoker.invokeFinishBundle(mockContext);
@@ -358,7 +356,7 @@ public class DoFnInvokersTest {
   @Test
   public void testSplittableDoFnWithAllMethods() throws Exception {
     MockFn fn = mock(MockFn.class);
-    DoFnInvoker<String, String> invoker = 
DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
     final SomeRestrictionTracker tracker = mock(SomeRestrictionTracker.class);
     final SomeRestrictionCoder coder = mock(SomeRestrictionCoder.class);
     SomeRestriction restriction = new SomeRestriction();
@@ -430,7 +428,7 @@ public class DoFnInvokersTest {
       }
     }
     MockFn fn = mock(MockFn.class);
-    DoFnInvoker<String, String> invoker = 
DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
 
     CoderRegistry coderRegistry = new CoderRegistry();
     coderRegistry.registerCoder(SomeRestriction.class, 
SomeRestrictionCoder.class);
@@ -521,7 +519,7 @@ public class DoFnInvokersTest {
   @Test
   public void testProcessElementException() throws Exception {
     DoFnInvoker<Integer, Integer> invoker =
-        DoFnInvokers.INSTANCE.newByteBuddyInvoker(
+        DoFnInvokers.invokerFor(
             new DoFn<Integer, Integer>() {
               @ProcessElement
               public void processElement(@SuppressWarnings("unused") 
ProcessContext c) {
@@ -537,8 +535,8 @@ public class DoFnInvokersTest {
   public void testProcessElementExceptionWithReturn() throws Exception {
     thrown.expect(UserCodeException.class);
     thrown.expectMessage("bogus");
-    DoFnInvokers.INSTANCE
-        .newByteBuddyInvoker(
+    DoFnInvokers
+        .invokerFor(
             new DoFn<Integer, Integer>() {
               @ProcessElement
               public ProcessContinuation processElement(
@@ -562,7 +560,7 @@ public class DoFnInvokersTest {
   @Test
   public void testStartBundleException() throws Exception {
     DoFnInvoker<Integer, Integer> invoker =
-        DoFnInvokers.INSTANCE.newByteBuddyInvoker(
+        DoFnInvokers.invokerFor(
             new DoFn<Integer, Integer>() {
               @StartBundle
               public void startBundle(@SuppressWarnings("unused") Context c) {
@@ -580,7 +578,7 @@ public class DoFnInvokersTest {
   @Test
   public void testFinishBundleException() throws Exception {
     DoFnInvoker<Integer, Integer> invoker =
-        DoFnInvokers.INSTANCE.newByteBuddyInvoker(
+        DoFnInvokers.invokerFor(
             new DoFn<Integer, Integer>() {
               @FinishBundle
               public void finishBundle(@SuppressWarnings("unused") Context c) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
index f8275fa..d29810c 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
@@ -53,7 +53,7 @@ public class OnTimerInvokersTest {
   }
 
   private void invokeOnTimer(DoFn<String, String> fn, String timerId) {
-    OnTimerInvokers.INSTANCE.forTimer(fn, 
timerId).invokeOnTimer(mockExtraContextFactory);
+    OnTimerInvokers.forTimer(fn, 
timerId).invokeOnTimer(mockExtraContextFactory);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
 
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
index 80324b9..14ade5b 100644
--- 
a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
+++ 
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
@@ -66,7 +66,7 @@ public class DoFnInvokersBenchmark {
   @Setup
   public void setUp() {
     adaptedDoFnWithContext = DoFnAdapters.toOldDoFn(doFn);
-    invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(doFn);
+    invoker = DoFnInvokers.invokerFor(doFn);
   }
 
   @Benchmark

Reply via email to