Merge branch 'master' into gearpump-runner

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9dc9be9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9dc9be9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9dc9be9e

Branch: refs/heads/gearpump-runner
Commit: 9dc9be9e6d0aa202e170d64e1a1d7b5a57828c5a
Parents: 8f4334c f2fe1ae
Author: manuzhang <[email protected]>
Authored: Wed Oct 26 11:15:24 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Wed Oct 26 11:15:24 2016 +0800

----------------------------------------------------------------------
 .gitignore                                      |   8 +
 .travis.yml                                     |  10 +-
 examples/java/pom.xml                           | 154 ++-
 .../beam/examples/DebuggingWordCount.java       |   2 +-
 .../apache/beam/examples/MinimalWordCount.java  |   3 +-
 .../apache/beam/examples/WindowedWordCount.java |   2 +-
 .../org/apache/beam/examples/WordCount.java     |   4 +-
 .../common/ExampleBigQueryTableOptions.java     |   2 +-
 ...xamplePubsubTopicAndSubscriptionOptions.java |   2 +-
 .../common/ExamplePubsubTopicOptions.java       |   2 +-
 .../beam/examples/common/ExampleUtils.java      |   2 +-
 .../examples/common/PubsubFileInjector.java     | 153 ---
 .../beam/examples/complete/AutoComplete.java    |   2 +-
 .../apache/beam/examples/complete/TfIdf.java    |   2 +-
 .../examples/complete/TopWikipediaSessions.java |   2 +-
 .../examples/cookbook/BigQueryTornadoes.java    |   2 +-
 .../cookbook/CombinePerKeyExamples.java         |   2 +-
 .../examples/cookbook/DatastoreWordCount.java   | 261 -----
 .../beam/examples/cookbook/DeDupExample.java    |   4 +-
 .../beam/examples/cookbook/FilterExamples.java  |   2 +-
 .../beam/examples/cookbook/JoinExamples.java    |   2 +-
 .../examples/cookbook/MaxPerKeyExamples.java    |   2 +-
 .../beam/examples/cookbook/TriggerExample.java  |  28 +-
 .../beam/examples/WindowedWordCountIT.java      |  75 ++
 .../org/apache/beam/examples/WordCountIT.java   |   6 -
 .../examples/cookbook/BigQueryTornadoesIT.java  |  14 +-
 examples/java8/pom.xml                          |   2 +-
 .../beam/examples/complete/game/GameStats.java  |  10 +-
 .../examples/complete/game/HourlyTeamScore.java |   8 +-
 .../examples/complete/game/LeaderBoard.java     |  12 +-
 .../beam/examples/complete/game/UserScore.java  |  10 +-
 .../complete/game/injector/Injector.java        |  10 +-
 .../examples/complete/game/LeaderBoardTest.java |   5 +-
 examples/pom.xml                                |   2 +-
 pom.xml                                         | 134 ++-
 runners/core-java/pom.xml                       |   8 +-
 .../beam/runners/core/AggregatorFactory.java    |  39 +
 .../beam/runners/core/BatchTimerInternals.java  | 140 ---
 .../apache/beam/runners/core/DoFnRunner.java    |   8 +-
 .../beam/runners/core/DoFnRunnerBase.java       | 559 -----------
 .../apache/beam/runners/core/DoFnRunners.java   | 191 +++-
 .../runners/core/ElementAndRestriction.java     |  42 +
 .../core/ElementAndRestrictionCoder.java        |  67 ++
 .../runners/core/ElementByteSizeObservable.java |   5 +-
 .../runners/core/GBKIntoKeyedWorkItems.java     |  55 +
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   5 +
 .../runners/core/GroupAlsoByWindowsDoFn.java    |  19 -
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  12 +-
 .../runners/core/ReduceFnContextFactory.java    |   2 +-
 .../beam/runners/core/ReduceFnRunner.java       |  42 +-
 .../beam/runners/core/SideInputHandler.java     |   2 +-
 .../beam/runners/core/SimpleDoFnRunner.java     |  58 --
 .../beam/runners/core/SimpleOldDoFnRunner.java  | 521 ++++++++++
 .../beam/runners/core/SplittableParDo.java      | 469 +++++++++
 .../apache/beam/runners/core/TriggerRunner.java | 247 -----
 .../core/UnboundedReadFromBoundedSource.java    |   2 +-
 .../core/triggers/AfterAllStateMachine.java     | 109 ++
 .../AfterDelayFromFirstElementStateMachine.java | 337 +++++++
 .../core/triggers/AfterEachStateMachine.java    | 130 +++
 .../core/triggers/AfterFirstStateMachine.java   | 112 +++
 .../core/triggers/AfterPaneStateMachine.java    | 139 +++
 .../AfterProcessingTimeStateMachine.java        |  93 ++
 ...rSynchronizedProcessingTimeStateMachine.java |  63 ++
 .../triggers/AfterWatermarkStateMachine.java    | 325 ++++++
 .../triggers/DefaultTriggerStateMachine.java    |  81 ++
 .../triggers/ExecutableTriggerStateMachine.java | 160 +++
 .../runners/core/triggers/FinishedTriggers.java |  44 +
 .../core/triggers/FinishedTriggersBitSet.java   |  67 ++
 .../core/triggers/FinishedTriggersSet.java      |  72 ++
 .../core/triggers/NeverStateMachine.java        |  60 ++
 .../core/triggers/OrFinallyStateMachine.java    |  85 ++
 .../core/triggers/RepeatedlyStateMachine.java   |  88 ++
 .../triggers/ReshuffleTriggerStateMachine.java  |  50 +
 .../core/triggers/TriggerStateMachine.java      | 487 +++++++++
 .../TriggerStateMachineContextFactory.java      | 509 ++++++++++
 .../triggers/TriggerStateMachineRunner.java     | 234 +++++
 .../core/triggers/TriggerStateMachines.java     | 215 ++++
 .../runners/core/triggers/package-info.java     |  23 +
 .../runners/core/BatchTimerInternalsTest.java   | 118 ---
 .../core/ElementAndRestrictionCoderTest.java    | 127 +++
 .../beam/runners/core/ReduceFnRunnerTest.java   | 281 +++---
 .../beam/runners/core/ReduceFnTester.java       | 405 +++-----
 .../beam/runners/core/SimpleDoFnRunnerTest.java |  88 --
 .../runners/core/SimpleOldDoFnRunnerTest.java   |  88 ++
 .../beam/runners/core/SplittableParDoTest.java  | 467 +++++++++
 .../core/triggers/AfterAllStateMachineTest.java | 140 +++
 .../triggers/AfterEachStateMachineTest.java     | 108 ++
 .../triggers/AfterFirstStateMachineTest.java    | 159 +++
 .../triggers/AfterPaneStateMachineTest.java     | 117 +++
 .../AfterProcessingTimeStateMachineTest.java    | 172 ++++
 ...chronizedProcessingTimeStateMachineTest.java | 110 ++
 .../AfterWatermarkStateMachineTest.java         | 382 +++++++
 .../DefaultTriggerStateMachineTest.java         | 165 +++
 .../ExecutableTriggerStateMachineTest.java      | 108 ++
 .../triggers/FinishedTriggersBitSetTest.java    |  55 +
 .../triggers/FinishedTriggersProperties.java    | 115 +++
 .../core/triggers/FinishedTriggersSetTest.java  |  60 ++
 .../core/triggers/NeverStateMachineTest.java    |  59 ++
 .../triggers/OrFinallyStateMachineTest.java     | 177 ++++
 .../triggers/RepeatedlyStateMachineTest.java    | 200 ++++
 .../ReshuffleTriggerStateMachineTest.java       |  68 ++
 .../core/triggers/StubTriggerStateMachine.java  |  60 ++
 .../core/triggers/TriggerStateMachineTest.java  |  98 ++
 .../triggers/TriggerStateMachineTester.java     | 431 ++++++++
 .../core/triggers/TriggerStateMachinesTest.java | 199 ++++
 runners/direct-java/pom.xml                     |  26 +-
 .../runners/direct/AggregatorContainer.java     |   9 +-
 .../direct/BoundedReadEvaluatorFactory.java     | 155 +--
 .../beam/runners/direct/BundleFactory.java      |  15 +-
 .../runners/direct/CloningBundleFactory.java    |  98 ++
 .../beam/runners/direct/CompletionCallback.java |   4 +-
 .../runners/direct/DirectExecutionContext.java  |   2 +-
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  66 ++
 .../beam/runners/direct/DirectMetrics.java      | 338 +++++++
 .../beam/runners/direct/DirectOptions.java      |  40 +-
 .../beam/runners/direct/DirectRunner.java       | 121 ++-
 .../runners/direct/DirectTimerInternals.java    |   2 +-
 .../runners/direct/DoFnLifecycleManager.java    |  86 +-
 .../beam/runners/direct/EmptyInputProvider.java |  45 +
 .../direct/EncodabilityEnforcementFactory.java  |  50 +-
 .../beam/runners/direct/EvaluationContext.java  |  27 +-
 .../direct/ExecutorServiceParallelExecutor.java | 124 ++-
 .../runners/direct/FlattenEvaluatorFactory.java |  26 +-
 .../runners/direct/ForwardingPTransform.java    |   2 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |  14 +-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |  21 +-
 .../ImmutabilityCheckingBundleFactory.java      |  18 +-
 .../direct/ImmutableListBundleFactory.java      |  76 +-
 .../beam/runners/direct/ParDoEvaluator.java     |  13 +-
 .../direct/ParDoMultiEvaluatorFactory.java      |  29 +-
 .../runners/direct/ParDoOverrideFactory.java    |  55 +
 .../direct/ParDoSingleEvaluatorFactory.java     |  23 +-
 .../beam/runners/direct/PipelineExecutor.java   |   2 +-
 .../beam/runners/direct/RootInputProvider.java  |  46 +
 .../runners/direct/RootProviderRegistry.java    |  66 ++
 .../runners/direct/StepTransformResult.java     |  49 +-
 .../beam/runners/direct/StructuralKey.java      |  88 +-
 .../direct/TestStreamEvaluatorFactory.java      | 152 +--
 .../beam/runners/direct/TransformEvaluator.java |   2 +-
 .../direct/TransformEvaluatorFactory.java       |  22 +-
 .../direct/TransformEvaluatorRegistry.java      |  63 +-
 .../beam/runners/direct/TransformExecutor.java  |  53 +-
 .../beam/runners/direct/TransformResult.java    |  16 +-
 .../direct/UnboundedReadEvaluatorFactory.java   | 318 +++---
 .../direct/UncommittedBundleOutputManager.java  |   4 +-
 .../runners/direct/ViewEvaluatorFactory.java    |  16 +-
 .../beam/runners/direct/WatermarkManager.java   |  53 +-
 .../runners/direct/WindowEvaluatorFactory.java  |  18 +-
 .../direct/WriteWithShardingFactory.java        |   6 +-
 .../direct/BoundedReadEvaluatorFactoryTest.java | 181 ++--
 .../direct/CloningBundleFactoryTest.java        | 177 ++++
 .../runners/direct/CommittedResultTest.java     |  10 +-
 .../ConsumerTrackingPipelineVisitorTest.java    |  32 +-
 .../beam/runners/direct/DirectMetricsTest.java  | 133 +++
 .../beam/runners/direct/DirectRunnerTest.java   | 180 +++-
 ...leManagerRemovingTransformEvaluatorTest.java |  16 +-
 .../direct/DoFnLifecycleManagerTest.java        |  86 +-
 .../direct/DoFnLifecycleManagersTest.java       |  48 +-
 .../EncodabilityEnforcementFactoryTest.java     | 132 ++-
 .../runners/direct/EvaluationContextTest.java   |  31 +-
 .../direct/FlattenEvaluatorFactoryTest.java     |  36 +-
 .../direct/ForwardingPTransformTest.java        |   7 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |  27 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java     |  35 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  |  67 +-
 .../ImmutabilityEnforcementFactoryTest.java     |  14 +-
 .../direct/ImmutableListBundleFactoryTest.java  |  52 +-
 .../direct/KeyedPValueTrackingVisitorTest.java  |   8 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |  15 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  | 162 +--
 .../direct/ParDoSingleEvaluatorFactoryTest.java | 145 ++-
 .../beam/runners/direct/SplittableDoFnTest.java | 231 +++++
 .../runners/direct/StepTransformResultTest.java |   4 +-
 .../beam/runners/direct/StructuralKeyTest.java  |   9 +
 .../direct/TestStreamEvaluatorFactoryTest.java  | 223 ++---
 .../runners/direct/TransformExecutorTest.java   | 115 +--
 .../UnboundedReadEvaluatorFactoryTest.java      | 321 ++++--
 .../direct/ViewEvaluatorFactoryTest.java        |   6 +-
 .../runners/direct/WatermarkManagerTest.java    |  84 +-
 .../direct/WindowEvaluatorFactoryTest.java      |  12 +-
 runners/flink/README.md                         |  10 +-
 runners/flink/examples/pom.xml                  |   2 +-
 .../beam/runners/flink/examples/TFIDF.java      |  12 +-
 .../beam/runners/flink/examples/WordCount.java  |   4 +-
 .../flink/examples/streaming/AutoComplete.java  |   4 +-
 .../flink/examples/streaming/package-info.java  |  22 -
 runners/flink/pom.xml                           |   4 +-
 runners/flink/runner/pom.xml                    |  10 +-
 .../flink/FlinkDetachedRunnerResult.java        |  76 ++
 .../FlinkPipelineExecutionEnvironment.java      |   7 +
 .../runners/flink/FlinkPipelineOptions.java     |  19 +-
 .../apache/beam/runners/flink/FlinkRunner.java  |  29 +-
 .../runners/flink/FlinkRunnerRegistrar.java     |   4 +-
 .../beam/runners/flink/FlinkRunnerResult.java   |  17 +-
 .../beam/runners/flink/TestFlinkRunner.java     |   9 +-
 .../FlinkBatchPipelineTranslator.java           |   2 +-
 .../FlinkStreamingPipelineTranslator.java       |   2 +-
 .../functions/FlinkDoFnFunction.java            |   2 +-
 .../functions/FlinkMultiOutputDoFnFunction.java |   2 +-
 .../functions/FlinkProcessContext.java          |  10 +-
 .../wrappers/streaming/DoFnOperator.java        |  31 +-
 .../wrappers/streaming/FlinkStateInternals.java |   8 +-
 .../wrappers/streaming/WindowDoFnOperator.java  | 177 +++-
 .../streaming/io/BoundedSourceWrapper.java      |   9 +-
 .../streaming/io/UnboundedSourceWrapper.java    | 187 +++-
 .../beam/runners/flink/FlinkTestPipeline.java   |   6 +-
 .../flink/streaming/DoFnOperatorTest.java       |   9 +-
 .../streaming/UnboundedSourceWrapperTest.java   | 123 +--
 .../translators/TransformTranslator.java        |  30 -
 .../translators/io/UnboundedSourceWrapper.java  |  45 -
 runners/google-cloud-dataflow-java/pom.xml      |  22 +-
 .../runners/dataflow/DataflowPipelineJob.java   |  52 +-
 .../dataflow/DataflowPipelineTranslator.java    | 137 ++-
 .../beam/runners/dataflow/DataflowRunner.java   |  57 +-
 .../dataflow/internal/AssignWindows.java        |   6 +-
 .../DataflowUnboundedReadFromBoundedSource.java |  18 +-
 .../runners/dataflow/internal/IsmFormat.java    |  13 +-
 .../dataflow/internal/ReadTranslator.java       |   2 +-
 .../options/DataflowPipelineDebugOptions.java   |   7 +-
 .../options/DataflowPipelineOptions.java        |  14 +-
 .../DataflowPipelineWorkerPoolOptions.java      |  11 +-
 .../options/DataflowProfilingOptions.java       |   2 +-
 .../options/DataflowWorkerLoggingOptions.java   |   4 +-
 .../dataflow/testing/TestDataflowRunner.java    | 144 ++-
 .../beam/runners/dataflow/util/DoFnInfo.java    |  29 +-
 .../runners/dataflow/util/MonitoringUtil.java   |   2 +-
 .../runners/dataflow/util/RandomAccessData.java |   8 +-
 .../beam/runners/dataflow/util/Stager.java      |   2 +-
 .../dataflow/DataflowPipelineJobTest.java       | 120 +++
 .../DataflowPipelineTranslatorTest.java         |  40 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  13 +-
 ...aflowUnboundedReadFromBoundedSourceTest.java |  83 ++
 .../testing/TestDataflowRunnerTest.java         | 287 +++++-
 .../dataflow/util/MonitoringUtilTest.java       |   4 +-
 runners/pom.xml                                 |   3 +-
 runners/spark/pom.xml                           |  94 +-
 .../beam/runners/spark/EvaluationResult.java    |   4 +-
 .../runners/spark/SparkPipelineOptions.java     |  48 +-
 .../apache/beam/runners/spark/SparkRunner.java  | 176 ++--
 .../beam/runners/spark/TestSparkRunner.java     |  17 +-
 .../spark/aggregators/AccumulatorSingleton.java |  53 +
 .../runners/spark/coders/WritableCoder.java     |   2 +-
 .../beam/runners/spark/examples/WordCount.java  |   2 +-
 .../apache/beam/runners/spark/io/SourceRDD.java | 200 ++++
 .../spark/io/hadoop/ShardNameTemplateAware.java |   2 +-
 .../runners/spark/translation/DoFnFunction.java |  73 +-
 .../spark/translation/EvaluationContext.java    |  32 +-
 .../translation/GroupCombineFunctions.java      | 313 ++++++
 .../spark/translation/MultiDoFnFunction.java    |  77 +-
 .../translation/SparkAbstractCombineFn.java     | 134 +++
 .../spark/translation/SparkContextFactory.java  |  50 +-
 .../spark/translation/SparkGlobalCombineFn.java | 260 +++++
 .../spark/translation/SparkKeyedCombineFn.java  | 273 +++++
 .../translation/SparkPipelineEvaluator.java     |  57 --
 .../translation/SparkPipelineTranslator.java    |   5 +-
 .../spark/translation/SparkProcessContext.java  | 168 +++-
 .../spark/translation/SparkRuntimeContext.java  |  44 +-
 .../spark/translation/TransformTranslator.java  | 592 +++--------
 .../spark/translation/TranslationUtils.java     | 197 ++++
 .../SparkRunnerStreamingContextFactory.java     | 106 ++
 .../streaming/StreamingEvaluationContext.java   |  81 +-
 .../streaming/StreamingTransformTranslator.java | 561 +++++++----
 .../spark/util/SparkSideInputReader.java        |  95 ++
 .../runners/spark/ClearAggregatorsRule.java     |  33 +
 .../apache/beam/runners/spark/DeDupTest.java    |  59 --
 .../beam/runners/spark/EmptyInputTest.java      |  75 --
 .../beam/runners/spark/SimpleWordCountTest.java | 105 --
 .../apache/beam/runners/spark/TfIdfTest.java    | 260 -----
 .../beam/runners/spark/io/AvroPipelineTest.java |   4 +-
 .../beam/runners/spark/io/NumShardsTest.java    |   4 +-
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   4 +-
 .../spark/translation/CombineGloballyTest.java  | 102 --
 .../spark/translation/CombinePerKeyTest.java    |  79 --
 .../spark/translation/DoFnOutputTest.java       |  67 --
 .../translation/MultiOutputWordCountTest.java   | 176 ----
 .../spark/translation/SerializationTest.java    | 201 ----
 .../spark/translation/SideEffectsTest.java      |  21 +-
 .../translation/SparkPipelineOptionsTest.java   |  42 -
 .../translation/TransformTranslatorTest.java    | 104 --
 .../translation/WindowedWordCountTest.java      | 120 ---
 .../streaming/EmptyStreamAssertionTest.java     |  80 ++
 .../streaming/FlattenStreamingTest.java         |  57 +-
 .../streaming/KafkaStreamingTest.java           |  32 +-
 .../ResumeFromCheckpointStreamingTest.java      | 182 ++++
 .../streaming/SimpleStreamingWordCountTest.java |  67 +-
 .../streaming/utils/PAssertStreaming.java       |  87 +-
 .../utils/TestOptionsForStreaming.java          |  55 +
 .../spark/src/test/resources/metrics.properties |  61 +-
 sdks/java/build-tools/pom.xml                   |   2 +-
 .../src/main/resources/beam/checkstyle.xml      |  28 +-
 .../src/main/resources/beam/findbugs-filter.xml |   2 +-
 .../src/main/resources/beam/suppressions.xml    |  11 +-
 sdks/java/core/pom.xml                          |  43 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |  12 +-
 .../org/apache/beam/sdk/PipelineResult.java     |  15 +-
 .../beam/sdk/annotations/Experimental.java      |  15 +-
 .../sdk/coders/CannotProvideCoderException.java |   2 +-
 .../java/org/apache/beam/sdk/coders/Coder.java  |  28 +-
 .../apache/beam/sdk/coders/CoderFactory.java    |   4 +-
 .../apache/beam/sdk/coders/CoderProvider.java   |   2 +-
 .../apache/beam/sdk/coders/DelegateCoder.java   |   4 +-
 .../beam/sdk/coders/IterableLikeCoder.java      |  22 +-
 .../apache/beam/sdk/coders/NullableCoder.java   |  10 +-
 .../apache/beam/sdk/coders/package-info.java    |   2 +-
 .../beam/sdk/coders/protobuf/ProtoCoder.java    |   4 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 162 ++-
 .../java/org/apache/beam/sdk/io/AvroSource.java |  98 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |  11 +-
 .../org/apache/beam/sdk/io/BoundedSource.java   |  19 +-
 .../apache/beam/sdk/io/CompressedSource.java    |  18 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 145 ++-
 .../apache/beam/sdk/io/OffsetBasedSource.java   |   6 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   |  24 +-
 .../beam/sdk/io/PubsubUnboundedSource.java      |   2 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |   4 +-
 .../sdk/io/SerializableAvroCodecFactory.java    | 112 +++
 .../main/java/org/apache/beam/sdk/io/Sink.java  |   5 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 247 +++--
 .../main/java/org/apache/beam/sdk/io/Write.java |   8 +-
 .../java/org/apache/beam/sdk/io/XmlSource.java  |  13 +-
 .../apache/beam/sdk/io/range/ByteKeyRange.java  |   6 +-
 .../apache/beam/sdk/io/range/RangeTracker.java  |   1 +
 .../org/apache/beam/sdk/metrics/Counter.java    |  40 +
 .../apache/beam/sdk/metrics/CounterCell.java    |  76 ++
 .../org/apache/beam/sdk/metrics/DirtyState.java |  98 ++
 .../apache/beam/sdk/metrics/Distribution.java   |  30 +
 .../beam/sdk/metrics/DistributionCell.java      |  58 ++
 .../beam/sdk/metrics/DistributionData.java      |  59 ++
 .../beam/sdk/metrics/DistributionResult.java    |  42 +
 .../org/apache/beam/sdk/metrics/Metric.java     |  24 +
 .../org/apache/beam/sdk/metrics/MetricCell.java |  47 +
 .../org/apache/beam/sdk/metrics/MetricKey.java  |  40 +
 .../org/apache/beam/sdk/metrics/MetricName.java |  46 +
 .../beam/sdk/metrics/MetricNameFilter.java      |  60 ++
 .../beam/sdk/metrics/MetricQueryResults.java    |  33 +
 .../apache/beam/sdk/metrics/MetricResult.java   |  45 +
 .../apache/beam/sdk/metrics/MetricResults.java  |  34 +
 .../apache/beam/sdk/metrics/MetricUpdates.java  |  72 ++
 .../org/apache/beam/sdk/metrics/Metrics.java    | 110 ++
 .../beam/sdk/metrics/MetricsContainer.java      | 150 +++
 .../beam/sdk/metrics/MetricsEnvironment.java    |  85 ++
 .../apache/beam/sdk/metrics/MetricsFilter.java  |  86 ++
 .../org/apache/beam/sdk/metrics/MetricsMap.java |  86 ++
 .../apache/beam/sdk/metrics/package-info.java   |  28 +
 .../options/CloudResourceManagerOptions.java    |  40 +
 .../org/apache/beam/sdk/options/GcpOptions.java |   8 +-
 .../org/apache/beam/sdk/options/GcsOptions.java |   4 +-
 .../beam/sdk/options/GoogleApiDebugOptions.java |   2 +-
 .../beam/sdk/options/PipelineOptions.java       |  52 +-
 .../sdk/options/PipelineOptionsFactory.java     | 269 +++--
 .../sdk/options/ProxyInvocationHandler.java     | 264 +++--
 .../apache/beam/sdk/options/ValueProvider.java  | 239 +++++
 .../java/org/apache/beam/sdk/package-info.java  |   2 +-
 .../sdk/runners/PipelineRunnerRegistrar.java    |   2 +-
 .../apache/beam/sdk/runners/package-info.java   |  17 +-
 .../beam/sdk/testing/BigqueryMatcher.java       | 239 +++++
 .../org/apache/beam/sdk/testing/PAssert.java    |   4 +-
 .../beam/sdk/testing/SerializableMatchers.java  |   4 +-
 .../beam/sdk/testing/SourceTestUtils.java       |   3 +-
 .../apache/beam/sdk/testing/StreamingIT.java    |  35 +
 .../org/apache/beam/sdk/testing/TestStream.java |   2 +-
 .../apache/beam/sdk/testing/package-info.java   |   4 +-
 .../apache/beam/sdk/transforms/Aggregator.java  |  11 +-
 .../sdk/transforms/ApproximateQuantiles.java    |   5 +-
 .../org/apache/beam/sdk/transforms/Combine.java |  95 +-
 .../beam/sdk/transforms/CombineFnBase.java      |  17 +-
 .../apache/beam/sdk/transforms/CombineFns.java  |  41 +-
 .../beam/sdk/transforms/CombineWithContext.java |   7 +-
 .../sdk/transforms/DelegatingAggregator.java    | 125 +++
 .../org/apache/beam/sdk/transforms/DoFn.java    | 445 ++++++++-
 .../beam/sdk/transforms/DoFnAdapters.java       | 184 +++-
 .../apache/beam/sdk/transforms/DoFnTester.java  | 257 +++--
 .../beam/sdk/transforms/FlatMapElements.java    |   6 +-
 .../apache/beam/sdk/transforms/GroupByKey.java  |   4 +-
 .../transforms/IntraBundleParallelization.java  | 361 -------
 .../org/apache/beam/sdk/transforms/Latest.java  |  12 +-
 .../apache/beam/sdk/transforms/MapElements.java |   8 +-
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 318 ++++--
 .../apache/beam/sdk/transforms/PTransform.java  |  10 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   | 341 +++++--
 .../apache/beam/sdk/transforms/Partition.java   |   4 +-
 .../beam/sdk/transforms/RemoveDuplicates.java   |   5 +-
 .../sdk/transforms/SerializableFunction.java    |   2 +-
 .../org/apache/beam/sdk/transforms/ViewFn.java  |   2 +-
 .../apache/beam/sdk/transforms/WithKeys.java    |   2 +-
 .../sdk/transforms/display/DisplayData.java     | 530 ++++++----
 .../sdk/transforms/reflect/DoFnInvoker.java     |  48 +-
 .../sdk/transforms/reflect/DoFnInvokers.java    | 658 ++++++++----
 .../sdk/transforms/reflect/DoFnSignature.java   | 466 ++++++++-
 .../sdk/transforms/reflect/DoFnSignatures.java  | 946 +++++++++++++++---
 .../splittabledofn/RestrictionTracker.java      |  42 +
 .../transforms/splittabledofn/package-info.java |  22 +
 .../beam/sdk/transforms/windowing/AfterAll.java |  51 +-
 .../windowing/AfterDelayFromFirstElement.java   | 110 +-
 .../sdk/transforms/windowing/AfterEach.java     |  63 +-
 .../sdk/transforms/windowing/AfterFirst.java    |  52 +-
 .../sdk/transforms/windowing/AfterPane.java     |  59 +-
 .../windowing/AfterProcessingTime.java          |   7 -
 .../AfterSynchronizedProcessingTime.java        |  13 +-
 .../transforms/windowing/AfterWatermark.java    | 174 +---
 .../transforms/windowing/DefaultTrigger.java    |  37 +-
 .../beam/sdk/transforms/windowing/Never.java    |  28 +-
 .../transforms/windowing/OrFinallyTrigger.java  |  55 +-
 .../beam/sdk/transforms/windowing/PaneInfo.java |   6 +-
 .../sdk/transforms/windowing/Repeatedly.java    |  36 +-
 .../transforms/windowing/SlidingWindows.java    |   3 +-
 .../beam/sdk/transforms/windowing/Trigger.java  | 421 ++------
 .../beam/sdk/transforms/windowing/Window.java   |  20 +-
 .../beam/sdk/transforms/windowing/WindowFn.java |   2 +-
 ...AttemptAndTimeBoundedExponentialBackOff.java | 173 ++++
 .../util/AttemptBoundedExponentialBackOff.java  |  86 ++
 .../beam/sdk/util/BaseExecutionContext.java     |   4 +-
 .../apache/beam/sdk/util/CredentialFactory.java |   2 +-
 .../apache/beam/sdk/util/ExecutableTrigger.java |  40 +-
 .../sdk/util/ExposedByteArrayOutputStream.java  |   1 +
 .../apache/beam/sdk/util/FinishedTriggers.java  |  44 -
 .../beam/sdk/util/FinishedTriggersBitSet.java   |  67 --
 .../beam/sdk/util/FinishedTriggersSet.java      |  72 --
 .../apache/beam/sdk/util/GatherAllPanes.java    |  10 +-
 .../apache/beam/sdk/util/GcpProjectUtil.java    | 106 ++
 .../apache/beam/sdk/util/GcsPathValidator.java  |   2 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  94 +-
 .../apache/beam/sdk/util/IOChannelFactory.java  |   2 +-
 .../beam/sdk/util/MergingActiveWindowSet.java   |  12 +-
 .../apache/beam/sdk/util/PCollectionViews.java  |   9 +-
 .../org/apache/beam/sdk/util/PathValidator.java |   6 +-
 .../beam/sdk/util/PerKeyCombineFnRunner.java    |  26 +-
 .../beam/sdk/util/PerKeyCombineFnRunners.java   |   4 +-
 .../org/apache/beam/sdk/util/PropertyNames.java |   1 +
 .../org/apache/beam/sdk/util/PubsubClient.java  |   5 +-
 .../apache/beam/sdk/util/PubsubGrpcClient.java  |  34 +-
 .../apache/beam/sdk/util/PubsubTestClient.java  |   2 +-
 .../sdk/util/ReifyTimestampAndWindowsDoFn.java  |  16 +-
 .../org/apache/beam/sdk/util/ReleaseInfo.java   |   6 +-
 .../apache/beam/sdk/util/ReshuffleTrigger.java  |  16 +-
 .../org/apache/beam/sdk/util/StringUtils.java   |   2 +-
 .../java/org/apache/beam/sdk/util/Timer.java    |  56 ++
 .../apache/beam/sdk/util/TimerInternals.java    |   6 +-
 .../org/apache/beam/sdk/util/TimerSpec.java     |  30 +
 .../org/apache/beam/sdk/util/TimerSpecs.java    |  41 +
 .../java/org/apache/beam/sdk/util/Timers.java   |  10 +-
 .../org/apache/beam/sdk/util/Transport.java     |  17 +
 .../beam/sdk/util/TriggerContextFactory.java    | 507 ----------
 .../apache/beam/sdk/util/ValueWithRecordId.java |   8 +-
 .../beam/sdk/util/common/ReflectHelpers.java    |  22 -
 .../apache/beam/sdk/util/common/Reiterable.java |   2 +-
 .../apache/beam/sdk/util/common/Reiterator.java |   2 +-
 .../CopyOnAccessInMemoryStateInternals.java     |   2 +-
 .../sdk/util/state/InMemoryTimerInternals.java  | 235 +++++
 .../beam/sdk/util/state/ReadableState.java      |  10 +-
 .../apache/beam/sdk/util/state/StateBinder.java |  67 ++
 .../beam/sdk/util/state/StateContext.java       |   6 +-
 .../apache/beam/sdk/util/state/StateSpec.java   |  39 +
 .../apache/beam/sdk/util/state/StateSpecs.java  | 452 +++++++++
 .../apache/beam/sdk/util/state/StateTag.java    |  82 +-
 .../apache/beam/sdk/util/state/StateTags.java   | 386 +-------
 .../util/state/TestInMemoryStateInternals.java  |  61 ++
 .../beam/sdk/util/state/TimerCallback.java      |  35 +
 .../apache/beam/sdk/values/PCollectionView.java |  15 +-
 .../java/org/apache/beam/sdk/values/PInput.java |   8 +-
 .../org/apache/beam/sdk/values/POutput.java     |   8 +-
 .../java/org/apache/beam/sdk/values/PValue.java |   4 +-
 .../apache/beam/sdk/values/TypeDescriptors.java |  40 +-
 .../apache/beam/sdk/values/package-info.java    |   4 +-
 .../dataflow/util/GcsPathValidatorTest.java     | 103 --
 .../org/apache/beam/sdk/DataflowMatchers.java   |  64 --
 .../java/org/apache/beam/sdk/PipelineTest.java  |   2 +-
 .../apache/beam/sdk/coders/AvroCoderTest.java   |   4 +-
 .../beam/sdk/coders/CoderRegistryTest.java      |   1 +
 .../org/apache/beam/sdk/coders/KvCoderTest.java |  99 +-
 .../beam/sdk/coders/NullableCoderTest.java      |  60 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 107 +-
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  43 +
 .../io/BoundedReadFromUnboundedSourceTest.java  |  14 +-
 .../beam/sdk/io/CompressedSourceTest.java       |   4 +-
 .../sdk/io/DrunkWritableByteChannelFactory.java |  80 ++
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 108 ++
 .../beam/sdk/io/OffsetBasedSourceTest.java      |  30 +-
 .../java/org/apache/beam/sdk/io/ReadTest.java   |  62 +-
 .../io/SerializableAvroCodecFactoryTest.java    | 100 ++
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 601 ++++++-----
 .../java/org/apache/beam/sdk/io/WriteTest.java  |   6 +-
 .../beam/sdk/metrics/CounterCellTest.java       |  55 +
 .../apache/beam/sdk/metrics/DirtyStateTest.java |  56 ++
 .../beam/sdk/metrics/DistributionCellTest.java  |  53 +
 .../apache/beam/sdk/metrics/MetricMatchers.java |  99 ++
 .../beam/sdk/metrics/MetricsContainerTest.java  | 129 +++
 .../sdk/metrics/MetricsEnvironmentTest.java     |  63 ++
 .../apache/beam/sdk/metrics/MetricsMapTest.java | 103 ++
 .../apache/beam/sdk/metrics/MetricsTest.java    |  98 ++
 .../sdk/options/PipelineOptionsFactoryTest.java | 184 +++-
 .../beam/sdk/options/PipelineOptionsTest.java   |  49 +-
 .../options/PipelineOptionsValidatorTest.java   |  18 +-
 .../sdk/options/ProxyInvocationHandlerTest.java | 111 ++-
 .../beam/sdk/options/ValueProviderTest.java     | 220 ++++
 .../beam/sdk/testing/BigqueryMatcherTest.java   | 176 ++++
 .../sdk/testing/PCollectionViewTesting.java     |   7 -
 .../beam/sdk/testing/SystemNanoTimeSleeper.java |   2 +-
 .../beam/sdk/testing/TestPipelineTest.java      |   4 +-
 .../beam/sdk/transforms/CombineFnsTest.java     |   7 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  81 +-
 .../DoFnDelegatingAggregatorTest.java           |   5 +-
 .../beam/sdk/transforms/DoFnTesterTest.java     | 458 +++++----
 .../apache/beam/sdk/transforms/FlattenTest.java |  20 +
 .../IntraBundleParallelizationTest.java         | 280 ------
 .../beam/sdk/transforms/LatestFnTest.java       | 233 +++++
 .../apache/beam/sdk/transforms/ParDoTest.java   | 101 +-
 .../display/DisplayDataEvaluator.java           |  13 +-
 .../transforms/display/DisplayDataMatchers.java | 141 ++-
 .../display/DisplayDataMatchersTest.java        |  67 +-
 .../sdk/transforms/display/DisplayDataTest.java | 367 ++++---
 .../transforms/reflect/DoFnInvokersTest.java    | 710 +++++++------
 .../reflect/DoFnInvokersTestHelper.java         | 116 ---
 .../DoFnSignaturesProcessElementTest.java       | 213 ++++
 .../DoFnSignaturesSplittableDoFnTest.java       | 543 ++++++++++
 .../transforms/reflect/DoFnSignaturesTest.java  | 991 ++++++++++++++-----
 .../reflect/DoFnSignaturesTestUtils.java        |  67 ++
 .../testhelper/DoFnInvokersTestHelper.java      | 124 +++
 .../sdk/transforms/windowing/AfterAllTest.java  |  98 --
 .../sdk/transforms/windowing/AfterEachTest.java |  64 --
 .../transforms/windowing/AfterFirstTest.java    | 120 ---
 .../sdk/transforms/windowing/AfterPaneTest.java |  77 --
 .../windowing/AfterProcessingTimeTest.java      |  94 --
 .../AfterSynchronizedProcessingTimeTest.java    |  75 --
 .../windowing/AfterWatermarkTest.java           | 308 ------
 .../windowing/DefaultTriggerTest.java           | 130 ---
 .../sdk/transforms/windowing/NeverTest.java     |  34 +-
 .../windowing/OrFinallyTriggerTest.java         | 136 ---
 .../transforms/windowing/RepeatedlyTest.java    | 161 +--
 .../sdk/transforms/windowing/StubTrigger.java   |  17 -
 .../sdk/transforms/windowing/TriggerTest.java   |  28 -
 .../sdk/transforms/windowing/WindowTest.java    |   4 +-
 .../apache/beam/sdk/util/ApiSurfaceTest.java    |  28 +-
 ...mptAndTimeBoundedExponentialBackOffTest.java | 213 ++++
 .../AttemptBoundedExponentialBackOffTest.java   |  85 ++
 .../beam/sdk/util/ExecutableTriggerTest.java    |  18 -
 .../sdk/util/FinishedTriggersBitSetTest.java    |  55 -
 .../sdk/util/FinishedTriggersProperties.java    | 110 --
 .../beam/sdk/util/FinishedTriggersSetTest.java  |  60 --
 .../beam/sdk/util/GcpProjectUtilTest.java       |  76 ++
 .../beam/sdk/util/GcsPathValidatorTest.java     | 100 ++
 .../org/apache/beam/sdk/util/GcsUtilTest.java   | 112 ++-
 .../beam/sdk/util/PubsubGrpcClientTest.java     | 108 +-
 .../beam/sdk/util/PubsubJsonClientTest.java     |  16 +-
 .../beam/sdk/util/ReshuffleTriggerTest.java     |  23 -
 .../org/apache/beam/sdk/util/TriggerTester.java | 592 -----------
 .../util/state/InMemoryTimerInternalsTest.java  | 116 +++
 sdks/java/extensions/join-library/pom.xml       |   2 +-
 sdks/java/extensions/pom.xml                    |   2 +-
 sdks/java/io/google-cloud-platform/pom.xml      |  16 +-
 .../sdk/io/gcp/bigquery/BigQueryAvroUtils.java  |  79 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 239 +++--
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  15 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 106 +-
 .../gcp/bigquery/BigQueryTableRowIterator.java  | 130 ++-
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    |  56 +-
 .../beam/sdk/io/gcp/datastore/DatastoreIO.java  |   2 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  99 +-
 .../io/gcp/bigquery/BigQueryAvroUtilsTest.java  | 149 ++-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 570 ++++++++++-
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 149 ++-
 .../bigquery/BigQueryTableRowIteratorTest.java  | 169 +++-
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 104 +-
 .../sdk/io/gcp/datastore/SplitQueryFnIT.java    |   4 +-
 .../beam/sdk/io/gcp/datastore/V1ReadIT.java     |   2 +-
 .../beam/sdk/io/gcp/datastore/V1WriteIT.java    |   2 +-
 sdks/java/io/hdfs/pom.xml                       |   2 +-
 .../beam/sdk/io/hdfs/AvroHDFSFileSource.java    |   2 +-
 .../beam/sdk/io/hdfs/AvroWrapperCoder.java      |   2 +-
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java |  10 +-
 .../apache/beam/sdk/io/hdfs/WritableCoder.java  |   2 +-
 .../SimpleAuthAvroHDFSFileSource.java           |   2 +-
 .../hdfs/simpleauth/SimpleAuthHDFSFileSink.java |   2 +-
 .../simpleauth/SimpleAuthHDFSFileSource.java    |   7 +-
 sdks/java/io/jdbc/pom.xml                       | 138 +++
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     | 443 +++++++++
 .../apache/beam/sdk/io/jdbc/package-info.java   |  22 +
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 271 +++++
 sdks/java/io/jms/pom.xml                        |   2 +-
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |  11 +-
 sdks/java/io/kafka/pom.xml                      |   2 +-
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java  |   8 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  90 +-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 166 +++-
 sdks/java/io/kinesis/pom.xml                    |   3 +-
 .../beam/sdk/io/kinesis/CustomOptional.java     |   7 +-
 .../sdk/io/kinesis/GetKinesisRecordsResult.java |   2 +-
 .../sdk/io/kinesis/KinesisClientProvider.java   |   2 +-
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   |  18 +-
 .../beam/sdk/io/kinesis/KinesisReader.java      |  10 +-
 .../sdk/io/kinesis/KinesisReaderCheckpoint.java |   4 +-
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |   2 +-
 .../beam/sdk/io/kinesis/KinesisSource.java      |   6 +-
 .../beam/sdk/io/kinesis/RecordFilter.java       |   6 +-
 .../apache/beam/sdk/io/kinesis/RoundRobin.java  |   2 +-
 .../beam/sdk/io/kinesis/ShardCheckpoint.java    |   6 +-
 .../sdk/io/kinesis/ShardRecordsIterator.java    |   4 +-
 .../sdk/io/kinesis/SimplifiedKinesisClient.java |   8 +-
 .../beam/sdk/io/kinesis/StartingPoint.java      |   2 +-
 .../beam/sdk/io/kinesis/KinesisTestOptions.java |   2 +-
 .../beam/sdk/io/kinesis/KinesisUploader.java    |   5 +-
 sdks/java/io/mongodb/pom.xml                    |  14 +-
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    | 449 +++++++++
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   | 310 +++---
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java     | 276 ++++++
 .../beam/sdk/io/mongodb/MongoDbIOTest.java      |  19 +-
 sdks/java/io/pom.xml                            |   3 +-
 sdks/java/java8tests/pom.xml                    |   2 +-
 .../PipelineOptionsFactoryJava8Test.java        |   8 +-
 sdks/java/maven-archetypes/examples/pom.xml     |  27 +-
 .../main/resources/archetype-resources/pom.xml  |  26 +-
 .../src/main/java/DebuggingWordCount.java       |  34 +-
 .../src/main/java/MinimalWordCount.java         |  50 +-
 .../src/main/java/WindowedWordCount.java        | 139 +--
 .../src/main/java/WordCount.java                |  79 +-
 .../java/common/DataflowExampleOptions.java     |  32 -
 .../main/java/common/DataflowExampleUtils.java  | 391 --------
 .../common/ExampleBigQueryTableOptions.java     |  11 +-
 .../src/main/java/common/ExampleOptions.java    |  32 +
 ...xamplePubsubTopicAndSubscriptionOptions.java |  45 +
 .../java/common/ExamplePubsubTopicOptions.java  |  17 +-
 .../src/main/java/common/ExampleUtils.java      | 353 +++++++
 .../main/java/common/PubsubFileInjector.java    | 153 ---
 .../src/test/java/WordCountTest.java            |   9 +-
 sdks/java/maven-archetypes/pom.xml              |   2 +-
 sdks/java/maven-archetypes/starter/pom.xml      |  10 +-
 .../main/resources/archetype-resources/pom.xml  |   8 +-
 .../resources/projects/basic/reference/pom.xml  |   8 +-
 sdks/java/microbenchmarks/pom.xml               |   2 +-
 .../transforms/DoFnInvokersBenchmark.java       |   7 +
 sdks/java/pom.xml                               |   2 +-
 sdks/pom.xml                                    |   2 +-
 632 files changed, 36444 insertions(+), 16763 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --cc 
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index df74ed3,dec9905..0000000
deleted file mode 100644,100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ /dev/null
@@@ -1,58 -1,586 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.beam.runners.core;
 -
 -import static com.google.common.base.Preconditions.checkNotNull;
