Repository: incubator-beam Updated Branches: refs/heads/master 46097736b -> 89367cfb1
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java new file mode 100644 index 0000000..1a26df2 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import org.apache.beam.sdk.transforms.DoFn; + +import com.google.common.reflect.TypeToken; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.lang.reflect.Method; +import java.util.List; + +/** Tests for {@link DoFnSignatures}. */ +@RunWith(JUnit4.class) +public class DoFnSignaturesTest { + + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static class FakeDoFn extends DoFn<Integer, String> {} + + @SuppressWarnings({"unused"}) + private void missingProcessContext() {} + + @Test + public void testMissingProcessContext() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + getClass().getName() + + "#missingProcessContext() must take a ProcessContext<> as its first argument"); + + DoFnSignatures.analyzeProcessElementMethod( + TypeToken.of(FakeDoFn.class), + getClass().getDeclaredMethod("missingProcessContext"), + TypeToken.of(Integer.class), + TypeToken.of(String.class)); + } + + @SuppressWarnings({"unused"}) + private void badProcessContext(String s) {} + + @Test + public void testBadProcessContextType() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + getClass().getName() + + "#badProcessContext(String) must take a ProcessContext<> as its first argument"); + + DoFnSignatures.analyzeProcessElementMethod( + TypeToken.of(FakeDoFn.class), + getClass().getDeclaredMethod("badProcessContext", String.class), + TypeToken.of(Integer.class), + TypeToken.of(String.class)); + } + + @SuppressWarnings({"unused"}) + private void badExtraContext(DoFn<Integer, String>.Context c, int n) {} + + @Test + public void testBadExtraContext() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + getClass().getName() + + "#badExtraContext(Context, int) must have a single argument of type Context"); + + DoFnSignatures.analyzeBundleMethod( + TypeToken.of(FakeDoFn.class), + getClass().getDeclaredMethod("badExtraContext", DoFn.Context.class, int.class), + TypeToken.of(Integer.class), + TypeToken.of(String.class)); + } + + @SuppressWarnings({"unused"}) + private void badExtraProcessContext(DoFn<Integer, String>.ProcessContext c, Integer n) {} + + @Test + public void testBadExtraProcessContextType() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Integer is not a valid context parameter for method " + + getClass().getName() + + "#badExtraProcessContext(ProcessContext, Integer)" + + ". Should be one of [BoundedWindow]"); + + DoFnSignatures.analyzeProcessElementMethod( + TypeToken.of(FakeDoFn.class), + getClass() + .getDeclaredMethod("badExtraProcessContext", DoFn.ProcessContext.class, Integer.class), + TypeToken.of(Integer.class), + TypeToken.of(String.class)); + } + + @SuppressWarnings("unused") + private int badReturnType() { + return 0; + } + + @Test + public void testBadReturnType() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(getClass().getName() + "#badReturnType() must have a void return type"); + + DoFnSignatures.analyzeProcessElementMethod( + TypeToken.of(FakeDoFn.class), + getClass().getDeclaredMethod("badReturnType"), + TypeToken.of(Integer.class), + TypeToken.of(String.class)); + } + + @SuppressWarnings("unused") + private void goodConcreteTypes( + DoFn<Integer, String>.ProcessContext c, + DoFn.InputProvider<Integer> input, + DoFn.OutputReceiver<String> output) {} + + @Test + public void testGoodConcreteTypes() throws Exception { + Method method = + getClass() + .getDeclaredMethod( + "goodConcreteTypes", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); + DoFnSignatures.analyzeProcessElementMethod( + TypeToken.of(FakeDoFn.class), + method, + TypeToken.of(Integer.class), + TypeToken.of(String.class)); + } + + private static class GoodTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> { + @ProcessElement + @SuppressWarnings("unused") + public void goodTypeVariables( + DoFn<InputT, OutputT>.ProcessContext c, + DoFn.InputProvider<InputT> input, + DoFn.OutputReceiver<OutputT> output) {} + } + + @Test + public void testGoodTypeVariables() throws Exception { + DoFnSignatures.INSTANCE.getOrParseSignature(GoodTypeVariables.class); + } + + private static class IdentityFn<T> extends DoFn<T, T> { + @ProcessElement + @SuppressWarnings("unused") + public void processElement(ProcessContext c, InputProvider<T> input, OutputReceiver<T> output) { + c.output(c.element()); + } + } + + private static class IdentityListFn<T> extends IdentityFn<List<T>> {} + + @Test + public void testIdentityFnApplied() throws Exception { + DoFnSignatures.INSTANCE.getOrParseSignature(new IdentityFn<String>() {}.getClass()); + } + + @SuppressWarnings("unused") + private void badGenericTwoArgs( + DoFn<Integer, String>.ProcessContext c, + DoFn.InputProvider<Integer> input, + DoFn.OutputReceiver<Integer> output) {} + + @Test + public void testBadGenericsTwoArgs() throws Exception { + Method method = + getClass() + .getDeclaredMethod( + "badGenericTwoArgs", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Wrong type of OutputReceiver parameter " + + "for method " + + getClass().getName() + + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver): " + + "OutputReceiver<Integer>, should be " + + "OutputReceiver<String>"); + + DoFnSignatures.analyzeProcessElementMethod( + TypeToken.of(FakeDoFn.class), + method, + TypeToken.of(Integer.class), + TypeToken.of(String.class)); + } + + @SuppressWarnings("unused") + private void badGenericWildCards( + DoFn<Integer, String>.ProcessContext c, + DoFn.InputProvider<Integer> input, + DoFn.OutputReceiver<? super Integer> output) {} + + @Test + public void testBadGenericWildCards() throws Exception { + Method method = + getClass() + .getDeclaredMethod( + "badGenericWildCards", + DoFn.ProcessContext.class, + DoFn.InputProvider.class, + DoFn.OutputReceiver.class); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Wrong type of OutputReceiver parameter for method " + + getClass().getName() + + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver): " + + "OutputReceiver<? super Integer>, should be " + + "OutputReceiver<String>"); + + DoFnSignatures.analyzeProcessElementMethod( + TypeToken.of(FakeDoFn.class), + method, + TypeToken.of(Integer.class), + TypeToken.of(String.class)); + } + + static class BadTypeVariables<InputT, OutputT> extends DoFn<InputT, OutputT> { + @ProcessElement + @SuppressWarnings("unused") + public void badTypeVariables( + DoFn<InputT, OutputT>.ProcessContext c, + DoFn.InputProvider<InputT> input, + DoFn.OutputReceiver<InputT> output) {} + } + + @Test + public void testBadTypeVariables() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Wrong type of OutputReceiver parameter for method " + + BadTypeVariables.class.getName() + + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver): " + + "OutputReceiver<InputT>, should be " + + "OutputReceiver<OutputT>"); + + DoFnSignatures.INSTANCE.getOrParseSignature(BadTypeVariables.class); + } + + @Test + public void testNoProcessElement() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("No method annotated with @ProcessElement found"); + thrown.expectMessage(getClass().getName() + "$"); + DoFnSignatures.INSTANCE.getOrParseSignature(new DoFn<String, String>() {}.getClass()); + } + + @Test + public void testMultipleProcessElement() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Found multiple methods annotated with @ProcessElement"); + thrown.expectMessage("foo()"); + thrown.expectMessage("bar()"); + thrown.expectMessage(getClass().getName() + "$"); + DoFnSignatures.INSTANCE.getOrParseSignature( + new DoFn<String, String>() { + @ProcessElement + public void foo() {} + + @ProcessElement + public void bar() {} + }.getClass()); + } + + @Test + public void testMultipleStartBundleElement() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Found multiple methods annotated with @StartBundle"); + thrown.expectMessage("bar()"); + thrown.expectMessage("baz()"); + thrown.expectMessage(getClass().getName() + "$"); + DoFnSignatures.INSTANCE.getOrParseSignature( + new DoFn<String, String>() { + @ProcessElement + public void foo() {} + + @StartBundle + public void bar() {} + + @StartBundle + public void baz() {} + }.getClass()); + } + + @Test + public void testMultipleFinishBundleMethods() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Found multiple methods annotated with @FinishBundle"); + thrown.expectMessage("bar(Context)"); + thrown.expectMessage("baz(Context)"); + thrown.expectMessage(getClass().getName() + "$"); + DoFnSignatures.INSTANCE.getOrParseSignature( + new DoFn<String, String>() { + @ProcessElement + public void foo(ProcessContext context) {} + + @FinishBundle + public void bar(Context context) {} + + @FinishBundle + public void baz(Context context) {} + }.getClass()); + } + + @Test + public void testPrivateProcessElement() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("process() must be public"); + thrown.expectMessage(getClass().getName() + "$"); + DoFnSignatures.INSTANCE.getOrParseSignature( + new DoFn<String, String>() { + @ProcessElement + private void process() {} + }.getClass()); + } + + @Test + public void testPrivateStartBundle() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("startBundle() must be public"); + thrown.expectMessage(getClass().getName() + "$"); + DoFnSignatures.INSTANCE.getOrParseSignature( + new DoFn<String, String>() { + @ProcessElement + public void processElement() {} + + @StartBundle + void startBundle() {} + }.getClass()); + } + + @Test + public void testPrivateFinishBundle() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("finishBundle() must be public"); + thrown.expectMessage(getClass().getName() + "$"); + DoFnSignatures.INSTANCE.getOrParseSignature( + new DoFn<String, String>() { + @ProcessElement + public void processElement() {} + + @FinishBundle + void finishBundle() {} + }.getClass()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java new file mode 100644 index 0000000..a574ed8 --- /dev/null +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.microbenchmarks.transforms; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; +import org.apache.beam.sdk.transforms.DoFnAdapters; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Benchmarks for {@link OldDoFn} and {@link DoFn} invocations, specifically for measuring the + * overhead of {@link DoFnInvokers}. + */ +@State(Scope.Benchmark) +@Fork(1) +@Warmup(iterations = 5) +public class DoFnInvokersBenchmark { + + private static final String ELEMENT = "some string to use for testing"; + + private OldDoFn<String, String> oldDoFn = new UpperCaseOldDoFn(); + private DoFn<String, String> doFn = new UpperCaseDoFn(); + + private StubOldDoFnProcessContext stubOldDoFnContext = + new StubOldDoFnProcessContext(oldDoFn, ELEMENT); + private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT); + private ExtraContextFactory<String, String> extraContextFactory = + new DoFn.FakeExtraContextFactory<>(); + + private OldDoFn<String, String> adaptedDoFnWithContext; + + private DoFnInvoker<String, String> invoker; + + @Setup + public void setUp() { + adaptedDoFnWithContext = DoFnAdapters.toOldDoFn(doFn); + invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(doFn); + } + + @Benchmark + public String invokeOldDoFn() throws Exception { + oldDoFn.processElement(stubOldDoFnContext); + return stubDoFnContext.output; + } + + @Benchmark + public String invokeDoFnWithContextViaAdaptor() throws Exception { + adaptedDoFnWithContext.processElement(stubOldDoFnContext); + return stubOldDoFnContext.output; + } + + @Benchmark + public String invokeDoFnWithContext() throws Exception { + invoker.invokeProcessElement(stubDoFnContext, extraContextFactory); + return stubDoFnContext.output; + } + + private static class UpperCaseOldDoFn extends OldDoFn<String, String> { + + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element().toUpperCase()); + } + } + + private static class UpperCaseDoFn extends DoFn<String, String> { + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(c.element().toUpperCase()); + } + } + + private static class StubOldDoFnProcessContext extends OldDoFn<String, String>.ProcessContext { + + private final String element; + private String output; + + public StubOldDoFnProcessContext(OldDoFn<String, String> fn, String element) { + fn.super(); + this.element = element; + } + + @Override + public String element() { + return element; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return null; + } + + @Override + public Instant timestamp() { + return null; + } + + @Override + public BoundedWindow window() { + return null; + } + + @Override + public PaneInfo pane() { + return null; + } + + @Override + public WindowingInternals<String, String> windowingInternals() { + return null; + } + + @Override + public PipelineOptions getPipelineOptions() { + return null; + } + + @Override + public void output(String output) { + this.output = output; + } + + @Override + public void outputWithTimestamp(String output, Instant timestamp) { + output(output); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) {} + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {} + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + return null; + } + } + + private static class StubDoFnProcessContext extends DoFn<String, String>.ProcessContext { + private final String element; + private String output; + + public StubDoFnProcessContext(DoFn<String, String> fn, String element) { + fn.super(); + this.element = element; + } + + @Override + public String element() { + return element; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return null; + } + + @Override + public Instant timestamp() { + return null; + } + + @Override + public PaneInfo pane() { + return null; + } + + @Override + public PipelineOptions getPipelineOptions() { + return null; + } + + @Override + public void output(String output) { + this.output = output; + } + + @Override + public void outputWithTimestamp(String output, Instant timestamp) { + output(output); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) {} + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {} + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbf77f90/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java deleted file mode 100644 index 91ecd16..0000000 --- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.microbenchmarks.transforms; - -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; -import org.apache.beam.sdk.transforms.DoFnReflector; -import org.apache.beam.sdk.transforms.DoFnReflector.DoFnInvoker; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowingInternals; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; - -import org.joda.time.Instant; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.Warmup; - -/** - * Benchmarks for {@link OldDoFn} and {@link DoFn} invocations, specifically - * for measuring the overhead of {@link DoFnReflector}. - */ -@State(Scope.Benchmark) -@Fork(1) -@Warmup(iterations = 5) -public class DoFnReflectorBenchmark { - - private static final String ELEMENT = "some string to use for testing"; - - private OldDoFn<String, String> oldDoFn = new UpperCaseOldDoFn(); - private DoFn<String, String> doFn = new UpperCaseDoFn(); - - private StubOldDoFnProcessContext stubOldDoFnContext = new StubOldDoFnProcessContext(oldDoFn, - ELEMENT); - private StubDoFnProcessContext stubDoFnContext = - new StubDoFnProcessContext(doFn, ELEMENT); - private ExtraContextFactory<String, String> extraContextFactory = - new DoFn.FakeExtraContextFactory<>(); - - private DoFnReflector doFnReflector; - private OldDoFn<String, String> adaptedDoFnWithContext; - - private DoFnInvoker<String, String> invoker; - - @Setup - public void setUp() { - doFnReflector = DoFnReflector.of(doFn.getClass()); - adaptedDoFnWithContext = doFnReflector.toDoFn(doFn); - invoker = doFnReflector.bindInvoker(doFn); - } - - @Benchmark - public String invokeOldDoFn() throws Exception { - oldDoFn.processElement(stubOldDoFnContext); - return stubDoFnContext.output; - } - - @Benchmark - public String invokeDoFnWithContextViaAdaptor() throws Exception { - adaptedDoFnWithContext.processElement(stubOldDoFnContext); - return stubOldDoFnContext.output; - } - - @Benchmark - public String invokeDoFnWithContext() throws Exception { - invoker.invokeProcessElement(stubDoFnContext, extraContextFactory); - return stubDoFnContext.output; - } - - private static class UpperCaseOldDoFn extends OldDoFn<String, String> { - - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(c.element().toUpperCase()); - } - } - - private static class UpperCaseDoFn extends DoFn<String, String> { - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - c.output(c.element().toUpperCase()); - } - } - - private static class StubOldDoFnProcessContext extends OldDoFn<String, String>.ProcessContext { - - private final String element; - private String output; - - public StubOldDoFnProcessContext(OldDoFn<String, String> fn, String element) { - fn.super(); - this.element = element; - } - - @Override - public String element() { - return element; - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - return null; - } - - @Override - public Instant timestamp() { - return null; - } - - @Override - public BoundedWindow window() { - return null; - } - - @Override - public PaneInfo pane() { - return null; - } - - @Override - public WindowingInternals<String, String> windowingInternals() { - return null; - } - - @Override - public PipelineOptions getPipelineOptions() { - return null; - } - - @Override - public void output(String output) { - this.output = output; - } - - @Override - public void outputWithTimestamp(String output, Instant timestamp) { - output(output); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> - createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { - return null; - } - } - - private static class StubDoFnProcessContext - extends DoFn<String, String>.ProcessContext { - private final String element; - private String output; - - public StubDoFnProcessContext(DoFn<String, String> fn, String element) { - fn.super(); - this.element = element; - } - - @Override - public String element() { - return element; - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - return null; - } - - @Override - public Instant timestamp() { - return null; - } - - @Override - public PaneInfo pane() { - return null; - } - - @Override - public PipelineOptions getPipelineOptions() { - return null; - } - - @Override - public void output(String output) { - this.output = output; - } - - @Override - public void outputWithTimestamp(String output, Instant timestamp) { - output(output); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - } - } -}