Removes OldDoFn from ParDo

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e9e53c5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e9e53c5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e9e53c5d

Branch: refs/heads/master
Commit: e9e53c5d037561aa4dcacfcde69d76a03f3a1571
Parents: 8330bfa
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri Dec 9 17:13:43 2016 -0800
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Thu Dec 15 13:58:43 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/ParDo.java   | 167 +++----------------
 .../apache/beam/sdk/transforms/OldDoFnTest.java | 125 ++++----------
 2 files changed, 55 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9e53c5d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 167f5fa..d2149c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -530,23 +529,6 @@ public class ParDo {
     return new Unbound().of(fn, displayDataForFn(fn));
   }
 
-  /**
-   * Creates a {@link ParDo} {@link PTransform} that will invoke the
-   * given {@link OldDoFn} function.
-   *
-   * <p>The resulting {@link PTransform PTransform's} types have been bound, 
with the
-   * input being a {@code PCollection<InputT>} and the output a
-   * {@code PCollection<OutputT>}, inferred from the types of the argument
-   * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further
-   * properties can be set on it first.
-   *
-   * @deprecated please port your {@link OldDoFn} to a {@link DoFn}
-   */
-  @Deprecated
-  public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, 
OutputT> fn) {
-    return new Unbound().of(fn, displayDataForFn(fn));
-  }
-
   private static <T> DisplayData.ItemSpec<? extends Class<?>> 
displayDataForFn(T fn) {
     return DisplayData.item("fn", fn.getClass()).withLabel("Transform 
Function");
   }
@@ -557,12 +539,7 @@ public class ParDo {
    * the {@link PCollection}.
    */
   private static <InputT, OutputT> void validateWindowType(
-      PCollection<? extends InputT> input, Serializable fn) {
-    // No validation for OldDoFn
-    if (!(fn instanceof DoFn)) {
-      return;
-    }
-
+      PCollection<? extends InputT> input, DoFn<InputT, OutputT> fn) {
     DoFnSignature signature = DoFnSignatures.getSignature((Class) 
fn.getClass());
 
     TypeDescriptor<? extends BoundedWindow> actualWindowT =
@@ -609,10 +586,6 @@ public class ParDo {
     }
   }
 
-  private static <InputT, OutputT> OldDoFn<InputT, OutputT> adapt(DoFn<InputT, 
OutputT> fn) {
-    return DoFnAdapters.toOldDoFn(fn);
-  }
-
   /**
    * An incomplete {@link ParDo} transform, with unbound input/output types.
    *
@@ -688,24 +661,9 @@ public class ParDo {
       return new UnboundMulti<>(name, sideInputs, mainOutputTag, 
sideOutputTags);
     }
 
-    /**
-     * Returns a new {@link ParDo} {@link PTransform} that's like this
-     * transform but that will invoke the given {@link OldDoFn}
-     * function, and that has its input and output types bound. Does
-     * not modify this transform. The resulting {@link PTransform} is
-     * sufficiently specified to be applied, but more properties can
-     * still be specified.
-     *
-     * @deprecated please port your {@link OldDoFn} to a {@link DoFn}
-     */
-    @Deprecated
-    public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, 
OutputT> oldFn) {
-      return of(oldFn, displayDataForFn(oldFn));
-    }
-
     private <InputT, OutputT> Bound<InputT, OutputT> of(
-        Serializable originalFn, DisplayData.ItemSpec<? extends Class<?>> 
fnDisplayData) {
-      return new Bound<>(name, originalFn, sideInputs, fnDisplayData);
+        DoFn<InputT, OutputT> doFn, DisplayData.ItemSpec<? extends Class<?>> 
fnDisplayData) {
+      return new Bound<>(name, doFn, sideInputs, fnDisplayData);
     }
   }
 