--
 -import com.google.common.collect.Iterables;
 -import com.google.common.collect.Sets;
 -import java.io.IOException;
 -import java.util.Collection;
 -import java.util.Iterator;
--import java.util.List;
 -import java.util.Set;
--import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 -import org.apache.beam.sdk.coders.Coder;
 -import org.apache.beam.sdk.coders.IterableCoder;
--import org.apache.beam.sdk.options.PipelineOptions;
 -import org.apache.beam.sdk.transforms.Aggregator;
--import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
- import org.apache.beam.sdk.transforms.OldDoFn;
 -import org.apache.beam.sdk.transforms.Combine.CombineFn;
 -import org.apache.beam.sdk.transforms.DoFn;
 -import org.apache.beam.sdk.transforms.DoFn.InputProvider;
 -import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 -import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 -import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 -import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 -import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 -import org.apache.beam.sdk.transforms.windowing.WindowFn;
--import org.apache.beam.sdk.util.ExecutionContext.StepContext;
--import org.apache.beam.sdk.util.SideInputReader;
 -import org.apache.beam.sdk.util.SystemDoFnInternal;
 -import org.apache.beam.sdk.util.TimerInternals;
 -import org.apache.beam.sdk.util.UserCodeException;
--import org.apache.beam.sdk.util.WindowedValue;
 -import org.apache.beam.sdk.util.WindowingInternals;
--import org.apache.beam.sdk.util.WindowingStrategy;
 -import org.apache.beam.sdk.util.state.StateInternals;
 -import org.apache.beam.sdk.values.PCollectionView;
--import org.apache.beam.sdk.values.TupleTag;
 -import org.joda.time.Instant;
 -import org.joda.time.format.PeriodFormat;
--
--/**
-  * Runs a {@link OldDoFn} by constructing the appropriate contexts and 
passing them in.
 - * Runs a {@link DoFn} by constructing the appropriate contexts and passing 
them in.
-- *
-  * @param <InputT> the type of the {@link OldDoFn} (main) input elements
-  * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
 - * @param <InputT> the type of the {@link DoFn} (main) input elements
 - * @param <OutputT> the type of the {@link DoFn} (main) output elements
-- */
- public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, 
OutputT> {
 -public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, 
OutputT> {
--
-   protected SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, 
OutputT> fn,
 -  /** The {@link DoFn} being run. */
 -  private final DoFn<InputT, OutputT> fn;
 -
 -  /** The {@link DoFnInvoker} being run. */
 -  private final DoFnInvoker<InputT, OutputT> invoker;
 -
 -  /** The context used for running the {@link DoFn}. */
 -  private final DoFnContext<InputT, OutputT> context;
 -
 -  private final OutputManager outputManager;
 -
 -  private final TupleTag<OutputT> mainOutputTag;
 -
 -  private final boolean observesWindow;
 -
 -  public SimpleDoFnRunner(
 -      PipelineOptions options,
 -      DoFn<InputT, OutputT> fn,
--      SideInputReader sideInputReader,
--      OutputManager outputManager,
-       TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, 
StepContext stepContext,
-       AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> 
windowingStrategy) {
-     super(options, fn, sideInputReader, outputManager, mainOutputTag, 
sideOutputTags, stepContext,
-         aggregatorFactory, windowingStrategy);
 -      TupleTag<OutputT> mainOutputTag,
 -      List<TupleTag<?>> sideOutputTags,
 -      StepContext stepContext,
 -      AggregatorFactory aggregatorFactory,
 -      WindowingStrategy<?, ?> windowingStrategy) {
 -    this.fn = fn;
 -    this.observesWindow =
 -        
DoFnSignatures.INSTANCE.getSignature(fn.getClass()).processElement().observesWindow();
 -    this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
 -    this.outputManager = outputManager;
 -    this.mainOutputTag = mainOutputTag;
 -    this.context =
 -        new DoFnContext<>(
 -            options,
 -            fn,
 -            sideInputReader,
 -            outputManager,
 -            mainOutputTag,
 -            sideOutputTags,
 -            stepContext,
 -            aggregatorFactory,
 -            windowingStrategy == null ? null : 
windowingStrategy.getWindowFn());
--  }
--
--  @Override
-   protected void invokeProcessElement(WindowedValue<InputT> elem) {
-     final OldDoFn<InputT, OutputT>.ProcessContext processContext = 
createProcessContext(elem);
 -  public void startBundle() {
--    // This can contain user code. Wrap it in case it throws an exception.
--    try {
-       fn.processElement(processContext);
 -      invoker.invokeStartBundle(context);
 -    } catch (Throwable t) {
 -      // Exception in user code.
 -      throw wrapUserCodeException(t);
 -    }
 -  }
 -
 -  @Override
 -  public void processElement(WindowedValue<InputT> compressedElem) {
 -    if (observesWindow) {
 -      for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
 -        invokeProcessElement(elem);
 -      }
 -    } else {
 -      invokeProcessElement(compressedElem);
 -    }
 -  }
 -
 -  private void invokeProcessElement(WindowedValue<InputT> elem) {
 -    final DoFn<InputT, OutputT>.ProcessContext processContext = 
createProcessContext(elem);
 -
 -    // Note that if the element must be exploded into all its windows, that 
has to be done outside
 -    // of this runner.
 -    final DoFn.ExtraContextFactory<InputT, OutputT> extraContextFactory =
 -        createExtraContextFactory(elem);
 -
 -    // This can contain user code. Wrap it in case it throws an exception.
 -    try {
 -      invoker.invokeProcessElement(processContext, extraContextFactory);
--    } catch (Exception ex) {
--      throw wrapUserCodeException(ex);
 -    }
 -  }
 -
 -  @Override
 -  public void finishBundle() {
 -    // This can contain user code. Wrap it in case it throws an exception.
 -    try {
 -      invoker.invokeFinishBundle(context);
 -    } catch (Throwable t) {
 -      // Exception in user code.
 -      throw wrapUserCodeException(t);
 -    }
 -  }
 -
 -  /** Returns a new {@link DoFn.ProcessContext} for the given element. */
 -  private DoFn<InputT, OutputT>.ProcessContext 
createProcessContext(WindowedValue<InputT> elem) {
 -    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
 -  }
 -
 -  private DoFn.ExtraContextFactory<InputT, OutputT> createExtraContextFactory(
 -      WindowedValue<InputT> elem) {
 -    return new DoFnExtraContextFactory<InputT, OutputT>(elem.getWindows(), 
elem.getPane());
 -  }
 -
 -  private RuntimeException wrapUserCodeException(Throwable t) {
 -    throw UserCodeException.wrapIf(!isSystemDoFn(), t);
 -  }
 -
 -  private boolean isSystemDoFn() {
 -    return invoker.getClass().isAnnotationPresent(SystemDoFnInternal.class);
 -  }
 -
 -  /**
 -   * A concrete implementation of {@code DoFn.Context} used for running a 
{@link DoFn}.
 -   *
 -   * @param <InputT> the type of the {@link DoFn} (main) input elements
 -   * @param <OutputT> the type of the {@link DoFn} (main) output elements
 -   */
 -  private static class DoFnContext<InputT, OutputT> extends DoFn<InputT, 
OutputT>.Context {
 -    private static final int MAX_SIDE_OUTPUTS = 1000;
 -
 -    final PipelineOptions options;
 -    final DoFn<InputT, OutputT> fn;
 -    final SideInputReader sideInputReader;
 -    final OutputManager outputManager;
 -    final TupleTag<OutputT> mainOutputTag;
 -    final StepContext stepContext;
 -    final AggregatorFactory aggregatorFactory;
 -    final WindowFn<?, ?> windowFn;
 -
 -    /**
 -     * The set of known output tags, some of which may be undeclared, so we 
can throw an exception
 -     * when it exceeds {@link #MAX_SIDE_OUTPUTS}.
 -     */
 -    private Set<TupleTag<?>> outputTags;
 -
 -    public DoFnContext(
 -        PipelineOptions options,
 -        DoFn<InputT, OutputT> fn,
 -        SideInputReader sideInputReader,
 -        OutputManager outputManager,
 -        TupleTag<OutputT> mainOutputTag,
 -        List<TupleTag<?>> sideOutputTags,
 -        StepContext stepContext,
 -        AggregatorFactory aggregatorFactory,
 -        WindowFn<?, ?> windowFn) {
 -      fn.super();
 -      this.options = options;
 -      this.fn = fn;
 -      this.sideInputReader = sideInputReader;
 -      this.outputManager = outputManager;
 -      this.mainOutputTag = mainOutputTag;
 -      this.outputTags = Sets.newHashSet();
 -
 -      outputTags.add(mainOutputTag);
 -      for (TupleTag<?> sideOutputTag : sideOutputTags) {
 -        outputTags.add(sideOutputTag);
 -      }
 -
 -      this.stepContext = stepContext;
 -      this.aggregatorFactory = aggregatorFactory;
 -      this.windowFn = windowFn;
 -      super.setupDelegateAggregators();
 -    }
 -
 -    
//////////////////////////////////////////////////////////////////////////////
 -
 -    @Override
 -    public PipelineOptions getPipelineOptions() {
 -      return options;
 -    }
 -
 -    <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
 -        T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
 -      final Instant inputTimestamp = timestamp;
 -
 -      if (timestamp == null) {
 -        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
 -      }
 -
 -      if (windows == null) {
 -        try {
 -          // The windowFn can never succeed at accessing the element, so its 
type does not
 -          // matter here
 -          @SuppressWarnings("unchecked")
 -          WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
 -          windows =
 -              objectWindowFn.assignWindows(
 -                  objectWindowFn.new AssignContext() {
 -                    @Override
 -                    public Object element() {
 -                      throw new UnsupportedOperationException(
 -                          "WindowFn attempted to access input element when 
none was available");
 -                    }
 -
 -                    @Override
 -                    public Instant timestamp() {
 -                      if (inputTimestamp == null) {
 -                        throw new UnsupportedOperationException(
 -                            "WindowFn attempted to access input timestamp 
when none was available");
 -                      }
 -                      return inputTimestamp;
 -                    }
 -
 -                    @Override
 -                    public W window() {
 -                      throw new UnsupportedOperationException(
 -                          "WindowFn attempted to access input windows when 
none were available");
 -                    }
 -                  });
 -        } catch (Exception e) {
 -          throw UserCodeException.wrap(e);
 -        }
 -      }
 -
 -      return WindowedValue.of(output, timestamp, windows, pane);
 -    }
 -
 -    public <T> T sideInput(PCollectionView<T> view, BoundedWindow 
mainInputWindow) {
 -      if (!sideInputReader.contains(view)) {
 -        throw new IllegalArgumentException("calling sideInput() with unknown 
view");
 -      }
 -      BoundedWindow sideInputWindow =
 -          
view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
 -      return sideInputReader.get(view, sideInputWindow);
 -    }
 -
 -    void outputWindowedValue(
 -        OutputT output,
 -        Instant timestamp,
 -        Collection<? extends BoundedWindow> windows,
 -        PaneInfo pane) {
 -      outputWindowedValue(makeWindowedValue(output, timestamp, windows, 
pane));
 -    }
 -
 -    void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
 -      outputManager.output(mainOutputTag, windowedElem);
 -      if (stepContext != null) {
 -        stepContext.noteOutput(windowedElem);
 -      }
 -    }
 -
 -    private <T> void sideOutputWindowedValue(
 -        TupleTag<T> tag,
 -        T output,
 -        Instant timestamp,
 -        Collection<? extends BoundedWindow> windows,
 -        PaneInfo pane) {
 -      sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, 
windows, pane));
 -    }
 -
 -    private <T> void sideOutputWindowedValue(TupleTag<T> tag, 
WindowedValue<T> windowedElem) {
 -      if (!outputTags.contains(tag)) {
 -        // This tag wasn't declared nor was it seen before during this 
execution.
 -        // Thus, this must be a new, undeclared and unconsumed output.
 -        // To prevent likely user errors, enforce the limit on the number of 
side
 -        // outputs.
 -        if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
 -          throw new IllegalArgumentException(
 -              "the number of side outputs has exceeded a limit of " + 
MAX_SIDE_OUTPUTS);
 -        }
 -        outputTags.add(tag);
 -      }
 -
 -      outputManager.output(tag, windowedElem);
 -      if (stepContext != null) {
 -        stepContext.noteSideOutput(tag, windowedElem);
 -      }
 -    }
 -
 -    // Following implementations of output, outputWithTimestamp, and 
sideOutput
 -    // are only accessible in DoFn.startBundle and DoFn.finishBundle, and 
will be shadowed by
 -    // ProcessContext's versions in DoFn.processElement.
 -    @Override
 -    public void output(OutputT output) {
 -      outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
 -    }
 -
 -    @Override
 -    public void outputWithTimestamp(OutputT output, Instant timestamp) {
 -      outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
 -    }
 -
 -    @Override
 -    public <T> void sideOutput(TupleTag<T> tag, T output) {
 -      checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
 -      sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
 -    }
 -
 -    @Override
 -    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
 -      checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be 
null");
 -      sideOutputWindowedValue(tag, output, timestamp, null, 
PaneInfo.NO_FIRING);
 -    }
 -
 -    @Override
 -    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregator(
 -        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
 -      checkNotNull(combiner, "Combiner passed to createAggregator cannot be 
null");
 -      return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), 
stepContext, name, combiner);
 -    }
 -  }
 -
 -  /**
 -   * A concrete implementation of {@link DoFn.ProcessContext} used for 
running a {@link DoFn} over a
 -   * single element.
 -   *
 -   * @param <InputT> the type of the {@link DoFn} (main) input elements
 -   * @param <OutputT> the type of the {@link DoFn} (main) output elements
 -   */
 -  private static class DoFnProcessContext<InputT, OutputT>
 -      extends DoFn<InputT, OutputT>.ProcessContext {
 -
 -    final DoFn<InputT, OutputT> fn;
 -    final DoFnContext<InputT, OutputT> context;
 -    final WindowedValue<InputT> windowedValue;
 -
 -    public DoFnProcessContext(
 -        DoFn<InputT, OutputT> fn,
 -        DoFnContext<InputT, OutputT> context,
 -        WindowedValue<InputT> windowedValue) {
 -      fn.super();
 -      this.fn = fn;
 -      this.context = context;
 -      this.windowedValue = windowedValue;
 -    }
 -
 -    @Override
 -    public PipelineOptions getPipelineOptions() {
 -      return context.getPipelineOptions();
 -    }
 -
 -    @Override
 -    public InputT element() {
 -      return windowedValue.getValue();
 -    }
 -
 -    @Override
 -    public <T> T sideInput(PCollectionView<T> view) {
 -      checkNotNull(view, "View passed to sideInput cannot be null");
 -      Iterator<? extends BoundedWindow> windowIter = windows().iterator();
 -      BoundedWindow window;
 -      if (!windowIter.hasNext()) {
 -        if (context.windowFn instanceof GlobalWindows) {
 -          // TODO: Remove this once GroupByKeyOnly no longer outputs elements
 -          // without windows
 -          window = GlobalWindow.INSTANCE;
 -        } else {
 -          throw new IllegalStateException(
 -              "sideInput called when main input element is not in any 
windows");
 -        }
 -      } else {
 -        window = windowIter.next();
 -        if (windowIter.hasNext()) {
 -          throw new IllegalStateException(
 -              "sideInput called when main input element is in multiple 
windows");
 -        }
 -      }
 -      return context.sideInput(view, window);
 -    }
 -
 -    @Override
 -    public PaneInfo pane() {
 -      return windowedValue.getPane();
 -    }
 -
 -    @Override
 -    public void output(OutputT output) {
 -      context.outputWindowedValue(windowedValue.withValue(output));
 -    }
 -
 -    @Override
 -    public void outputWithTimestamp(OutputT output, Instant timestamp) {
 -      checkTimestamp(timestamp);
 -      context.outputWindowedValue(
 -          output, timestamp, windowedValue.getWindows(), 
windowedValue.getPane());
 -    }
 -
 -    void outputWindowedValue(
 -        OutputT output,
 -        Instant timestamp,
 -        Collection<? extends BoundedWindow> windows,
 -        PaneInfo pane) {
 -      context.outputWindowedValue(output, timestamp, windows, pane);
 -    }
 -
 -    @Override
 -    public <T> void sideOutput(TupleTag<T> tag, T output) {
 -      checkNotNull(tag, "Tag passed to sideOutput cannot be null");
 -      context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
 -    }
 -
 -    @Override
 -    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
 -      checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be 
null");
 -      checkTimestamp(timestamp);
 -      context.sideOutputWindowedValue(
 -          tag, output, timestamp, windowedValue.getWindows(), 
windowedValue.getPane());
 -    }
 -
 -    @Override
 -    public Instant timestamp() {
 -      return windowedValue.getTimestamp();
 -    }
 -
 -    public Collection<? extends BoundedWindow> windows() {
 -      return windowedValue.getWindows();
 -    }
 -
 -    private void checkTimestamp(Instant timestamp) {
 -      if 
(timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew())))
 {
 -        throw new IllegalArgumentException(
 -            String.format(
 -                "Cannot output with timestamp %s. Output timestamps must be 
no earlier than the "
 -                    + "timestamp of the current input (%s) minus the allowed 
skew (%s). See the "
 -                    + "DoFn#getAllowedTimestampSkew() Javadoc for details on 
changing the allowed "
 -                    + "skew.",
 -                timestamp,
 -                windowedValue.getTimestamp(),
 -                
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
 -      }
 -    }
 -
 -    @Override
 -    protected <AggregatorInputT, AggregatorOutputT>
 -        Aggregator<AggregatorInputT, AggregatorOutputT> createAggregator(
 -            String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> 
combiner) {
 -      return context.createAggregator(name, combiner);
 -    }
 -  }
 -
 -  private class DoFnExtraContextFactory<InputT, OutputT>
 -      implements DoFn.ExtraContextFactory<InputT, OutputT> {
 -
 -    /** The windows of the current element. */
 -    private final Collection<? extends BoundedWindow> windows;
 -
 -    /** The pane of the current element. */
 -    private final PaneInfo pane;
 -
 -    public DoFnExtraContextFactory(Collection<? extends BoundedWindow> 
windows, PaneInfo pane) {
 -      this.windows = windows;
 -      this.pane = pane;
 -    }
 -
 -    @Override
 -    public BoundedWindow window() {
 -      return Iterables.getOnlyElement(windows);
 -    }
 -
 -    @Override
 -    public InputProvider<InputT> inputProvider() {
 -      throw new UnsupportedOperationException("InputProvider parameters are 
not supported.");
 -    }
 -
 -    @Override
 -    public OutputReceiver<OutputT> outputReceiver() {
 -      throw new UnsupportedOperationException("OutputReceiver parameters are 
not supported.");
 -    }
 -
 -    @Override
 -    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
 -      throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
 -    }
 -
 -    @Override
 -    public WindowingInternals<InputT, OutputT> windowingInternals() {
 -      return new WindowingInternals<InputT, OutputT>() {
 -        @Override
 -        public Collection<? extends BoundedWindow> windows() {
 -          return windows;
 -        }
 -
 -        @Override
 -        public PaneInfo pane() {
 -          return pane;
 -        }
 -
 -        @Override
 -        public TimerInternals timerInternals() {
 -          return context.stepContext.timerInternals();
 -        }
 -
 -        @Override
 -        public <T> void writePCollectionViewData(
 -            TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> 
elemCoder)
 -            throws IOException {
 -          @SuppressWarnings("unchecked")
 -          Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) 
context.windowFn.windowCoder();
 -
 -          context.stepContext.writePCollectionViewData(
 -              tag,
 -              data,
 -              IterableCoder.of(WindowedValue.getFullCoder(elemCoder, 
windowCoder)),
 -              window(),
 -              windowCoder);
 -        }
 -
 -        @Override
 -        public StateInternals<?> stateInternals() {
 -          return context.stepContext.stateInternals();
 -        }
 -
 -        @Override
 -        public void outputWindowedValue(
 -            OutputT output,
 -            Instant timestamp,
 -            Collection<? extends BoundedWindow> windows,
 -            PaneInfo pane) {}
 -
 -        @Override
 -        public <T> T sideInput(PCollectionView<T> view, BoundedWindow 
mainInputWindow) {
 -          return context.sideInput(view, mainInputWindow);
 -        }
 -      };
--    }
--  }
--}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --cc 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index b2d61c3,e02c8a6..f87f1c1
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@@ -401,31 -450,19 +450,41 @@@ public class DirectRunne
      }
  
      @Override
