Merge branch 'master' into gearpump-runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9dc9be9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9dc9be9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9dc9be9e Branch: refs/heads/gearpump-runner Commit: 9dc9be9e6d0aa202e170d64e1a1d7b5a57828c5a Parents: 8f4334c f2fe1ae Author: manuzhang <[email protected]> Authored: Wed Oct 26 11:15:24 2016 +0800 Committer: manuzhang <[email protected]> Committed: Wed Oct 26 11:15:24 2016 +0800 ---------------------------------------------------------------------- .gitignore | 8 + .travis.yml | 10 +- examples/java/pom.xml | 154 ++- .../beam/examples/DebuggingWordCount.java | 2 +- .../apache/beam/examples/MinimalWordCount.java | 3 +- .../apache/beam/examples/WindowedWordCount.java | 2 +- .../org/apache/beam/examples/WordCount.java | 4 +- .../common/ExampleBigQueryTableOptions.java | 2 +- ...xamplePubsubTopicAndSubscriptionOptions.java | 2 +- .../common/ExamplePubsubTopicOptions.java | 2 +- .../beam/examples/common/ExampleUtils.java | 2 +- .../examples/common/PubsubFileInjector.java | 153 --- .../beam/examples/complete/AutoComplete.java | 2 +- .../apache/beam/examples/complete/TfIdf.java | 2 +- .../examples/complete/TopWikipediaSessions.java | 2 +- .../examples/cookbook/BigQueryTornadoes.java | 2 +- .../cookbook/CombinePerKeyExamples.java | 2 +- .../examples/cookbook/DatastoreWordCount.java | 261 ----- .../beam/examples/cookbook/DeDupExample.java | 4 +- .../beam/examples/cookbook/FilterExamples.java | 2 +- .../beam/examples/cookbook/JoinExamples.java | 2 +- .../examples/cookbook/MaxPerKeyExamples.java | 2 +- .../beam/examples/cookbook/TriggerExample.java | 28 +- .../beam/examples/WindowedWordCountIT.java | 75 ++ .../org/apache/beam/examples/WordCountIT.java | 6 - .../examples/cookbook/BigQueryTornadoesIT.java | 14 +- examples/java8/pom.xml | 2 +- .../beam/examples/complete/game/GameStats.java | 10 +- .../examples/complete/game/HourlyTeamScore.java | 8 +- .../examples/complete/game/LeaderBoard.java | 12 +- .../beam/examples/complete/game/UserScore.java | 10 +- .../complete/game/injector/Injector.java | 10 +- .../examples/complete/game/LeaderBoardTest.java | 5 +- examples/pom.xml | 2 +- pom.xml | 134 ++- runners/core-java/pom.xml | 8 +- .../beam/runners/core/AggregatorFactory.java | 39 + .../beam/runners/core/BatchTimerInternals.java | 140 --- .../apache/beam/runners/core/DoFnRunner.java | 8 +- .../beam/runners/core/DoFnRunnerBase.java | 559 ----------- .../apache/beam/runners/core/DoFnRunners.java | 191 +++- .../runners/core/ElementAndRestriction.java | 42 + .../core/ElementAndRestrictionCoder.java | 67 ++ .../runners/core/ElementByteSizeObservable.java | 5 +- .../runners/core/GBKIntoKeyedWorkItems.java | 55 + .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 5 + .../runners/core/GroupAlsoByWindowsDoFn.java | 19 - .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 12 +- .../runners/core/ReduceFnContextFactory.java | 2 +- .../beam/runners/core/ReduceFnRunner.java | 42 +- .../beam/runners/core/SideInputHandler.java | 2 +- .../beam/runners/core/SimpleDoFnRunner.java | 58 -- .../beam/runners/core/SimpleOldDoFnRunner.java | 521 ++++++++++ .../beam/runners/core/SplittableParDo.java | 469 +++++++++ .../apache/beam/runners/core/TriggerRunner.java | 247 ----- .../core/UnboundedReadFromBoundedSource.java | 2 +- .../core/triggers/AfterAllStateMachine.java | 109 ++ .../AfterDelayFromFirstElementStateMachine.java | 337 +++++++ .../core/triggers/AfterEachStateMachine.java | 130 +++ .../core/triggers/AfterFirstStateMachine.java | 112 +++ .../core/triggers/AfterPaneStateMachine.java | 139 +++ .../AfterProcessingTimeStateMachine.java | 93 ++ ...rSynchronizedProcessingTimeStateMachine.java | 63 ++ .../triggers/AfterWatermarkStateMachine.java | 325 ++++++ .../triggers/DefaultTriggerStateMachine.java | 81 ++ .../triggers/ExecutableTriggerStateMachine.java | 160 +++ .../runners/core/triggers/FinishedTriggers.java | 44 + .../core/triggers/FinishedTriggersBitSet.java | 67 ++ .../core/triggers/FinishedTriggersSet.java | 72 ++ .../core/triggers/NeverStateMachine.java | 60 ++ .../core/triggers/OrFinallyStateMachine.java | 85 ++ .../core/triggers/RepeatedlyStateMachine.java | 88 ++ .../triggers/ReshuffleTriggerStateMachine.java | 50 + .../core/triggers/TriggerStateMachine.java | 487 +++++++++ .../TriggerStateMachineContextFactory.java | 509 ++++++++++ .../triggers/TriggerStateMachineRunner.java | 234 +++++ .../core/triggers/TriggerStateMachines.java | 215 ++++ .../runners/core/triggers/package-info.java | 23 + .../runners/core/BatchTimerInternalsTest.java | 118 --- .../core/ElementAndRestrictionCoderTest.java | 127 +++ .../beam/runners/core/ReduceFnRunnerTest.java | 281 +++--- .../beam/runners/core/ReduceFnTester.java | 405 +++----- .../beam/runners/core/SimpleDoFnRunnerTest.java | 88 -- .../runners/core/SimpleOldDoFnRunnerTest.java | 88 ++ .../beam/runners/core/SplittableParDoTest.java | 467 +++++++++ .../core/triggers/AfterAllStateMachineTest.java | 140 +++ .../triggers/AfterEachStateMachineTest.java | 108 ++ .../triggers/AfterFirstStateMachineTest.java | 159 +++ .../triggers/AfterPaneStateMachineTest.java | 117 +++ .../AfterProcessingTimeStateMachineTest.java | 172 ++++ ...chronizedProcessingTimeStateMachineTest.java | 110 ++ .../AfterWatermarkStateMachineTest.java | 382 +++++++ .../DefaultTriggerStateMachineTest.java | 165 +++ .../ExecutableTriggerStateMachineTest.java | 108 ++ .../triggers/FinishedTriggersBitSetTest.java | 55 + .../triggers/FinishedTriggersProperties.java | 115 +++ .../core/triggers/FinishedTriggersSetTest.java | 60 ++ .../core/triggers/NeverStateMachineTest.java | 59 ++ .../triggers/OrFinallyStateMachineTest.java | 177 ++++ .../triggers/RepeatedlyStateMachineTest.java | 200 ++++ .../ReshuffleTriggerStateMachineTest.java | 68 ++ .../core/triggers/StubTriggerStateMachine.java | 60 ++ .../core/triggers/TriggerStateMachineTest.java | 98 ++ .../triggers/TriggerStateMachineTester.java | 431 ++++++++ .../core/triggers/TriggerStateMachinesTest.java | 199 ++++ runners/direct-java/pom.xml | 26 +- .../runners/direct/AggregatorContainer.java | 9 +- .../direct/BoundedReadEvaluatorFactory.java | 155 +-- .../beam/runners/direct/BundleFactory.java | 15 +- .../runners/direct/CloningBundleFactory.java | 98 ++ .../beam/runners/direct/CompletionCallback.java | 4 +- .../runners/direct/DirectExecutionContext.java | 2 +- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 66 ++ .../beam/runners/direct/DirectMetrics.java | 338 +++++++ .../beam/runners/direct/DirectOptions.java | 40 +- .../beam/runners/direct/DirectRunner.java | 121 ++- .../runners/direct/DirectTimerInternals.java | 2 +- .../runners/direct/DoFnLifecycleManager.java | 86 +- .../beam/runners/direct/EmptyInputProvider.java | 45 + .../direct/EncodabilityEnforcementFactory.java | 50 +- .../beam/runners/direct/EvaluationContext.java | 27 +- .../direct/ExecutorServiceParallelExecutor.java | 124 ++- .../runners/direct/FlattenEvaluatorFactory.java | 26 +- .../runners/direct/ForwardingPTransform.java | 2 +- .../GroupAlsoByWindowEvaluatorFactory.java | 14 +- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 21 +- .../ImmutabilityCheckingBundleFactory.java | 18 +- .../direct/ImmutableListBundleFactory.java | 76 +- .../beam/runners/direct/ParDoEvaluator.java | 13 +- .../direct/ParDoMultiEvaluatorFactory.java | 29 +- .../runners/direct/ParDoOverrideFactory.java | 55 + .../direct/ParDoSingleEvaluatorFactory.java | 23 +- .../beam/runners/direct/PipelineExecutor.java | 2 +- .../beam/runners/direct/RootInputProvider.java | 46 + .../runners/direct/RootProviderRegistry.java | 66 ++ .../runners/direct/StepTransformResult.java | 49 +- .../beam/runners/direct/StructuralKey.java | 88 +- .../direct/TestStreamEvaluatorFactory.java | 152 +-- .../beam/runners/direct/TransformEvaluator.java | 2 +- .../direct/TransformEvaluatorFactory.java | 22 +- .../direct/TransformEvaluatorRegistry.java | 63 +- .../beam/runners/direct/TransformExecutor.java | 53 +- .../beam/runners/direct/TransformResult.java | 16 +- .../direct/UnboundedReadEvaluatorFactory.java | 318 +++--- .../direct/UncommittedBundleOutputManager.java | 4 +- .../runners/direct/ViewEvaluatorFactory.java | 16 +- .../beam/runners/direct/WatermarkManager.java | 53 +- .../runners/direct/WindowEvaluatorFactory.java | 18 +- .../direct/WriteWithShardingFactory.java | 6 +- .../direct/BoundedReadEvaluatorFactoryTest.java | 181 ++-- .../direct/CloningBundleFactoryTest.java | 177 ++++ .../runners/direct/CommittedResultTest.java | 10 +- .../ConsumerTrackingPipelineVisitorTest.java | 32 +- .../beam/runners/direct/DirectMetricsTest.java | 133 +++ .../beam/runners/direct/DirectRunnerTest.java | 180 +++- ...leManagerRemovingTransformEvaluatorTest.java | 16 +- .../direct/DoFnLifecycleManagerTest.java | 86 +- .../direct/DoFnLifecycleManagersTest.java | 48 +- .../EncodabilityEnforcementFactoryTest.java | 132 ++- .../runners/direct/EvaluationContextTest.java | 31 +- .../direct/FlattenEvaluatorFactoryTest.java | 36 +- .../direct/ForwardingPTransformTest.java | 7 +- .../direct/GroupByKeyEvaluatorFactoryTest.java | 27 +- .../GroupByKeyOnlyEvaluatorFactoryTest.java | 35 +- .../ImmutabilityCheckingBundleFactoryTest.java | 67 +- .../ImmutabilityEnforcementFactoryTest.java | 14 +- .../direct/ImmutableListBundleFactoryTest.java | 52 +- .../direct/KeyedPValueTrackingVisitorTest.java | 8 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 15 +- .../direct/ParDoMultiEvaluatorFactoryTest.java | 162 +-- .../direct/ParDoSingleEvaluatorFactoryTest.java | 145 ++- .../beam/runners/direct/SplittableDoFnTest.java | 231 +++++ .../runners/direct/StepTransformResultTest.java | 4 +- .../beam/runners/direct/StructuralKeyTest.java | 9 + .../direct/TestStreamEvaluatorFactoryTest.java | 223 ++--- .../runners/direct/TransformExecutorTest.java | 115 +-- .../UnboundedReadEvaluatorFactoryTest.java | 321 ++++-- .../direct/ViewEvaluatorFactoryTest.java | 6 +- .../runners/direct/WatermarkManagerTest.java | 84 +- .../direct/WindowEvaluatorFactoryTest.java | 12 +- runners/flink/README.md | 10 +- runners/flink/examples/pom.xml | 2 +- .../beam/runners/flink/examples/TFIDF.java | 12 +- .../beam/runners/flink/examples/WordCount.java | 4 +- .../flink/examples/streaming/AutoComplete.java | 4 +- .../flink/examples/streaming/package-info.java | 22 - runners/flink/pom.xml | 4 +- runners/flink/runner/pom.xml | 10 +- .../flink/FlinkDetachedRunnerResult.java | 76 ++ .../FlinkPipelineExecutionEnvironment.java | 7 + .../runners/flink/FlinkPipelineOptions.java | 19 +- .../apache/beam/runners/flink/FlinkRunner.java | 29 +- .../runners/flink/FlinkRunnerRegistrar.java | 4 +- .../beam/runners/flink/FlinkRunnerResult.java | 17 +- .../beam/runners/flink/TestFlinkRunner.java | 9 +- .../FlinkBatchPipelineTranslator.java | 2 +- .../FlinkStreamingPipelineTranslator.java | 2 +- .../functions/FlinkDoFnFunction.java | 2 +- .../functions/FlinkMultiOutputDoFnFunction.java | 2 +- .../functions/FlinkProcessContext.java | 10 +- .../wrappers/streaming/DoFnOperator.java | 31 +- .../wrappers/streaming/FlinkStateInternals.java | 8 +- .../wrappers/streaming/WindowDoFnOperator.java | 177 +++- .../streaming/io/BoundedSourceWrapper.java | 9 +- .../streaming/io/UnboundedSourceWrapper.java | 187 +++- .../beam/runners/flink/FlinkTestPipeline.java | 6 +- .../flink/streaming/DoFnOperatorTest.java | 9 +- .../streaming/UnboundedSourceWrapperTest.java | 123 +-- .../translators/TransformTranslator.java | 30 - .../translators/io/UnboundedSourceWrapper.java | 45 - runners/google-cloud-dataflow-java/pom.xml | 22 +- .../runners/dataflow/DataflowPipelineJob.java | 52 +- .../dataflow/DataflowPipelineTranslator.java | 137 ++- .../beam/runners/dataflow/DataflowRunner.java | 57 +- .../dataflow/internal/AssignWindows.java | 6 +- .../DataflowUnboundedReadFromBoundedSource.java | 18 +- .../runners/dataflow/internal/IsmFormat.java | 13 +- .../dataflow/internal/ReadTranslator.java | 2 +- .../options/DataflowPipelineDebugOptions.java | 7 +- .../options/DataflowPipelineOptions.java | 14 +- .../DataflowPipelineWorkerPoolOptions.java | 11 +- .../options/DataflowProfilingOptions.java | 2 +- .../options/DataflowWorkerLoggingOptions.java | 4 +- .../dataflow/testing/TestDataflowRunner.java | 144 ++- .../beam/runners/dataflow/util/DoFnInfo.java | 29 +- .../runners/dataflow/util/MonitoringUtil.java | 2 +- .../runners/dataflow/util/RandomAccessData.java | 8 +- .../beam/runners/dataflow/util/Stager.java | 2 +- .../dataflow/DataflowPipelineJobTest.java | 120 +++ .../DataflowPipelineTranslatorTest.java | 40 +- .../runners/dataflow/DataflowRunnerTest.java | 13 +- ...aflowUnboundedReadFromBoundedSourceTest.java | 83 ++ .../testing/TestDataflowRunnerTest.java | 287 +++++- .../dataflow/util/MonitoringUtilTest.java | 4 +- runners/pom.xml | 3 +- runners/spark/pom.xml | 94 +- .../beam/runners/spark/EvaluationResult.java | 4 +- .../runners/spark/SparkPipelineOptions.java | 48 +- .../apache/beam/runners/spark/SparkRunner.java | 176 ++-- .../beam/runners/spark/TestSparkRunner.java | 17 +- .../spark/aggregators/AccumulatorSingleton.java | 53 + .../runners/spark/coders/WritableCoder.java | 2 +- .../beam/runners/spark/examples/WordCount.java | 2 +- .../apache/beam/runners/spark/io/SourceRDD.java | 200 ++++ .../spark/io/hadoop/ShardNameTemplateAware.java | 2 +- .../runners/spark/translation/DoFnFunction.java | 73 +- .../spark/translation/EvaluationContext.java | 32 +- .../translation/GroupCombineFunctions.java | 313 ++++++ .../spark/translation/MultiDoFnFunction.java | 77 +- .../translation/SparkAbstractCombineFn.java | 134 +++ .../spark/translation/SparkContextFactory.java | 50 +- .../spark/translation/SparkGlobalCombineFn.java | 260 +++++ .../spark/translation/SparkKeyedCombineFn.java | 273 +++++ .../translation/SparkPipelineEvaluator.java | 57 -- .../translation/SparkPipelineTranslator.java | 5 +- .../spark/translation/SparkProcessContext.java | 168 +++- .../spark/translation/SparkRuntimeContext.java | 44 +- .../spark/translation/TransformTranslator.java | 592 +++-------- .../spark/translation/TranslationUtils.java | 197 ++++ .../SparkRunnerStreamingContextFactory.java | 106 ++ .../streaming/StreamingEvaluationContext.java | 81 +- .../streaming/StreamingTransformTranslator.java | 561 +++++++---- .../spark/util/SparkSideInputReader.java | 95 ++ .../runners/spark/ClearAggregatorsRule.java | 33 + .../apache/beam/runners/spark/DeDupTest.java | 59 -- .../beam/runners/spark/EmptyInputTest.java | 75 -- .../beam/runners/spark/SimpleWordCountTest.java | 105 -- .../apache/beam/runners/spark/TfIdfTest.java | 260 ----- .../beam/runners/spark/io/AvroPipelineTest.java | 4 +- .../beam/runners/spark/io/NumShardsTest.java | 4 +- .../io/hadoop/HadoopFileFormatPipelineTest.java | 4 +- .../spark/translation/CombineGloballyTest.java | 102 -- .../spark/translation/CombinePerKeyTest.java | 79 -- .../spark/translation/DoFnOutputTest.java | 67 -- .../translation/MultiOutputWordCountTest.java | 176 ---- .../spark/translation/SerializationTest.java | 201 ---- .../spark/translation/SideEffectsTest.java | 21 +- .../translation/SparkPipelineOptionsTest.java | 42 - .../translation/TransformTranslatorTest.java | 104 -- .../translation/WindowedWordCountTest.java | 120 --- .../streaming/EmptyStreamAssertionTest.java | 80 ++ .../streaming/FlattenStreamingTest.java | 57 +- .../streaming/KafkaStreamingTest.java | 32 +- .../ResumeFromCheckpointStreamingTest.java | 182 ++++ .../streaming/SimpleStreamingWordCountTest.java | 67 +- .../streaming/utils/PAssertStreaming.java | 87 +- .../utils/TestOptionsForStreaming.java | 55 + .../spark/src/test/resources/metrics.properties | 61 +- sdks/java/build-tools/pom.xml | 2 +- .../src/main/resources/beam/checkstyle.xml | 28 +- .../src/main/resources/beam/findbugs-filter.xml | 2 +- .../src/main/resources/beam/suppressions.xml | 11 +- sdks/java/core/pom.xml | 43 +- .../main/java/org/apache/beam/sdk/Pipeline.java | 12 +- .../org/apache/beam/sdk/PipelineResult.java | 15 +- .../beam/sdk/annotations/Experimental.java | 15 +- .../sdk/coders/CannotProvideCoderException.java | 2 +- .../java/org/apache/beam/sdk/coders/Coder.java | 28 +- .../apache/beam/sdk/coders/CoderFactory.java | 4 +- .../apache/beam/sdk/coders/CoderProvider.java | 2 +- .../apache/beam/sdk/coders/DelegateCoder.java | 4 +- .../beam/sdk/coders/IterableLikeCoder.java | 22 +- .../apache/beam/sdk/coders/NullableCoder.java | 10 +- .../apache/beam/sdk/coders/package-info.java | 2 +- .../beam/sdk/coders/protobuf/ProtoCoder.java | 4 +- .../java/org/apache/beam/sdk/io/AvroIO.java | 162 ++- .../java/org/apache/beam/sdk/io/AvroSource.java | 98 +- .../sdk/io/BoundedReadFromUnboundedSource.java | 11 +- .../org/apache/beam/sdk/io/BoundedSource.java | 19 +- .../apache/beam/sdk/io/CompressedSource.java | 18 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 145 ++- .../apache/beam/sdk/io/OffsetBasedSource.java | 6 +- .../java/org/apache/beam/sdk/io/PubsubIO.java | 24 +- .../beam/sdk/io/PubsubUnboundedSource.java | 2 +- .../main/java/org/apache/beam/sdk/io/Read.java | 4 +- .../sdk/io/SerializableAvroCodecFactory.java | 112 +++ .../main/java/org/apache/beam/sdk/io/Sink.java | 5 +- .../java/org/apache/beam/sdk/io/TextIO.java | 247 +++-- .../main/java/org/apache/beam/sdk/io/Write.java | 8 +- .../java/org/apache/beam/sdk/io/XmlSource.java | 13 +- .../apache/beam/sdk/io/range/ByteKeyRange.java | 6 +- .../apache/beam/sdk/io/range/RangeTracker.java | 1 + .../org/apache/beam/sdk/metrics/Counter.java | 40 + .../apache/beam/sdk/metrics/CounterCell.java | 76 ++ .../org/apache/beam/sdk/metrics/DirtyState.java | 98 ++ .../apache/beam/sdk/metrics/Distribution.java | 30 + .../beam/sdk/metrics/DistributionCell.java | 58 ++ .../beam/sdk/metrics/DistributionData.java | 59 ++ .../beam/sdk/metrics/DistributionResult.java | 42 + .../org/apache/beam/sdk/metrics/Metric.java | 24 + .../org/apache/beam/sdk/metrics/MetricCell.java | 47 + .../org/apache/beam/sdk/metrics/MetricKey.java | 40 + .../org/apache/beam/sdk/metrics/MetricName.java | 46 + .../beam/sdk/metrics/MetricNameFilter.java | 60 ++ .../beam/sdk/metrics/MetricQueryResults.java | 33 + .../apache/beam/sdk/metrics/MetricResult.java | 45 + .../apache/beam/sdk/metrics/MetricResults.java | 34 + .../apache/beam/sdk/metrics/MetricUpdates.java | 72 ++ .../org/apache/beam/sdk/metrics/Metrics.java | 110 ++ .../beam/sdk/metrics/MetricsContainer.java | 150 +++ .../beam/sdk/metrics/MetricsEnvironment.java | 85 ++ .../apache/beam/sdk/metrics/MetricsFilter.java | 86 ++ .../org/apache/beam/sdk/metrics/MetricsMap.java | 86 ++ .../apache/beam/sdk/metrics/package-info.java | 28 + .../options/CloudResourceManagerOptions.java | 40 + .../org/apache/beam/sdk/options/GcpOptions.java | 8 +- .../org/apache/beam/sdk/options/GcsOptions.java | 4 +- .../beam/sdk/options/GoogleApiDebugOptions.java | 2 +- .../beam/sdk/options/PipelineOptions.java | 52 +- .../sdk/options/PipelineOptionsFactory.java | 269 +++-- .../sdk/options/ProxyInvocationHandler.java | 264 +++-- .../apache/beam/sdk/options/ValueProvider.java | 239 +++++ .../java/org/apache/beam/sdk/package-info.java | 2 +- .../sdk/runners/PipelineRunnerRegistrar.java | 2 +- .../apache/beam/sdk/runners/package-info.java | 17 +- .../beam/sdk/testing/BigqueryMatcher.java | 239 +++++ .../org/apache/beam/sdk/testing/PAssert.java | 4 +- .../beam/sdk/testing/SerializableMatchers.java | 4 +- .../beam/sdk/testing/SourceTestUtils.java | 3 +- .../apache/beam/sdk/testing/StreamingIT.java | 35 + .../org/apache/beam/sdk/testing/TestStream.java | 2 +- .../apache/beam/sdk/testing/package-info.java | 4 +- .../apache/beam/sdk/transforms/Aggregator.java | 11 +- .../sdk/transforms/ApproximateQuantiles.java | 5 +- .../org/apache/beam/sdk/transforms/Combine.java | 95 +- .../beam/sdk/transforms/CombineFnBase.java | 17 +- .../apache/beam/sdk/transforms/CombineFns.java | 41 +- .../beam/sdk/transforms/CombineWithContext.java | 7 +- .../sdk/transforms/DelegatingAggregator.java | 125 +++ .../org/apache/beam/sdk/transforms/DoFn.java | 445 ++++++++- .../beam/sdk/transforms/DoFnAdapters.java | 184 +++- .../apache/beam/sdk/transforms/DoFnTester.java | 257 +++-- .../beam/sdk/transforms/FlatMapElements.java | 6 +- .../apache/beam/sdk/transforms/GroupByKey.java | 4 +- .../transforms/IntraBundleParallelization.java | 361 ------- .../org/apache/beam/sdk/transforms/Latest.java | 12 +- .../apache/beam/sdk/transforms/MapElements.java | 8 +- .../org/apache/beam/sdk/transforms/OldDoFn.java | 318 ++++-- .../apache/beam/sdk/transforms/PTransform.java | 10 +- .../org/apache/beam/sdk/transforms/ParDo.java | 341 +++++-- .../apache/beam/sdk/transforms/Partition.java | 4 +- .../beam/sdk/transforms/RemoveDuplicates.java | 5 +- .../sdk/transforms/SerializableFunction.java | 2 +- .../org/apache/beam/sdk/transforms/ViewFn.java | 2 +- .../apache/beam/sdk/transforms/WithKeys.java | 2 +- .../sdk/transforms/display/DisplayData.java | 530 ++++++---- .../sdk/transforms/reflect/DoFnInvoker.java | 48 +- .../sdk/transforms/reflect/DoFnInvokers.java | 658 ++++++++---- .../sdk/transforms/reflect/DoFnSignature.java | 466 ++++++++- .../sdk/transforms/reflect/DoFnSignatures.java | 946 +++++++++++++++--- .../splittabledofn/RestrictionTracker.java | 42 + .../transforms/splittabledofn/package-info.java | 22 + .../beam/sdk/transforms/windowing/AfterAll.java | 51 +- .../windowing/AfterDelayFromFirstElement.java | 110 +- .../sdk/transforms/windowing/AfterEach.java | 63 +- .../sdk/transforms/windowing/AfterFirst.java | 52 +- .../sdk/transforms/windowing/AfterPane.java | 59 +- .../windowing/AfterProcessingTime.java | 7 - .../AfterSynchronizedProcessingTime.java | 13 +- .../transforms/windowing/AfterWatermark.java | 174 +--- .../transforms/windowing/DefaultTrigger.java | 37 +- .../beam/sdk/transforms/windowing/Never.java | 28 +- .../transforms/windowing/OrFinallyTrigger.java | 55 +- .../beam/sdk/transforms/windowing/PaneInfo.java | 6 +- .../sdk/transforms/windowing/Repeatedly.java | 36 +- .../transforms/windowing/SlidingWindows.java | 3 +- .../beam/sdk/transforms/windowing/Trigger.java | 421 ++------ .../beam/sdk/transforms/windowing/Window.java | 20 +- .../beam/sdk/transforms/windowing/WindowFn.java | 2 +- ...AttemptAndTimeBoundedExponentialBackOff.java | 173 ++++ .../util/AttemptBoundedExponentialBackOff.java | 86 ++ .../beam/sdk/util/BaseExecutionContext.java | 4 +- .../apache/beam/sdk/util/CredentialFactory.java | 2 +- .../apache/beam/sdk/util/ExecutableTrigger.java | 40 +- .../sdk/util/ExposedByteArrayOutputStream.java | 1 + .../apache/beam/sdk/util/FinishedTriggers.java | 44 - .../beam/sdk/util/FinishedTriggersBitSet.java | 67 -- .../beam/sdk/util/FinishedTriggersSet.java | 72 -- .../apache/beam/sdk/util/GatherAllPanes.java | 10 +- .../apache/beam/sdk/util/GcpProjectUtil.java | 106 ++ .../apache/beam/sdk/util/GcsPathValidator.java | 2 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 94 +- .../apache/beam/sdk/util/IOChannelFactory.java | 2 +- .../beam/sdk/util/MergingActiveWindowSet.java | 12 +- .../apache/beam/sdk/util/PCollectionViews.java | 9 +- .../org/apache/beam/sdk/util/PathValidator.java | 6 +- .../beam/sdk/util/PerKeyCombineFnRunner.java | 26 +- .../beam/sdk/util/PerKeyCombineFnRunners.java | 4 +- .../org/apache/beam/sdk/util/PropertyNames.java | 1 + .../org/apache/beam/sdk/util/PubsubClient.java | 5 +- .../apache/beam/sdk/util/PubsubGrpcClient.java | 34 +- .../apache/beam/sdk/util/PubsubTestClient.java | 2 +- .../sdk/util/ReifyTimestampAndWindowsDoFn.java | 16 +- .../org/apache/beam/sdk/util/ReleaseInfo.java | 6 +- .../apache/beam/sdk/util/ReshuffleTrigger.java | 16 +- .../org/apache/beam/sdk/util/StringUtils.java | 2 +- .../java/org/apache/beam/sdk/util/Timer.java | 56 ++ .../apache/beam/sdk/util/TimerInternals.java | 6 +- .../org/apache/beam/sdk/util/TimerSpec.java | 30 + .../org/apache/beam/sdk/util/TimerSpecs.java | 41 + .../java/org/apache/beam/sdk/util/Timers.java | 10 +- .../org/apache/beam/sdk/util/Transport.java | 17 + .../beam/sdk/util/TriggerContextFactory.java | 507 ---------- .../apache/beam/sdk/util/ValueWithRecordId.java | 8 +- .../beam/sdk/util/common/ReflectHelpers.java | 22 - .../apache/beam/sdk/util/common/Reiterable.java | 2 +- .../apache/beam/sdk/util/common/Reiterator.java | 2 +- .../CopyOnAccessInMemoryStateInternals.java | 2 +- .../sdk/util/state/InMemoryTimerInternals.java | 235 +++++ .../beam/sdk/util/state/ReadableState.java | 10 +- .../apache/beam/sdk/util/state/StateBinder.java | 67 ++ .../beam/sdk/util/state/StateContext.java | 6 +- .../apache/beam/sdk/util/state/StateSpec.java | 39 + .../apache/beam/sdk/util/state/StateSpecs.java | 452 +++++++++ .../apache/beam/sdk/util/state/StateTag.java | 82 +- .../apache/beam/sdk/util/state/StateTags.java | 386 +------- .../util/state/TestInMemoryStateInternals.java | 61 ++ .../beam/sdk/util/state/TimerCallback.java | 35 + .../apache/beam/sdk/values/PCollectionView.java | 15 +- .../java/org/apache/beam/sdk/values/PInput.java | 8 +- .../org/apache/beam/sdk/values/POutput.java | 8 +- .../java/org/apache/beam/sdk/values/PValue.java | 4 +- .../apache/beam/sdk/values/TypeDescriptors.java | 40 +- .../apache/beam/sdk/values/package-info.java | 4 +- .../dataflow/util/GcsPathValidatorTest.java | 103 -- .../org/apache/beam/sdk/DataflowMatchers.java | 64 -- .../java/org/apache/beam/sdk/PipelineTest.java | 2 +- .../apache/beam/sdk/coders/AvroCoderTest.java | 4 +- .../beam/sdk/coders/CoderRegistryTest.java | 1 + .../org/apache/beam/sdk/coders/KvCoderTest.java | 99 +- .../beam/sdk/coders/NullableCoderTest.java | 60 +- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 107 +- .../org/apache/beam/sdk/io/AvroSourceTest.java | 43 + .../io/BoundedReadFromUnboundedSourceTest.java | 14 +- .../beam/sdk/io/CompressedSourceTest.java | 4 +- .../sdk/io/DrunkWritableByteChannelFactory.java | 80 ++ .../apache/beam/sdk/io/FileBasedSinkTest.java | 108 ++ .../beam/sdk/io/OffsetBasedSourceTest.java | 30 +- .../java/org/apache/beam/sdk/io/ReadTest.java | 62 +- .../io/SerializableAvroCodecFactoryTest.java | 100 ++ .../java/org/apache/beam/sdk/io/TextIOTest.java | 601 ++++++----- .../java/org/apache/beam/sdk/io/WriteTest.java | 6 +- .../beam/sdk/metrics/CounterCellTest.java | 55 + .../apache/beam/sdk/metrics/DirtyStateTest.java | 56 ++ .../beam/sdk/metrics/DistributionCellTest.java | 53 + .../apache/beam/sdk/metrics/MetricMatchers.java | 99 ++ .../beam/sdk/metrics/MetricsContainerTest.java | 129 +++ .../sdk/metrics/MetricsEnvironmentTest.java | 63 ++ .../apache/beam/sdk/metrics/MetricsMapTest.java | 103 ++ .../apache/beam/sdk/metrics/MetricsTest.java | 98 ++ .../sdk/options/PipelineOptionsFactoryTest.java | 184 +++- .../beam/sdk/options/PipelineOptionsTest.java | 49 +- .../options/PipelineOptionsValidatorTest.java | 18 +- .../sdk/options/ProxyInvocationHandlerTest.java | 111 ++- .../beam/sdk/options/ValueProviderTest.java | 220 ++++ .../beam/sdk/testing/BigqueryMatcherTest.java | 176 ++++ .../sdk/testing/PCollectionViewTesting.java | 7 - .../beam/sdk/testing/SystemNanoTimeSleeper.java | 2 +- .../beam/sdk/testing/TestPipelineTest.java | 4 +- .../beam/sdk/transforms/CombineFnsTest.java | 7 +- .../apache/beam/sdk/transforms/CombineTest.java | 81 +- .../DoFnDelegatingAggregatorTest.java | 5 +- .../beam/sdk/transforms/DoFnTesterTest.java | 458 +++++---- .../apache/beam/sdk/transforms/FlattenTest.java | 20 + .../IntraBundleParallelizationTest.java | 280 ------ .../beam/sdk/transforms/LatestFnTest.java | 233 +++++ .../apache/beam/sdk/transforms/ParDoTest.java | 101 +- .../display/DisplayDataEvaluator.java | 13 +- .../transforms/display/DisplayDataMatchers.java | 141 ++- .../display/DisplayDataMatchersTest.java | 67 +- .../sdk/transforms/display/DisplayDataTest.java | 367 ++++--- .../transforms/reflect/DoFnInvokersTest.java | 710 +++++++------ .../reflect/DoFnInvokersTestHelper.java | 116 --- .../DoFnSignaturesProcessElementTest.java | 213 ++++ .../DoFnSignaturesSplittableDoFnTest.java | 543 ++++++++++ .../transforms/reflect/DoFnSignaturesTest.java | 991 ++++++++++++++----- .../reflect/DoFnSignaturesTestUtils.java | 67 ++ .../testhelper/DoFnInvokersTestHelper.java | 124 +++ .../sdk/transforms/windowing/AfterAllTest.java | 98 -- .../sdk/transforms/windowing/AfterEachTest.java | 64 -- .../transforms/windowing/AfterFirstTest.java | 120 --- .../sdk/transforms/windowing/AfterPaneTest.java | 77 -- .../windowing/AfterProcessingTimeTest.java | 94 -- .../AfterSynchronizedProcessingTimeTest.java | 75 -- .../windowing/AfterWatermarkTest.java | 308 ------ .../windowing/DefaultTriggerTest.java | 130 --- .../sdk/transforms/windowing/NeverTest.java | 34 +- .../windowing/OrFinallyTriggerTest.java | 136 --- .../transforms/windowing/RepeatedlyTest.java | 161 +-- .../sdk/transforms/windowing/StubTrigger.java | 17 - .../sdk/transforms/windowing/TriggerTest.java | 28 - .../sdk/transforms/windowing/WindowTest.java | 4 +- .../apache/beam/sdk/util/ApiSurfaceTest.java | 28 +- ...mptAndTimeBoundedExponentialBackOffTest.java | 213 ++++ .../AttemptBoundedExponentialBackOffTest.java | 85 ++ .../beam/sdk/util/ExecutableTriggerTest.java | 18 - .../sdk/util/FinishedTriggersBitSetTest.java | 55 - .../sdk/util/FinishedTriggersProperties.java | 110 -- .../beam/sdk/util/FinishedTriggersSetTest.java | 60 -- .../beam/sdk/util/GcpProjectUtilTest.java | 76 ++ .../beam/sdk/util/GcsPathValidatorTest.java | 100 ++ .../org/apache/beam/sdk/util/GcsUtilTest.java | 112 ++- .../beam/sdk/util/PubsubGrpcClientTest.java | 108 +- .../beam/sdk/util/PubsubJsonClientTest.java | 16 +- .../beam/sdk/util/ReshuffleTriggerTest.java | 23 - .../org/apache/beam/sdk/util/TriggerTester.java | 592 ----------- .../util/state/InMemoryTimerInternalsTest.java | 116 +++ sdks/java/extensions/join-library/pom.xml | 2 +- sdks/java/extensions/pom.xml | 2 +- sdks/java/io/google-cloud-platform/pom.xml | 16 +- .../sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 79 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 239 +++-- .../sdk/io/gcp/bigquery/BigQueryServices.java | 15 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 106 +- .../gcp/bigquery/BigQueryTableRowIterator.java | 130 ++- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 56 +- .../beam/sdk/io/gcp/datastore/DatastoreIO.java | 2 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 99 +- .../io/gcp/bigquery/BigQueryAvroUtilsTest.java | 149 ++- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 570 ++++++++++- .../gcp/bigquery/BigQueryServicesImplTest.java | 149 ++- .../bigquery/BigQueryTableRowIteratorTest.java | 169 +++- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 104 +- .../sdk/io/gcp/datastore/SplitQueryFnIT.java | 4 +- .../beam/sdk/io/gcp/datastore/V1ReadIT.java | 2 +- .../beam/sdk/io/gcp/datastore/V1WriteIT.java | 2 +- sdks/java/io/hdfs/pom.xml | 2 +- .../beam/sdk/io/hdfs/AvroHDFSFileSource.java | 2 +- .../beam/sdk/io/hdfs/AvroWrapperCoder.java | 2 +- .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 10 +- .../apache/beam/sdk/io/hdfs/WritableCoder.java | 2 +- .../SimpleAuthAvroHDFSFileSource.java | 2 +- .../hdfs/simpleauth/SimpleAuthHDFSFileSink.java | 2 +- .../simpleauth/SimpleAuthHDFSFileSource.java | 7 +- sdks/java/io/jdbc/pom.xml | 138 +++ .../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 443 +++++++++ .../apache/beam/sdk/io/jdbc/package-info.java | 22 + .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 271 +++++ sdks/java/io/jms/pom.xml | 2 +- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 11 +- sdks/java/io/kafka/pom.xml | 2 +- .../beam/sdk/io/kafka/KafkaCheckpointMark.java | 8 +- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 90 +- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 166 +++- sdks/java/io/kinesis/pom.xml | 3 +- .../beam/sdk/io/kinesis/CustomOptional.java | 7 +- .../sdk/io/kinesis/GetKinesisRecordsResult.java | 2 +- .../sdk/io/kinesis/KinesisClientProvider.java | 2 +- .../apache/beam/sdk/io/kinesis/KinesisIO.java | 18 +- .../beam/sdk/io/kinesis/KinesisReader.java | 10 +- .../sdk/io/kinesis/KinesisReaderCheckpoint.java | 4 +- .../beam/sdk/io/kinesis/KinesisRecordCoder.java | 2 +- .../beam/sdk/io/kinesis/KinesisSource.java | 6 +- .../beam/sdk/io/kinesis/RecordFilter.java | 6 +- .../apache/beam/sdk/io/kinesis/RoundRobin.java | 2 +- .../beam/sdk/io/kinesis/ShardCheckpoint.java | 6 +- .../sdk/io/kinesis/ShardRecordsIterator.java | 4 +- .../sdk/io/kinesis/SimplifiedKinesisClient.java | 8 +- .../beam/sdk/io/kinesis/StartingPoint.java | 2 +- .../beam/sdk/io/kinesis/KinesisTestOptions.java | 2 +- .../beam/sdk/io/kinesis/KinesisUploader.java | 5 +- sdks/java/io/mongodb/pom.xml | 14 +- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 449 +++++++++ .../apache/beam/sdk/io/mongodb/MongoDbIO.java | 310 +++--- .../sdk/io/mongodb/MongoDBGridFSIOTest.java | 276 ++++++ .../beam/sdk/io/mongodb/MongoDbIOTest.java | 19 +- sdks/java/io/pom.xml | 3 +- sdks/java/java8tests/pom.xml | 2 +- .../PipelineOptionsFactoryJava8Test.java | 8 +- sdks/java/maven-archetypes/examples/pom.xml | 27 +- .../main/resources/archetype-resources/pom.xml | 26 +- .../src/main/java/DebuggingWordCount.java | 34 +- .../src/main/java/MinimalWordCount.java | 50 +- .../src/main/java/WindowedWordCount.java | 139 +-- .../src/main/java/WordCount.java | 79 +- .../java/common/DataflowExampleOptions.java | 32 - .../main/java/common/DataflowExampleUtils.java | 391 -------- .../common/ExampleBigQueryTableOptions.java | 11 +- .../src/main/java/common/ExampleOptions.java | 32 + ...xamplePubsubTopicAndSubscriptionOptions.java | 45 + .../java/common/ExamplePubsubTopicOptions.java | 17 +- .../src/main/java/common/ExampleUtils.java | 353 +++++++ .../main/java/common/PubsubFileInjector.java | 153 --- .../src/test/java/WordCountTest.java | 9 +- sdks/java/maven-archetypes/pom.xml | 2 +- sdks/java/maven-archetypes/starter/pom.xml | 10 +- .../main/resources/archetype-resources/pom.xml | 8 +- .../resources/projects/basic/reference/pom.xml | 8 +- sdks/java/microbenchmarks/pom.xml | 2 +- .../transforms/DoFnInvokersBenchmark.java | 7 + sdks/java/pom.xml | 2 +- sdks/pom.xml | 2 +- 632 files changed, 36444 insertions(+), 16763 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --cc runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index df74ed3,dec9905..0000000 deleted file mode 100644,100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ /dev/null @@@ -1,58 -1,586 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ --package org.apache.beam.runners.core; - -import static com.google.common.base.Preconditions.checkNotNull; -- -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import java.io.IOException; -import java.util.Collection; -import java.util.Iterator; --import java.util.List; -import java.util.Set; --import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; --import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; --import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; - import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.InputProvider; -import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; --import org.apache.beam.sdk.util.ExecutionContext.StepContext; --import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.UserCodeException; --import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; --import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.PCollectionView; --import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; -import org.joda.time.format.PeriodFormat; -- --/** - * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in. - * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in. -- * - * @param <InputT> the type of the {@link OldDoFn} (main) input elements - * @param <OutputT> the type of the {@link OldDoFn} (main) output elements - * @param <InputT> the type of the {@link DoFn} (main) input elements - * @param <OutputT> the type of the {@link DoFn} (main) output elements -- */ - public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT> { -public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { -- - protected SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn, - /** The {@link DoFn} being run. */ - private final DoFn<InputT, OutputT> fn; - - /** The {@link DoFnInvoker} being run. */ - private final DoFnInvoker<InputT, OutputT> invoker; - - /** The context used for running the {@link DoFn}. */ - private final DoFnContext<InputT, OutputT> context; - - private final OutputManager outputManager; - - private final TupleTag<OutputT> mainOutputTag; - - private final boolean observesWindow; - - public SimpleDoFnRunner( - PipelineOptions options, - DoFn<InputT, OutputT> fn, -- SideInputReader sideInputReader, -- OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext, - AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) { - super(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext, - aggregatorFactory, windowingStrategy); - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowingStrategy<?, ?> windowingStrategy) { - this.fn = fn; - this.observesWindow = - DoFnSignatures.INSTANCE.getSignature(fn.getClass()).processElement().observesWindow(); - this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); - this.outputManager = outputManager; - this.mainOutputTag = mainOutputTag; - this.context = - new DoFnContext<>( - options, - fn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy == null ? null : windowingStrategy.getWindowFn()); -- } -- -- @Override - protected void invokeProcessElement(WindowedValue<InputT> elem) { - final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem); - public void startBundle() { -- // This can contain user code. Wrap it in case it throws an exception. -- try { - fn.processElement(processContext); - invoker.invokeStartBundle(context); - } catch (Throwable t) { - // Exception in user code. - throw wrapUserCodeException(t); - } - } - - @Override - public void processElement(WindowedValue<InputT> compressedElem) { - if (observesWindow) { - for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) { - invokeProcessElement(elem); - } - } else { - invokeProcessElement(compressedElem); - } - } - - private void invokeProcessElement(WindowedValue<InputT> elem) { - final DoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem); - - // Note that if the element must be exploded into all its windows, that has to be done outside - // of this runner. - final DoFn.ExtraContextFactory<InputT, OutputT> extraContextFactory = - createExtraContextFactory(elem); - - // This can contain user code. Wrap it in case it throws an exception. - try { - invoker.invokeProcessElement(processContext, extraContextFactory); -- } catch (Exception ex) { -- throw wrapUserCodeException(ex); - } - } - - @Override - public void finishBundle() { - // This can contain user code. Wrap it in case it throws an exception. - try { - invoker.invokeFinishBundle(context); - } catch (Throwable t) { - // Exception in user code. - throw wrapUserCodeException(t); - } - } - - /** Returns a new {@link DoFn.ProcessContext} for the given element. */ - private DoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> elem) { - return new DoFnProcessContext<InputT, OutputT>(fn, context, elem); - } - - private DoFn.ExtraContextFactory<InputT, OutputT> createExtraContextFactory( - WindowedValue<InputT> elem) { - return new DoFnExtraContextFactory<InputT, OutputT>(elem.getWindows(), elem.getPane()); - } - - private RuntimeException wrapUserCodeException(Throwable t) { - throw UserCodeException.wrapIf(!isSystemDoFn(), t); - } - - private boolean isSystemDoFn() { - return invoker.getClass().isAnnotationPresent(SystemDoFnInternal.class); - } - - /** - * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}. - * - * @param <InputT> the type of the {@link DoFn} (main) input elements - * @param <OutputT> the type of the {@link DoFn} (main) output elements - */ - private static class DoFnContext<InputT, OutputT> extends DoFn<InputT, OutputT>.Context { - private static final int MAX_SIDE_OUTPUTS = 1000; - - final PipelineOptions options; - final DoFn<InputT, OutputT> fn; - final SideInputReader sideInputReader; - final OutputManager outputManager; - final TupleTag<OutputT> mainOutputTag; - final StepContext stepContext; - final AggregatorFactory aggregatorFactory; - final WindowFn<?, ?> windowFn; - - /** - * The set of known output tags, some of which may be undeclared, so we can throw an exception - * when it exceeds {@link #MAX_SIDE_OUTPUTS}. - */ - private Set<TupleTag<?>> outputTags; - - public DoFnContext( - PipelineOptions options, - DoFn<InputT, OutputT> fn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowFn<?, ?> windowFn) { - fn.super(); - this.options = options; - this.fn = fn; - this.sideInputReader = sideInputReader; - this.outputManager = outputManager; - this.mainOutputTag = mainOutputTag; - this.outputTags = Sets.newHashSet(); - - outputTags.add(mainOutputTag); - for (TupleTag<?> sideOutputTag : sideOutputTags) { - outputTags.add(sideOutputTag); - } - - this.stepContext = stepContext; - this.aggregatorFactory = aggregatorFactory; - this.windowFn = windowFn; - super.setupDelegateAggregators(); - } - - ////////////////////////////////////////////////////////////////////////////// - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue( - T output, Instant timestamp, Collection<W> windows, PaneInfo pane) { - final Instant inputTimestamp = timestamp; - - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - if (windows == null) { - try { - // The windowFn can never succeed at accessing the element, so its type does not - // matter here - @SuppressWarnings("unchecked") - WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn; - windows = - objectWindowFn.assignWindows( - objectWindowFn.new AssignContext() { - @Override - public Object element() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); - } - - @Override - public Instant timestamp() { - if (inputTimestamp == null) { - throw new UnsupportedOperationException( - "WindowFn attempted to access input timestamp when none was available"); - } - return inputTimestamp; - } - - @Override - public W window() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input windows when none were available"); - } - }); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - - return WindowedValue.of(output, timestamp, windows, pane); - } - - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - if (!sideInputReader.contains(view)) { - throw new IllegalArgumentException("calling sideInput() with unknown view"); - } - BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); - return sideInputReader.get(view, sideInputWindow); - } - - void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane)); - } - - void outputWindowedValue(WindowedValue<OutputT> windowedElem) { - outputManager.output(mainOutputTag, windowedElem); - if (stepContext != null) { - stepContext.noteOutput(windowedElem); - } - } - - private <T> void sideOutputWindowedValue( - TupleTag<T> tag, - T output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); - } - - private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) { - if (!outputTags.contains(tag)) { - // This tag wasn't declared nor was it seen before during this execution. - // Thus, this must be a new, undeclared and unconsumed output. - // To prevent likely user errors, enforce the limit on the number of side - // outputs. - if (outputTags.size() >= MAX_SIDE_OUTPUTS) { - throw new IllegalArgumentException( - "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); - } - outputTags.add(tag); - } - - outputManager.output(tag, windowedElem); - if (stepContext != null) { - stepContext.noteSideOutput(tag, windowedElem); - } - } - - // Following implementations of output, outputWithTimestamp, and sideOutput - // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by - // ProcessContext's versions in DoFn.processElement. - @Override - public void output(OutputT output) { - outputWindowedValue(output, null, null, PaneInfo.NO_FIRING); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - checkNotNull(tag, "TupleTag passed to sideOutput cannot be null"); - sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null"); - sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( - String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { - checkNotNull(combiner, "Combiner passed to createAggregator cannot be null"); - return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner); - } - } - - /** - * A concrete implementation of {@link DoFn.ProcessContext} used for running a {@link DoFn} over a - * single element. - * - * @param <InputT> the type of the {@link DoFn} (main) input elements - * @param <OutputT> the type of the {@link DoFn} (main) output elements - */ - private static class DoFnProcessContext<InputT, OutputT> - extends DoFn<InputT, OutputT>.ProcessContext { - - final DoFn<InputT, OutputT> fn; - final DoFnContext<InputT, OutputT> context; - final WindowedValue<InputT> windowedValue; - - public DoFnProcessContext( - DoFn<InputT, OutputT> fn, - DoFnContext<InputT, OutputT> context, - WindowedValue<InputT> windowedValue) { - fn.super(); - this.fn = fn; - this.context = context; - this.windowedValue = windowedValue; - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public InputT element() { - return windowedValue.getValue(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - checkNotNull(view, "View passed to sideInput cannot be null"); - Iterator<? extends BoundedWindow> windowIter = windows().iterator(); - BoundedWindow window; - if (!windowIter.hasNext()) { - if (context.windowFn instanceof GlobalWindows) { - // TODO: Remove this once GroupByKeyOnly no longer outputs elements - // without windows - window = GlobalWindow.INSTANCE; - } else { - throw new IllegalStateException( - "sideInput called when main input element is not in any windows"); - } - } else { - window = windowIter.next(); - if (windowIter.hasNext()) { - throw new IllegalStateException( - "sideInput called when main input element is in multiple windows"); - } - } - return context.sideInput(view, window); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public void output(OutputT output) { - context.outputWindowedValue(windowedValue.withValue(output)); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - checkTimestamp(timestamp); - context.outputWindowedValue( - output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); - } - - void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - context.outputWindowedValue(output, timestamp, windows, pane); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - checkNotNull(tag, "Tag passed to sideOutput cannot be null"); - context.sideOutputWindowedValue(tag, windowedValue.withValue(output)); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null"); - checkTimestamp(timestamp); - context.sideOutputWindowedValue( - tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); - } - - @Override - public Instant timestamp() { - return windowedValue.getTimestamp(); - } - - public Collection<? extends BoundedWindow> windows() { - return windowedValue.getWindows(); - } - - private void checkTimestamp(Instant timestamp) { - if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) { - throw new IllegalArgumentException( - String.format( - "Cannot output with timestamp %s. Output timestamps must be no earlier than the " - + "timestamp of the current input (%s) minus the allowed skew (%s). See the " - + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed " - + "skew.", - timestamp, - windowedValue.getTimestamp(), - PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()))); - } - } - - @Override - protected <AggregatorInputT, AggregatorOutputT> - Aggregator<AggregatorInputT, AggregatorOutputT> createAggregator( - String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) { - return context.createAggregator(name, combiner); - } - } - - private class DoFnExtraContextFactory<InputT, OutputT> - implements DoFn.ExtraContextFactory<InputT, OutputT> { - - /** The windows of the current element. */ - private final Collection<? extends BoundedWindow> windows; - - /** The pane of the current element. */ - private final PaneInfo pane; - - public DoFnExtraContextFactory(Collection<? extends BoundedWindow> windows, PaneInfo pane) { - this.windows = windows; - this.pane = pane; - } - - @Override - public BoundedWindow window() { - return Iterables.getOnlyElement(windows); - } - - @Override - public InputProvider<InputT> inputProvider() { - throw new UnsupportedOperationException("InputProvider parameters are not supported."); - } - - @Override - public OutputReceiver<OutputT> outputReceiver() { - throw new UnsupportedOperationException("OutputReceiver parameters are not supported."); - } - - @Override - public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { - throw new UnsupportedOperationException("RestrictionTracker parameters are not supported."); - } - - @Override - public WindowingInternals<InputT, OutputT> windowingInternals() { - return new WindowingInternals<InputT, OutputT>() { - @Override - public Collection<? extends BoundedWindow> windows() { - return windows; - } - - @Override - public PaneInfo pane() { - return pane; - } - - @Override - public TimerInternals timerInternals() { - return context.stepContext.timerInternals(); - } - - @Override - public <T> void writePCollectionViewData( - TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) - throws IOException { - @SuppressWarnings("unchecked") - Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder(); - - context.stepContext.writePCollectionViewData( - tag, - data, - IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)), - window(), - windowCoder); - } - - @Override - public StateInternals<?> stateInternals() { - return context.stepContext.stateInternals(); - } - - @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) {} - - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - return context.sideInput(view, mainInputWindow); - } - }; -- } -- } --} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --cc runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index b2d61c3,e02c8a6..f87f1c1 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@@ -401,31 -450,19 +450,41 @@@ public class DirectRunne } @Override - public State waitUntilFinish(Duration duration) throws IOException { - throw new UnsupportedOperationException( - "DirectPipelineResult does not support waitUntilFinish."); + public ExecutorService get() { + return Executors.newFixedThreadPool(options.getTargetParallelism()); + } + } + + + /** + * A {@link Supplier} that creates a {@link NanosOffsetClock}. + */ + private static class NanosOffsetClockSupplier implements Supplier<Clock> { + @Override + public Clock get() { + return NanosOffsetClock.create(); } } + + /** + * A {@link Supplier} that creates a {@link ExecutorService} based on + * {@link Executors#newFixedThreadPool(int)}. + */ + private static class FixedThreadPoolSupplier implements Supplier<ExecutorService> { + @Override + public ExecutorService get() { + return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + } + } + + + /** + * A {@link Supplier} that creates a {@link NanosOffsetClock}. + */ + private static class NanosOffsetClockSupplier implements Supplier<Clock> { + @Override + public Clock get() { + return NanosOffsetClock.create(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --cc runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 9edc50f,3dd44a7..6485714 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@@ -84,44 -82,38 +82,75 @@@ class TransformEvaluatorRegistry implem throws Exception { checkState( !finished.get(), "Tried to get an evaluator for a finished TransformEvaluatorRegistry"); - TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass()); - return factory.forApplication(application, inputBundle, evaluationContext); + Class<? extends PTransform> transformClass = application.getTransform().getClass(); + TransformEvaluatorFactory factory = + checkNotNull( + factories.get(transformClass), "No evaluator for PTransform type %s", transformClass); + return factory.forApplication(application, inputBundle); + } + + @Override + public void cleanup() throws Exception { + Collection<Exception> thrownInCleanup = new ArrayList<>(); + for (TransformEvaluatorFactory factory : factories.values()) { + try { + factory.cleanup(); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + thrownInCleanup.add(e); + } + } + finished.set(true); + if (!thrownInCleanup.isEmpty()) { + LOG.error("Exceptions {} thrown while cleaning up evaluators", thrownInCleanup); + Exception toThrow = null; + for (Exception e : thrownInCleanup) { + if (toThrow == null) { + toThrow = e; + } else { + toThrow.addSuppressed(e); + } + } + throw toThrow; + } } + + @Override + public void cleanup() throws Exception { + Collection<Exception> thrownInCleanup = new ArrayList<>(); + for (TransformEvaluatorFactory factory : factories.values()) { + try { + factory.cleanup(); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + thrownInCleanup.add(e); + } + } + finished.set(true); + if (!thrownInCleanup.isEmpty()) { + LOG.error("Exceptions {} thrown while cleaning up evaluators", thrownInCleanup); + Exception toThrow = null; + for (Exception e : thrownInCleanup) { + if (toThrow == null) { + toThrow = e; + } else { + toThrow.addSuppressed(e); + } + } + throw toThrow; + } + } + + /** + * A factory to create Transform Evaluator Registries. + */ + public static class Factory { + public TransformEvaluatorRegistry create() { + return TransformEvaluatorRegistry.defaultRegistry(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java ---------------------------------------------------------------------- diff --cc runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java index 58f41b6,58f41b6..0000000 deleted file mode 100644,100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java +++ /dev/null @@@ -1,22 -1,22 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --/** -- * Flink Beam runner exemple. -- */ --package org.apache.beam.runners.flink.examples.streaming; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java ---------------------------------------------------------------------- diff --cc runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index 949c381,b211c04..b84a1a8 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@@ -65,5 -68,12 +68,13 @@@ public class DoFnInfo<InputT, OutputT> public Coder<InputT> getInputCoder() { return inputCoder; } + + public long getMainOutput() { + return mainOutput; + } + + public Map<Long, TupleTag<?>> getOutputMap() { + return outputMap; + } } + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java ---------------------------------------------------------------------- diff --cc runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java index 454b760,4dfbee6..69c450e --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@@ -37,22 -42,26 +42,28 @@@ import org.apache.spark.api.java.functi * @param <OutputT> Output element type. */ public class DoFnFunction<InputT, OutputT> - implements FlatMapFunction<Iterator<WindowedValue<InputT>>, - WindowedValue<OutputT>> { + implements FlatMapFunction<Iterator<WindowedValue<InputT>>, WindowedValue<OutputT>> { + private final Accumulator<NamedAggregators> accum; private final OldDoFn<InputT, OutputT> mFunction; + private static final Logger LOG = LoggerFactory.getLogger(DoFnFunction.class); + private final SparkRuntimeContext mRuntimeContext; - private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs; + private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> mSideInputs; + private final WindowFn<Object, ?> windowFn; /** - * @param fn DoFunction to be wrapped. - * @param runtime Runtime to apply function in. - * @param sideInputs Side inputs used in DoFunction. + * @param accum The Spark Accumulator that handles the Beam Aggregators. + * @param fn DoFunction to be wrapped. + * @param runtime Runtime to apply function in. + * @param sideInputs Side inputs used in DoFunction. + * @param windowFn Input {@link WindowFn}. */ - public DoFnFunction(OldDoFn<InputT, OutputT> fn, - SparkRuntimeContext runtime, - Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) { + public DoFnFunction(Accumulator<NamedAggregators> accum, + OldDoFn<InputT, OutputT> fn, + SparkRuntimeContext runtime, + Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs, + WindowFn<Object, ?> windowFn) { + this.accum = accum; this.mFunction = fn; this.mRuntimeContext = runtime; this.mSideInputs = sideInputs; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --cc sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index c0761b1,2dbcda7..78ea988 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@@ -32,21 -34,24 +33,36 @@@ import java.nio.channels.WritableByteCh import java.nio.charset.StandardCharsets; import java.util.NoSuchElementException; import java.util.regex.Pattern; - import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.Read.Bounded; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.MimeTypes; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + import org.apache.beam.sdk.coders.Coder; + import org.apache.beam.sdk.coders.Coder.Context; + import org.apache.beam.sdk.coders.StringUtf8Coder; + import org.apache.beam.sdk.coders.VoidCoder; + import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; + import org.apache.beam.sdk.io.Read.Bounded; + import org.apache.beam.sdk.options.PipelineOptions; + import org.apache.beam.sdk.transforms.PTransform; + import org.apache.beam.sdk.transforms.display.DisplayData; + import org.apache.beam.sdk.util.IOChannelUtils; + import org.apache.beam.sdk.util.MimeTypes; + import org.apache.beam.sdk.values.PBegin; + import org.apache.beam.sdk.values.PCollection; + import org.apache.beam.sdk.values.PDone; + /** * {@link PTransform}s for reading and writing text files. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --cc sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 59c8323,018877f..f2fa87c --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@@ -339,21 -423,135 +423,149 @@@ public abstract class DoFn<InputT, Outp ///////////////////////////////////////////////////////////////////////////// + + /** + * Annotation for the method to use to prepare an instance for processing bundles of elements. The + * method annotated with this must satisfy the following constraints + * <ul> + * <li>It must have zero arguments. + * </ul> + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface Setup { + } + /** + * Annotation for declaring and dereferencing state cells. + * + * <p><i>Not currently supported by any runner. When ready, the feature will work as described + * here.</i> + * + * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a {@link + * StateId}. To use the cell during processing, add a parameter of the appropriate {@link State} + * subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and + * annotate it with {@link StateId}. See the following code for an example: + * + * <pre>{@code + * new DoFn<KV<Key, Foo>, Baz>() { + * @StateId("my-state-id") + * private final StateSpec<K, ValueState<MyState>> myStateSpec = + * StateSpecs.value(new MyStateCoder()); + * + * @ProcessElement + * public void processElement( + * ProcessContext c, + * @StateId("my-state-id") ValueState<MyState> myState) { + * myState.read(); + * myState.write(...); + * } + * } + * }</pre> + * + * <p>State is subject to the following validity conditions: + * + * <ul> + * <li>Each state ID must be declared at most once. + * <li>Any state referenced in a parameter must be declared with the same state type. + * <li>State declarations must be final. + * </ul> + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.FIELD, ElementType.PARAMETER}) + @Experimental(Kind.STATE) + public @interface StateId { + /** The state ID. */ + String value(); + } + + /** + * Annotation for declaring and dereferencing timers. + * + * <p><i>Not currently supported by any runner. When ready, the feature will work as described + * here.</i> + * + * <p>To declare a timer, create a field of type {@link TimerSpec} annotated with a {@link + * TimerId}. To use the cell during processing, add a parameter of the type {@link Timer} to your + * {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and annotate it with + * {@link TimerId}. See the following code for an example: + * + * <pre>{@code + * new DoFn<KV<Key, Foo>, Baz>() { + * @TimerId("my-timer-id") + * private final TimerSpec myTimer = TimerSpecs.timerForDomain(TimeDomain.EVENT_TIME); + * + * @ProcessElement + * public void processElement( + * ProcessContext c, + * @TimerId("my-timer-id") Timer myTimer) { + * myTimer.setForNowPlus(Duration.standardSeconds(...)); + * } + * + * @OnTimer("my-timer-id") + * public void onMyTimer() { + * ... + * } + * } + * }</pre> + * + * <p>Timers are subject to the following validity conditions: + * + * <ul> + * <li>Each timer must have a distinct id. + * <li>Any timer referenced in a parameter must be declared. + * <li>Timer declarations must be final. + * <li>All declared timers must have a corresponding callback annotated with {@link + * OnTimer @OnTimer}. + * </ul> + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.FIELD, ElementType.PARAMETER}) + @Experimental(Kind.TIMERS) + public @interface TimerId { + /** The timer ID. */ + String value(); + } + + /** + * Annotation for registering a callback for a timer. + * + * <p><i>Not currently supported by any runner. When ready, the feature will work as described + * here.</i> + * + * <p>See the javadoc for {@link TimerId} for use in a full example. + * + * <p>The method annotated with {@code @OnTimer} may have parameters according to the same logic + * as {@link ProcessElement}, but limited to the {@link BoundedWindow}, {@link State} subclasses, + * and {@link Timer}. State and timer parameters must be annotated with their {@link StateId} and + * {@link TimerId} respectively. + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @Experimental(Kind.TIMERS) + public @interface OnTimer { + /** The timer ID. */ + String value(); + } + + /** + * Annotation for the method to use to prepare an instance for processing bundles of elements. The + * method annotated with this must satisfy the following constraints + * <ul> + * <li>It must have zero arguments. + * </ul> + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface Setup { + } + + /** * Annotation for the method to use to prepare an instance for processing a batch of elements. * The method annotated with this must satisfy the following constraints: * <ul> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ----------------------------------------------------------------------
