Rename DoFn to OldDoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a64baf48 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a64baf48 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a64baf48 Branch: refs/heads/python-sdk Commit: a64baf4878f28e98da696dacc587c1151d0cdb9e Parents: 388816a Author: Kenneth Knowles <[email protected]> Authored: Fri Jul 22 13:00:10 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Aug 3 18:25:52 2016 -0700 ---------------------------------------------------------------------- .../beam/examples/DebuggingWordCount.java | 6 +- .../apache/beam/examples/MinimalWordCount.java | 7 +- .../apache/beam/examples/WindowedWordCount.java | 10 +- .../org/apache/beam/examples/WordCount.java | 8 +- .../examples/common/PubsubFileInjector.java | 6 +- .../beam/examples/complete/AutoComplete.java | 16 +- .../examples/complete/StreamingWordExtract.java | 12 +- .../apache/beam/examples/complete/TfIdf.java | 16 +- .../examples/complete/TopWikipediaSessions.java | 12 +- .../examples/complete/TrafficMaxLaneFlow.java | 10 +- .../beam/examples/complete/TrafficRoutes.java | 12 +- .../examples/cookbook/BigQueryTornadoes.java | 6 +- .../cookbook/CombinePerKeyExamples.java | 6 +- .../examples/cookbook/DatastoreWordCount.java | 11 +- .../beam/examples/cookbook/FilterExamples.java | 12 +- .../beam/examples/cookbook/JoinExamples.java | 10 +- .../examples/cookbook/MaxPerKeyExamples.java | 6 +- .../beam/examples/cookbook/TriggerExample.java | 12 +- .../org/apache/beam/examples/WordCountTest.java | 2 +- .../examples/complete/AutoCompleteTest.java | 4 +- .../examples/cookbook/TriggerExampleTest.java | 4 +- .../beam/examples/complete/game/GameStats.java | 10 +- .../beam/examples/complete/game/UserScore.java | 4 +- .../complete/game/utils/WriteToBigQuery.java | 12 +- .../game/utils/WriteWindowedToBigQuery.java | 8 +- .../examples/complete/game/UserScoreTest.java | 2 +- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 12 +- .../core/UnboundedReadFromBoundedSource.java | 2 +- .../apache/beam/sdk/util/AssignWindowsDoFn.java | 10 +- .../org/apache/beam/sdk/util/DoFnRunner.java | 21 +- .../apache/beam/sdk/util/DoFnRunnerBase.java | 54 +- .../org/apache/beam/sdk/util/DoFnRunners.java | 24 +- .../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 6 +- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 4 +- .../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 6 +- .../sdk/util/LateDataDroppingDoFnRunner.java | 4 +- .../apache/beam/sdk/util/PaneInfoTracker.java | 1 - .../apache/beam/sdk/util/ReduceFnRunner.java | 4 +- .../apache/beam/sdk/util/SimpleDoFnRunner.java | 12 +- .../org/apache/beam/sdk/util/WatermarkHold.java | 1 - .../beam/sdk/util/ReduceFnRunnerTest.java | 1 + .../apache/beam/sdk/util/ReduceFnTester.java | 1 + .../beam/sdk/util/SimpleDoFnRunnerTest.java | 6 +- .../GroupAlsoByWindowEvaluatorFactory.java | 4 +- .../ImmutabilityCheckingBundleFactory.java | 4 +- .../beam/runners/direct/ParDoEvaluator.java | 4 +- .../direct/ParDoMultiEvaluatorFactory.java | 11 +- .../direct/ParDoSingleEvaluatorFactory.java | 11 +- .../direct/TransformEvaluatorFactory.java | 6 +- .../direct/WriteWithShardingFactory.java | 4 +- .../ConsumerTrackingPipelineVisitorTest.java | 22 +- .../beam/runners/direct/DirectRunnerTest.java | 24 +- .../ImmutabilityCheckingBundleFactoryTest.java | 6 +- .../ImmutabilityEnforcementFactoryTest.java | 6 +- .../direct/KeyedPValueTrackingVisitorTest.java | 6 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 6 +- .../direct/ParDoMultiEvaluatorFactoryTest.java | 10 +- .../direct/ParDoSingleEvaluatorFactoryTest.java | 10 +- .../runners/direct/WatermarkManagerTest.java | 7 +- .../beam/runners/flink/examples/TFIDF.java | 16 +- .../beam/runners/flink/examples/WordCount.java | 4 +- .../flink/examples/streaming/AutoComplete.java | 16 +- .../flink/examples/streaming/JoinExamples.java | 8 +- .../examples/streaming/KafkaIOExamples.java | 4 +- .../KafkaWindowedWordCountExample.java | 6 +- .../examples/streaming/WindowedWordCount.java | 6 +- .../FlinkBatchTransformTranslators.java | 12 +- .../FlinkStreamingTransformTranslators.java | 9 +- .../functions/FlinkDoFnFunction.java | 10 +- .../FlinkMergingNonShuffleReduceFunction.java | 8 +- .../functions/FlinkMultiOutputDoFnFunction.java | 10 +- .../FlinkMultiOutputProcessContext.java | 6 +- .../functions/FlinkNoElementAssignContext.java | 8 +- .../functions/FlinkPartialReduceFunction.java | 8 +- .../functions/FlinkProcessContext.java | 16 +- .../functions/FlinkReduceFunction.java | 8 +- .../streaming/FlinkAbstractParDoWrapper.java | 18 +- .../FlinkGroupAlsoByWindowWrapper.java | 10 +- .../streaming/FlinkParDoBoundMultiWrapper.java | 4 +- .../streaming/FlinkParDoBoundWrapper.java | 4 +- .../state/AbstractFlinkTimerInternals.java | 4 +- .../beam/runners/flink/PipelineOptionsTest.java | 6 +- .../beam/runners/flink/ReadSourceITCase.java | 4 +- .../flink/ReadSourceStreamingITCase.java | 4 +- .../flink/streaming/GroupByNullKeyTest.java | 8 +- .../streaming/TopWikipediaSessionsITCase.java | 6 +- .../dataflow/DataflowPipelineTranslator.java | 6 +- .../beam/runners/dataflow/DataflowRunner.java | 83 ++- .../dataflow/internal/AssignWindows.java | 6 +- .../beam/runners/dataflow/util/DoFnInfo.java | 16 +- .../DataflowPipelineTranslatorTest.java | 10 +- .../beam/runners/spark/examples/WordCount.java | 4 +- .../runners/spark/translation/DoFnFunction.java | 8 +- .../spark/translation/MultiDoFnFunction.java | 8 +- .../spark/translation/SparkProcessContext.java | 18 +- .../spark/translation/TransformTranslator.java | 7 +- .../streaming/StreamingTransformTranslator.java | 4 +- .../apache/beam/runners/spark/TfIdfTest.java | 12 +- .../spark/translation/CombinePerKeyTest.java | 4 +- .../spark/translation/DoFnOutputTest.java | 4 +- .../translation/MultiOutputWordCountTest.java | 8 +- .../spark/translation/SerializationTest.java | 10 +- .../spark/translation/SideEffectsTest.java | 4 +- .../streaming/KafkaStreamingTest.java | 4 +- .../org/apache/beam/sdk/coders/AvroCoder.java | 1 - .../apache/beam/sdk/coders/DurationCoder.java | 1 - .../apache/beam/sdk/coders/InstantCoder.java | 1 - .../java/org/apache/beam/sdk/io/PubsubIO.java | 6 +- .../apache/beam/sdk/io/PubsubUnboundedSink.java | 8 +- .../beam/sdk/io/PubsubUnboundedSource.java | 4 +- .../java/org/apache/beam/sdk/io/Source.java | 2 +- .../main/java/org/apache/beam/sdk/io/Write.java | 21 +- .../org/apache/beam/sdk/options/GcpOptions.java | 1 - .../beam/sdk/options/PipelineOptions.java | 8 +- .../sdk/options/PipelineOptionsFactory.java | 1 - .../sdk/options/PipelineOptionsReflector.java | 1 + .../beam/sdk/runners/AggregatorValues.java | 4 +- .../org/apache/beam/sdk/testing/PAssert.java | 24 +- .../beam/sdk/testing/SerializableMatchers.java | 1 - .../apache/beam/sdk/testing/TestPipeline.java | 1 - .../beam/sdk/testing/TestPipelineOptions.java | 1 + .../apache/beam/sdk/transforms/Aggregator.java | 14 +- .../sdk/transforms/AggregatorRetriever.java | 6 +- .../org/apache/beam/sdk/transforms/Combine.java | 14 +- .../apache/beam/sdk/transforms/CombineFns.java | 4 +- .../org/apache/beam/sdk/transforms/Count.java | 2 +- .../org/apache/beam/sdk/transforms/Create.java | 2 +- .../org/apache/beam/sdk/transforms/DoFn.java | 565 ------------------- .../beam/sdk/transforms/DoFnReflector.java | 38 +- .../apache/beam/sdk/transforms/DoFnTester.java | 86 +-- .../beam/sdk/transforms/DoFnWithContext.java | 16 +- .../org/apache/beam/sdk/transforms/Filter.java | 2 +- .../beam/sdk/transforms/FlatMapElements.java | 2 +- .../org/apache/beam/sdk/transforms/Flatten.java | 2 +- .../apache/beam/sdk/transforms/GroupByKey.java | 2 +- .../transforms/IntraBundleParallelization.java | 40 +- .../org/apache/beam/sdk/transforms/Keys.java | 2 +- .../org/apache/beam/sdk/transforms/KvSwap.java | 2 +- .../apache/beam/sdk/transforms/MapElements.java | 2 +- .../org/apache/beam/sdk/transforms/OldDoFn.java | 565 +++++++++++++++++++ .../apache/beam/sdk/transforms/PTransform.java | 2 +- .../org/apache/beam/sdk/transforms/ParDo.java | 203 +++---- .../apache/beam/sdk/transforms/Partition.java | 2 +- .../beam/sdk/transforms/RemoveDuplicates.java | 2 +- .../org/apache/beam/sdk/transforms/Sample.java | 4 +- .../beam/sdk/transforms/SimpleFunction.java | 6 +- .../org/apache/beam/sdk/transforms/Values.java | 2 +- .../org/apache/beam/sdk/transforms/View.java | 8 +- .../apache/beam/sdk/transforms/WithKeys.java | 2 +- .../beam/sdk/transforms/WithTimestamps.java | 4 +- .../sdk/transforms/display/DisplayData.java | 1 - .../beam/sdk/transforms/join/CoGbkResult.java | 1 - .../beam/sdk/transforms/join/CoGroupByKey.java | 14 +- .../sdk/transforms/windowing/AfterEach.java | 1 + .../windowing/AfterProcessingTime.java | 1 + .../transforms/windowing/IntervalWindow.java | 1 - .../beam/sdk/transforms/windowing/Never.java | 1 + .../beam/sdk/transforms/windowing/PaneInfo.java | 10 +- .../beam/sdk/transforms/windowing/Window.java | 4 +- .../beam/sdk/util/BaseExecutionContext.java | 4 +- .../apache/beam/sdk/util/BucketingFunction.java | 1 + .../beam/sdk/util/CombineContextFactory.java | 6 +- .../apache/beam/sdk/util/ExecutionContext.java | 8 +- .../apache/beam/sdk/util/MovingFunction.java | 1 + .../beam/sdk/util/PerKeyCombineFnRunner.java | 44 +- .../beam/sdk/util/PerKeyCombineFnRunners.java | 30 +- .../org/apache/beam/sdk/util/PubsubClient.java | 1 + .../apache/beam/sdk/util/PubsubTestClient.java | 1 + .../sdk/util/ReifyTimestampAndWindowsDoFn.java | 6 +- .../org/apache/beam/sdk/util/Reshuffle.java | 4 +- .../apache/beam/sdk/util/SerializableUtils.java | 2 +- .../org/apache/beam/sdk/util/StringUtils.java | 2 +- .../beam/sdk/util/SystemDoFnInternal.java | 6 +- .../apache/beam/sdk/util/TimerInternals.java | 1 - .../apache/beam/sdk/util/ValueWithRecordId.java | 6 +- .../org/apache/beam/sdk/util/WindowedValue.java | 1 - .../beam/sdk/util/WindowingInternals.java | 4 +- .../beam/sdk/util/common/ReflectHelpers.java | 1 + .../beam/sdk/values/TimestampedValue.java | 1 - .../java/org/apache/beam/sdk/PipelineTest.java | 6 +- .../apache/beam/sdk/coders/AvroCoderTest.java | 4 +- .../beam/sdk/coders/CoderRegistryTest.java | 6 +- .../beam/sdk/coders/SerializableCoderTest.java | 6 +- .../org/apache/beam/sdk/io/AvroSourceTest.java | 1 + .../io/BoundedReadFromUnboundedSourceTest.java | 1 + .../beam/sdk/io/CompressedSourceTest.java | 1 + .../apache/beam/sdk/io/CountingInputTest.java | 5 +- .../apache/beam/sdk/io/CountingSourceTest.java | 4 +- .../beam/sdk/io/OffsetBasedSourceTest.java | 1 + .../beam/sdk/io/PubsubUnboundedSinkTest.java | 4 +- .../java/org/apache/beam/sdk/io/ReadTest.java | 1 + .../java/org/apache/beam/sdk/io/TextIOTest.java | 1 + .../java/org/apache/beam/sdk/io/WriteTest.java | 7 +- .../org/apache/beam/sdk/io/XmlSinkTest.java | 1 + .../apache/beam/sdk/options/GcpOptionsTest.java | 1 + .../sdk/options/GoogleApiDebugOptionsTest.java | 1 - .../sdk/options/PipelineOptionsFactoryTest.java | 1 - .../beam/sdk/options/PipelineOptionsTest.java | 1 - .../sdk/options/ProxyInvocationHandlerTest.java | 2 +- .../AggregatorPipelineExtractorTest.java | 6 +- .../apache/beam/sdk/testing/PAssertTest.java | 1 - .../beam/sdk/testing/TestPipelineTest.java | 1 - .../transforms/ApproximateQuantilesTest.java | 1 + .../sdk/transforms/ApproximateUniqueTest.java | 5 +- .../beam/sdk/transforms/CombineFnsTest.java | 2 +- .../apache/beam/sdk/transforms/CombineTest.java | 12 +- .../apache/beam/sdk/transforms/CreateTest.java | 2 +- .../beam/sdk/transforms/DoFnContextTest.java | 69 --- .../DoFnDelegatingAggregatorTest.java | 16 +- .../beam/sdk/transforms/DoFnReflectorTest.java | 2 +- .../apache/beam/sdk/transforms/DoFnTest.java | 242 -------- .../beam/sdk/transforms/DoFnTesterTest.java | 10 +- .../sdk/transforms/DoFnWithContextTest.java | 6 +- .../apache/beam/sdk/transforms/FlattenTest.java | 4 +- .../beam/sdk/transforms/GroupByKeyTest.java | 6 +- .../IntraBundleParallelizationTest.java | 23 +- .../beam/sdk/transforms/MapElementsTest.java | 1 + .../org/apache/beam/sdk/transforms/MaxTest.java | 1 + .../org/apache/beam/sdk/transforms/MinTest.java | 2 + .../apache/beam/sdk/transforms/NoOpDoFn.java | 20 +- .../beam/sdk/transforms/OldDoFnContextTest.java | 69 +++ .../apache/beam/sdk/transforms/OldDoFnTest.java | 242 ++++++++ .../apache/beam/sdk/transforms/ParDoTest.java | 96 ++-- .../beam/sdk/transforms/PartitionTest.java | 1 + .../apache/beam/sdk/transforms/SampleTest.java | 1 + .../org/apache/beam/sdk/transforms/TopTest.java | 1 + .../apache/beam/sdk/transforms/ViewTest.java | 398 ++++++------- .../beam/sdk/transforms/WithTimestampsTest.java | 8 +- .../display/DisplayDataEvaluatorTest.java | 6 +- .../display/DisplayDataMatchersTest.java | 1 + .../sdk/transforms/display/DisplayDataTest.java | 6 +- .../sdk/transforms/join/CoGroupByKeyTest.java | 18 +- .../sdk/transforms/windowing/NeverTest.java | 1 + .../sdk/transforms/windowing/WindowTest.java | 6 +- .../sdk/transforms/windowing/WindowingTest.java | 10 +- .../beam/sdk/util/BucketingFunctionTest.java | 4 +- .../beam/sdk/util/MovingFunctionTest.java | 4 +- .../beam/sdk/util/SerializableUtilsTest.java | 1 - .../apache/beam/sdk/util/SerializerTest.java | 1 - .../apache/beam/sdk/util/StringUtilsTest.java | 16 +- .../org/apache/beam/sdk/util/TriggerTester.java | 1 + .../beam/sdk/util/common/CounterTest.java | 1 + .../beam/sdk/values/PCollectionTupleTest.java | 4 +- .../apache/beam/sdk/values/TypedPValueTest.java | 6 +- .../beam/sdk/extensions/joinlibrary/Join.java | 8 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 +- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 4 +- .../beam/sdk/io/gcp/datastore/V1Beta3.java | 13 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 6 +- .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 4 +- .../sdk/io/gcp/datastore/V1Beta3TestUtil.java | 6 +- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 4 +- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 8 +- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 7 +- .../sdk/transforms/WithTimestampsJava8Test.java | 4 +- .../src/main/java/DebuggingWordCount.java | 4 +- .../src/main/java/MinimalWordCount.java | 6 +- .../src/main/java/WindowedWordCount.java | 6 +- .../src/main/java/WordCount.java | 6 +- .../main/java/common/PubsubFileInjector.java | 4 +- .../src/main/java/StarterPipeline.java | 6 +- .../src/main/java/it/pkg/StarterPipeline.java | 6 +- .../transforms/DoFnReflectorBenchmark.java | 14 +- 263 files changed, 2196 insertions(+), 2151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java index 8d85d44..3c43152 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; @@ -106,8 +106,8 @@ import java.util.regex.Pattern; * overridden with {@code --inputFile}. */ public class DebuggingWordCount { - /** A DoFn that filters for a specific key based upon a regular expression. */ - public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { + /** A OldDoFn that filters for a specific key based upon a regular expression. */ + public static class FilterTextFn extends OldDoFn<KV<String, Long>, KV<String, Long>> { /** * Concept #1: The logger below uses the fully qualified class name of FilterTextFn * as the logger. All log statements emitted by this logger will be referenced by this name http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java index 9f6d61a..ab0bb6d 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java @@ -22,8 +22,8 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; @@ -89,10 +89,11 @@ public class MinimalWordCount { // the input text (a set of Shakespeare's texts). p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a - // DoFn (defined in-line) on each element that tokenizes the text line into individual words. + // OldDoFn (defined in-line) on each element that tokenizes the text line into individua + // words. // The ParDo returns a PCollection<String>, where each element is an individual word in // Shakespeare's collected texts. - .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { + .apply("ExtractWords", ParDo.of(new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index 7a4b29f..17f7da3 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -103,14 +103,14 @@ public class WindowedWordCount { static final int WINDOW_SIZE = 1; // Default window duration in minutes /** - * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for + * Concept #2: A OldDoFn that sets the data element timestamp. This is a silly method, just for * this example, for the bounded data case. * * <p>Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a * 2-hour period. */ - static class AddTimestampFn extends DoFn<String, String> { + static class AddTimestampFn extends OldDoFn<String, String> { private static final Duration RAND_RANGE = Duration.standardHours(2); private final Instant minTimestamp; @@ -130,8 +130,8 @@ public class WindowedWordCount { } } - /** A DoFn that converts a Word and Count into a BigQuery table row. */ - static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> { + /** A OldDoFn that converts a Word and Count into a BigQuery table row. */ + static class FormatAsTableRowFn extends OldDoFn<KV<String, Long>, TableRow> { @Override public void processElement(ProcessContext c) { TableRow row = new TableRow() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java index af16c44..274d1ad 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java @@ -26,8 +26,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -97,10 +97,10 @@ public class WordCount { /** * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out- - * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the - * pipeline. + * of-line. This OldDoFn tokenizes lines of text into individual words; we pass it to a ParDo in + * the pipeline. */ - static class ExtractWordsFn extends DoFn<String, String> { + static class ExtractWordsFn extends OldDoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java index 15eda06..0a93521 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java @@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.IntraBundleParallelization; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.Transport; import com.google.api.services.pubsub.Pubsub; @@ -71,8 +71,8 @@ public class PubsubFileInjector { } } - /** A DoFn that publishes non-empty lines to Google Cloud PubSub. */ - public static class Bound extends DoFn<String, Void> { + /** A OldDoFn that publishes non-empty lines to Google Cloud PubSub. */ + public static class Bound extends OldDoFn<String, Void> { private final String outputTopic; private final String timestampLabelKey; public transient Pubsub pubsub; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java index c6272e8..7b44af8 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java @@ -36,9 +36,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Partition; @@ -130,7 +130,7 @@ public class AutoComplete { // Map the KV outputs of Count into our own CompletionCandiate class. .apply("CreateCompletionCandidates", ParDo.of( - new DoFn<KV<String, Long>, CompletionCandidate>() { + new OldDoFn<KV<String, Long>, CompletionCandidate>() { @Override public void processElement(ProcessContext c) { c.output(new CompletionCandidate(c.element().getKey(), c.element().getValue())); @@ -209,7 +209,7 @@ public class AutoComplete { } private static class FlattenTops - extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { + extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { @Override public void processElement(ProcessContext c) { for (CompletionCandidate cc : c.element().getValue()) { @@ -260,10 +260,10 @@ public class AutoComplete { } /** - * A DoFn that keys each candidate by all its prefixes. + * A OldDoFn that keys each candidate by all its prefixes. */ private static class AllPrefixes - extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> { + extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> { private final int minPrefix; private final int maxPrefix; public AllPrefixes(int minPrefix) { @@ -341,7 +341,7 @@ public class AutoComplete { /** * Takes as input a set of strings, and emits each #hashtag found therein. */ - static class ExtractHashtags extends DoFn<String, String> { + static class ExtractHashtags extends OldDoFn<String, String> { @Override public void processElement(ProcessContext c) { Matcher m = Pattern.compile("#\\S+").matcher(c.element()); @@ -351,7 +351,7 @@ public class AutoComplete { } } - static class FormatForBigquery extends DoFn<KV<String, List<CompletionCandidate>>, TableRow> { + static class FormatForBigquery extends OldDoFn<KV<String, List<CompletionCandidate>>, TableRow> { @Override public void processElement(ProcessContext c) { List<TableRow> completions = new ArrayList<>(); @@ -385,7 +385,7 @@ public class AutoComplete { * Takes as input a the top candidates per prefix, and emits an entity * suitable for writing to Datastore. */ - static class FormatForDatastore extends DoFn<KV<String, List<CompletionCandidate>>, Entity> { + static class FormatForDatastore extends OldDoFn<KV<String, List<CompletionCandidate>>, Entity> { private String kind; public FormatForDatastore(String kind) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java index db646a5..b0c9ffd 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import com.google.api.services.bigquery.model.TableFieldSchema; @@ -55,8 +55,8 @@ import java.util.ArrayList; */ public class StreamingWordExtract { - /** A DoFn that tokenizes lines of text into individual words. */ - static class ExtractWords extends DoFn<String, String> { + /** A OldDoFn that tokenizes lines of text into individual words. */ + static class ExtractWords extends OldDoFn<String, String> { @Override public void processElement(ProcessContext c) { String[] words = c.element().split("[^a-zA-Z']+"); @@ -68,8 +68,8 @@ public class StreamingWordExtract { } } - /** A DoFn that uppercases a word. */ - static class Uppercase extends DoFn<String, String> { + /** A OldDoFn that uppercases a word. */ + static class Uppercase extends OldDoFn<String, String> { @Override public void processElement(ProcessContext c) { c.output(c.element().toUpperCase()); @@ -79,7 +79,7 @@ public class StreamingWordExtract { /** * Converts strings into BigQuery rows. */ - static class StringToRowConverter extends DoFn<String, TableRow> { + static class StringToRowConverter extends OldDoFn<String, TableRow> { /** * In this example, put the whole string into single BigQuery field. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java index 8305314..470a689 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java @@ -30,9 +30,9 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Keys; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; @@ -225,7 +225,7 @@ public class TfIdf { // of the words in the document associated with that that URI. PCollection<KV<URI, String>> uriToWords = uriToContent .apply("SplitWords", ParDo.of( - new DoFn<KV<URI, String>, KV<URI, String>>() { + new OldDoFn<KV<URI, String>, KV<URI, String>>() { @Override public void processElement(ProcessContext c) { URI uri = c.element().getKey(); @@ -268,7 +268,7 @@ public class TfIdf { // by the URI key. PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount .apply("ShiftKeys", ParDo.of( - new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { + new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { @Override public void processElement(ProcessContext c) { URI uri = c.element().getKey().getKey(); @@ -307,7 +307,7 @@ public class TfIdf { // divided by the total number of words in the document. PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal .apply("ComputeTermFrequencies", ParDo.of( - new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { + new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { @Override public void processElement(ProcessContext c) { URI uri = c.element().getKey(); @@ -328,11 +328,11 @@ public class TfIdf { // documents in which the word appears divided by the total // number of documents in the corpus. Note how the total number of // documents is passed as a side input; the same value is - // presented to each invocation of the DoFn. + // presented to each invocation of the OldDoFn. PCollection<KV<String, Double>> wordToDf = wordToDocCount .apply("ComputeDocFrequencies", ParDo .withSideInputs(totalDocuments) - .of(new DoFn<KV<String, Long>, KV<String, Double>>() { + .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() { @Override public void processElement(ProcessContext c) { String word = c.element().getKey(); @@ -361,7 +361,7 @@ public class TfIdf { // divided by the log of the document frequency. PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = wordToUriAndTfAndDf .apply("ComputeTfIdf", ParDo.of( - new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { + new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { @Override public void processElement(ProcessContext c) { String word = c.element().getKey(); @@ -400,7 +400,7 @@ public class TfIdf { @Override public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { return wordToUriAndTfIdf - .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() { + .apply("Format", ParDo.of(new OldDoFn<KV<String, KV<URI, Double>>, String>() { @Override public void processElement(ProcessContext c) { c.output(String.format("%s,\t%s,\t%f", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java index f8af02a..0ed89d2 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java @@ -26,8 +26,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableComparator; @@ -85,7 +85,7 @@ public class TopWikipediaSessions { /** * Extracts user and timestamp from a TableRow representing a Wikipedia edit. */ - static class ExtractUserAndTimestamp extends DoFn<TableRow, String> { + static class ExtractUserAndTimestamp extends OldDoFn<TableRow, String> { @Override public void processElement(ProcessContext c) { TableRow row = c.element(); @@ -132,7 +132,7 @@ public class TopWikipediaSessions { } } - static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>> + static class SessionsToStringsDoFn extends OldDoFn<KV<String, Long>, KV<String, Long>> implements RequiresWindowAccess { @Override @@ -142,7 +142,7 @@ public class TopWikipediaSessions { } } - static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String> + static class FormatOutputDoFn extends OldDoFn<List<KV<String, Long>>, String> implements RequiresWindowAccess { @Override public void processElement(ProcessContext c) { @@ -168,7 +168,7 @@ public class TopWikipediaSessions { .apply(ParDo.of(new ExtractUserAndTimestamp())) .apply("SampleUsers", ParDo.of( - new DoFn<String, String>() { + new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) { if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java index 7b1496f..9122015 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -145,12 +145,12 @@ public class TrafficMaxLaneFlow { /** * Extract the timestamp field from the input string, and use it as the element timestamp. */ - static class ExtractTimestamps extends DoFn<String, String> { + static class ExtractTimestamps extends OldDoFn<String, String> { private static final DateTimeFormatter dateTimeFormat = DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss"); @Override - public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { + public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception { String[] items = c.element().split(","); if (items.length > 0) { try { @@ -170,7 +170,7 @@ public class TrafficMaxLaneFlow { * information. The number of lanes for which data is present depends upon which freeway the data * point comes from. */ - static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> { + static class ExtractFlowInfoFn extends OldDoFn<String, KV<String, LaneInfo>> { @Override public void processElement(ProcessContext c) { @@ -226,7 +226,7 @@ public class TrafficMaxLaneFlow { * Format the results of the Max Lane flow calculation to a TableRow, to save to BigQuery. * Add the timestamp from the window context. */ - static class FormatMaxesFn extends DoFn<KV<String, LaneInfo>, TableRow> { + static class FormatMaxesFn extends OldDoFn<KV<String, LaneInfo>, TableRow> { @Override public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java index ebf7b9a..30091b6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java @@ -29,8 +29,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; @@ -149,12 +149,12 @@ public class TrafficRoutes { /** * Extract the timestamp field from the input string, and use it as the element timestamp. */ - static class ExtractTimestamps extends DoFn<String, String> { + static class ExtractTimestamps extends OldDoFn<String, String> { private static final DateTimeFormatter dateTimeFormat = DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss"); @Override - public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { + public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception { String[] items = c.element().split(","); String timestamp = tryParseTimestamp(items); if (timestamp != null) { @@ -171,7 +171,7 @@ public class TrafficRoutes { * Filter out readings for the stations along predefined 'routes', and output * (station, speed info) keyed on route. */ - static class ExtractStationSpeedFn extends DoFn<String, KV<String, StationSpeed>> { + static class ExtractStationSpeedFn extends OldDoFn<String, KV<String, StationSpeed>> { @Override public void processElement(ProcessContext c) { @@ -200,7 +200,7 @@ public class TrafficRoutes { * Note: these calculations are for example purposes only, and are unrealistic and oversimplified. */ static class GatherStats - extends DoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> { + extends OldDoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> { @Override public void processElement(ProcessContext c) throws IOException { String route = c.element().getKey(); @@ -243,7 +243,7 @@ public class TrafficRoutes { /** * Format the results of the slowdown calculations to a TableRow, to save to BigQuery. */ - static class FormatStatsFn extends DoFn<KV<String, RouteInfo>, TableRow> { + static class FormatStatsFn extends OldDoFn<KV<String, RouteInfo>, TableRow> { @Override public void processElement(ProcessContext c) { RouteInfo routeInfo = c.element().getValue(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java index 665be01..6002b11 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java @@ -25,7 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -81,7 +81,7 @@ public class BigQueryTornadoes { * Examines each row in the input table. If a tornado was recorded * in that sample, the month in which it occurred is output. */ - static class ExtractTornadoesFn extends DoFn<TableRow, Integer> { + static class ExtractTornadoesFn extends OldDoFn<TableRow, Integer> { @Override public void processElement(ProcessContext c){ TableRow row = c.element(); @@ -95,7 +95,7 @@ public class BigQueryTornadoes { * Prepares the data for writing to BigQuery by building a TableRow object containing an * integer representation of month and the number of tornadoes that occurred in each month. */ - static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> { + static class FormatCountsFn extends OldDoFn<KV<Integer, Long>, TableRow> { @Override public void processElement(ProcessContext c) { TableRow row = new TableRow() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java index 252f3cc..d0bce5d 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -90,7 +90,7 @@ public class CombinePerKeyExamples { * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH, * outputs word, play_name. */ - static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> { + static class ExtractLargeWordsFn extends OldDoFn<TableRow, KV<String, String>> { private final Aggregator<Long, Long> smallerWords = createAggregator("smallerWords", new Sum.SumLongFn()); @@ -114,7 +114,7 @@ public class CombinePerKeyExamples { * Prepares the data for writing to BigQuery by building a TableRow object * containing a word with a string listing the plays in which it appeared. */ - static class FormatShakespeareOutputFn extends DoFn<KV<String, String>, TableRow> { + static class FormatShakespeareOutputFn extends OldDoFn<KV<String, String>, TableRow> { @Override public void processElement(ProcessContext c) { TableRow row = new TableRow() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java index 847523b..1850e89 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java @@ -32,8 +32,8 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import com.google.datastore.v1beta3.Entity; @@ -44,7 +44,6 @@ import com.google.datastore.v1beta3.Value; import java.util.Map; import java.util.UUID; - import javax.annotation.Nullable; /** @@ -80,10 +79,10 @@ import javax.annotation.Nullable; public class DatastoreWordCount { /** - * A DoFn that gets the content of an entity (one line in a + * A OldDoFn that gets the content of an entity (one line in a * Shakespeare play) and converts it to a string. */ - static class GetContentFn extends DoFn<Entity, String> { + static class GetContentFn extends OldDoFn<Entity, String> { @Override public void processElement(ProcessContext c) { Map<String, Value> props = c.element().getProperties(); @@ -109,9 +108,9 @@ public class DatastoreWordCount { } /** - * A DoFn that creates entity for every line in Shakespeare. + * A OldDoFn that creates entity for every line in Shakespeare. */ - static class CreateEntityFn extends DoFn<String, Entity> { + static class CreateEntityFn extends OldDoFn<String, Entity> { private final String namespace; private final String kind; private final Key ancestorKey; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java index ea1dcf6..06fba77 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java @@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Mean; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -98,7 +98,7 @@ public class FilterExamples { * Examines each row in the input table. Outputs only the subset of the cells this example * is interested in-- the mean_temp and year, month, and day-- as a bigquery table row. */ - static class ProjectionFn extends DoFn<TableRow, TableRow> { + static class ProjectionFn extends OldDoFn<TableRow, TableRow> { @Override public void processElement(ProcessContext c){ TableRow row = c.element(); @@ -119,9 +119,9 @@ public class FilterExamples { * Implements 'filter' functionality. * * <p>Examines each row in the input table. Outputs only rows from the month - * monthFilter, which is passed in as a parameter during construction of this DoFn. + * monthFilter, which is passed in as a parameter during construction of this OldDoFn. */ - static class FilterSingleMonthDataFn extends DoFn<TableRow, TableRow> { + static class FilterSingleMonthDataFn extends OldDoFn<TableRow, TableRow> { Integer monthFilter; public FilterSingleMonthDataFn(Integer monthFilter) { @@ -143,7 +143,7 @@ public class FilterExamples { * Examines each row (weather reading) in the input table. Output the temperature * reading for that row ('mean_temp'). */ - static class ExtractTempFn extends DoFn<TableRow, Double> { + static class ExtractTempFn extends OldDoFn<TableRow, Double> { @Override public void processElement(ProcessContext c){ TableRow row = c.element(); @@ -191,7 +191,7 @@ public class FilterExamples { PCollection<TableRow> filteredRows = monthFilteredRows .apply("ParseAndFilter", ParDo .withSideInputs(globalMeanTemp) - .of(new DoFn<TableRow, TableRow>() { + .of(new OldDoFn<TableRow, TableRow>() { @Override public void processElement(ProcessContext c) { Double meanTemp = Double.parseDouble(c.element().get("mean_temp").toString()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java index 1b43cc2..5260c0d 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -99,7 +99,7 @@ public class JoinExamples { // country code 'key' -> string of <event info>, <country name> PCollection<KV<String, String>> finalResultCollection = kvpCollection.apply("Process", ParDo.of( - new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { + new OldDoFn<KV<String, CoGbkResult>, KV<String, String>>() { @Override public void processElement(ProcessContext c) { KV<String, CoGbkResult> e = c.element(); @@ -116,7 +116,7 @@ public class JoinExamples { // write to GCS PCollection<String> formattedResults = finalResultCollection - .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() { + .apply("Format", ParDo.of(new OldDoFn<KV<String, String>, String>() { @Override public void processElement(ProcessContext c) { String outputstring = "Country code: " + c.element().getKey() @@ -131,7 +131,7 @@ public class JoinExamples { * Examines each row (event) in the input table. Output a KV with the key the country * code of the event, and the value a string encoding event information. */ - static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> { + static class ExtractEventDataFn extends OldDoFn<TableRow, KV<String, String>> { @Override public void processElement(ProcessContext c) { TableRow row = c.element(); @@ -149,7 +149,7 @@ public class JoinExamples { * Examines each row (country info) in the input table. Output a KV with the key the country * code, and the value the country name. */ - static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> { + static class ExtractCountryInfoFn extends OldDoFn<TableRow, KV<String, String>> { @Override public void processElement(ProcessContext c) { TableRow row = c.element(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java index a37690b..1bcb491 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java @@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -82,7 +82,7 @@ public class MaxPerKeyExamples { * Examines each row (weather reading) in the input table. Output the month of the reading, * and the mean_temp. */ - static class ExtractTempFn extends DoFn<TableRow, KV<Integer, Double>> { + static class ExtractTempFn extends OldDoFn<TableRow, KV<Integer, Double>> { @Override public void processElement(ProcessContext c) { TableRow row = c.element(); @@ -96,7 +96,7 @@ public class MaxPerKeyExamples { * Format the results to a TableRow, to save to BigQuery. * */ - static class FormatMaxesFn extends DoFn<KV<Integer, Double>, TableRow> { + static class FormatMaxesFn extends OldDoFn<KV<Integer, Double>, TableRow> { @Override public void processElement(ProcessContext c) { TableRow row = new TableRow() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index a0c5181..0be9921 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -28,9 +28,9 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.AfterEach; @@ -342,7 +342,7 @@ public class TriggerExample { .apply(GroupByKey.<String, Integer>create()); PCollection<KV<String, String>> results = flowPerFreeway.apply(ParDo.of( - new DoFn <KV<String, Iterable<Integer>>, KV<String, String>>() { + new OldDoFn<KV<String, Iterable<Integer>>, KV<String, String>>() { @Override public void processElement(ProcessContext c) throws Exception { @@ -365,7 +365,7 @@ public class TriggerExample { * Format the results of the Total flow calculation to a TableRow, to save to BigQuery. * Adds the triggerType, pane information, processing time and the window timestamp. * */ - static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow> + static class FormatTotalFlow extends OldDoFn<KV<String, String>, TableRow> implements RequiresWindowAccess { private String triggerType; @@ -394,7 +394,7 @@ public class TriggerExample { * Extract the freeway and total flow in a reading. * Freeway is used as key since we are calculating the total flow for each freeway. */ - static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> { + static class ExtractFlowInfo extends OldDoFn<String, KV<String, Integer>> { @Override public void processElement(ProcessContext c) throws Exception { String[] laneInfo = c.element().split(","); @@ -471,7 +471,7 @@ public class TriggerExample { * Add current time to each record. * Also insert a delay at random to demo the triggers. */ - public static class InsertDelays extends DoFn<String, String> { + public static class InsertDelays extends OldDoFn<String, String> { private static final double THRESHOLD = 0.001; // MIN_DELAY and MAX_DELAY in minutes. private static final int MIN_DELAY = 1; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java index ff117dc..26bf8fb 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java @@ -46,7 +46,7 @@ import java.util.List; @RunWith(JUnit4.class) public class WordCountTest { - /** Example test that tests a specific DoFn. */ + /** Example test that tests a specific OldDoFn. */ @Test public void testExtractWordsFn() throws Exception { DoFnTester<String, String> extractWordsFn = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java index b2ed9a2..6f68ce8 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java @@ -23,8 +23,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -171,7 +171,7 @@ public class AutoCompleteTest implements Serializable { extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> { @Override public PCollection<T> apply(PCollection<TimestampedValue<T>> input) { - return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() { + return input.apply(ParDo.of(new OldDoFn<TimestampedValue<T>, T>() { @Override public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java index 6f58389..e72a9e8 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java @@ -24,8 +24,8 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -141,7 +141,7 @@ public class TriggerExampleTest { return Joiner.on(",").join(entries); } - static class FormatResults extends DoFn<TableRow, String> { + static class FormatResults extends OldDoFn<TableRow, String> { @Override public void processElement(ProcessContext c) throws Exception { TableRow element = c.element(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index 33b8727..b1407f6 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -27,10 +27,10 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.Mean; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -126,7 +126,7 @@ public class GameStats extends LeaderBoard { .apply("ProcessAndFilter", ParDo // use the derived mean total score as a side input .withSideInputs(globalMeanScore) - .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() { + .of(new OldDoFn<KV<String, Integer>, KV<String, Integer>>() { private final Aggregator<Long, Long> numSpammerUsers = createAggregator("SpammerUsers", new Sum.SumLongFn()); @Override @@ -149,7 +149,7 @@ public class GameStats extends LeaderBoard { /** * Calculate and output an element's session duration. */ - private static class UserSessionInfoFn extends DoFn<KV<String, Integer>, Integer> + private static class UserSessionInfoFn extends OldDoFn<KV<String, Integer>, Integer> implements RequiresWindowAccess { @Override @@ -281,7 +281,7 @@ public class GameStats extends LeaderBoard { // Filter out the detected spammer users, using the side input derived above. .apply("FilterOutSpammers", ParDo .withSideInputs(spammersView) - .of(new DoFn<GameActionInfo, GameActionInfo>() { + .of(new OldDoFn<GameActionInfo, GameActionInfo>() { @Override public void processElement(ProcessContext c) { // If the user is not in the spammers Map, output the data element. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java index 28614cb..00dc8a4 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java @@ -28,8 +28,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -123,7 +123,7 @@ public class UserScore { * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224 * The human-readable time string is not used here. */ - static class ParseEventFn extends DoFn<String, GameActionInfo> { + static class ParseEventFn extends OldDoFn<String, GameActionInfo> { // Log and count parse errors. private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java index 36ed195..6af6e15 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -66,10 +66,10 @@ public class WriteToBigQuery<T> // The BigQuery 'type' of the field private String fieldType; // A lambda function to generate the field value - private SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn; + private SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fieldFn; public FieldInfo(String fieldType, - SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn) { + SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fieldFn) { this.fieldType = fieldType; this.fieldFn = fieldFn; } @@ -78,12 +78,12 @@ public class WriteToBigQuery<T> return this.fieldType; } - SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> getFieldFn() { + SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> getFieldFn() { return this.fieldFn; } } /** Convert each key/score pair into a BigQuery TableRow as specified by fieldFn. */ - protected class BuildRowFn extends DoFn<T, TableRow> { + protected class BuildRowFn extends OldDoFn<T, TableRow> { @Override public void processElement(ProcessContext c) { @@ -92,7 +92,7 @@ public class WriteToBigQuery<T> for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) { String key = entry.getKey(); FieldInfo<T> fcnInfo = entry.getValue(); - SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn = + SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fcn = fcnInfo.getFieldFn(); row.set(key, fcn.apply(c)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java index b4c9b4a..c59fd61 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java @@ -20,8 +20,8 @@ package org.apache.beam.examples.complete.game.utils; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; @@ -45,7 +45,7 @@ public class WriteWindowedToBigQuery<T> } /** Convert each key/score pair into a BigQuery TableRow. */ - protected class BuildRowFn extends DoFn<T, TableRow> + protected class BuildRowFn extends OldDoFn<T, TableRow> implements RequiresWindowAccess { @Override @@ -55,7 +55,7 @@ public class WriteWindowedToBigQuery<T> for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) { String key = entry.getKey(); FieldInfo<T> fcnInfo = entry.getValue(); - SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn = + SerializableFunction<OldDoFn<T, TableRow>.ProcessContext, Object> fcn = fcnInfo.getFieldFn(); row.set(key, fcn.apply(c)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java index cc3e7fa..01efad8 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java @@ -83,7 +83,7 @@ public class UserScoreTest implements Serializable { KV.of("AndroidGreenKookaburra", 23), KV.of("BisqueBilby", 14)); - /** Test the ParseEventFn DoFn. */ + /** Test the ParseEventFn OldDoFn. */ @Test public void testParseEventFn() throws Exception { DoFnTester<String, GameActionInfo> parseEventFn = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index 0d320bc..7cdab00 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor; @@ -41,10 +41,10 @@ import org.apache.beam.sdk.values.KV; @SystemDoFnInternal public class GroupAlsoByWindowViaWindowSetDoFn< K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>> - extends DoFn<RinT, KV<K, OutputT>> implements ReduceFnExecutor<K, InputT, OutputT, W> { + extends OldDoFn<RinT, KV<K, OutputT>> implements ReduceFnExecutor<K, InputT, OutputT, W> { public static <K, InputT, OutputT, W extends BoundedWindow> - DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create( + OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create( WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) { @@ -99,11 +99,11 @@ public class GroupAlsoByWindowViaWindowSetDoFn< } @Override - public DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() { + public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() { // Safe contravariant cast @SuppressWarnings("unchecked") - DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn = - (DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this; + OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn = + (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this; return asFn; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index 5821e73..3ce0c06 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.core; import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; + import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -47,7 +48,6 @@ import com.google.common.collect.Lists; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java index d40b007..739db45 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java @@ -19,8 +19,8 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkNotNull; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -32,14 +32,14 @@ import org.joda.time.Instant; import java.util.Collection; /** - * {@link DoFn} that tags elements of a {@link PCollection} with windows, according to the provided - * {@link WindowFn}. + * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the + * provided {@link WindowFn}. * * @param <T> Type of elements being windowed * @param <W> Window type */ @SystemDoFnInternal -public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> +public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T> implements RequiresWindowAccess { private WindowFn<? super T, W> fn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java index 4ec8920..49206d1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java @@ -18,41 +18,42 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext; import org.apache.beam.sdk.values.KV; /** - * An wrapper interface that represents the execution of a {@link DoFn}. + * An wrapper interface that represents the execution of a {@link OldDoFn}. */ public interface DoFnRunner<InputT, OutputT> { /** - * Prepares and calls {@link DoFn#startBundle}. + * Prepares and calls {@link OldDoFn#startBundle}. */ public void startBundle(); /** - * Calls {@link DoFn#processElement} with a {@link ProcessContext} containing the current element. + * Calls {@link OldDoFn#processElement} with a {@link ProcessContext} containing the current + * element. */ public void processElement(WindowedValue<InputT> elem); /** - * Calls {@link DoFn#finishBundle} and performs additional tasks, such as + * Calls {@link OldDoFn#finishBundle} and performs additional tasks, such as * flushing in-memory states. */ public void finishBundle(); /** - * An internal interface for signaling that a {@link DoFn} requires late data dropping. + * An internal interface for signaling that a {@link OldDoFn} requires late data dropping. */ public interface ReduceFnExecutor<K, InputT, OutputT, W> { /** - * Gets this object as a {@link DoFn}. + * Gets this object as a {@link OldDoFn}. * - * Most implementors of this interface are expected to be {@link DoFn} instances, and will + * Most implementors of this interface are expected to be {@link OldDoFn} instances, and will * return themselves. */ - DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn(); + OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn(); /** * Returns an aggregator that tracks elements that are dropped due to being late.
