Merge branch 'master' into temp-option
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/911d2953 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/911d2953 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/911d2953 Branch: refs/heads/master Commit: 911d29539b8e586bd1452a6ec751155981b0f8f7 Parents: 8bc0659 9247ad7 Author: Pei He <[email protected]> Authored: Wed Mar 23 19:56:01 2016 -0700 Committer: Pei He <[email protected]> Committed: Wed Mar 23 19:56:01 2016 -0700 ---------------------------------------------------------------------- DISCLAIMER | 10 + NOTICE | 12 + README.md | 9 +- examples/pom.xml | 129 +- .../examples/complete/AutoComplete.java | 10 +- .../examples/MinimalWordCountJava8.java | 68 -- .../examples/complete/game/GameStats.java | 347 ------ .../examples/complete/game/HourlyTeamScore.java | 193 --- .../examples/complete/game/LeaderBoard.java | 237 ---- .../dataflow/examples/complete/game/README.md | 119 -- .../examples/complete/game/UserScore.java | 239 ---- .../complete/game/injector/Injector.java | 417 ------- .../complete/game/injector/InjectorUtils.java | 101 -- .../injector/RetryHttpInitializerWrapper.java | 127 -- .../complete/game/utils/WriteToBigQuery.java | 134 --- .../game/utils/WriteWindowedToBigQuery.java | 76 -- .../examples/MinimalWordCountJava8Test.java | 103 -- .../examples/complete/game/GameStatsTest.java | 99 -- .../complete/game/HourlyTeamScoreTest.java | 121 -- .../examples/complete/game/UserScoreTest.java | 156 --- java8examples/pom.xml | 278 +++++ .../examples/MinimalWordCountJava8.java | 68 ++ .../examples/complete/game/GameStats.java | 339 ++++++ .../examples/complete/game/HourlyTeamScore.java | 193 +++ .../examples/complete/game/LeaderBoard.java | 237 ++++ .../dataflow/examples/complete/game/README.md | 113 ++ .../examples/complete/game/UserScore.java | 239 ++++ .../complete/game/injector/Injector.java | 415 +++++++ .../complete/game/injector/InjectorUtils.java | 101 ++ .../injector/RetryHttpInitializerWrapper.java | 126 ++ .../complete/game/utils/WriteToBigQuery.java | 134 +++ .../game/utils/WriteWindowedToBigQuery.java | 76 ++ .../examples/MinimalWordCountJava8Test.java | 103 ++ .../examples/complete/game/GameStatsTest.java | 76 ++ .../complete/game/HourlyTeamScoreTest.java | 111 ++ .../examples/complete/game/UserScoreTest.java | 154 +++ java8tests/pom.xml | 183 +++ .../sdk/transforms/CombineJava8Test.java | 133 +++ .../sdk/transforms/FilterJava8Test.java | 118 ++ .../transforms/FlatMapElementsJava8Test.java | 84 ++ .../sdk/transforms/MapElementsJava8Test.java | 77 ++ .../sdk/transforms/PartitionJava8Test.java | 74 ++ .../transforms/RemoveDuplicatesJava8Test.java | 98 ++ .../sdk/transforms/WithKeysJava8Test.java | 73 ++ .../sdk/transforms/WithTimestampsJava8Test.java | 65 ++ maven-archetypes/examples/pom.xml | 2 +- maven-archetypes/starter/pom.xml | 2 +- pom.xml | 24 +- runners/flink/README.md | 101 +- runners/flink/examples/pom.xml | 89 ++ .../beam/runners/flink/examples/TFIDF.java | 452 +++++++ .../beam/runners/flink/examples/WordCount.java | 113 ++ .../flink/examples/streaming/AutoComplete.java | 387 ++++++ .../flink/examples/streaming/JoinExamples.java | 158 +++ .../KafkaWindowedWordCountExample.java | 142 +++ .../examples/streaming/WindowedWordCount.java | 130 +++ runners/flink/pom.xml | 416 +++---- runners/flink/runner/pom.xml | 147 +++ .../FlinkPipelineExecutionEnvironment.java | 269 +++++ .../runners/flink/FlinkPipelineOptions.java | 93 ++ .../beam/runners/flink/FlinkPipelineRunner.java | 198 ++++ .../runners/flink/FlinkRunnerRegistrar.java | 56 + .../beam/runners/flink/FlinkRunnerResult.java | 68 ++ .../apache/beam/runners/flink/io/ConsoleIO.java | 82 ++ .../FlinkBatchPipelineTranslator.java | 153 +++ .../FlinkBatchTransformTranslators.java | 594 ++++++++++ .../FlinkBatchTranslationContext.java | 129 ++ .../translation/FlinkPipelineTranslator.java | 36 + .../FlinkStreamingPipelineTranslator.java | 150 +++ .../FlinkStreamingTransformTranslators.java | 407 +++++++ .../FlinkStreamingTranslationContext.java | 89 ++ .../FlinkCoGroupKeyedListAggregator.java | 60 + .../functions/FlinkCreateFunction.java | 62 + .../functions/FlinkDoFnFunction.java | 204 ++++ .../FlinkKeyedListAggregationFunction.java | 77 ++ .../functions/FlinkMultiOutputDoFnFunction.java | 177 +++ .../FlinkMultiOutputPruningFunction.java | 43 + .../functions/FlinkPartialReduceFunction.java | 60 + .../functions/FlinkReduceFunction.java | 57 + .../flink/translation/functions/UnionCoder.java | 150 +++ .../translation/types/CoderComparator.java | 216 ++++ .../translation/types/CoderTypeInformation.java | 116 ++ .../translation/types/CoderTypeSerializer.java | 152 +++ .../types/InspectableByteArrayOutputStream.java | 34 + .../translation/types/KvCoderComperator.java | 264 +++++ .../types/KvCoderTypeInformation.java | 186 +++ .../types/VoidCoderTypeSerializer.java | 112 ++ .../wrappers/CombineFnAggregatorWrapper.java | 92 ++ .../wrappers/DataInputViewWrapper.java | 59 + .../wrappers/DataOutputViewWrapper.java | 52 + .../SerializableFnAggregatorWrapper.java | 91 ++ .../translation/wrappers/SinkOutputFormat.java | 121 ++ .../translation/wrappers/SourceInputFormat.java | 164 +++ .../translation/wrappers/SourceInputSplit.java | 52 + .../streaming/FlinkAbstractParDoWrapper.java | 266 +++++ .../FlinkGroupAlsoByWindowWrapper.java | 640 ++++++++++ .../streaming/FlinkGroupByKeyWrapper.java | 66 ++ .../streaming/FlinkParDoBoundMultiWrapper.java | 77 ++ .../streaming/FlinkParDoBoundWrapper.java | 100 ++ .../io/FlinkStreamingCreateFunction.java | 65 ++ .../streaming/io/UnboundedFlinkSource.java | 82 ++ .../streaming/io/UnboundedSocketSource.java | 233 ++++ .../streaming/io/UnboundedSourceWrapper.java | 171 +++ .../state/AbstractFlinkTimerInternals.java | 128 ++ .../streaming/state/FlinkStateInternals.java | 715 ++++++++++++ .../streaming/state/StateCheckpointReader.java | 91 ++ .../streaming/state/StateCheckpointUtils.java | 155 +++ .../streaming/state/StateCheckpointWriter.java | 129 ++ .../wrappers/streaming/state/StateType.java | 73 ++ .../runner/src/main/resources/log4j.properties | 23 + .../apache/beam/runners/flink/AvroITCase.java | 127 ++ .../beam/runners/flink/FlattenizeITCase.java | 74 ++ .../runners/flink/FlinkRunnerRegistrarTest.java | 48 + .../beam/runners/flink/FlinkTestPipeline.java | 72 ++ .../beam/runners/flink/JoinExamplesITCase.java | 101 ++ .../runners/flink/MaybeEmptyTestITCase.java | 65 ++ .../runners/flink/ParDoMultiOutputITCase.java | 100 ++ .../beam/runners/flink/ReadSourceITCase.java | 165 +++ .../flink/RemoveDuplicatesEmptyITCase.java | 70 ++ .../runners/flink/RemoveDuplicatesITCase.java | 71 ++ .../beam/runners/flink/SideInputITCase.java | 69 ++ .../apache/beam/runners/flink/TfIdfITCase.java | 78 ++ .../beam/runners/flink/WordCountITCase.java | 75 ++ .../runners/flink/WordCountJoin2ITCase.java | 138 +++ .../runners/flink/WordCountJoin3ITCase.java | 156 +++ .../beam/runners/flink/WriteSinkITCase.java | 158 +++ .../flink/streaming/GroupAlsoByWindowTest.java | 508 ++++++++ .../flink/streaming/GroupByNullKeyTest.java | 123 ++ .../flink/streaming/StateSerializationTest.java | 305 +++++ .../streaming/TopWikipediaSessionsITCase.java | 134 +++ .../flink/streaming/UnboundedSourceITCase.java | 210 ++++ .../beam/runners/flink/util/JoinExamples.java | 160 +++ .../src/test/resources/log4j-test.properties | 27 + .../FlinkPipelineExecutionEnvironment.java | 269 ----- .../runners/flink/FlinkPipelineOptions.java | 93 -- .../beam/runners/flink/FlinkPipelineRunner.java | 206 ---- .../beam/runners/flink/FlinkRunnerResult.java | 68 -- .../beam/runners/flink/examples/TFIDF.java | 452 ------- .../beam/runners/flink/examples/WordCount.java | 113 -- .../flink/examples/streaming/AutoComplete.java | 387 ------ .../flink/examples/streaming/JoinExamples.java | 158 --- .../KafkaWindowedWordCountExample.java | 143 --- .../examples/streaming/WindowedWordCount.java | 130 --- .../apache/beam/runners/flink/io/ConsoleIO.java | 82 -- .../FlinkBatchPipelineTranslator.java | 153 --- .../FlinkBatchTransformTranslators.java | 594 ---------- .../FlinkBatchTranslationContext.java | 129 -- .../translation/FlinkPipelineTranslator.java | 36 - .../FlinkStreamingPipelineTranslator.java | 150 --- .../FlinkStreamingTransformTranslators.java | 406 ------- .../FlinkStreamingTranslationContext.java | 89 -- .../FlinkCoGroupKeyedListAggregator.java | 60 - .../functions/FlinkCreateFunction.java | 62 - .../functions/FlinkDoFnFunction.java | 204 ---- .../FlinkKeyedListAggregationFunction.java | 77 -- .../functions/FlinkMultiOutputDoFnFunction.java | 177 --- .../FlinkMultiOutputPruningFunction.java | 43 - .../functions/FlinkPartialReduceFunction.java | 60 - .../functions/FlinkReduceFunction.java | 57 - .../flink/translation/functions/UnionCoder.java | 150 --- .../translation/types/CoderComparator.java | 216 ---- .../translation/types/CoderTypeInformation.java | 116 -- .../translation/types/CoderTypeSerializer.java | 152 --- .../types/InspectableByteArrayOutputStream.java | 34 - .../translation/types/KvCoderComperator.java | 264 ----- .../types/KvCoderTypeInformation.java | 186 --- .../types/VoidCoderTypeSerializer.java | 112 -- .../wrappers/CombineFnAggregatorWrapper.java | 92 -- .../wrappers/DataInputViewWrapper.java | 59 - .../wrappers/DataOutputViewWrapper.java | 52 - .../SerializableFnAggregatorWrapper.java | 91 -- .../translation/wrappers/SinkOutputFormat.java | 121 -- .../translation/wrappers/SourceInputFormat.java | 164 --- .../translation/wrappers/SourceInputSplit.java | 52 - .../streaming/FlinkAbstractParDoWrapper.java | 266 ----- .../FlinkGroupAlsoByWindowWrapper.java | 640 ---------- .../streaming/FlinkGroupByKeyWrapper.java | 66 -- .../streaming/FlinkParDoBoundMultiWrapper.java | 77 -- .../streaming/FlinkParDoBoundWrapper.java | 100 -- .../io/FlinkStreamingCreateFunction.java | 65 -- .../streaming/io/UnboundedFlinkSource.java | 82 -- .../streaming/io/UnboundedSocketSource.java | 233 ---- .../streaming/io/UnboundedSourceWrapper.java | 134 --- .../state/AbstractFlinkTimerInternals.java | 128 -- .../streaming/state/FlinkStateInternals.java | 715 ------------ .../streaming/state/StateCheckpointReader.java | 91 -- .../streaming/state/StateCheckpointUtils.java | 155 --- .../streaming/state/StateCheckpointWriter.java | 129 -- .../wrappers/streaming/state/StateType.java | 73 -- .../flink/src/main/resources/log4j.properties | 23 - .../apache/beam/runners/flink/AvroITCase.java | 127 -- .../beam/runners/flink/FlattenizeITCase.java | 74 -- .../beam/runners/flink/FlinkTestPipeline.java | 72 -- .../beam/runners/flink/JoinExamplesITCase.java | 101 -- .../runners/flink/MaybeEmptyTestITCase.java | 65 -- .../runners/flink/ParDoMultiOutputITCase.java | 100 -- .../beam/runners/flink/ReadSourceITCase.java | 165 --- .../flink/RemoveDuplicatesEmptyITCase.java | 70 -- .../runners/flink/RemoveDuplicatesITCase.java | 71 -- .../beam/runners/flink/SideInputITCase.java | 69 -- .../apache/beam/runners/flink/TfIdfITCase.java | 78 -- .../beam/runners/flink/WordCountITCase.java | 76 -- .../runners/flink/WordCountJoin2ITCase.java | 138 --- .../runners/flink/WordCountJoin3ITCase.java | 156 --- .../beam/runners/flink/WriteSinkITCase.java | 158 --- .../flink/streaming/GroupAlsoByWindowTest.java | 508 -------- .../flink/streaming/GroupByNullKeyTest.java | 123 -- .../flink/streaming/StateSerializationTest.java | 305 ----- .../streaming/TopWikipediaSessionsITCase.java | 134 --- .../beam/runners/flink/util/JoinExamples.java | 160 --- .../src/test/resources/log4j-test.properties | 27 - runners/pom.xml | 63 +- runners/spark/.gitignore | 10 - runners/spark/.travis.yml | 22 - runners/spark/README.md | 112 +- runners/spark/build-resources/checkstyle.xml | 27 +- runners/spark/build-resources/header-file.txt | 23 +- runners/spark/pom.xml | 246 ++-- .../com/cloudera/dataflow/hadoop/HadoopIO.java | 202 ---- .../dataflow/hadoop/NullWritableCoder.java | 71 -- .../cloudera/dataflow/hadoop/WritableCoder.java | 120 -- .../com/cloudera/dataflow/io/ConsoleIO.java | 60 - .../com/cloudera/dataflow/io/CreateStream.java | 66 -- .../java/com/cloudera/dataflow/io/KafkaIO.java | 128 -- .../dataflow/spark/BroadcastHelper.java | 121 -- .../com/cloudera/dataflow/spark/ByteArray.java | 52 - .../cloudera/dataflow/spark/CoderHelpers.java | 185 --- .../cloudera/dataflow/spark/DoFnFunction.java | 93 -- .../dataflow/spark/EvaluationContext.java | 283 ----- .../dataflow/spark/EvaluationResult.java | 62 - .../dataflow/spark/MultiDoFnFunction.java | 115 -- .../dataflow/spark/ShardNameBuilder.java | 106 -- .../dataflow/spark/ShardNameTemplateAware.java | 28 - .../dataflow/spark/ShardNameTemplateHelper.java | 58 - .../dataflow/spark/SparkContextFactory.java | 66 -- .../dataflow/spark/SparkPipelineEvaluator.java | 52 - .../dataflow/spark/SparkPipelineOptions.java | 39 - .../spark/SparkPipelineOptionsFactory.java | 27 - .../spark/SparkPipelineOptionsRegistrar.java | 27 - .../dataflow/spark/SparkPipelineRunner.java | 252 ---- .../spark/SparkPipelineRunnerRegistrar.java | 27 - .../dataflow/spark/SparkPipelineTranslator.java | 27 - .../dataflow/spark/SparkProcessContext.java | 250 ---- .../dataflow/spark/SparkRuntimeContext.java | 212 ---- .../spark/TemplatedAvroKeyOutputFormat.java | 40 - .../TemplatedSequenceFileOutputFormat.java | 40 - .../spark/TemplatedTextOutputFormat.java | 40 - .../dataflow/spark/TransformEvaluator.java | 24 - .../dataflow/spark/TransformTranslator.java | 800 ------------- .../dataflow/spark/WindowingHelpers.java | 59 - .../spark/aggregators/AggAccumParam.java | 35 - .../spark/aggregators/NamedAggregators.java | 202 ---- .../SparkStreamingPipelineOptions.java | 40 - .../SparkStreamingPipelineOptionsFactory.java | 27 - .../SparkStreamingPipelineOptionsRegistrar.java | 28 - .../streaming/StreamingEvaluationContext.java | 226 ---- .../streaming/StreamingTransformTranslator.java | 414 ------- .../StreamingWindowPipelineDetector.java | 100 -- .../beam/runners/spark/EvaluationResult.java | 65 ++ .../runners/spark/SparkPipelineOptions.java | 42 + .../beam/runners/spark/SparkPipelineRunner.java | 255 ++++ .../spark/SparkStreamingPipelineOptions.java | 41 + .../spark/aggregators/AggAccumParam.java | 38 + .../spark/aggregators/NamedAggregators.java | 205 ++++ .../beam/runners/spark/coders/CoderHelpers.java | 189 +++ .../runners/spark/coders/NullWritableCoder.java | 74 ++ .../runners/spark/coders/WritableCoder.java | 123 ++ .../apache/beam/runners/spark/io/ConsoleIO.java | 63 + .../beam/runners/spark/io/CreateStream.java | 69 ++ .../apache/beam/runners/spark/io/KafkaIO.java | 131 +++ .../beam/runners/spark/io/hadoop/HadoopIO.java | 203 ++++ .../spark/io/hadoop/ShardNameBuilder.java | 109 ++ .../spark/io/hadoop/ShardNameTemplateAware.java | 31 + .../io/hadoop/ShardNameTemplateHelper.java | 61 + .../io/hadoop/TemplatedAvroKeyOutputFormat.java | 43 + .../TemplatedSequenceFileOutputFormat.java | 43 + .../io/hadoop/TemplatedTextOutputFormat.java | 43 + .../runners/spark/translation/DoFnFunction.java | 97 ++ .../spark/translation/EvaluationContext.java | 288 +++++ .../spark/translation/MultiDoFnFunction.java | 119 ++ .../spark/translation/SparkContextFactory.java | 69 ++ .../translation/SparkPipelineEvaluator.java | 56 + .../SparkPipelineOptionsFactory.java | 31 + .../SparkPipelineOptionsRegistrar.java | 31 + .../SparkPipelineRunnerRegistrar.java | 31 + .../translation/SparkPipelineTranslator.java | 30 + .../spark/translation/SparkProcessContext.java | 262 +++++ .../spark/translation/SparkRuntimeContext.java | 217 ++++ .../spark/translation/TransformEvaluator.java | 27 + .../spark/translation/TransformTranslator.java | 808 +++++++++++++ .../spark/translation/WindowingHelpers.java | 62 + .../SparkStreamingPipelineOptionsFactory.java | 31 + .../SparkStreamingPipelineOptionsRegistrar.java | 32 + .../streaming/StreamingEvaluationContext.java | 229 ++++ .../streaming/StreamingTransformTranslator.java | 418 +++++++ .../StreamingWindowPipelineDetector.java | 104 ++ .../runners/spark/util/BroadcastHelper.java | 125 ++ .../beam/runners/spark/util/ByteArray.java | 55 + ...ataflow.sdk.options.PipelineOptionsRegistrar | 4 +- ...dataflow.sdk.runners.PipelineRunnerRegistrar | 2 +- .../dataflow/hadoop/WritableCoderTest.java | 42 - .../dataflow/spark/AvroPipelineTest.java | 103 -- .../dataflow/spark/CombineGloballyTest.java | 87 -- .../dataflow/spark/CombinePerKeyTest.java | 69 -- .../com/cloudera/dataflow/spark/DeDupTest.java | 55 - .../cloudera/dataflow/spark/DoFnOutputTest.java | 57 - .../cloudera/dataflow/spark/EmptyInputTest.java | 64 - .../spark/HadoopFileFormatPipelineTest.java | 105 -- .../spark/MultiOutputWordCountTest.java | 148 --- .../cloudera/dataflow/spark/NumShardsTest.java | 89 -- .../dataflow/spark/SerializationTest.java | 183 --- .../dataflow/spark/ShardNameBuilderTest.java | 82 -- .../dataflow/spark/SideEffectsTest.java | 77 -- .../dataflow/spark/SimpleWordCountTest.java | 117 -- .../spark/TestSparkPipelineOptionsFactory.java | 34 - .../com/cloudera/dataflow/spark/TfIdfTest.java | 60 - .../dataflow/spark/TransformTranslatorTest.java | 95 -- .../dataflow/spark/WindowedWordCountTest.java | 63 - .../spark/streaming/FlattenStreamingTest.java | 84 -- .../spark/streaming/KafkaStreamingTest.java | 133 --- .../streaming/SimpleStreamingWordCountTest.java | 73 -- .../utils/DataflowAssertStreaming.java | 39 - .../streaming/utils/EmbeddedKafkaCluster.java | 314 ----- .../apache/beam/runners/spark/DeDupTest.java | 60 + .../beam/runners/spark/EmptyInputTest.java | 69 ++ .../beam/runners/spark/SimpleWordCountTest.java | 115 ++ .../apache/beam/runners/spark/TfIdfTest.java | 64 + .../runners/spark/coders/WritableCoderTest.java | 45 + .../beam/runners/spark/io/AvroPipelineTest.java | 108 ++ .../beam/runners/spark/io/NumShardsTest.java | 96 ++ .../io/hadoop/HadoopFileFormatPipelineTest.java | 113 ++ .../spark/io/hadoop/ShardNameBuilderTest.java | 85 ++ .../spark/translation/CombineGloballyTest.java | 94 ++ .../spark/translation/CombinePerKeyTest.java | 70 ++ .../spark/translation/DoFnOutputTest.java | 64 + .../translation/MultiOutputWordCountTest.java | 137 +++ .../spark/translation/SerializationTest.java | 183 +++ .../spark/translation/SideEffectsTest.java | 81 ++ .../TestSparkPipelineOptionsFactory.java | 38 + .../translation/TransformTranslatorTest.java | 99 ++ .../translation/WindowedWordCountTest.java | 71 ++ .../streaming/FlattenStreamingTest.java | 88 ++ .../streaming/KafkaStreamingTest.java | 140 +++ .../streaming/SimpleStreamingWordCountTest.java | 77 ++ .../utils/DataflowAssertStreaming.java | 42 + .../streaming/utils/EmbeddedKafkaCluster.java | 317 +++++ sdk/pom.xml | 82 +- .../sdk/coders/protobuf/package-info.java | 23 + .../dataflow/sdk/io/bigtable/BigtableIO.java | 4 +- .../dataflow/sdk/io/bigtable/package-info.java | 22 + .../dataflow/sdk/options/PipelineOptions.java | 3 +- .../sdk/options/PipelineOptionsFactory.java | 72 +- .../sdk/runners/DataflowPipelineRunner.java | 25 +- .../sdk/runners/DataflowPipelineTranslator.java | 15 + .../inprocess/BoundedReadEvaluatorFactory.java | 50 +- .../CachedThreadPoolExecutorServiceFactory.java | 42 + .../runners/inprocess/CompletionCallback.java | 33 + .../ConsumerTrackingPipelineVisitor.java | 173 +++ .../sdk/runners/inprocess/EvaluatorKey.java | 1 - .../inprocess/ExecutorServiceFactory.java | 32 + .../ExecutorServiceParallelExecutor.java | 432 +++++++ .../inprocess/FlattenEvaluatorFactory.java | 7 +- .../inprocess/GroupByKeyEvaluatorFactory.java | 10 +- .../inprocess/InMemoryWatermarkManager.java | 30 +- .../sdk/runners/inprocess/InProcessBundle.java | 20 +- .../inprocess/InProcessEvaluationContext.java | 405 +++++++ .../runners/inprocess/InProcessExecutor.java | 46 + .../inprocess/InProcessPipelineOptions.java | 68 +- .../inprocess/InProcessPipelineRunner.java | 319 +++-- .../inprocess/InProcessSideInputContainer.java | 71 +- .../inprocess/KeyedPValueTrackingVisitor.java | 95 ++ .../inprocess/ParDoMultiEvaluatorFactory.java | 6 +- .../inprocess/ParDoSingleEvaluatorFactory.java | 6 +- .../sdk/runners/inprocess/StepAndKey.java | 68 ++ .../inprocess/TransformEvaluatorFactory.java | 1 - .../inprocess/TransformEvaluatorRegistry.java | 72 ++ .../runners/inprocess/TransformExecutor.java | 114 ++ .../inprocess/TransformExecutorService.java | 34 + .../inprocess/TransformExecutorServices.java | 153 +++ .../UnboundedReadEvaluatorFactory.java | 54 +- .../runners/inprocess/ViewEvaluatorFactory.java | 8 +- .../inprocess/WatermarkCallbackExecutor.java | 143 +++ .../cloud/dataflow/sdk/transforms/Combine.java | 18 +- .../dataflow/sdk/transforms/CombineFns.java | 1100 ++++++++++++++++++ .../cloud/dataflow/sdk/transforms/DoFn.java | 13 +- .../dataflow/sdk/transforms/DoFnReflector.java | 7 +- .../dataflow/sdk/transforms/PTransform.java | 14 +- .../cloud/dataflow/sdk/transforms/ParDo.java | 13 + .../sdk/transforms/display/DisplayData.java | 530 +++++++++ .../sdk/transforms/display/HasDisplayData.java | 53 + .../transforms/windowing/AfterWatermark.java | 4 +- .../cloud/dataflow/sdk/util/DoFnRunners.java | 4 +- .../cloud/dataflow/sdk/util/PropertyNames.java | 2 + .../sdk/options/PipelineOptionsFactoryTest.java | 75 +- .../runners/DataflowPipelineTranslatorTest.java | 98 +- .../BoundedReadEvaluatorFactoryTest.java | 138 ++- .../ConsumerTrackingPipelineVisitorTest.java | 233 ++++ .../inprocess/FlattenEvaluatorFactoryTest.java | 1 - .../GroupByKeyEvaluatorFactoryTest.java | 1 - .../inprocess/InMemoryWatermarkManagerTest.java | 12 + .../InProcessEvaluationContextTest.java | 544 +++++++++ .../inprocess/InProcessPipelineRunnerTest.java | 77 ++ .../InProcessSideInputContainerTest.java | 92 +- .../KeyedPValueTrackingVisitorTest.java | 189 +++ .../ParDoMultiEvaluatorFactoryTest.java | 1 - .../ParDoSingleEvaluatorFactoryTest.java | 1 - .../TransformExecutorServicesTest.java | 134 +++ .../inprocess/TransformExecutorTest.java | 312 +++++ .../UnboundedReadEvaluatorFactoryTest.java | 169 ++- .../inprocess/ViewEvaluatorFactoryTest.java | 1 - .../WatermarkCallbackExecutorTest.java | 126 ++ .../dataflow/sdk/transforms/CombineFnsTest.java | 413 +++++++ .../cloud/dataflow/sdk/transforms/DoFnTest.java | 15 + .../dataflow/sdk/transforms/PTransformTest.java | 41 + .../dataflow/sdk/transforms/ParDoTest.java | 23 + .../transforms/display/DisplayDataMatchers.java | 98 ++ .../display/DisplayDataMatchersTest.java | 81 ++ .../sdk/transforms/display/DisplayDataTest.java | 633 ++++++++++ .../cloud/dataflow/sdk/util/ApiSurfaceTest.java | 3 +- .../PipelineOptionsFactoryJava8Test.java | 90 ++ .../sdk/transforms/CombineJava8Test.java | 133 --- .../sdk/transforms/FilterJava8Test.java | 118 -- .../transforms/FlatMapElementsJava8Test.java | 84 -- .../sdk/transforms/MapElementsJava8Test.java | 77 -- .../sdk/transforms/PartitionJava8Test.java | 74 -- .../transforms/RemoveDuplicatesJava8Test.java | 99 -- .../sdk/transforms/WithKeysJava8Test.java | 74 -- .../sdk/transforms/WithTimestampsJava8Test.java | 66 -- 428 files changed, 31664 insertions(+), 23375 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/911d2953/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --cc runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index 0000000,e115a15..b413d7a mode 000000,100644..100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@@ -1,0 -1,631 +1,640 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.runners.flink.translation.wrappers.streaming; + + import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; + import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; + import org.apache.beam.runners.flink.translation.wrappers.streaming.state.*; + import com.google.cloud.dataflow.sdk.coders.*; + import com.google.cloud.dataflow.sdk.options.PipelineOptions; + import com.google.cloud.dataflow.sdk.runners.PipelineRunner; + import com.google.cloud.dataflow.sdk.transforms.Aggregator; + import com.google.cloud.dataflow.sdk.transforms.Combine; + import com.google.cloud.dataflow.sdk.transforms.DoFn; + import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; + import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; + import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; + import com.google.cloud.dataflow.sdk.util.*; + import com.google.cloud.dataflow.sdk.values.*; + import com.google.common.base.Preconditions; + import com.google.common.collect.HashMultimap; + import com.google.common.collect.Multimap; + import org.apache.flink.api.common.accumulators.Accumulator; + import org.apache.flink.api.common.accumulators.AccumulatorHelper; + import org.apache.flink.core.memory.DataInputView; + import org.apache.flink.runtime.state.AbstractStateBackend; + import org.apache.flink.runtime.state.StateHandle; + import org.apache.flink.streaming.api.datastream.DataStream; + import org.apache.flink.streaming.api.datastream.KeyedStream; + import org.apache.flink.streaming.api.operators.*; + import org.apache.flink.streaming.api.watermark.Watermark; + import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + import org.apache.flink.streaming.runtime.tasks.StreamTaskState; + import org.joda.time.Instant; + + import java.io.IOException; + import java.util.*; + + /** + * This class is the key class implementing all the windowing/triggering logic of Apache Beam. + * To provide full compatibility and support for all the windowing/triggering combinations offered by + * Beam, we opted for a strategy that uses the SDK's code for doing these operations. See the code in + * ({@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}. + * <p/> + * In a nutshell, when the execution arrives to this operator, we expect to have a stream <b>already + * grouped by key</b>. Each of the elements that enter here, registers a timer + * (see {@link TimerInternals#setTimer(TimerInternals.TimerData)} in the + * {@link FlinkGroupAlsoByWindowWrapper#activeTimers}. + * This is essentially a timestamp indicating when to trigger the computation over the window this + * element belongs to. + * <p/> + * When a watermark arrives, all the registered timers are checked to see which ones are ready to + * fire (see {@link FlinkGroupAlsoByWindowWrapper#processWatermark(Watermark)}). These are deregistered from + * the {@link FlinkGroupAlsoByWindowWrapper#activeTimers} + * list, and are fed into the {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn} + * for furhter processing. + */ + public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> + extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>> + implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> { + + private static final long serialVersionUID = 1L; + + private transient PipelineOptions options; + + private transient CoderRegistry coderRegistry; + + private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator; + + private ProcessContext context; + + private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy; + + private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn; + + private final KvCoder<K, VIN> inputKvCoder; + + /** + * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a + * key whose elements are currently waiting to be processed, and its associated state. + */ + private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(); + + /** + * Timers waiting to be processed. + */ + private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); + + private FlinkTimerInternals timerInternals = new FlinkTimerInternals(); + + /** + * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy. + * This method assumes that <b>elements are already grouped by key</b>. + * <p/> + * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)} + * is that this method assumes that a combiner function is provided + * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). + * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state. + * + * @param options the general job configuration options. + * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. + * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. + * @param combiner the combiner to be used. + * @param outputKvCoder the type of the output values. + */ + public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create( + PipelineOptions options, + PCollection input, + KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey, + Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner, + KvCoder<K, VOUT> outputKvCoder) { + Preconditions.checkNotNull(options); + + KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder(); + FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options, + input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner); + + Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of( + outputKvCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo = + new CoderTypeInformation<>(windowedOutputElemCoder); + + DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey + .transform("GroupByWindowWithCombiner", + new CoderTypeInformation<>(outputKvCoder), + windower) + .returns(outputTypeInfo); + + return groupedByKeyAndWindow; + } + + /** + * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy. + * This method assumes that <b>elements are already grouped by key</b>. + * <p/> + * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)} + * is that this method assumes no combiner function + * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). + * + * @param options the general job configuration options. + * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. + * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. + */ + public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable( + PipelineOptions options, + PCollection input, + KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) { + Preconditions.checkNotNull(options); + + KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder(); + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); + + FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options, + input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null); + + Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder); + KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder); + + Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of( + outputElemCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo = + new CoderTypeInformation<>(windowedOutputElemCoder); + + DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey + .transform("GroupByWindow", + new CoderTypeInformation<>(windowedOutputElemCoder), + windower) + .returns(outputTypeInfo); + + return groupedByKeyAndWindow; + } + + public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper + createForTesting(PipelineOptions options, + CoderRegistry registry, + WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy, + KvCoder<K, VIN> inputCoder, + Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) { + Preconditions.checkNotNull(options); + + return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner); + } + + private FlinkGroupAlsoByWindowWrapper(PipelineOptions options, + CoderRegistry registry, + WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy, + KvCoder<K, VIN> inputCoder, + Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) { + Preconditions.checkNotNull(options); + + this.options = Preconditions.checkNotNull(options); + this.coderRegistry = Preconditions.checkNotNull(registry); + this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder(); + this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy(); + this.combineFn = combiner; + this.operator = createGroupAlsoByWindowOperator(); + this.chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void open() throws Exception { + super.open(); + this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals); + } + + /** + * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}, + * <b> if not already created</b>. + * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then + * a function with that combiner is created, so that elements are combined as they arrive. This is + * done for speed and (in most of the cases) for reduction of the per-window state. + */ + private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() { + if (this.operator == null) { + if (this.combineFn == null) { + // Thus VOUT == Iterable<VIN> + Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); + + this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create( + (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder)); + } else { + Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder(); + + AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn + .withInputCoder(combineFn, coderRegistry, inputKvCoder); + + this.operator = GroupAlsoByWindowViaWindowSetDoFn.create( + (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn)); + } + } + return this.operator; + } + + private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception { + context.setElement(workItem, getStateInternalsForKey(workItem.key())); + + // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded. + operator.startBundle(context); + operator.processElement(context); + operator.finishBundle(context); + } + + @Override + public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception { + ArrayList<WindowedValue<VIN>> elements = new ArrayList<>(); + elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(), + element.getValue().getWindows(), element.getValue().getPane())); + processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements)); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + context.setCurrentInputWatermark(new Instant(mark.getTimestamp())); + + Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp()); + if (!timers.isEmpty()) { + for (K key : timers.keySet()) { + processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key))); + } + } + + /** + * This is to take into account the different semantics of the Watermark in Flink and + * in Dataflow. To understand the reasoning behind the Dataflow semantics and its + * watermark holding logic, see the documentation of + * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)} + * */ + long millis = Long.MAX_VALUE; + for (FlinkStateInternals state : perKeyStateInternals.values()) { + Instant watermarkHold = state.getWatermarkHold(); + if (watermarkHold != null && watermarkHold.getMillis() < millis) { + millis = watermarkHold.getMillis(); + } + } + + if (mark.getTimestamp() < millis) { + millis = mark.getTimestamp(); + } + + context.setCurrentOutputWatermark(new Instant(millis)); + + // Don't forget to re-emit the watermark for further operators down the line. + // This is critical for jobs with multiple aggregation steps. + // Imagine a job with a groupByKey() on key K1, followed by a map() that changes + // the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark + // is not re-emitted, the second aggregation would never be triggered, and no result + // will be produced. + output.emitWatermark(new Watermark(millis)); + } + + @Override + public void close() throws Exception { + super.close(); + } + + private void registerActiveTimer(K key, TimerInternals.TimerData timer) { + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); + if (timersForKey == null) { + timersForKey = new HashSet<>(); + } + timersForKey.add(timer); + activeTimers.put(key, timersForKey); + } + + private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) { + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); + if (timersForKey != null) { + timersForKey.remove(timer); + if (timersForKey.isEmpty()) { + activeTimers.remove(key); + } else { + activeTimers.put(key, timersForKey); + } + } + } + + /** + * Returns the list of timers that are ready to fire. These are the timers + * that are registered to be triggered at a time before the current watermark. + * We keep these timers in a Set, so that they are deduplicated, as the same + * timer can be registered multiple times. + */ + private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) { + + // we keep the timers to return in a different list and launch them later + // because we cannot prevent a trigger from registering another trigger, + // which would lead to concurrent modification exception. + Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create(); + + Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); + + Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator(); + while (timerIt.hasNext()) { + TimerInternals.TimerData timerData = timerIt.next(); + if (timerData.getTimestamp().isBefore(currentWatermark)) { + toFire.put(keyWithTimers.getKey(), timerData); + timerIt.remove(); + } + } + + if (keyWithTimers.getValue().isEmpty()) { + it.remove(); + } + } + return toFire; + } + + /** + * Gets the state associated with the specified key. + * + * @param key the key whose state we want. + * @return The {@link FlinkStateInternals} + * associated with that key. + */ + private FlinkStateInternals<K> getStateInternalsForKey(K key) { + FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key); + if (stateInternals == null) { + Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); + OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn(); + stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn); + perKeyStateInternals.put(key, stateInternals); + } + return stateInternals; + } + + private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> { + @Override + public void setTimer(TimerData timerKey) { + registerActiveTimer(context.element().key(), timerKey); + } + + @Override + public void deleteTimer(TimerData timerKey) { + unregisterActiveTimer(context.element().key(), timerKey); + } + } + + private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext { + + private final FlinkTimerInternals timerInternals; + + private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector; + + private FlinkStateInternals<K> stateInternals; + + private KeyedWorkItem<K, VIN> element; + + public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function, + TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector, + FlinkTimerInternals timerInternals) { + function.super(); + super.setupDelegateAggregators(); + + this.collector = Preconditions.checkNotNull(outCollector); + this.timerInternals = Preconditions.checkNotNull(timerInternals); + } + + public void setElement(KeyedWorkItem<K, VIN> element, + FlinkStateInternals<K> stateForKey) { + this.element = element; + this.stateInternals = stateForKey; + } + + public void setCurrentInputWatermark(Instant watermark) { + this.timerInternals.setCurrentInputWatermark(watermark); + } + + public void setCurrentOutputWatermark(Instant watermark) { + this.timerInternals.setCurrentOutputWatermark(watermark); + } + + @Override + public KeyedWorkItem<K, VIN> element() { + return this.element; + } + + @Override + public Instant timestamp() { + throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems."); + } + + @Override + public PipelineOptions getPipelineOptions() { + // TODO: PipelineOptions need to be available on the workers. + // Ideally they are captured as part of the pipeline. + // For now, construct empty options so that StateContexts.createFromComponents + // will yield a valid StateContext, which is needed to support the StateContext.window(). + if (options == null) { + options = new PipelineOptions() { + @Override + public <T extends PipelineOptions> T as(Class<T> kls) { + return null; + } + + @Override + public <T extends PipelineOptions> T cloneAs(Class<T> kls) { + return null; + } + + @Override + public Class<? extends PipelineRunner<?>> getRunner() { + return null; + } + + @Override + public void setRunner(Class<? extends PipelineRunner<?>> kls) { + + } + + @Override + public CheckEnabled getStableUniqueNames() { + return null; + } + + @Override + public void setStableUniqueNames(CheckEnabled enabled) { + } ++ ++ @Override ++ public String getTempLocation() { ++ return null; ++ } ++ ++ @Override ++ public void setTempLocation(String tempLocation) { ++ } + }; + } + return options; + } + + @Override + public void output(KV<K, VOUT> output) { + throw new UnsupportedOperationException( + "output() is not available when processing KeyedWorkItems."); + } + + @Override + public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) { + throw new UnsupportedOperationException( + "outputWithTimestamp() is not available when processing KeyedWorkItems."); + } + + @Override + public PaneInfo pane() { + throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems."); + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "window() is not available when processing KeyedWorkItems."); + } + + @Override + public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() { + return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() { + + @Override + public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() { + return stateInternals; + } + + @Override + public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + // TODO: No need to represent timestamp twice. + collector.setAbsoluteTimestamp(timestamp.getMillis()); + collector.collect(WindowedValue.of(output, timestamp, windows, pane)); + + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException("windows() is not available in Streaming mode."); + } + + @Override + public PaneInfo pane() { + throw new UnsupportedOperationException("pane() is not available in Streaming mode."); + } + + @Override + public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); + } + + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + throw new RuntimeException("sideInput() is not available in Streaming mode."); + } + }; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new RuntimeException("sideInput() is not supported in Streaming mode."); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + // ignore the side output, this can happen when a user does not register + // side outputs but then outputs using a freshly created TupleTag. + throw new RuntimeException("sideOutput() is not available when grouping by window."); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + sideOutput(tag, output); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + Accumulator acc = getRuntimeContext().getAccumulator(name); + if (acc != null) { + AccumulatorHelper.compareAccumulatorTypes(name, + SerializableFnAggregatorWrapper.class, acc.getClass()); + return (Aggregator<AggInputT, AggOutputT>) acc; + } + + SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator = + new SerializableFnAggregatorWrapper<>(combiner); + getRuntimeContext().addAccumulator(name, accumulator); + return accumulator; + } + } + + ////////////// Checkpointing implementation //////////////// + + @Override + public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); + StateCheckpointWriter writer = StateCheckpointWriter.create(out); + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + + // checkpoint the timers + StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder); + + // checkpoint the state + StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder); + + // checkpoint the timerInternals + context.timerInternals.encodeTimerInternals(context, writer, + inputKvCoder, windowingStrategy.getWindowFn().windowCoder()); + + taskState.setOperatorState(out.closeAndGetHandle()); + return taskState; + } + + @Override + public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception { + super.restoreState(taskState, recoveryTimestamp); + + final ClassLoader userClassloader = getUserCodeClassloader(); + + Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + + @SuppressWarnings("unchecked") + StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState(); + DataInputView in = inputState.getState(userClassloader); + StateCheckpointReader reader = new StateCheckpointReader(in); + + // restore the timers + this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder); + + // restore the state + this.perKeyStateInternals = StateCheckpointUtils.decodeState( + reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader); + + // restore the timerInternals. + this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder); + } -} ++} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/911d2953/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/911d2953/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java ----------------------------------------------------------------------