@@ -725,12 +683,12 @@ public class ParDo {
       extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
     // Inherits name.
     private final List<PCollectionView<?>> sideInputs;
-    private final Serializable fn;
+    private final DoFn<InputT, OutputT> fn;
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
 
     Bound(
         String name,
-        Serializable fn,
+        DoFn<InputT, OutputT> fn,
         List<PCollectionView<?>> sideInputs,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       super(name);
@@ -787,7 +745,7 @@ public class ParDo {
     @Override
     public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
       checkArgument(
-          !isSplittable(getOldFn()),
+          !isSplittable(getNewFn()),
           "%s does not support Splittable DoFn",
           input.getPipeline().getOptions().getRunner().getName());
       validateWindowType(input, fn);
@@ -795,7 +753,7 @@ public class ParDo {
               input.getPipeline(),
               input.getWindowingStrategy(),
               input.isBounded())
-          .setTypeDescriptor(getOldFn().getOutputTypeDescriptor());
+          .setTypeDescriptor(getNewFn().getOutputTypeDescriptor());
     }
 
     @Override
@@ -803,14 +761,14 @@ public class ParDo {
     protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends 
InputT> input)
         throws CannotProvideCoderException {
       return input.getPipeline().getCoderRegistry().getDefaultCoder(
-          getOldFn().getOutputTypeDescriptor(),
-          getOldFn().getInputTypeDescriptor(),
+          getNewFn().getOutputTypeDescriptor(),
+          getNewFn().getInputTypeDescriptor(),
           ((PCollection<InputT>) input).getCoder());
     }
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = DoFnAdapters.getDoFnClass(getOldFn());
+      Class<?> clazz = getNewFn().getClass();
       if (clazz.isAnonymousClass()) {
         return "AnonymousParDo";
       } else {
@@ -831,44 +789,7 @@ public class ParDo {
       ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData);
     }
 
-    /**
-     * @deprecated this method to be converted to return {@link DoFn}. If you 
want to receive
-     * an {@link OldDoFn} you should (temporarily) use {@link #getOldFn}.
-     */
-    @Deprecated
-    public OldDoFn<InputT, OutputT> getFn() {
-      return getOldFn();
-    }
-
-    /**
-     * @deprecated please migrate to {@link #getNewFn} until {@link #getFn} is 
migrated to return
-     * a {@link DoFn}.
-     */
-    @Deprecated
-    public OldDoFn<InputT, OutputT> getOldFn() {
-      if (fn instanceof OldDoFn) {
-        return (OldDoFn<InputT, OutputT>) fn;
-      } else {
-        return adapt((DoFn<InputT, OutputT>) fn);
-      }
-    }
-
     public DoFn<InputT, OutputT> getNewFn() {
-      if (fn instanceof DoFn) {
-        return (DoFn<InputT, OutputT>) fn;
-      } else {
-        return ((OldDoFn<InputT, OutputT>) fn).toDoFn();
-      }
-    }
-
-    /**
-     * Returns the {@link OldDoFn} or {@link DoFn} used to create this 
transform.
-     *
-     * @deprecated for migration purposes only. There are some cases of {@link 
OldDoFn} that are not
-     *     fully supported by wrapping it into a {@link DoFn}, such as {@link 
RequiresWindowAccess}.
-     */
-    @Deprecated
-    public Object getOriginalFn() {
       return fn;
     }
 
@@ -951,23 +872,8 @@ public class ParDo {
       return of(fn, displayDataForFn(fn));
     }
 
-    /**
-     * Returns a new multi-output {@link ParDo} {@link PTransform}
-     * that's like this transform but that will invoke the given
-     * {@link OldDoFn} function, and that has its input type bound.
-     * Does not modify this transform. The resulting
-     * {@link PTransform} is sufficiently specified to be applied, but
-     * more properties can still be specified.
-     *
-     * @deprecated please port your {@link OldDoFn} to a {@link DoFn}
-     */
-    @Deprecated
-    public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> 
fn) {
-      return of(fn, displayDataForFn(fn));
-    }
-
     private <InputT> BoundMulti<InputT, OutputT> of(
-        Serializable fn, DisplayData.ItemSpec<? extends Class<?>> 
fnDisplayData) {
+        DoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> 
fnDisplayData) {
       return new BoundMulti<>(name, fn, sideInputs, mainOutputTag, 
sideOutputTags, fnDisplayData);
     }
   }
