Repository: incubator-beam
Updated Branches:
  refs/heads/master 46097736b -> 89367cfb1


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
new file mode 100644
index 0000000..1a26df2
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+import com.google.common.reflect.TypeToken;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/** Tests for {@link DoFnSignatures}. */
+@RunWith(JUnit4.class)
+public class DoFnSignaturesTest {
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private static class FakeDoFn extends DoFn<Integer, String> {}
+
+  @SuppressWarnings({"unused"})
+  private void missingProcessContext() {}
+
+  @Test
+  public void testMissingProcessContext() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        getClass().getName()
+            + "#missingProcessContext() must take a ProcessContext<> as its 
first argument");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        getClass().getDeclaredMethod("missingProcessContext"),
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings({"unused"})
+  private void badProcessContext(String s) {}
+
+  @Test
+  public void testBadProcessContextType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        getClass().getName()
+            + "#badProcessContext(String) must take a ProcessContext<> as its 
first argument");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        getClass().getDeclaredMethod("badProcessContext", String.class),
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings({"unused"})
+  private void badExtraContext(DoFn<Integer, String>.Context c, int n) {}
+
+  @Test
+  public void testBadExtraContext() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        getClass().getName()
+            + "#badExtraContext(Context, int) must have a single argument of 
type Context");
+
+    DoFnSignatures.analyzeBundleMethod(
+        TypeToken.of(FakeDoFn.class),
+        getClass().getDeclaredMethod("badExtraContext", DoFn.Context.class, 
int.class),
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings({"unused"})
+  private void badExtraProcessContext(DoFn<Integer, String>.ProcessContext c, 
Integer n) {}
+
+  @Test
+  public void testBadExtraProcessContextType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Integer is not a valid context parameter for method "
+            + getClass().getName()
+            + "#badExtraProcessContext(ProcessContext, Integer)"
+            + ". Should be one of [BoundedWindow]");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        getClass()
+            .getDeclaredMethod("badExtraProcessContext", 
DoFn.ProcessContext.class, Integer.class),
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings("unused")
+  private int badReturnType() {
+    return 0;
+  }
+
+  @Test
+  public void testBadReturnType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(getClass().getName() + "#badReturnType() must have a 
void return type");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        getClass().getDeclaredMethod("badReturnType"),
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings("unused")
+  private void goodConcreteTypes(
+      DoFn<Integer, String>.ProcessContext c,
+      DoFn.InputProvider<Integer> input,
+      DoFn.OutputReceiver<String> output) {}
+
+  @Test
+  public void testGoodConcreteTypes() throws Exception {
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "goodConcreteTypes",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        method,
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  private static class GoodTypeVariables<InputT, OutputT> extends DoFn<InputT, 
OutputT> {
+    @ProcessElement
+    @SuppressWarnings("unused")
+    public void goodTypeVariables(
+        DoFn<InputT, OutputT>.ProcessContext c,
+        DoFn.InputProvider<InputT> input,
+        DoFn.OutputReceiver<OutputT> output) {}
+  }
+
+  @Test
+  public void testGoodTypeVariables() throws Exception {
+    DoFnSignatures.INSTANCE.getOrParseSignature(GoodTypeVariables.class);
+  }
+
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    @SuppressWarnings("unused")
+    public void processElement(ProcessContext c, InputProvider<T> input, 
OutputReceiver<T> output) {
+      c.output(c.element());
+    }
+  }
+
+  private static class IdentityListFn<T> extends IdentityFn<List<T>> {}
+
+  @Test
+  public void testIdentityFnApplied() throws Exception {
+    DoFnSignatures.INSTANCE.getOrParseSignature(new IdentityFn<String>() 
{}.getClass());
+  }
+
+  @SuppressWarnings("unused")
+  private void badGenericTwoArgs(
+      DoFn<Integer, String>.ProcessContext c,
+      DoFn.InputProvider<Integer> input,
+      DoFn.OutputReceiver<Integer> output) {}
+
+  @Test
+  public void testBadGenericsTwoArgs() throws Exception {
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "badGenericTwoArgs",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Wrong type of OutputReceiver parameter "
+            + "for method "
+            + getClass().getName()
+            + "#badGenericTwoArgs(ProcessContext, InputProvider, 
OutputReceiver): "
+            + "OutputReceiver<Integer>, should be "
+            + "OutputReceiver<String>");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        method,
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  @SuppressWarnings("unused")
+  private void badGenericWildCards(
+      DoFn<Integer, String>.ProcessContext c,
+      DoFn.InputProvider<Integer> input,
+      DoFn.OutputReceiver<? super Integer> output) {}
+
+  @Test
+  public void testBadGenericWildCards() throws Exception {
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "badGenericWildCards",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Wrong type of OutputReceiver parameter for method "
+            + getClass().getName()
+            + "#badGenericWildCards(ProcessContext, InputProvider, 
OutputReceiver): "
+            + "OutputReceiver<? super Integer>, should be "
+            + "OutputReceiver<String>");
+
+    DoFnSignatures.analyzeProcessElementMethod(
+        TypeToken.of(FakeDoFn.class),
+        method,
+        TypeToken.of(Integer.class),
+        TypeToken.of(String.class));
+  }
+
+  static class BadTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> 
{
+    @ProcessElement
+    @SuppressWarnings("unused")
+    public void badTypeVariables(
+        DoFn<InputT, OutputT>.ProcessContext c,
+        DoFn.InputProvider<InputT> input,
+        DoFn.OutputReceiver<InputT> output) {}
+  }
+
+  @Test
+  public void testBadTypeVariables() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Wrong type of OutputReceiver parameter for method "
+            + BadTypeVariables.class.getName()
+            + "#badTypeVariables(ProcessContext, InputProvider, 
OutputReceiver): "
+            + "OutputReceiver<InputT>, should be "
+            + "OutputReceiver<OutputT>");
+
+    DoFnSignatures.INSTANCE.getOrParseSignature(BadTypeVariables.class);
+  }
+
+  @Test
+  public void testNoProcessElement() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No method annotated with @ProcessElement found");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(new DoFn<String, String>() 
{}.getClass());
+  }
+
+  @Test
+  public void testMultipleProcessElement() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Found multiple methods annotated with 
@ProcessElement");
+    thrown.expectMessage("foo()");
+    thrown.expectMessage("bar()");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void foo() {}
+
+          @ProcessElement
+          public void bar() {}
+        }.getClass());
+  }
+
+  @Test
+  public void testMultipleStartBundleElement() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Found multiple methods annotated with @StartBundle");
+    thrown.expectMessage("bar()");
+    thrown.expectMessage("baz()");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void foo() {}
+
+          @StartBundle
+          public void bar() {}
+
+          @StartBundle
+          public void baz() {}
+        }.getClass());
+  }
+
+  @Test
+  public void testMultipleFinishBundleMethods() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Found multiple methods annotated with 
@FinishBundle");
+    thrown.expectMessage("bar(Context)");
+    thrown.expectMessage("baz(Context)");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void foo(ProcessContext context) {}
+
+          @FinishBundle
+          public void bar(Context context) {}
+
+          @FinishBundle
+          public void baz(Context context) {}
+        }.getClass());
+  }
+
+  @Test
+  public void testPrivateProcessElement() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("process() must be public");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          private void process() {}
+        }.getClass());
+  }
+
+  @Test
+  public void testPrivateStartBundle() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("startBundle() must be public");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement() {}
+
+          @StartBundle
+          void startBundle() {}
+        }.getClass());
+  }
+
+  @Test
+  public void testPrivateFinishBundle() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("finishBundle() must be public");
+    thrown.expectMessage(getClass().getName() + "$");
+    DoFnSignatures.INSTANCE.getOrParseSignature(
+        new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement() {}
+
+          @FinishBundle
+          void finishBundle() {}
+        }.getClass());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
 
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
new file mode 100644
index 0000000..a574ed8
--- /dev/null
+++ 
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.microbenchmarks.transforms;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Benchmarks for {@link OldDoFn} and {@link DoFn} invocations, specifically 
for measuring the
+ * overhead of {@link DoFnInvokers}.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Warmup(iterations = 5)
+public class DoFnInvokersBenchmark {
+
+  private static final String ELEMENT = "some string to use for testing";
+
+  private OldDoFn<String, String> oldDoFn = new UpperCaseOldDoFn();
+  private DoFn<String, String> doFn = new UpperCaseDoFn();
+
+  private StubOldDoFnProcessContext stubOldDoFnContext =
+      new StubOldDoFnProcessContext(oldDoFn, ELEMENT);
+  private StubDoFnProcessContext stubDoFnContext = new 
StubDoFnProcessContext(doFn, ELEMENT);
+  private ExtraContextFactory<String, String> extraContextFactory =
+      new DoFn.FakeExtraContextFactory<>();
+
+  private OldDoFn<String, String> adaptedDoFnWithContext;
+
+  private DoFnInvoker<String, String> invoker;
+
+  @Setup
+  public void setUp() {
+    adaptedDoFnWithContext = DoFnAdapters.toOldDoFn(doFn);
+    invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(doFn);
+  }
+
+  @Benchmark
+  public String invokeOldDoFn() throws Exception {
+    oldDoFn.processElement(stubOldDoFnContext);
+    return stubDoFnContext.output;
+  }
+
+  @Benchmark
+  public String invokeDoFnWithContextViaAdaptor() throws Exception {
+    adaptedDoFnWithContext.processElement(stubOldDoFnContext);
+    return stubOldDoFnContext.output;
+  }
+
+  @Benchmark
+  public String invokeDoFnWithContext() throws Exception {
+    invoker.invokeProcessElement(stubDoFnContext, extraContextFactory);
+    return stubDoFnContext.output;
+  }
+
+  private static class UpperCaseOldDoFn extends OldDoFn<String, String> {
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element().toUpperCase());
+    }
+  }
+
+  private static class UpperCaseDoFn extends DoFn<String, String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element().toUpperCase());
+    }
+  }
+
+  private static class StubOldDoFnProcessContext extends OldDoFn<String, 
String>.ProcessContext {
+
+    private final String element;
+    private String output;
+
+    public StubOldDoFnProcessContext(OldDoFn<String, String> fn, String 
element) {
+      fn.super();
+      this.element = element;
+    }
+
+    @Override
+    public String element() {
+      return element;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return null;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return null;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return null;
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return null;
+    }
+
+    @Override
+    public WindowingInternals<String, String> windowingInternals() {
+      return null;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return null;
+    }
+
+    @Override
+    public void output(String output) {
+      this.output = output;
+    }
+
+    @Override
+    public void outputWithTimestamp(String output, Instant timestamp) {
+      output(output);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {}
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {}
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return null;
+    }
+  }
+
+  private static class StubDoFnProcessContext extends DoFn<String, 
String>.ProcessContext {
+    private final String element;
+    private String output;
+
+    public StubDoFnProcessContext(DoFn<String, String> fn, String element) {
+      fn.super();
+      this.element = element;
+    }
+
+    @Override
+    public String element() {
+      return element;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return null;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return null;
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return null;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return null;
+    }
+
+    @Override
+    public void output(String output) {
+      this.output = output;
+    }
+
+    @Override
+    public void outputWithTimestamp(String output, Instant timestamp) {
+      output(output);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {}
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
 
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
deleted file mode 100644
index 91ecd16..0000000
--- 
a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.microbenchmarks.transforms;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFnReflector;
-import org.apache.beam.sdk.transforms.DoFnReflector.DoFnInvoker;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import org.joda.time.Instant;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.Warmup;
-
-/**
- * Benchmarks for {@link OldDoFn} and {@link DoFn} invocations, specifically
- * for measuring the overhead of {@link DoFnReflector}.
- */
-@State(Scope.Benchmark)
-@Fork(1)
-@Warmup(iterations = 5)
-public class DoFnReflectorBenchmark {
-
-  private static final String ELEMENT = "some string to use for testing";
-
-  private OldDoFn<String, String> oldDoFn = new UpperCaseOldDoFn();
-  private DoFn<String, String> doFn = new UpperCaseDoFn();
-
-  private StubOldDoFnProcessContext stubOldDoFnContext = new 
StubOldDoFnProcessContext(oldDoFn,
-      ELEMENT);
-  private StubDoFnProcessContext stubDoFnContext =
-      new StubDoFnProcessContext(doFn, ELEMENT);
-  private ExtraContextFactory<String, String> extraContextFactory =
-      new DoFn.FakeExtraContextFactory<>();
-
-  private DoFnReflector doFnReflector;
-  private OldDoFn<String, String> adaptedDoFnWithContext;
-
-  private DoFnInvoker<String, String> invoker;
-
-  @Setup
-  public void setUp() {
-    doFnReflector = DoFnReflector.of(doFn.getClass());
-    adaptedDoFnWithContext = doFnReflector.toDoFn(doFn);
-    invoker = doFnReflector.bindInvoker(doFn);
-  }
-
-  @Benchmark
-  public String invokeOldDoFn() throws Exception {
-    oldDoFn.processElement(stubOldDoFnContext);
-    return stubDoFnContext.output;
-  }
-
-  @Benchmark
-  public String invokeDoFnWithContextViaAdaptor() throws Exception {
-    adaptedDoFnWithContext.processElement(stubOldDoFnContext);
-    return stubOldDoFnContext.output;
-  }
-
-  @Benchmark
-  public String invokeDoFnWithContext() throws Exception {
-    invoker.invokeProcessElement(stubDoFnContext, extraContextFactory);
-    return stubDoFnContext.output;
-  }
-
-  private static class UpperCaseOldDoFn extends OldDoFn<String, String> {
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      c.output(c.element().toUpperCase());
-    }
-  }
-
-  private static class UpperCaseDoFn extends DoFn<String, String> {
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      c.output(c.element().toUpperCase());
-    }
-  }
-
-  private static class StubOldDoFnProcessContext extends OldDoFn<String, 
String>.ProcessContext {
-
-    private final String element;
-    private String output;
-
-    public StubOldDoFnProcessContext(OldDoFn<String, String> fn, String 
element) {
-      fn.super();
-      this.element = element;
-    }
-
-    @Override
-    public String element() {
-      return element;
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return null;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return null;
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return null;
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return null;
-    }
-
-    @Override
-    public WindowingInternals<String, String> windowingInternals() {
-      return null;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return null;
-    }
-
-    @Override
-    public void output(String output) {
-      this.output = output;
-    }
-
-    @Override
-    public void outputWithTimestamp(String output, Instant timestamp) {
-      output(output);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-        createAggregatorInternal(String name, CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
-      return null;
-    }
-  }
-
-  private static class StubDoFnProcessContext
-      extends DoFn<String, String>.ProcessContext {
-    private final String element;
-    private  String output;
-
-    public StubDoFnProcessContext(DoFn<String, String> fn, String element) {
-      fn.super();
-      this.element = element;
-    }
-
-    @Override
-    public String element() {
-      return element;
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return null;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return null;
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return null;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return null;
-    }
-
-    @Override
-    public void output(String output) {
-      this.output = output;
-    }
-
-    @Override
-    public void outputWithTimestamp(String output, Instant timestamp) {
-      output(output);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-    }
-  }
-}

Reply via email to