-     public State waitUntilFinish(Duration duration) throws IOException {
-       throw new UnsupportedOperationException(
-           "DirectPipelineResult does not support waitUntilFinish.");
+     public ExecutorService get() {
+       return Executors.newFixedThreadPool(options.getTargetParallelism());
+     }
+   }
+ 
+ 
+   /**
+    * A {@link Supplier} that creates a {@link NanosOffsetClock}.
+    */
+   private static class NanosOffsetClockSupplier implements Supplier<Clock> {
+     @Override
+     public Clock get() {
+       return NanosOffsetClock.create();
      }
    }
 +
 +  /**
 +   * A {@link Supplier} that creates a {@link ExecutorService} based on
 +   * {@link Executors#newFixedThreadPool(int)}.
 +   */
 +  private static class FixedThreadPoolSupplier implements 
Supplier<ExecutorService> {
 +    @Override
 +    public ExecutorService get() {
 +      return 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 +    }
 +  }
 +
 +
 +  /**
 +   * A {@link Supplier} that creates a {@link NanosOffsetClock}.
 +   */
 +  private static class NanosOffsetClockSupplier implements Supplier<Clock> {
 +    @Override
 +    public Clock get() {
 +      return NanosOffsetClock.create();
 +    }
 +  }
  }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --cc 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 9edc50f,3dd44a7..6485714
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@@ -84,44 -82,38 +82,75 @@@ class TransformEvaluatorRegistry implem
        throws Exception {
      checkState(
          !finished.get(), "Tried to get an evaluator for a finished 
TransformEvaluatorRegistry");
-     TransformEvaluatorFactory factory = 
factories.get(application.getTransform().getClass());
-     return factory.forApplication(application, inputBundle, 
evaluationContext);
+     Class<? extends PTransform> transformClass = 
application.getTransform().getClass();
+     TransformEvaluatorFactory factory =
+         checkNotNull(
+             factories.get(transformClass), "No evaluator for PTransform type 
%s", transformClass);
+     return factory.forApplication(application, inputBundle);
+   }
+ 
+   @Override
+   public void cleanup() throws Exception {
+     Collection<Exception> thrownInCleanup = new ArrayList<>();
+     for (TransformEvaluatorFactory factory : factories.values()) {
+       try {
+         factory.cleanup();
+       } catch (Exception e) {
+         if (e instanceof InterruptedException) {
+           Thread.currentThread().interrupt();
+         }
+         thrownInCleanup.add(e);
+       }
+     }
+     finished.set(true);
+     if (!thrownInCleanup.isEmpty()) {
+       LOG.error("Exceptions {} thrown while cleaning up evaluators", 
thrownInCleanup);
+       Exception toThrow = null;
+       for (Exception e : thrownInCleanup) {
+         if (toThrow == null) {
+           toThrow = e;
+         } else {
+           toThrow.addSuppressed(e);
+         }
+       }
+       throw toThrow;
+     }
    }
 +
 +  @Override
 +  public void cleanup() throws Exception {
 +    Collection<Exception> thrownInCleanup = new ArrayList<>();
 +    for (TransformEvaluatorFactory factory : factories.values()) {
 +      try {
 +        factory.cleanup();
 +      } catch (Exception e) {
 +        if (e instanceof InterruptedException) {
 +          Thread.currentThread().interrupt();
 +        }
 +        thrownInCleanup.add(e);
 +      }
 +    }
 +    finished.set(true);
 +    if (!thrownInCleanup.isEmpty()) {
 +      LOG.error("Exceptions {} thrown while cleaning up evaluators", 
thrownInCleanup);
 +      Exception toThrow = null;
 +      for (Exception e : thrownInCleanup) {
 +        if (toThrow == null) {
 +          toThrow = e;
 +        } else {
 +          toThrow.addSuppressed(e);
 +        }
 +      }
 +      throw toThrow;
 +    }
 +  }
 +
 +  /**
 +   * A factory to create Transform Evaluator Registries.
 +   */
 +  public static class Factory {
 +    public TransformEvaluatorRegistry create() {
 +      return TransformEvaluatorRegistry.defaultRegistry();
 +    }
 +  }
  }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
----------------------------------------------------------------------
diff --cc 
runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
index 58f41b6,58f41b6..0000000
deleted file mode 100644,100644
--- 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
+++ /dev/null
@@@ -1,22 -1,22 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--/**
-- * Flink Beam runner exemple.
-- */
--package org.apache.beam.runners.flink.examples.streaming;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --cc 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index 949c381,b211c04..b84a1a8
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@@ -65,5 -68,12 +68,13 @@@ public class DoFnInfo<InputT, OutputT> 
    public Coder<InputT> getInputCoder() {
      return inputCoder;
    }
+ 
+   public long getMainOutput() {
+     return mainOutput;
+   }
+ 
+   public Map<Long, TupleTag<?>> getOutputMap() {
+     return outputMap;
+   }
  }
 +

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --cc 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index 454b760,4dfbee6..69c450e
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@@ -37,22 -42,26 +42,28 @@@ import org.apache.spark.api.java.functi
   * @param <OutputT> Output element type.
   */
  public class DoFnFunction<InputT, OutputT>
