Removes code for wrapping DoFn as an OldDoFn

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a22de150
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a22de150
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a22de150

Branch: refs/heads/master
Commit: a22de15012c51e8b7e31143021f0a298e093bf51
Parents: e9e53c5
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri Dec 9 17:21:40 2016 -0800
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Thu Dec 15 13:58:43 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/transforms/DoFnAdapters.java       | 150 ----------
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 295 +------------------
 .../sdk/transforms/reflect/DoFnInvokers.java    | 141 +--------
 .../transforms/reflect/DoFnInvokersTest.java    |  36 ---
 4 files changed, 11 insertions(+), 611 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index e15b08b..d1c40a6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import java.io.IOException;
-import java.util.Collection;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
@@ -38,7 +36,6 @@ import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -53,18 +50,6 @@ public class DoFnAdapters {
   /** Should not be instantiated. */
   private DoFnAdapters() {}
 
-  /**
-   * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns 
the class of the
-   * original {@link DoFn}, otherwise returns {@code fn.getClass()}.
-   */
-  public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
-    if (fn instanceof SimpleDoFnAdapter) {
-      return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
-    } else {
-      return fn.getClass();
-    }
-  }
-
   /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
   @SuppressWarnings({"unchecked", "rawtypes"})
   public static <InputT, OutputT> OldDoFn<InputT, OutputT> 
toOldDoFn(DoFn<InputT, OutputT> fn) {
@@ -76,126 +61,6 @@ public class DoFnAdapters {
     }
   }
 
-  /** Creates a {@link OldDoFn.ProcessContext} from a {@link 
DoFn.ProcessContext}. */
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext 
adaptProcessContext(
-      OldDoFn<InputT, OutputT> fn,
-      final DoFn<InputT, OutputT>.ProcessContext c,
-      final DoFnInvoker.ArgumentProvider<InputT, OutputT> extra) {
-    return fn.new ProcessContext() {
-      @Override
-      public InputT element() {
-        return c.element();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return c.sideInput(view);
-      }
-
-      @Override
-      public Instant timestamp() {
-        return c.timestamp();
-      }
-
-      @Override
-      public BoundedWindow window() {
-        return extra.window();
-      }
-
-      @Override
-      public PaneInfo pane() {
-        return c.pane();
-      }
-
-      @Override
-      public WindowingInternals<InputT, OutputT> windowingInternals() {
-        return extra.windowingInternals();
-      }
-
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public void output(OutputT output) {
-        c.output(output);
-      }
-
-      @Override
-      public void outputWithTimestamp(OutputT output, Instant timestamp) {
-        c.outputWithTimestamp(output, timestamp);
-      }
-
-      @Override
-      public <T> void sideOutput(TupleTag<T> tag, T output) {
-        c.sideOutput(tag, output);
-      }
-
-      @Override
-      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
-        c.sideOutputWithTimestamp(tag, output, timestamp);
-      }
-
-      @Override
-      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
-          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        return c.createAggregator(name, combiner);
-      }
-    };
-  }
-
-  /** Creates a {@link OldDoFn.ProcessContext} from a {@link 
DoFn.ProcessContext}. */
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.Context 
adaptContext(
-      OldDoFn<InputT, OutputT> fn,
-      final DoFn<InputT, OutputT>.Context c) {
-    return fn.new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public void output(OutputT output) {
-        c.output(output);
-      }
-
-      @Override
-      public void outputWithTimestamp(OutputT output, Instant timestamp) {
-        c.outputWithTimestamp(output, timestamp);
-      }
-
-      @Override
-      public <T> void sideOutput(TupleTag<T> tag, T output) {
-        c.sideOutput(tag, output);
-      }
-
-      @Override
-      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
-        c.sideOutputWithTimestamp(tag, output, timestamp);
-      }
-
-      @Override
-      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
-          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        return c.createAggregator(name, combiner);
-      }
-    };
-  }
-
-  /**
-   * If the fn was created using {@link #toOldDoFn}, returns the original 
{@link DoFn}. Otherwise,
-   * returns {@code null}.
-   */
-  @Nullable
-  public static <InputT, OutputT> DoFn<InputT, OutputT> 
getDoFn(OldDoFn<InputT, OutputT> fn) {
-    if (fn instanceof SimpleDoFnAdapter) {
-      return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;
-    } else {
-      return null;
-    }
-  }
-
   /**
    * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} 
as an {@link
    * OldDoFn}.
@@ -238,21 +103,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return fn.getInputTypeDescriptor();
-    }
-
-    @Override
-    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return fn.getOutputTypeDescriptor();
-    }
-
-    @Override
-    Collection<Aggregator<?, ?>> getAggregators() {
-      return fn.getAggregators();
-    }
-
-    @Override
     public Duration getAllowedTimestampSkew() {
       return fn.getAllowedTimestampSkew();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 2d2c1fd..0aef552 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -71,21 +70,6 @@ import org.joda.time.Instant;
  */
 @Deprecated
 public abstract class OldDoFn<InputT, OutputT> implements Serializable, 
HasDisplayData {
-
-  public DoFn<InputT, OutputT> toDoFn() {
-    DoFn<InputT, OutputT> doFn = DoFnAdapters.getDoFn(this);
-    if (doFn != null) {
-      return doFn;
-    }
-    if (this instanceof RequiresWindowAccess) {
-      // No parameters as it just accesses `this`
-      return new AdaptedRequiresWindowAccessDoFn();
-    } else {
-      // No parameters as it just accesses `this`
-      return new AdaptedDoFn();
-    }
-  }
-
   /**
    * Information accessible to all methods in this {@code OldDoFn}.
    * Used primarily to output elements.
@@ -334,7 +318,7 @@ public abstract class OldDoFn<InputT, OutputT> implements 
Serializable, HasDispl
     this(new HashMap<String, DelegatingAggregator<?, ?>>());
   }
 
-  OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
+  public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
     this.aggregators = aggregators;
   }
 
@@ -419,32 +403,6 @@ public abstract class OldDoFn<InputT, OutputT> implements 
Serializable, HasDispl
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the input type of this {@code OldDoFn} instance's most-derived
-   * class.
-   *
-   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
-   */
-  protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-    return new TypeDescriptor<InputT>(getClass()) {};
-  }
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the output type of this {@code OldDoFn} instance's
-   * most-derived class.
-   *
-   * <p>In the normal case of a concrete {@code OldDoFn} subclass with
-   * no generic type parameters of its own (including anonymous inner
-   * classes), this will be a complete non-generic type, which is good
-   * for choosing a default output {@code Coder<OutputT>} for the output
-   * {@code PCollection<OutputT>}.
-   */
-  protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-    return new TypeDescriptor<OutputT>(getClass()) {};
-  }
-
-  /**
    * Returns an {@link Aggregator} with aggregation logic specified by the
    * {@link CombineFn} argument. The name provided must be unique across
    * {@link Aggregator}s created within the OldDoFn. Aggregators can only be 
created
@@ -504,255 +462,4 @@ public abstract class OldDoFn<InputT, OutputT> implements 
Serializable, HasDispl
   Collection<Aggregator<?, ?>> getAggregators() {
     return Collections.<Aggregator<?, 
?>>unmodifiableCollection(aggregators.values());
   }
-
-  /**
-   * A {@link Context} for an {@link OldDoFn} via a context for a proper 
{@link DoFn}.
-   */
-  private class AdaptedContext extends Context {
-
-    private final DoFn<InputT, OutputT>.Context newContext;
-
-    public AdaptedContext(
-        DoFn<InputT, OutputT>.Context newContext) {
-      this.newContext = newContext;
-      super.setupDelegateAggregators();
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return newContext.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      newContext.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      newContext.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      newContext.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      newContext.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return newContext.createAggregator(name, combiner);
-    }
-  }
-
-  /**
-   * A {@link ProcessContext} for an {@link OldDoFn} via a context for a 
proper {@link DoFn}.
-   */
-  private class AdaptedProcessContext extends ProcessContext {
-
-    private final DoFn<InputT, OutputT>.ProcessContext newContext;
-
-    public AdaptedProcessContext(DoFn<InputT, OutputT>.ProcessContext 
newContext) {
-      this.newContext = newContext;
-    }
-
-    @Override
-    public InputT element() {
-      return newContext.element();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return newContext.sideInput(view);
-    }
-
-    @Override
-    public Instant timestamp() {
-      return newContext.timestamp();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      throw new UnsupportedOperationException(String.format(
-          "%s.%s.windowingInternals() is no longer supported. Please convert 
your %s to a %s",
-          OldDoFn.class.getSimpleName(),
-          OldDoFn.ProcessContext.class.getSimpleName(),
-          OldDoFn.class.getSimpleName(),
-          DoFn.class.getSimpleName()));
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return newContext.pane();
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      throw new UnsupportedOperationException(String.format(
-          "%s.%s.windowingInternals() is no longer supported. Please convert 
your %s to a %s",
-          OldDoFn.class.getSimpleName(),
-          OldDoFn.ProcessContext.class.getSimpleName(),
-          OldDoFn.class.getSimpleName(),
-          DoFn.class.getSimpleName()));
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return newContext.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      newContext.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      newContext.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      newContext.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      newContext.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return newContext.createAggregator(name, combiner);
-    }
-  }
-
-  private class AdaptedDoFn extends DoFn<InputT, OutputT> {
-
-    @Setup
-    public void setup() throws Exception {
-      OldDoFn.this.setup();
-    }
-
-    @StartBundle
-    public void startBundle(Context c) throws Exception {
-      OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c));
-    }
-
-    @FinishBundle
-    public void finishBundle(Context c) throws Exception {
-      OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @Teardown
-    public void teardown() throws Exception {
-      OldDoFn.this.teardown();
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return OldDoFn.this.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      OldDoFn.this.populateDisplayData(builder);
-    }
-
-    @Override
-    public TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return OldDoFn.this.getInputTypeDescriptor();
-    }
-
-    @Override
-    Collection<Aggregator<?, ?>> getAggregators() {
-      return OldDoFn.this.getAggregators();
-    }
-
-    @Override
-    public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return OldDoFn.this.getOutputTypeDescriptor();
-    }
-  }
-
-  /**
-   * A {@link ProcessContext} for an {@link OldDoFn} that implements
-   * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}.
-   */
-  private class AdaptedRequiresWindowAccessProcessContext extends 
AdaptedProcessContext {
-
-    private final BoundedWindow window;
-
-    public AdaptedRequiresWindowAccessProcessContext(
-        DoFn<InputT, OutputT>.ProcessContext newContext,
-        BoundedWindow window) {
-      super(newContext);
-      this.window = window;
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return window;
-    }
-  }
-
-  private class AdaptedRequiresWindowAccessDoFn extends DoFn<InputT, OutputT> {
-
-    @Setup
-    public void setup() throws Exception {
-      OldDoFn.this.setup();
-    }
-
-    @StartBundle
-    public void startBundle(Context c) throws Exception {
-      OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
-      OldDoFn.this.processElement(
-          OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, 
window));
-    }
-
-    @FinishBundle
-    public void finishBundle(Context c) throws Exception {
-      OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
-    }
-
-    @Teardown
-    public void teardown() throws Exception {
-      OldDoFn.this.teardown();
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return OldDoFn.this.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      OldDoFn.this.populateDisplayData(builder);
-    }
-
-    @Override
-    public TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return OldDoFn.this.getInputTypeDescriptor();
-    }
-
-    @Override
-    public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return OldDoFn.this.getOutputTypeDescriptor();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 50a7082..b141d51 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -18,13 +18,7 @@
 package org.apache.beam.sdk.transforms.reflect;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.util.UserCodeException;
 
 /** Static utilities for working with {@link DoFnInvoker}. */
 public class DoFnInvokers {
@@ -42,137 +36,22 @@ public class DoFnInvokers {
     return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn);
   }
 