@@ -990,11 +896,11 @@ public class ParDo {
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList sideOutputTags;
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
-    private final Serializable fn;
+    private final DoFn<InputT, OutputT> fn;
 
     BoundMulti(
         String name,
-        Serializable fn,
+        DoFn<InputT, OutputT> fn,
         List<PCollectionView<?>> sideInputs,
         TupleTag<OutputT> mainOutputTag,
         TupleTagList sideOutputTags,
@@ -1046,7 +952,7 @@ public class ParDo {
     @Override
     public PCollectionTuple expand(PCollection<? extends InputT> input) {
       checkArgument(
-          !isSplittable(getOldFn()),
+          !isSplittable(getNewFn()),
           "%s does not support Splittable DoFn",
           input.getPipeline().getOptions().getRunner().getName());
       validateWindowType(input, fn);
@@ -1059,7 +965,7 @@ public class ParDo {
       // The fn will likely be an instance of an anonymous subclass
       // such as DoFn<Integer, String> { }, thus will have a high-fidelity
       // TypeDescriptor for the output type.
-      
outputs.get(mainOutputTag).setTypeDescriptor(getOldFn().getOutputTypeDescriptor());
+      
outputs.get(mainOutputTag).setTypeDescriptor(getNewFn().getOutputTypeDescriptor());
 
       return outputs;
     }
@@ -1084,7 +990,7 @@ public class ParDo {
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = DoFnAdapters.getDoFnClass(getOldFn());
+      Class<?> clazz = getNewFn().getClass();
       if (clazz.isAnonymousClass()) {
         return "AnonymousParMultiDo";
       } else {
@@ -1095,37 +1001,11 @@ public class ParDo {
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData);
-    }
-
-    /**
-     * @deprecated this method to be converted to return {@link DoFn}. If you 
want to receive
-     * an {@link OldDoFn} you should (temporarily) use {@link #getOldFn}.
-     */
-    @Deprecated
-    public OldDoFn<InputT, OutputT> getFn() {
-      return getOldFn();
-    }
-
-    /**
-     * @deprecated please migrate to {@link #getNewFn} until {@link #getFn} is 
migrated to return
-     * a {@link DoFn}.
-     */
-    @Deprecated
-    public OldDoFn<InputT, OutputT> getOldFn() {
-      if (fn instanceof OldDoFn) {
-        return (OldDoFn<InputT, OutputT>) fn;
-      } else {
-        return adapt((DoFn<InputT, OutputT>) fn);
-      }
+      ParDo.populateDisplayData(builder, fn, fnDisplayData);
     }
 
     public DoFn<InputT, OutputT> getNewFn() {
-      if (fn instanceof DoFn) {
-        return (DoFn<InputT, OutputT>) fn;
-      } else {
-        return ((OldDoFn<InputT, OutputT>) fn).toDoFn();
-      }
+      return fn;
     }
 
     public TupleTag<OutputT> getMainOutputTag() {
@@ -1148,14 +1028,7 @@ public class ParDo {
     builder.include("fn", fn).add(fnDisplayData);
   }
 
-  private static boolean isSplittable(OldDoFn<?, ?> oldDoFn) {
-    DoFn<?, ?> fn = DoFnAdapters.getDoFn(oldDoFn);
-    if (fn == null) {
-      return false;
-    }
-    return DoFnSignatures
-        .getSignature(fn.getClass())
-        .processElement()
-        .isSplittable();
+  private static boolean isSplittable(DoFn<?, ?> fn) {
+    return DoFnSignatures.signatureForDoFn(fn).processElement().isSplittable();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9e53c5d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
index 07e3078..cc84252 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
@@ -18,28 +18,20 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
-import java.util.Map;
-import org.apache.beam.sdk.AggregatorValues;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
-import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -134,68 +126,52 @@ public class OldDoFnTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInStartBundleThrows() {
-    TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+  public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws 
Exception {
+    OldDoFn<String, String> fn = new OldDoFn<String, String>() {
       @Override
-      public void startBundle(OldDoFn<String, String>.Context c) throws 
Exception {
-        createAggregator("anyAggregate", new MaxIntegerFn());
-      }
-
-      @Override
-      public void processElement(OldDoFn<String, String>.ProcessContext c) 
throws Exception {}
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
+      public void processElement(ProcessContext c) throws Exception { }
+    };
+    OldDoFn<String, String>.Context context = createContext(fn);
+    context.setupDelegateAggregators();
 
-    p.run();
+    thrown.expect(isA(IllegalStateException.class));
+    fn.createAggregator("anyAggregate", new MaxIntegerFn());
   }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInProcessElementThrows() {
-    TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+  private OldDoFn<String, String>.Context createContext(OldDoFn<String, 
String> fn) {
+    return fn.new Context() {
       @Override
-      public void processElement(ProcessContext c) throws Exception {
-        createAggregator("anyAggregate", new MaxIntegerFn());
+      public PipelineOptions getPipelineOptions() {
+        throw new UnsupportedOperationException();
       }
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-
-    p.run();
-  }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInFinishBundleThrows() {
-    TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
       @Override
-      public void finishBundle(OldDoFn<String, String>.Context c) throws 
Exception {
-        createAggregator("anyAggregate", new MaxIntegerFn());
+      public void output(String output) {
+        throw new UnsupportedOperationException();
       }
 
       @Override
-      public void processElement(OldDoFn<String, String>.ProcessContext c) 
throws Exception {}
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
+      public void outputWithTimestamp(String output, Instant timestamp) {
+        throw new UnsupportedOperationException();
+      }
 
-    p.run();
-  }
+      @Override
+      public <T> void sideOutput(TupleTag<T> tag, T output) {
+        throw new UnsupportedOperationException();
+      }
 
-  /**
-   * Initialize a test pipeline with the specified {@link OldDoFn}.
-   */
-  private <InputT, OutputT> TestPipeline createTestPipeline(OldDoFn<InputT, 
OutputT> fn) {
-    TestPipeline pipeline = TestPipeline.create();
-    pipeline.apply(Create.of((InputT) null))
-     .apply(ParDo.of(fn));
+      @Override
+      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
+        throw new UnsupportedOperationException();
+      }
 
-    return pipeline;
+      @Override
+      public <AggInputT, AggOutputT>
+      Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+              String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+        throw new UnsupportedOperationException();
+      }
+    };
   }
 
   @Test
@@ -209,35 +185,4 @@ public class OldDoFnTest implements Serializable {
     DisplayData data = DisplayData.from(usesDefault);
     assertThat(data.items(), empty());
   }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testAggregators() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
-    CountOddsFn countOdds = new CountOddsFn();
-    PCollection<Void> output = pipeline
-        .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
-        .apply(ParDo.of(countOdds));
-    PipelineResult result = pipeline.run();
-
-    AggregatorValues<Integer> values = 
result.getAggregatorValues(countOdds.aggregator);
-
-    Map<String, Integer> valuesMap = values.getValuesAtSteps();
-
-    assertThat(valuesMap.size(), equalTo(1));
-    
assertThat(valuesMap.get(output.getProducingTransformInternal().getFullName()), 
equalTo(4));
-  }
-
-  private static class CountOddsFn extends OldDoFn<Integer, Void> {
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      if (c.element() % 2 == 1) {
-        aggregator.addValue(1);
-      }
-    }
-
-    Aggregator<Integer, Integer> aggregator =
-        createAggregator("odds", new SumIntegerFn());
-  }
 }

Reply via email to