-     implements FlatMapFunction<Iterator<WindowedValue<InputT>>,
-     WindowedValue<OutputT>> {
+     implements FlatMapFunction<Iterator<WindowedValue<InputT>>, 
WindowedValue<OutputT>> {
+   private final Accumulator<NamedAggregators> accum;
    private final OldDoFn<InputT, OutputT> mFunction;
 +  private static final Logger LOG = 
LoggerFactory.getLogger(DoFnFunction.class);
 +
    private final SparkRuntimeContext mRuntimeContext;
-   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
+   private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, 
BroadcastHelper<?>>> mSideInputs;
+   private final WindowFn<Object, ?> windowFn;
  
    /**
-    * @param fn         DoFunction to be wrapped.
-    * @param runtime    Runtime to apply function in.
-    * @param sideInputs Side inputs used in DoFunction.
+    * @param accum             The Spark Accumulator that handles the Beam 
Aggregators.
+    * @param fn                DoFunction to be wrapped.
+    * @param runtime           Runtime to apply function in.
+    * @param sideInputs        Side inputs used in DoFunction.
+    * @param windowFn          Input {@link WindowFn}.
     */
-   public DoFnFunction(OldDoFn<InputT, OutputT> fn,
-                SparkRuntimeContext runtime,
-                Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
+   public DoFnFunction(Accumulator<NamedAggregators> accum,
+                       OldDoFn<InputT, OutputT> fn,
+                       SparkRuntimeContext runtime,
+                       Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, 
BroadcastHelper<?>>> sideInputs,
+                       WindowFn<Object, ?> windowFn) {
+     this.accum = accum;
      this.mFunction = fn;
      this.mRuntimeContext = runtime;
      this.mSideInputs = sideInputs;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --cc sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index c0761b1,2dbcda7..78ea988
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@@ -32,21 -34,24 +33,36 @@@ import java.nio.channels.WritableByteCh
  import java.nio.charset.StandardCharsets;
  import java.util.NoSuchElementException;
  import java.util.regex.Pattern;
 -
  import javax.annotation.Nullable;
 +import org.apache.beam.sdk.coders.Coder;
 +import org.apache.beam.sdk.coders.Coder.Context;
 +import org.apache.beam.sdk.coders.StringUtf8Coder;
 +import org.apache.beam.sdk.coders.VoidCoder;
 +import org.apache.beam.sdk.io.Read.Bounded;
 +import org.apache.beam.sdk.options.PipelineOptions;
 +import org.apache.beam.sdk.transforms.PTransform;
 +import org.apache.beam.sdk.transforms.display.DisplayData;
 +import org.apache.beam.sdk.util.IOChannelUtils;
 +import org.apache.beam.sdk.util.MimeTypes;
 +import org.apache.beam.sdk.values.PBegin;
 +import org.apache.beam.sdk.values.PCollection;
 +import org.apache.beam.sdk.values.PDone;
  
+ import org.apache.beam.sdk.coders.Coder;
+ import org.apache.beam.sdk.coders.Coder.Context;
+ import org.apache.beam.sdk.coders.StringUtf8Coder;
+ import org.apache.beam.sdk.coders.VoidCoder;
+ import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
+ import org.apache.beam.sdk.io.Read.Bounded;
+ import org.apache.beam.sdk.options.PipelineOptions;
+ import org.apache.beam.sdk.transforms.PTransform;
+ import org.apache.beam.sdk.transforms.display.DisplayData;
+ import org.apache.beam.sdk.util.IOChannelUtils;
+ import org.apache.beam.sdk.util.MimeTypes;
+ import org.apache.beam.sdk.values.PBegin;
+ import org.apache.beam.sdk.values.PCollection;
+ import org.apache.beam.sdk.values.PDone;
+ 
  /**
   * {@link PTransform}s for reading and writing text files.
   *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --cc sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 59c8323,018877f..f2fa87c
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@@ -339,21 -423,135 +423,149 @@@ public abstract class DoFn<InputT, Outp
  
    
/////////////////////////////////////////////////////////////////////////////
  
 +
 +  /**
 +   * Annotation for the method to use to prepare an instance for processing 
bundles of elements. The
 +   * method annotated with this must satisfy the following constraints
 +   * <ul>
 +   *   <li>It must have zero arguments.
 +   * </ul>
 +   */
 +  @Documented
 +  @Retention(RetentionPolicy.RUNTIME)
 +  @Target(ElementType.METHOD)
 +  public @interface Setup {
 +  }
 +
    /**
+    * Annotation for declaring and dereferencing state cells.
+    *
+    * <p><i>Not currently supported by any runner. When ready, the feature 
will work as described
+    * here.</i>
+    *
+    * <p>To declare a state cell, create a field of type {@link StateSpec} 
annotated with a {@link
+    * StateId}. To use the cell during processing, add a parameter of the 
appropriate {@link State}
+    * subclass to your {@link ProcessElement @ProcessElement} or {@link 
OnTimer @OnTimer} method, and
+    * annotate it with {@link StateId}. See the following code for an example:
+    *
+    * <pre>{@code
+    * new DoFn<KV<Key, Foo>, Baz>() {
+    *   @StateId("my-state-id")
+    *   private final StateSpec<K, ValueState<MyState>> myStateSpec =
+    *       StateSpecs.value(new MyStateCoder());
+    *
+    *   @ProcessElement
+    *   public void processElement(
+    *       ProcessContext c,
+    *       @StateId("my-state-id") ValueState<MyState> myState) {
+    *     myState.read();
+    *     myState.write(...);
+    *   }
+    * }
+    * }</pre>
+    *
+    * <p>State is subject to the following validity conditions:
+    *
+    * <ul>
+    * <li>Each state ID must be declared at most once.
+    * <li>Any state referenced in a parameter must be declared with the same 
state type.
+    * <li>State declarations must be final.
+    * </ul>
+    */
+   @Documented
+   @Retention(RetentionPolicy.RUNTIME)
+   @Target({ElementType.FIELD, ElementType.PARAMETER})
+   @Experimental(Kind.STATE)
+   public @interface StateId {
+     /** The state ID. */
+     String value();
+   }
+ 
+   /**
+    * Annotation for declaring and dereferencing timers.
+    *
+    * <p><i>Not currently supported by any runner. When ready, the feature 
will work as described
+    * here.</i>
+    *
+    * <p>To declare a timer, create a field of type {@link TimerSpec} 
annotated with a {@link
+    * TimerId}. To use the cell during processing, add a parameter of the type 
{@link Timer} to your
+    * {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} 
method, and annotate it with
+    * {@link TimerId}. See the following code for an example:
+    *
+    * <pre>{@code
+    * new DoFn<KV<Key, Foo>, Baz>() {
+    *   @TimerId("my-timer-id")
+    *   private final TimerSpec myTimer = 
TimerSpecs.timerForDomain(TimeDomain.EVENT_TIME);
+    *
+    *   @ProcessElement
+    *   public void processElement(
+    *       ProcessContext c,
+    *       @TimerId("my-timer-id") Timer myTimer) {
+    *     myTimer.setForNowPlus(Duration.standardSeconds(...));
+    *   }
+    *
+    *   @OnTimer("my-timer-id")
+    *   public void onMyTimer() {
+    *     ...
+    *   }
+    * }
+    * }</pre>
+    *
+    * <p>Timers are subject to the following validity conditions:
+    *
+    * <ul>
+    * <li>Each timer must have a distinct id.
+    * <li>Any timer referenced in a parameter must be declared.
+    * <li>Timer declarations must be final.
+    * <li>All declared timers must have a corresponding callback annotated 
with {@link
+    *     OnTimer @OnTimer}.
+    * </ul>
+    */
+   @Documented
+   @Retention(RetentionPolicy.RUNTIME)
+   @Target({ElementType.FIELD, ElementType.PARAMETER})
+   @Experimental(Kind.TIMERS)
+   public @interface TimerId {
+     /** The timer ID. */
+     String value();
+   }
+ 
+   /**
+    * Annotation for registering a callback for a timer.
+    *
+    * <p><i>Not currently supported by any runner. When ready, the feature 
will work as described
+    * here.</i>
+    *
+    * <p>See the javadoc for {@link TimerId} for use in a full example.
+    *
+    * <p>The method annotated with {@code @OnTimer} may have parameters 
according to the same logic
+    * as {@link ProcessElement}, but limited to the {@link BoundedWindow}, 
{@link State} subclasses,
+    * and {@link Timer}. State and timer parameters must be annotated with 
their {@link StateId} and
+    * {@link TimerId} respectively.
+    */
+   @Documented
+   @Retention(RetentionPolicy.RUNTIME)
+   @Target(ElementType.METHOD)
+   @Experimental(Kind.TIMERS)
+   public @interface OnTimer {
+     /** The timer ID. */
+     String value();
+   }
+ 
+   /**
+    * Annotation for the method to use to prepare an instance for processing 
bundles of elements. The
+    * method annotated with this must satisfy the following constraints
+    * <ul>
+    *   <li>It must have zero arguments.
+    * </ul>
+    */
+   @Documented
+   @Retention(RetentionPolicy.RUNTIME)
+   @Target(ElementType.METHOD)
+   public @interface Setup {
+   }
+ 
+   /**
     * Annotation for the method to use to prepare an instance for processing a 
batch of elements.
     * The method annotated with this must satisfy the following constraints:
     * <ul>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9dc9be9e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------

Reply via email to