-  private DoFnInvokers() {}
-
   /**
-   * Returns a {@link DoFnInvoker} for the given {@link Object}, which should 
be either a {@link
-   * DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a 
user's function as an
-   * {@link Object} and then pass it to this method, so there is no need to 
statically specify what
-   * sort of object it is.
+   * Temporarily retained for compatibility with Dataflow worker.
+   * TODO: delete this when Dataflow worker is fixed to call {@link 
#invokerFor(DoFn)}.
    *
-   * @deprecated this is to be used only as a migration path for decoupling 
upgrades
+   * @deprecated Use {@link #invokerFor(DoFn)}.
    */
+  @SuppressWarnings("unchecked")
   @Deprecated
-  public static DoFnInvoker<?, ?> invokerFor(Serializable deserializedFn) {
+  public static <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(
+      Serializable deserializedFn) {
     if (deserializedFn instanceof DoFn) {
-      return invokerFor((DoFn<?, ?>) deserializedFn);
-    } else if (deserializedFn instanceof OldDoFn) {
-      return new OldDoFnInvoker<>((OldDoFn<?, ?>) deserializedFn);
-    } else {
-      throw new IllegalArgumentException(
-          String.format(
-              "Cannot create a %s for %s; it should be either a %s or an %s.",
-              DoFnInvoker.class.getSimpleName(),
-              deserializedFn.toString(),
-              DoFn.class.getSimpleName(),
-              OldDoFn.class.getSimpleName()));
+      return invokerFor((DoFn<InputT, OutputT>) deserializedFn);
     }
+    throw new UnsupportedOperationException(
+        "Only DoFn supported, was: " + deserializedFn.getClass());
   }
 
-  /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */
-  @Deprecated public static final DoFnInvokers INSTANCE = new DoFnInvokers();
-
-  /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */
-  @Deprecated
-  public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(Object 
deserializedFn) {
-    return (DoFnInvoker<InputT, OutputT>) 
DoFnInvokers.invokerFor((Serializable) deserializedFn);
-  }
-
-
-  static class OldDoFnInvoker<InputT, OutputT> implements DoFnInvoker<InputT, 
OutputT> {
-
-    private final OldDoFn<InputT, OutputT> fn;
-
-    public OldDoFnInvoker(OldDoFn<InputT, OutputT> fn) {
-      this.fn = fn;
-    }
-
-    @Override
-    public DoFn.ProcessContinuation invokeProcessElement(
-        ArgumentProvider<InputT, OutputT> extra) {
-      // The outer DoFn is immaterial - it exists only to avoid typing InputT 
and OutputT repeatedly
-      DoFn<InputT, OutputT>.ProcessContext newCtx =
-          extra.processContext(new DoFn<InputT, OutputT>() {});
-      OldDoFn<InputT, OutputT>.ProcessContext oldCtx =
-          DoFnAdapters.adaptProcessContext(fn, newCtx, extra);
-      try {
-        fn.processElement(oldCtx);
-        return DoFn.ProcessContinuation.stop();
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeOnTimer(String timerId, ArgumentProvider<InputT, 
OutputT> arguments) {
-      throw new UnsupportedOperationException(
-          String.format("Timers are not supported for %s", 
OldDoFn.class.getSimpleName()));
-    }
-
-    @Override
-    public void invokeStartBundle(DoFn.Context c) {
-      OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, 
c);
-      try {
-        fn.startBundle(oldCtx);
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeFinishBundle(DoFn.Context c) {
-      OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, 
c);
-      try {
-        fn.finishBundle(oldCtx);
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeSetup() {
-      try {
-        fn.setup();
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public void invokeTeardown() {
-      try {
-        fn.teardown();
-      } catch (Throwable exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT 
element) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(
-        CoderRegistry coderRegistry) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public <RestrictionT> void invokeSplitRestriction(
-        InputT element, RestrictionT restriction, 
DoFn.OutputReceiver<RestrictionT> receiver) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
-        TrackerT invokeNewTracker(RestrictionT restriction) {
-      throw new UnsupportedOperationException("OldDoFn is not splittable");
-    }
-
-    @Override
-    public DoFn<InputT, OutputT> getFn() {
-      throw new UnsupportedOperationException("getFn is not supported for 
OldDoFn");
-    }
-  }
+  private DoFnInvokers() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 456a6eb..55b8e7e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -738,39 +737,4 @@ public class DoFnInvokersTest {
     invoker.invokeOnTimer(timerId, mockArgumentProvider);
     assertThat(fn.window, equalTo(testWindow));
   }
-
-  private class OldDoFnIdentity extends OldDoFn<String, String> {
-    public void processElement(ProcessContext c) {}
-  }
-
-  @Test
-  public void testOldDoFnProcessElement() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn)
-        .invokeProcessElement(mockArgumentProvider);
-    verify(mockOldDoFn).processElement(any(OldDoFn.ProcessContext.class));
-  }
-
-  @Test
-  public void testOldDoFnStartBundle() throws Exception {
-    new 
DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeStartBundle(mockProcessContext);
-    verify(mockOldDoFn).startBundle(any(OldDoFn.Context.class));
-  }
-
-  @Test
-  public void testOldDoFnFinishBundle() throws Exception {
-    new 
DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeFinishBundle(mockProcessContext);
-    verify(mockOldDoFn).finishBundle(any(OldDoFn.Context.class));
-  }
-
-  @Test
-  public void testOldDoFnSetup() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeSetup();
-    verify(mockOldDoFn).setup();
-  }
-
-  @Test
-  public void testOldDoFnTeardown() throws Exception {
-    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeTeardown();
-    verify(mockOldDoFn).teardown();
-  }
 }

Reply via email to