http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java index 3b314b2..8b00c03 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java @@ -142,9 +142,9 @@ public class DoFnWithContextTest implements Serializable { @Test public void testDoFnWithContextUsingAggregators() { NoOpDoFn<Object, Object> noOpFn = new NoOpDoFn<>(); - DoFn<Object, Object>.Context context = noOpFn.context(); + OldDoFn<Object, Object>.Context context = noOpFn.context(); - DoFn<Object, Object> fn = spy(noOpFn); + OldDoFn<Object, Object> fn = spy(noOpFn); context = spy(context); @SuppressWarnings("unchecked") @@ -225,7 +225,7 @@ public class DoFnWithContextTest implements Serializable { } /** - * Initialize a test pipeline with the specified {@link DoFn}. + * Initialize a test pipeline with the specified {@link OldDoFn}. */ private <InputT, OutputT> TestPipeline createTestPipeline(DoFnWithContext<InputT, OutputT> fn) { TestPipeline pipeline = TestPipeline.create();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java index 80825cb..b81eedb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java @@ -130,7 +130,7 @@ public class FlattenTest implements Serializable { PCollection<String> output = p .apply(Create.of((Void) null).withCoder(VoidCoder.of())) - .apply(ParDo.withSideInputs(view).of(new DoFn<Void, String>() { + .apply(ParDo.withSideInputs(view).of(new OldDoFn<Void, String>() { @Override public void processElement(ProcessContext c) { for (String side : c.sideInput(view)) { @@ -339,7 +339,7 @@ public class FlattenTest implements Serializable { ///////////////////////////////////////////////////////////////////////////// - private static class IdentityFn<T> extends DoFn<T, T> { + private static class IdentityFn<T> extends OldDoFn<T, T> { @Override public void processElement(ProcessContext c) { c.output(c.element()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index d6e4589..15c3ba8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.Matchers.empty; @@ -55,7 +56,6 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.fasterxml.jackson.annotation.JsonCreator; - import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Assert; @@ -371,7 +371,7 @@ public class GroupByKeyTest { pipeline.run(); } - private static class AssertTimestamp<K, V> extends DoFn<KV<K, V>, Void> { + private static class AssertTimestamp<K, V> extends OldDoFn<KV<K, V>, Void> { private final Instant timestamp; public AssertTimestamp(Instant timestamp) { @@ -506,7 +506,7 @@ public class GroupByKeyTest { * Creates a KV that wraps the original KV together with a random key. */ static class AssignRandomKey - extends DoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> { + extends OldDoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> { @Override public void processElement(ProcessContext c) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java index 3355aeb..fa2fae9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; + import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -74,7 +75,7 @@ public class IntraBundleParallelizationTest { /** * Introduces a delay in processing, then passes thru elements. */ - private static class DelayFn<T> extends DoFn<T, T> { + private static class DelayFn<T> extends OldDoFn<T, T> { public static final long DELAY_MS = 25; @Override @@ -94,7 +95,7 @@ public class IntraBundleParallelizationTest { /** * Throws an exception after some number of calls. */ - private static class ExceptionThrowingFn<T> extends DoFn<T, T> { + private static class ExceptionThrowingFn<T> extends OldDoFn<T, T> { private ExceptionThrowingFn(int numSuccesses) { IntraBundleParallelizationTest.numSuccesses.set(numSuccesses); } @@ -120,11 +121,11 @@ public class IntraBundleParallelizationTest { /** * Measures concurrency of the processElement method. */ - private static class ConcurrencyMeasuringFn<T> extends DoFn<T, T> { + private static class ConcurrencyMeasuringFn<T> extends OldDoFn<T, T> { @Override public void processElement(ProcessContext c) { // Synchronize on the class to provide synchronous access irrespective of - // how this DoFn is called. + // how this OldDoFn is called. synchronized (ConcurrencyMeasuringFn.class) { concurrentElements++; if (concurrentElements > maxDownstreamConcurrency) { @@ -154,8 +155,8 @@ public class IntraBundleParallelizationTest { } /** - * Test that the DoFn is parallelized up the the Max Parallelism factor within a bundle, but not - * greater than that amount. + * Test that the OldDoFn is parallelized up the the Max Parallelism factor within a bundle, but + * not greater than that amount. */ @Test @Category(NeedsRunner.class) @@ -224,7 +225,7 @@ public class IntraBundleParallelizationTest { @Test public void testDisplayData() { - DoFn<String, String> fn = new DoFn<String, String>() { + OldDoFn<String, String> fn = new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) throws Exception { } @@ -248,15 +249,15 @@ public class IntraBundleParallelizationTest { /** * Runs the provided doFn inside of an {@link IntraBundleParallelization} transform. * - * <p>This method assumes that the DoFn passed to it will call {@link #startConcurrentCall()} + * <p>This method assumes that the OldDoFn passed to it will call {@link #startConcurrentCall()} * before processing each elements and {@link #finishConcurrentCall()} after each element. * * @param numElements the size of the input * @param maxParallelism how many threads to execute in parallel - * @param doFn the DoFn to execute - * @return the maximum observed parallelism of the DoFn + * @param doFn the OldDoFn to execute + * @return the maximum observed parallelism of the OldDoFn */ - private int run(int numElements, int maxParallelism, DoFn<Integer, Integer> doFn) { + private int run(int numElements, int maxParallelism, OldDoFn<Integer, Integer> doFn) { Pipeline pipeline = TestPipeline.create(); ArrayList<Integer> data = new ArrayList<>(numElements); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java index f18504c..b4751d2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertThat; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java index 226255a..87fa554 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.TestUtils.checkCombineFn; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java index d7ec322..cd03a74 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java @@ -20,10 +20,12 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.TestUtils.checkCombineFn; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import org.apache.beam.sdk.transforms.display.DisplayData; + import com.google.common.collect.Lists; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java index a389fac..5c43755 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java @@ -28,35 +28,35 @@ import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; /** - * A {@link DoFn} that does nothing with provided elements. Used for testing - * methods provided by the DoFn abstract class. + * A {@link OldDoFn} that does nothing with provided elements. Used for testing + * methods provided by the OldDoFn abstract class. * * @param <InputT> unused. * @param <OutputT> unused. */ -class NoOpDoFn<InputT, OutputT> extends DoFn<InputT, OutputT> { +class NoOpDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> { @Override - public void processElement(DoFn<InputT, OutputT>.ProcessContext c) throws Exception { + public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception { } /** * Returns a new NoOp Context. */ - public DoFn<InputT, OutputT>.Context context() { + public OldDoFn<InputT, OutputT>.Context context() { return new NoOpDoFnContext(); } /** * Returns a new NoOp Process Context. */ - public DoFn<InputT, OutputT>.ProcessContext processContext() { + public OldDoFn<InputT, OutputT>.ProcessContext processContext() { return new NoOpDoFnProcessContext(); } /** - * A {@link DoFn.Context} that does nothing and returns exclusively null. + * A {@link OldDoFn.Context} that does nothing and returns exclusively null. */ - private class NoOpDoFnContext extends DoFn<InputT, OutputT>.Context { + private class NoOpDoFnContext extends OldDoFn<InputT, OutputT>.Context { @Override public PipelineOptions getPipelineOptions() { return null; @@ -82,10 +82,10 @@ class NoOpDoFn<InputT, OutputT> extends DoFn<InputT, OutputT> { } /** - * A {@link DoFn.ProcessContext} that does nothing and returns exclusively + * A {@link OldDoFn.ProcessContext} that does nothing and returns exclusively * null. */ - private class NoOpDoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext { + private class NoOpDoFnProcessContext extends OldDoFn<InputT, OutputT>.ProcessContext { @Override public InputT element() { return null; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java new file mode 100644 index 0000000..9234ccb --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link OldDoFn.Context}. + */ +@RunWith(JUnit4.class) +public class OldDoFnContextTest { + + @Mock + private Aggregator<Long, Long> agg; + + private OldDoFn<Object, Object> fn; + private OldDoFn<Object, Object>.Context context; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + + // Need to be real objects to call the constructor, and to reference the + // outer instance of OldDoFn + NoOpDoFn<Object, Object> noOpFn = new NoOpDoFn<>(); + OldDoFn<Object, Object>.Context noOpContext = noOpFn.context(); + + fn = spy(noOpFn); + context = spy(noOpContext); + } + + @Test + public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() { + Sum.SumLongFn combiner = new Sum.SumLongFn(); + Aggregator<Long, Long> delegateAggregator = + fn.createAggregator("test", combiner); + + when(context.createAggregatorInternal("test", combiner)).thenReturn(agg); + + context.setupDelegateAggregators(); + delegateAggregator.addValue(1L); + + verify(agg).addValue(1L); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java new file mode 100644 index 0000000..49f4366 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.runners.AggregatorValues; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Max.MaxIntegerFn; +import org.apache.beam.sdk.transforms.Sum.SumIntegerFn; +import org.apache.beam.sdk.transforms.display.DisplayData; + +import com.google.common.collect.ImmutableMap; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; +import java.util.Map; + +/** + * Tests for OldDoFn. + */ +@RunWith(JUnit4.class) +public class OldDoFnTest implements Serializable { + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testCreateAggregatorWithCombinerSucceeds() { + String name = "testAggregator"; + Sum.SumLongFn combiner = new Sum.SumLongFn(); + + OldDoFn<Void, Void> doFn = new NoOpDoFn<>(); + + Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner); + + assertEquals(name, aggregator.getName()); + assertEquals(combiner, aggregator.getCombineFn()); + } + + @Test + public void testCreateAggregatorWithNullNameThrowsException() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("name cannot be null"); + + OldDoFn<Void, Void> doFn = new NoOpDoFn<>(); + + doFn.createAggregator(null, new Sum.SumLongFn()); + } + + @Test + public void testCreateAggregatorWithNullCombineFnThrowsException() { + CombineFn<Object, Object, Object> combiner = null; + + thrown.expect(NullPointerException.class); + thrown.expectMessage("combiner cannot be null"); + + OldDoFn<Void, Void> doFn = new NoOpDoFn<>(); + + doFn.createAggregator("testAggregator", combiner); + } + + @Test + public void testCreateAggregatorWithNullSerializableFnThrowsException() { + SerializableFunction<Iterable<Object>, Object> combiner = null; + + thrown.expect(NullPointerException.class); + thrown.expectMessage("combiner cannot be null"); + + OldDoFn<Void, Void> doFn = new NoOpDoFn<>(); + + doFn.createAggregator("testAggregator", combiner); + } + + @Test + public void testCreateAggregatorWithSameNameThrowsException() { + String name = "testAggregator"; + CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn(); + + OldDoFn<Void, Void> doFn = new NoOpDoFn<>(); + + doFn.createAggregator(name, combiner); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Cannot create"); + thrown.expectMessage(name); + thrown.expectMessage("already exists"); + + doFn.createAggregator(name, combiner); + } + + @Test + public void testCreateAggregatorsWithDifferentNamesSucceeds() { + String nameOne = "testAggregator"; + String nameTwo = "aggregatorPrime"; + CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn(); + + OldDoFn<Void, Void> doFn = new NoOpDoFn<>(); + + Aggregator<Double, Double> aggregatorOne = + doFn.createAggregator(nameOne, combiner); + Aggregator<Double, Double> aggregatorTwo = + doFn.createAggregator(nameTwo, combiner); + + assertNotEquals(aggregatorOne, aggregatorTwo); + } + + @Test + @Category(NeedsRunner.class) + public void testCreateAggregatorInStartBundleThrows() { + TestPipeline p = createTestPipeline(new OldDoFn<String, String>() { + @Override + public void startBundle(OldDoFn<String, String>.Context c) throws Exception { + createAggregator("anyAggregate", new MaxIntegerFn()); + } + + @Override + public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {} + }); + + thrown.expect(PipelineExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testCreateAggregatorInProcessElementThrows() { + TestPipeline p = createTestPipeline(new OldDoFn<String, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + createAggregator("anyAggregate", new MaxIntegerFn()); + } + }); + + thrown.expect(PipelineExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testCreateAggregatorInFinishBundleThrows() { + TestPipeline p = createTestPipeline(new OldDoFn<String, String>() { + @Override + public void finishBundle(OldDoFn<String, String>.Context c) throws Exception { + createAggregator("anyAggregate", new MaxIntegerFn()); + } + + @Override + public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {} + }); + + thrown.expect(PipelineExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + + p.run(); + } + + /** + * Initialize a test pipeline with the specified {@link OldDoFn}. + */ + private <InputT, OutputT> TestPipeline createTestPipeline(OldDoFn<InputT, OutputT> fn) { + TestPipeline pipeline = TestPipeline.create(); + pipeline.apply(Create.of((InputT) null)) + .apply(ParDo.of(fn)); + + return pipeline; + } + + @Test + public void testPopulateDisplayDataDefaultBehavior() { + OldDoFn<String, String> usesDefault = + new OldDoFn<String, String>() { + @Override + public void processElement(ProcessContext c) throws Exception {} + }; + + DisplayData data = DisplayData.from(usesDefault); + assertThat(data.items(), empty()); + } + + @Test + @Category(NeedsRunner.class) + public void testAggregators() throws Exception { + Pipeline pipeline = TestPipeline.create(); + + CountOddsFn countOdds = new CountOddsFn(); + pipeline + .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100)) + .apply(ParDo.of(countOdds)); + PipelineResult result = pipeline.run(); + + AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator); + assertThat(values.getValuesAtSteps(), + equalTo((Map<String, Integer>) ImmutableMap.<String, Integer>of("ParDo(CountOdds)", 4))); + } + + private static class CountOddsFn extends OldDoFn<Integer, Void> { + @Override + public void processElement(ProcessContext c) throws Exception { + if (c.element() % 2 == 1) { + aggregator.addValue(1); + } + } + + Aggregator<Integer, Integer> aggregator = + createAggregator("odds", new SumIntegerFn()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 868270c..0a6eab0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -26,6 +26,7 @@ import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; import static com.google.common.base.Preconditions.checkNotNull; + import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; @@ -43,7 +44,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; @@ -59,7 +60,6 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import com.fasterxml.jackson.annotation.JsonCreator; - import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; @@ -89,7 +89,9 @@ public class ParDoTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); - private static class PrintingDoFn extends DoFn<String, String> implements RequiresWindowAccess { + private static class PrintingOldDoFn extends OldDoFn<String, String> implements + RequiresWindowAccess { + @Override public void processElement(ProcessContext c) { c.output(c.element() + ":" + c.timestamp().getMillis() @@ -97,17 +99,17 @@ public class ParDoTest implements Serializable { } } - static class TestDoFn extends DoFn<Integer, String> { + static class TestOldDoFn extends OldDoFn<Integer, String> { enum State { UNSTARTED, STARTED, PROCESSING, FINISHED } State state = State.UNSTARTED; final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>(); final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>(); - public TestDoFn() { + public TestOldDoFn() { } - public TestDoFn(List<PCollectionView<Integer>> sideInputViews, + public TestOldDoFn(List<PCollectionView<Integer>> sideInputViews, List<TupleTag<String>> sideOutputTupleTags) { this.sideInputViews.addAll(sideInputViews); this.sideOutputTupleTags.addAll(sideOutputTupleTags); @@ -161,9 +163,9 @@ public class ParDoTest implements Serializable { } } - static class TestNoOutputDoFn extends DoFn<Integer, String> { + static class TestNoOutputDoFn extends OldDoFn<Integer, String> { @Override - public void processElement(DoFn<Integer, String>.ProcessContext c) throws Exception {} + public void processElement(OldDoFn<Integer, String>.ProcessContext c) throws Exception {} } static class TestDoFnWithContext extends DoFnWithContext<Integer, String> { @@ -229,7 +231,7 @@ public class ParDoTest implements Serializable { } } - static class TestStartBatchErrorDoFn extends DoFn<Integer, String> { + static class TestStartBatchErrorDoFn extends OldDoFn<Integer, String> { @Override public void startBundle(Context c) { throw new RuntimeException("test error in initialize"); @@ -241,14 +243,14 @@ public class ParDoTest implements Serializable { } } - static class TestProcessElementErrorDoFn extends DoFn<Integer, String> { + static class TestProcessElementErrorDoFn extends OldDoFn<Integer, String> { @Override public void processElement(ProcessContext c) { throw new RuntimeException("test error in process"); } } - static class TestFinishBatchErrorDoFn extends DoFn<Integer, String> { + static class TestFinishBatchErrorDoFn extends OldDoFn<Integer, String> { @Override public void processElement(ProcessContext c) { // This has to be here. @@ -260,13 +262,13 @@ public class ParDoTest implements Serializable { } } - private static class StrangelyNamedDoer extends DoFn<Integer, String> { + private static class StrangelyNamedDoer extends OldDoFn<Integer, String> { @Override public void processElement(ProcessContext c) { } } - static class TestOutputTimestampDoFn extends DoFn<Integer, Integer> { + static class TestOutputTimestampDoFn extends OldDoFn<Integer, Integer> { @Override public void processElement(ProcessContext c) { Integer value = c.element(); @@ -274,7 +276,7 @@ public class ParDoTest implements Serializable { } } - static class TestShiftTimestampDoFn extends DoFn<Integer, Integer> { + static class TestShiftTimestampDoFn extends OldDoFn<Integer, Integer> { private Duration allowedTimestampSkew; private Duration durationToShift; @@ -297,7 +299,7 @@ public class ParDoTest implements Serializable { } } - static class TestFormatTimestampDoFn extends DoFn<Integer, String> { + static class TestFormatTimestampDoFn extends OldDoFn<Integer, String> { @Override public void processElement(ProcessContext c) { checkNotNull(c.timestamp()); @@ -318,7 +320,7 @@ public class ParDoTest implements Serializable { return PCollectionTuple.of(BY2, by2).and(BY3, by3); } - static class FilterFn extends DoFn<Integer, Integer> { + static class FilterFn extends OldDoFn<Integer, Integer> { private final int divisor; FilterFn(int divisor) { @@ -343,7 +345,7 @@ public class ParDoTest implements Serializable { PCollection<String> output = pipeline .apply(Create.of(inputs)) - .apply(ParDo.of(new TestDoFn())); + .apply(ParDo.of(new TestOldDoFn())); PAssert.that(output) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); @@ -377,7 +379,7 @@ public class ParDoTest implements Serializable { PCollection<String> output = pipeline .apply(Create.of(inputs).withCoder(VarIntCoder.of())) - .apply("TestDoFn", ParDo.of(new TestDoFn())); + .apply("TestOldDoFn", ParDo.of(new TestOldDoFn())); PAssert.that(output) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); @@ -395,7 +397,7 @@ public class ParDoTest implements Serializable { PCollection<String> output = pipeline .apply(Create.of(inputs).withCoder(VarIntCoder.of())) - .apply("TestDoFn", ParDo.of(new TestNoOutputDoFn())); + .apply("TestOldDoFn", ParDo.of(new TestNoOutputDoFn())); PAssert.that(output).empty(); @@ -418,7 +420,7 @@ public class ParDoTest implements Serializable { PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) .apply(ParDo - .of(new TestDoFn( + .of(new TestOldDoFn( Arrays.<PCollectionView<Integer>>asList(), Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3))) .withOutputTags( @@ -461,7 +463,7 @@ public class ParDoTest implements Serializable { PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) .apply(ParDo - .of(new TestDoFn( + .of(new TestOldDoFn( Arrays.<PCollectionView<Integer>>asList(), Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3))) .withOutputTags( @@ -527,7 +529,7 @@ public class ParDoTest implements Serializable { PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) - .of(new DoFn<Integer, Void>(){ + .of(new OldDoFn<Integer, Void>(){ @Override public void processElement(ProcessContext c) { c.sideOutput(sideOutputTag, c.element()); @@ -550,7 +552,7 @@ public class ParDoTest implements Serializable { PCollection<String> output = pipeline .apply(Create.of(inputs)) - .apply(ParDo.of(new TestDoFn( + .apply(ParDo.of(new TestOldDoFn( Arrays.<PCollectionView<Integer>>asList(), Arrays.asList(sideTag)))); @@ -569,7 +571,7 @@ public class ParDoTest implements Serializable { // Success for a total of 1000 outputs. input - .apply("Success1000", ParDo.of(new DoFn<Integer, String>() { + .apply("Success1000", ParDo.of(new OldDoFn<Integer, String>() { @Override public void processElement(ProcessContext c) { TupleTag<String> specialSideTag = new TupleTag<String>(){}; @@ -585,7 +587,7 @@ public class ParDoTest implements Serializable { // Failure for a total of 1001 outputs. input - .apply("Failure1001", ParDo.of(new DoFn<Integer, String>() { + .apply("Failure1001", ParDo.of(new OldDoFn<Integer, String>() { @Override public void processElement(ProcessContext c) { for (int i = 0; i < 1000; i++) { @@ -618,7 +620,7 @@ public class ParDoTest implements Serializable { PCollection<String> output = pipeline .apply(Create.of(inputs)) .apply(ParDo.withSideInputs(sideInput1, sideInputUnread, sideInput2) - .of(new TestDoFn( + .of(new TestOldDoFn( Arrays.asList(sideInput1, sideInput2), Arrays.<TupleTag<String>>asList()))); @@ -652,7 +654,7 @@ public class ParDoTest implements Serializable { .apply(ParDo.withSideInputs(sideInput1) .withSideInputs(sideInputUnread) .withSideInputs(sideInput2) - .of(new TestDoFn( + .of(new TestOldDoFn( Arrays.asList(sideInput1, sideInput2), Arrays.<TupleTag<String>>asList()))); @@ -690,7 +692,7 @@ public class ParDoTest implements Serializable { .withSideInputs(sideInputUnread) .withSideInputs(sideInput2) .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) - .of(new TestDoFn( + .of(new TestOldDoFn( Arrays.asList(sideInput1, sideInput2), Arrays.<TupleTag<String>>asList()))); @@ -728,7 +730,7 @@ public class ParDoTest implements Serializable { .withSideInputs(sideInputUnread) .withSideInputs(sideInput2) .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) - .of(new TestDoFn( + .of(new TestOldDoFn( Arrays.asList(sideInput1, sideInput2), Arrays.<TupleTag<String>>asList()))); @@ -752,7 +754,7 @@ public class ParDoTest implements Serializable { .apply(View.<Integer>asSingleton()); pipeline.apply("CreateMain", Create.of(inputs)) - .apply(ParDo.of(new TestDoFn( + .apply(ParDo.of(new TestOldDoFn( Arrays.<PCollectionView<Integer>>asList(sideView), Arrays.<TupleTag<String>>asList()))); @@ -815,18 +817,18 @@ public class ParDoTest implements Serializable { .setName("MyInput"); { - PCollection<String> output1 = input.apply(ParDo.of(new TestDoFn())); + PCollection<String> output1 = input.apply(ParDo.of(new TestOldDoFn())); assertEquals("ParDo(Test).out", output1.getName()); } { - PCollection<String> output2 = input.apply("MyParDo", ParDo.of(new TestDoFn())); + PCollection<String> output2 = input.apply("MyParDo", ParDo.of(new TestOldDoFn())); assertEquals("MyParDo.out", output2.getName()); } { - PCollection<String> output4 = input.apply("TestDoFn", ParDo.of(new TestDoFn())); - assertEquals("TestDoFn.out", output4.getName()); + PCollection<String> output4 = input.apply("TestOldDoFn", ParDo.of(new TestOldDoFn())); + assertEquals("TestOldDoFn.out", output4.getName()); } { @@ -835,7 +837,7 @@ public class ParDoTest implements Serializable { output5.getName()); } - assertEquals("ParDo(Printing)", ParDo.of(new PrintingDoFn()).getName()); + assertEquals("ParDo(Printing)", ParDo.of(new PrintingOldDoFn()).getName()); assertEquals( "ParMultiDo(SideOutputDummy)", @@ -855,7 +857,7 @@ public class ParDoTest implements Serializable { PCollectionTuple outputs = p .apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput") .apply("MyParDo", ParDo - .of(new TestDoFn( + .of(new TestOldDoFn( Arrays.<PCollectionView<Integer>>asList(), Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3))) .withOutputTags( @@ -883,7 +885,7 @@ public class ParDoTest implements Serializable { .apply("CustomTransform", new PTransform<PCollection<Integer>, PCollection<String>>() { @Override public PCollection<String> apply(PCollection<Integer> input) { - return input.apply(ParDo.of(new TestDoFn())); + return input.apply(ParDo.of(new TestOldDoFn())); } }); @@ -920,7 +922,7 @@ public class ParDoTest implements Serializable { @Test public void testJsonEscaping() { // Declare an arbitrary function and make sure we can serialize it - DoFn<Integer, Integer> doFn = new DoFn<Integer, Integer>() { + OldDoFn<Integer, Integer> doFn = new OldDoFn<Integer, Integer>() { @Override public void processElement(ProcessContext c) { c.output(c.element() + 1); @@ -973,7 +975,7 @@ public class ParDoTest implements Serializable { } } - private static class SideOutputDummyFn extends DoFn<Integer, Integer> { + private static class SideOutputDummyFn extends OldDoFn<Integer, Integer> { private TupleTag<TestDummy> sideTag; public SideOutputDummyFn(TupleTag<TestDummy> sideTag) { this.sideTag = sideTag; @@ -985,7 +987,7 @@ public class ParDoTest implements Serializable { } } - private static class MainOutputDummyFn extends DoFn<Integer, TestDummy> { + private static class MainOutputDummyFn extends OldDoFn<Integer, TestDummy> { private TupleTag<Integer> sideTag; public MainOutputDummyFn(TupleTag<Integer> sideTag) { this.sideTag = sideTag; @@ -1167,7 +1169,7 @@ public class ParDoTest implements Serializable { .apply(ParDo .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) .of( - new DoFn<TestDummy, TestDummy>() { + new OldDoFn<TestDummy, TestDummy>() { @Override public void processElement(ProcessContext context) { TestDummy element = context.element(); context.output(element); @@ -1181,7 +1183,7 @@ public class ParDoTest implements Serializable { // on a missing coder. tuple.get(mainOutputTag) .setCoder(TestDummyCoder.of()) - .apply("Output1", ParDo.of(new DoFn<TestDummy, Integer>() { + .apply("Output1", ParDo.of(new OldDoFn<TestDummy, Integer>() { @Override public void processElement(ProcessContext context) { context.output(1); } @@ -1228,7 +1230,7 @@ public class ParDoTest implements Serializable { PCollection<String> output = input .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)).of( - new DoFn<Integer, Integer>() { + new OldDoFn<Integer, Integer>() { @Override public void processElement(ProcessContext c) { c.sideOutputWithTimestamp( @@ -1349,7 +1351,7 @@ public class ParDoTest implements Serializable { PCollection<String> output = pipeline .apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1)))) .apply(Window.<String>into(FixedWindows.of(Duration.millis(1)))) - .apply(ParDo.of(new DoFn<String, String>() { + .apply(ParDo.of(new OldDoFn<String, String>() { @Override public void startBundle(Context c) { c.outputWithTimestamp("start", new Instant(2)); @@ -1368,7 +1370,7 @@ public class ParDoTest implements Serializable { System.out.println("Finish: 3"); } })) - .apply(ParDo.of(new PrintingDoFn())); + .apply(ParDo.of(new PrintingOldDoFn())); PAssert.that(output).satisfies(new Checker()); @@ -1383,7 +1385,7 @@ public class ParDoTest implements Serializable { pipeline .apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1)))) .apply(Window.<String>into(FixedWindows.of(Duration.millis(1)))) - .apply(ParDo.of(new DoFn<String, String>() { + .apply(ParDo.of(new OldDoFn<String, String>() { @Override public void startBundle(Context c) { c.output("start"); @@ -1400,7 +1402,7 @@ public class ParDoTest implements Serializable { } @Test public void testDoFnDisplayData() { - DoFn<String, String> fn = new DoFn<String, String>() { + OldDoFn<String, String> fn = new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) { } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java index 243b52b..0cc804e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java index fe02573..e7f8cd0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.TestUtils.NO_LINES; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static com.google.common.base.Preconditions.checkArgument; + import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java index a96d19b..fc0e659 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index 738b492..ee240bf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkArgument; + import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -98,12 +99,13 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("Create123", Create.of(1, 2, 3)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - c.output(c.sideInput(view)); - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + c.output(c.sideInput(view)); + } + })); PAssert.that(output).containsInAnyOrder(47, 47, 47); @@ -124,16 +126,17 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("Create123", Create.timestamped( - TimestampedValue.of(1, new Instant(4)), - TimestampedValue.of(2, new Instant(8)), - TimestampedValue.of(3, new Instant(12)))) + TimestampedValue.of(1, new Instant(4)), + TimestampedValue.of(2, new Instant(8)), + TimestampedValue.of(3, new Instant(12)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - c.output(c.sideInput(view)); - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + c.output(c.sideInput(view)); + } + })); PAssert.that(output).containsInAnyOrder(47, 47, 48); @@ -150,7 +153,7 @@ public class ViewTest implements Serializable { .apply(View.<Integer>asSingleton()); pipeline.apply("Create123", Create.of(1, 2, 3)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { @Override public void processElement(ProcessContext c) { c.output(c.sideInput(view)); @@ -175,7 +178,7 @@ public class ViewTest implements Serializable { final PCollectionView<Integer> view = oneTwoThree.apply(View.<Integer>asSingleton()); oneTwoThree.apply( - "OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + "OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { @Override public void processElement(ProcessContext c) { c.output(c.sideInput(view)); @@ -201,16 +204,17 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29, 31)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - checkArgument(c.sideInput(view).size() == 4); - checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0)); - for (Integer i : c.sideInput(view)) { - c.output(i); - } - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + checkArgument(c.sideInput(view).size() == 4); + checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0)); + for (Integer i : c.sideInput(view)) { + c.output(i); + } + } + })); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23); @@ -237,19 +241,21 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.timestamped( - TimestampedValue.of(29, new Instant(1)), - TimestampedValue.of(35, new Instant(11)))) + TimestampedValue.of(29, new Instant(1)), + TimestampedValue.of(35, new Instant(11)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - checkArgument(c.sideInput(view).size() == 4); - checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0)); - for (Integer i : c.sideInput(view)) { - c.output(i); - } - } - })); + .apply( + "OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + checkArgument(c.sideInput(view).size() == 4); + checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0)); + for (Integer i : c.sideInput(view)) { + c.output(i); + } + } + })); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43); @@ -267,16 +273,17 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - assertTrue(c.sideInput(view).isEmpty()); - assertFalse(c.sideInput(view).iterator().hasNext()); - c.output(1); - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + assertTrue(c.sideInput(view).isEmpty()); + assertFalse(c.sideInput(view).iterator().hasNext()); + c.output(1); + } + })); - // Pass at least one value through to guarantee that DoFn executes. + // Pass at least one value through to guarantee that OldDoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -292,36 +299,37 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - try { - c.sideInput(view).clear(); - fail("Expected UnsupportedOperationException on clear()"); - } catch (UnsupportedOperationException expected) { - } - try { - c.sideInput(view).add(4); - fail("Expected UnsupportedOperationException on add()"); - } catch (UnsupportedOperationException expected) { - } - try { - c.sideInput(view).addAll(new ArrayList<Integer>()); - fail("Expected UnsupportedOperationException on addAll()"); - } catch (UnsupportedOperationException expected) { - } - try { - c.sideInput(view).remove(0); - fail("Expected UnsupportedOperationException on remove()"); - } catch (UnsupportedOperationException expected) { - } - for (Integer i : c.sideInput(view)) { - c.output(i); - } - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + try { + c.sideInput(view).clear(); + fail("Expected UnsupportedOperationException on clear()"); + } catch (UnsupportedOperationException expected) { + } + try { + c.sideInput(view).add(4); + fail("Expected UnsupportedOperationException on add()"); + } catch (UnsupportedOperationException expected) { + } + try { + c.sideInput(view).addAll(new ArrayList<Integer>()); + fail("Expected UnsupportedOperationException on addAll()"); + } catch (UnsupportedOperationException expected) { + } + try { + c.sideInput(view).remove(0); + fail("Expected UnsupportedOperationException on remove()"); + } catch (UnsupportedOperationException expected) { + } + for (Integer i : c.sideInput(view)) { + c.output(i); + } + } + })); - // Pass at least one value through to guarantee that DoFn executes. + // Pass at least one value through to guarantee that OldDoFn executes. PAssert.that(output).containsInAnyOrder(11); pipeline.run(); @@ -338,14 +346,15 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29, 31)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - for (Integer i : c.sideInput(view)) { - c.output(i); - } - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + for (Integer i : c.sideInput(view)) { + c.output(i); + } + } + })); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23); @@ -371,18 +380,21 @@ public class ViewTest implements Serializable { .apply(View.<Integer>asIterable()); PCollection<Integer> output = - pipeline.apply("CreateMainInput", Create.timestamped( - TimestampedValue.of(29, new Instant(1)), - TimestampedValue.of(35, new Instant(11)))) + pipeline + .apply("CreateMainInput", + Create.timestamped( + TimestampedValue.of(29, new Instant(1)), + TimestampedValue.of(35, new Instant(11)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - for (Integer i : c.sideInput(view)) { - c.output(i); - } - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + for (Integer i : c.sideInput(view)) { + c.output(i); + } + } + })); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43); @@ -400,15 +412,16 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - assertFalse(c.sideInput(view).iterator().hasNext()); - c.output(1); - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + assertFalse(c.sideInput(view).iterator().hasNext()); + c.output(1); + } + })); - // Pass at least one value through to guarantee that DoFn executes. + // Pass at least one value through to guarantee that OldDoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -424,22 +437,23 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - Iterator<Integer> iterator = c.sideInput(view).iterator(); - while (iterator.hasNext()) { - try { - iterator.remove(); - fail("Expected UnsupportedOperationException on remove()"); - } catch (UnsupportedOperationException expected) { + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + Iterator<Integer> iterator = c.sideInput(view).iterator(); + while (iterator.hasNext()) { + try { + iterator.remove(); + fail("Expected UnsupportedOperationException on remove()"); + } catch (UnsupportedOperationException expected) { + } + c.output(iterator.next()); + } } - c.output(iterator.next()); - } - } - })); + })); - // Pass at least one value through to guarantee that DoFn executes. + // Pass at least one value through to guarantee that OldDoFn executes. PAssert.that(output).containsInAnyOrder(11); pipeline.run(); @@ -458,7 +472,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) { @@ -486,7 +500,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of(2 /* size */)) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() { + ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { assertEquals((int) c.element(), c.sideInput(view).size()); @@ -540,7 +554,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) { @@ -577,7 +591,7 @@ public class ViewTest implements Serializable { TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new DoFn<String, KV<String, Integer>>() { + new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { for (Integer v : @@ -615,7 +629,7 @@ public class ViewTest implements Serializable { TimestampedValue.of(1 /* size */, new Instant(16)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new DoFn<Integer, KV<String, Integer>>() { + new OldDoFn<Integer, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { assertEquals((int) c.element(), @@ -660,7 +674,7 @@ public class ViewTest implements Serializable { TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new DoFn<String, KV<String, Integer>>() { + new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { for (Integer v : @@ -689,17 +703,18 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - assertTrue(c.sideInput(view).isEmpty()); - assertTrue(c.sideInput(view).entrySet().isEmpty()); - assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); - c.output(c.element()); - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + assertTrue(c.sideInput(view).isEmpty()); + assertTrue(c.sideInput(view).entrySet().isEmpty()); + assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); + c.output(c.element()); + } + })); - // Pass at least one value through to guarantee that DoFn executes. + // Pass at least one value through to guarantee that OldDoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -718,17 +733,18 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - assertTrue(c.sideInput(view).isEmpty()); - assertTrue(c.sideInput(view).entrySet().isEmpty()); - assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); - c.output(c.element()); - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + assertTrue(c.sideInput(view).isEmpty()); + assertTrue(c.sideInput(view).entrySet().isEmpty()); + assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); + c.output(c.element()); + } + })); - // Pass at least one value through to guarantee that DoFn executes. + // Pass at least one value through to guarantee that OldDoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -747,7 +763,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { try { @@ -776,7 +792,7 @@ public class ViewTest implements Serializable { } })); - // Pass at least one value through to guarantee that DoFn executes. + // Pass at least one value through to guarantee that OldDoFn executes. PAssert.that(output).containsInAnyOrder(KV.of("apple", 1)); pipeline.run(); @@ -795,7 +811,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { c.output( @@ -822,7 +838,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of(2 /* size */)) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() { + ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { assertEquals((int) c.element(), c.sideInput(view).size()); @@ -854,7 +870,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { c.output( @@ -890,7 +906,7 @@ public class ViewTest implements Serializable { TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new DoFn<String, KV<String, Integer>>() { + new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { c.output(KV.of( @@ -927,7 +943,7 @@ public class ViewTest implements Serializable { TimestampedValue.of(1 /* size */, new Instant(16)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new DoFn<Integer, KV<String, Integer>>() { + new OldDoFn<Integer, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { assertEquals((int) c.element(), @@ -972,7 +988,7 @@ public class ViewTest implements Serializable { TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", ParDo.withSideInputs(view).of( - new DoFn<String, KV<String, Integer>>() { + new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { c.output(KV.of( @@ -1000,17 +1016,18 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - assertTrue(c.sideInput(view).isEmpty()); - assertTrue(c.sideInput(view).entrySet().isEmpty()); - assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); - c.output(c.element()); - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + assertTrue(c.sideInput(view).isEmpty()); + assertTrue(c.sideInput(view).entrySet().isEmpty()); + assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); + c.output(c.element()); + } + })); - // Pass at least one value through to guarantee that DoFn executes. + // Pass at least one value through to guarantee that OldDoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -1028,17 +1045,18 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) { - assertTrue(c.sideInput(view).isEmpty()); - assertTrue(c.sideInput(view).entrySet().isEmpty()); - assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); - c.output(c.element()); - } - })); + .apply("OutputSideInputs", + ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) { + assertTrue(c.sideInput(view).isEmpty()); + assertTrue(c.sideInput(view).entrySet().isEmpty()); + assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); + c.output(c.element()); + } + })); - // Pass at least one value through to guarantee that DoFn executes. + // Pass at least one value through to guarantee that OldDoFn executes. PAssert.that(results).containsInAnyOrder(1); pipeline.run(); @@ -1062,7 +1080,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { c.output( @@ -1093,7 +1111,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { try { @@ -1121,7 +1139,7 @@ public class ViewTest implements Serializable { } })); - // Pass at least one value through to guarantee that DoFn executes. + // Pass at least one value through to guarantee that OldDoFn executes. PAssert.that(output).containsInAnyOrder(KV.of("apple", 1)); pipeline.run(); @@ -1139,12 +1157,14 @@ public class ViewTest implements Serializable { PCollection<KV<String, Integer>> output = pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) - .apply("Output", ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.output(KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); - } - })); + .apply("Output", + ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.output(KV + .of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); + } + })); PAssert.that(output).containsInAnyOrder( KV.of("apple", 21), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -1173,7 +1193,7 @@ public class ViewTest implements Serializable { TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( - new DoFn<String, String>() { + new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); @@ -1206,7 +1226,7 @@ public class ViewTest implements Serializable { TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( - new DoFn<String, String>() { + new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); @@ -1237,7 +1257,7 @@ public class ViewTest implements Serializable { TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( - new DoFn<String, String>() { + new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); @@ -1267,7 +1287,7 @@ public class ViewTest implements Serializable { p.apply("CreateMainInput", Create.of("")) .apply( "OutputMainAndSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, String>() { + ParDo.withSideInputs(view).of(new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); @@ -1285,7 +1305,7 @@ public class ViewTest implements Serializable { Pipeline pipeline = TestPipeline.create(); final PCollectionView<Iterable<Integer>> view1 = pipeline.apply("CreateVoid1", Create.of((Void) null).withCoder(VoidCoder.of())) - .apply("OutputOneInteger", ParDo.of(new DoFn<Void, Integer>() { + .apply("OutputOneInteger", ParDo.of(new OldDoFn<Void, Integer>() { @Override public void processElement(ProcessContext c) { c.output(17); @@ -1297,7 +1317,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateVoid2", Create.of((Void) null).withCoder(VoidCoder.of())) .apply( "OutputSideInput", - ParDo.withSideInputs(view1).of(new DoFn<Void, Iterable<Integer>>() { + ParDo.withSideInputs(view1).of(new OldDoFn<Void, Iterable<Integer>>() { @Override public void processElement(ProcessContext c) { c.output(c.sideInput(view1)); @@ -1307,8 +1327,8 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateVoid3", Create.of((Void) null).withCoder(VoidCoder.of())) - .apply( - "ReadIterableSideInput", ParDo.withSideInputs(view2).of(new DoFn<Void, Integer>() { + .apply("ReadIterableSideInput", + ParDo.withSideInputs(view2).of(new OldDoFn<Void, Integer>() { @Override public void processElement(ProcessContext c) { for (Iterable<Integer> input : c.sideInput(view2)) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java index ac67bb4..d2ba452 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java @@ -65,9 +65,9 @@ public class WithTimestampsTest implements Serializable { .apply(WithTimestamps.of(timestampFn)); PCollection<KV<String, Instant>> timestampedVals = - timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() { + timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() { @Override - public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c) + public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c) throws Exception { c.output(KV.of(c.element(), c.timestamp())); } @@ -150,9 +150,9 @@ public class WithTimestampsTest implements Serializable { WithTimestamps.of(backInTimeFn).withAllowedTimestampSkew(skew.plus(100L))); PCollection<KV<String, Instant>> timestampedVals = - timestampedWithSkew.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() { + timestampedWithSkew.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() { @Override - public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c) + public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c) throws Exception { c.output(KV.of(c.element(), c.timestamp())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java index ce32b7d..c1848c6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java @@ -24,7 +24,7 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; @@ -50,7 +50,7 @@ public class DisplayDataEvaluatorTest implements Serializable { new PTransform<PCollection<String>, POutput> () { @Override public PCollection<String> apply(PCollection<String> input) { - return input.apply(ParDo.of(new DoFn<String, String>() { + return input.apply(ParDo.of(new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element()); @@ -79,7 +79,7 @@ public class DisplayDataEvaluatorTest implements Serializable { @Test public void testPrimitiveTransform() { PTransform<? super PCollection<Integer>, ? super PCollection<Integer>> myTransform = ParDo.of( - new DoFn<Integer, Integer>() { + new OldDoFn<Integer, Integer>() { @Override public void processElement(ProcessContext c) throws Exception {} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java index 07029e9..fa44390 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasName import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat;
