Revert "[BEAM-2610] This closes #3553" This reverts commit ec494f675aa73fbdc7929f9592f33951941962b0, reversing changes made to d89d1ee1a3085269cdf44ec50e29a95c8f43757b.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1b2b96a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1b2b96a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1b2b96a Branch: refs/heads/DSL_SQL Commit: c1b2b96a438b86a8b0023f6943dcf0a4f238ba39 Parents: 97a156c Author: mingmxu <[email protected]> Authored: Wed Jul 19 14:26:11 2017 -0700 Committer: mingmxu <[email protected]> Committed: Wed Jul 19 14:26:11 2017 -0700 ---------------------------------------------------------------------- .gitignore | 2 +- .../jenkins/common_job_properties.groovy | 9 +- .../job_beam_PerformanceTests_Python.groovy | 58 -- ..._beam_PostCommit_Java_JDKVersionsTest.groovy | 2 - ..._PostCommit_Java_MavenInstall_Windows.groovy | 3 +- .../job_beam_PreCommit_Website_Merge.groovy | 59 -- examples/java/pom.xml | 20 +- .../examples/common/WriteOneFilePerWindow.java | 52 +- .../beam/examples/WindowedWordCountIT.java | 4 +- examples/java8/pom.xml | 20 +- .../complete/game/utils/WriteToText.java | 43 +- .../examples/complete/game/LeaderBoardTest.java | 2 - examples/pom.xml | 2 +- pom.xml | 123 +--- runners/apex/pom.xml | 20 +- .../apache/beam/runners/apex/ApexRunner.java | 61 +- .../translation/ApexPipelineTranslator.java | 16 +- .../apex/translation/TranslationContext.java | 4 +- .../operators/ApexParDoOperator.java | 21 +- .../runners/apex/examples/WordCountTest.java | 8 +- .../utils/ApexStateInternalsTest.java | 411 ++++++++--- runners/core-construction-java/pom.xml | 2 +- .../CreatePCollectionViewTranslation.java | 4 +- .../construction/ElementAndRestriction.java | 42 ++ .../ElementAndRestrictionCoder.java | 88 +++ .../construction/PCollectionTranslation.java | 16 - .../core/construction/PTransformMatchers.java | 109 +-- .../construction/PTransformTranslation.java | 7 +- .../core/construction/ParDoTranslation.java | 82 +-- .../construction/RunnerPCollectionView.java | 31 +- .../core/construction/SplittableParDo.java | 124 +--- .../construction/TestStreamTranslation.java | 49 +- .../core/construction/TransformInputs.java | 50 -- .../WindowingStrategyTranslation.java | 27 +- .../construction/WriteFilesTranslation.java | 67 +- .../ElementAndRestrictionCoderTest.java | 126 ++++ .../PCollectionTranslationTest.java | 22 - .../construction/PTransformMatchersTest.java | 54 +- .../core/construction/ParDoTranslationTest.java | 28 +- .../core/construction/SplittableParDoTest.java | 18 +- .../core/construction/TransformInputsTest.java | 166 ----- .../WindowingStrategyTranslationTest.java | 3 - .../construction/WriteFilesTranslationTest.java | 62 +- runners/core-java/pom.xml | 2 +- .../core/LateDataDroppingDoFnRunner.java | 33 +- ...eBoundedSplittableProcessElementInvoker.java | 40 +- .../beam/runners/core/ProcessFnRunner.java | 16 +- .../beam/runners/core/ReduceFnRunner.java | 135 ++-- .../beam/runners/core/SimpleDoFnRunner.java | 20 - .../core/SplittableParDoViaKeyedWorkItems.java | 58 +- .../core/SplittableProcessElementInvoker.java | 25 +- .../beam/runners/core/SystemReduceFn.java | 6 - .../core/triggers/AfterAllStateMachine.java | 25 +- .../AfterDelayFromFirstElementStateMachine.java | 6 +- .../core/triggers/AfterFirstStateMachine.java | 20 +- .../core/triggers/AfterPaneStateMachine.java | 6 +- .../triggers/AfterWatermarkStateMachine.java | 7 +- .../triggers/ExecutableTriggerStateMachine.java | 23 +- .../core/triggers/NeverStateMachine.java | 5 +- .../core/triggers/TriggerStateMachine.java | 27 + .../core/InMemoryStateInternalsTest.java | 569 +++++++++++++-- ...ndedSplittableProcessElementInvokerTest.java | 47 +- .../beam/runners/core/ReduceFnRunnerTest.java | 374 +--------- .../beam/runners/core/ReduceFnTester.java | 48 +- .../core/SplittableParDoProcessFnTest.java | 117 +-- .../beam/runners/core/StateInternalsTest.java | 613 ---------------- .../beam/runners/core/WindowMatchers.java | 15 - .../triggers/AfterFirstStateMachineTest.java | 5 +- .../AfterWatermarkStateMachineTest.java | 7 +- .../core/triggers/StubTriggerStateMachine.java | 7 +- runners/direct-java/pom.xml | 7 +- .../beam/runners/direct/CommittedResult.java | 12 +- .../apache/beam/runners/direct/DirectGraph.java | 38 +- .../beam/runners/direct/DirectGraphVisitor.java | 48 +- .../beam/runners/direct/DirectGroupByKey.java | 13 +- .../direct/DirectGroupByKeyOverrideFactory.java | 14 +- .../beam/runners/direct/DirectRegistrar.java | 2 +- .../beam/runners/direct/DirectRunner.java | 64 +- .../beam/runners/direct/DirectTestOptions.java | 42 -- .../beam/runners/direct/EvaluationContext.java | 26 +- .../direct/ExecutorServiceParallelExecutor.java | 27 +- .../runners/direct/ParDoEvaluatorFactory.java | 9 +- .../direct/ParDoMultiOverrideFactory.java | 121 +--- ...littableProcessElementsEvaluatorFactory.java | 37 +- .../direct/StatefulParDoEvaluatorFactory.java | 12 +- .../direct/TestStreamEvaluatorFactory.java | 20 +- .../runners/direct/ViewEvaluatorFactory.java | 8 +- .../runners/direct/ViewOverrideFactory.java | 69 +- .../beam/runners/direct/WatermarkManager.java | 18 +- .../direct/WriteWithShardingFactory.java | 34 +- .../runners/direct/CommittedResultTest.java | 17 +- .../runners/direct/DirectGraphVisitorTest.java | 10 +- .../beam/runners/direct/DirectGraphs.java | 7 - .../runners/direct/DirectRegistrarTest.java | 2 +- .../runners/direct/EvaluationContextTest.java | 7 +- .../ImmutabilityEnforcementFactoryTest.java | 4 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 7 +- .../StatefulParDoEvaluatorFactoryTest.java | 65 +- .../runners/direct/TransformExecutorTest.java | 12 +- .../direct/ViewEvaluatorFactoryTest.java | 8 +- .../runners/direct/ViewOverrideFactoryTest.java | 37 +- .../direct/WatermarkCallbackExecutorTest.java | 1 - .../runners/direct/WatermarkManagerTest.java | 16 +- .../direct/WriteWithShardingFactoryTest.java | 44 +- runners/flink/pom.xml | 11 +- .../runners/flink/CreateStreamingFlinkView.java | 154 ---- .../flink/FlinkBatchTranslationContext.java | 3 +- .../FlinkPipelineExecutionEnvironment.java | 2 - .../flink/FlinkStreamingPipelineTranslator.java | 86 ++- .../FlinkStreamingTransformTranslators.java | 36 +- .../flink/FlinkStreamingTranslationContext.java | 3 +- .../flink/FlinkStreamingViewOverrides.java | 372 ++++++++++ .../runners/flink/FlinkTransformOverrides.java | 53 -- .../streaming/SplittableDoFnOperator.java | 16 +- .../streaming/state/FlinkStateInternals.java | 425 ++++++----- .../FlinkBroadcastStateInternalsTest.java | 242 +++++-- .../FlinkKeyGroupStateInternalsTest.java | 359 +++++----- .../streaming/FlinkSplitStateInternalsTest.java | 132 ++-- .../streaming/FlinkStateInternalsTest.java | 343 ++++++++- runners/google-cloud-dataflow-java/pom.xml | 10 +- .../dataflow/BatchStatefulParDoOverrides.java | 4 - .../runners/dataflow/BatchViewOverrides.java | 182 +++-- .../runners/dataflow/CreateDataflowView.java | 8 +- .../dataflow/DataflowPipelineTranslator.java | 62 +- .../beam/runners/dataflow/DataflowRunner.java | 133 +--- .../dataflow/SplittableParDoOverrides.java | 76 -- .../dataflow/StreamingViewOverrides.java | 10 +- .../runners/dataflow/TransformTranslator.java | 4 +- .../runners/dataflow/util/PropertyNames.java | 1 - .../beam/runners/dataflow/util/TimeUtil.java | 24 +- .../DataflowPipelineTranslatorTest.java | 95 +-- .../runners/dataflow/DataflowRunnerTest.java | 198 +----- .../runners/dataflow/util/TimeUtilTest.java | 6 - runners/pom.xml | 2 +- runners/spark/pom.xml | 70 +- .../spark/SparkNativePipelineVisitor.java | 3 +- .../apache/beam/runners/spark/SparkRunner.java | 9 +- .../beam/runners/spark/TestSparkRunner.java | 2 +- .../SparkGroupAlsoByWindowViaWindowSet.java | 6 +- .../spark/stateful/SparkTimerInternals.java | 18 +- .../spark/translation/EvaluationContext.java | 4 +- .../spark/translation/TransformTranslator.java | 50 +- .../spark/util/GlobalWatermarkHolder.java | 127 +--- .../spark/GlobalWatermarkHolderTest.java | 18 +- .../runners/spark/SparkRunnerDebuggerTest.java | 26 +- .../spark/stateful/SparkStateInternalsTest.java | 66 -- .../spark/translation/StorageLevelTest.java | 4 +- sdks/common/fn-api/pom.xml | 2 +- .../fn-api/src/main/proto/beam_fn_api.proto | 237 ++++-- sdks/common/pom.xml | 2 +- sdks/common/runner-api/pom.xml | 2 +- .../src/main/proto/beam_runner_api.proto | 26 +- sdks/java/build-tools/pom.xml | 2 +- .../src/main/resources/beam/findbugs-filter.xml | 9 - sdks/java/core/pom.xml | 2 +- .../apache/beam/sdk/coders/ShardedKeyCoder.java | 66 -- .../java/org/apache/beam/sdk/io/AvroIO.java | 220 +++--- .../java/org/apache/beam/sdk/io/AvroSink.java | 32 +- .../apache/beam/sdk/io/CompressedSource.java | 40 +- .../beam/sdk/io/DefaultFilenamePolicy.java | 274 ++----- .../beam/sdk/io/DynamicFileDestinations.java | 115 --- .../org/apache/beam/sdk/io/FileBasedSink.java | 513 ++++++------- .../apache/beam/sdk/io/OffsetBasedSource.java | 22 +- .../java/org/apache/beam/sdk/io/TFRecordIO.java | 44 +- .../java/org/apache/beam/sdk/io/TextIO.java | 712 ++++--------------- .../java/org/apache/beam/sdk/io/TextSink.java | 22 +- .../java/org/apache/beam/sdk/io/WriteFiles.java | 647 ++++++----------- .../beam/sdk/io/range/ByteKeyRangeTracker.java | 22 +- .../apache/beam/sdk/io/range/OffsetRange.java | 101 --- .../beam/sdk/io/range/OffsetRangeTracker.java | 3 - .../sdk/options/PipelineOptionsFactory.java | 18 +- .../sdk/options/PipelineOptionsValidator.java | 34 +- .../sdk/options/ProxyInvocationHandler.java | 19 +- .../beam/sdk/runners/TransformHierarchy.java | 165 +---- .../apache/beam/sdk/testing/StaticWindows.java | 5 - .../org/apache/beam/sdk/testing/TestStream.java | 12 - .../org/apache/beam/sdk/transforms/Combine.java | 30 +- .../org/apache/beam/sdk/transforms/DoFn.java | 52 +- .../apache/beam/sdk/transforms/DoFnTester.java | 21 +- .../org/apache/beam/sdk/transforms/ParDo.java | 41 +- .../sdk/transforms/SerializableFunctions.java | 50 -- .../org/apache/beam/sdk/transforms/View.java | 38 +- .../reflect/ByteBuddyDoFnInvokerFactory.java | 27 - .../reflect/ByteBuddyOnTimerInvokerFactory.java | 73 +- .../sdk/transforms/reflect/DoFnInvoker.java | 17 +- .../sdk/transforms/reflect/DoFnSignature.java | 33 +- .../sdk/transforms/reflect/DoFnSignatures.java | 44 +- .../reflect/OnTimerMethodSpecifier.java | 37 - .../transforms/splittabledofn/OffsetRange.java | 77 ++ .../splittabledofn/OffsetRangeTracker.java | 11 - .../splittabledofn/RestrictionTracker.java | 11 +- .../sdk/transforms/windowing/GlobalWindows.java | 5 - .../windowing/PartitioningWindowFn.java | 5 - .../transforms/windowing/SlidingWindows.java | 5 - .../beam/sdk/transforms/windowing/Window.java | 32 - .../beam/sdk/transforms/windowing/WindowFn.java | 11 - .../apache/beam/sdk/util/IdentityWindowFn.java | 5 - .../org/apache/beam/sdk/values/PCollection.java | 12 - .../beam/sdk/values/PCollectionViews.java | 38 - .../org/apache/beam/sdk/values/PValueBase.java | 12 + .../org/apache/beam/sdk/values/ShardedKey.java | 65 -- .../beam/sdk/values/WindowingStrategy.java | 46 +- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 85 +-- .../beam/sdk/io/DefaultFilenamePolicyTest.java | 135 ++-- .../sdk/io/DrunkWritableByteChannelFactory.java | 2 +- .../apache/beam/sdk/io/FileBasedSinkTest.java | 93 +-- .../java/org/apache/beam/sdk/io/SimpleSink.java | 56 +- .../java/org/apache/beam/sdk/io/TextIOTest.java | 326 +-------- .../org/apache/beam/sdk/io/WriteFilesTest.java | 366 ++-------- .../options/PipelineOptionsValidatorTest.java | 44 -- .../sdk/options/ProxyInvocationHandlerTest.java | 19 - .../sdk/runners/TransformHierarchyTest.java | 197 ----- .../sdk/testing/PCollectionViewTesting.java | 8 - .../apache/beam/sdk/transforms/CombineTest.java | 365 ++++------ .../beam/sdk/transforms/DoFnTesterTest.java | 32 - .../beam/sdk/transforms/GroupByKeyTest.java | 39 - .../apache/beam/sdk/transforms/ParDoTest.java | 165 ----- .../beam/sdk/transforms/SplittableDoFnTest.java | 155 +--- .../transforms/reflect/DoFnInvokersTest.java | 93 +-- .../DoFnSignaturesProcessElementTest.java | 2 +- .../DoFnSignaturesSplittableDoFnTest.java | 83 +-- .../transforms/reflect/DoFnSignaturesTest.java | 14 - .../splittabledofn/OffsetRangeTrackerTest.java | 1 - .../windowing/SlidingWindowsTest.java | 30 +- .../google-cloud-platform-core/pom.xml | 2 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 2 +- .../sdk/util/RetryHttpRequestInitializer.java | 147 ++-- .../extensions/gcp/GcpCoreApiSurfaceTest.java | 48 +- .../util/RetryHttpRequestInitializerTest.java | 31 +- sdks/java/extensions/jackson/pom.xml | 2 +- sdks/java/extensions/join-library/pom.xml | 2 +- sdks/java/extensions/pom.xml | 2 +- sdks/java/extensions/protobuf/pom.xml | 2 +- sdks/java/extensions/sorter/pom.xml | 8 +- sdks/java/harness/pom.xml | 18 +- .../harness/control/ProcessBundleHandler.java | 295 ++++++-- .../fn/harness/control/RegisterHandler.java | 2 +- .../beam/runners/core/BeamFnDataReadRunner.java | 70 +- .../runners/core/BeamFnDataWriteRunner.java | 67 +- .../beam/runners/core/BoundedSourceRunner.java | 74 +- .../beam/runners/core/FnApiDoFnRunner.java | 547 -------------- .../runners/core/PTransformRunnerFactory.java | 81 --- .../control/ProcessBundleHandlerTest.java | 521 ++++++++++++-- .../fn/harness/control/RegisterHandlerTest.java | 8 +- .../runners/core/BeamFnDataReadRunnerTest.java | 112 +-- .../runners/core/BeamFnDataWriteRunnerTest.java | 120 +--- .../runners/core/BoundedSourceRunnerTest.java | 124 +--- .../beam/runners/core/FnApiDoFnRunnerTest.java | 210 ------ sdks/java/io/amqp/pom.xml | 100 --- .../org/apache/beam/sdk/io/amqp/AmqpIO.java | 399 ----------- .../beam/sdk/io/amqp/AmqpMessageCoder.java | 79 -- .../amqp/AmqpMessageCoderProviderRegistrar.java | 44 -- .../apache/beam/sdk/io/amqp/package-info.java | 22 - .../org/apache/beam/sdk/io/amqp/AmqpIOTest.java | 148 ---- .../beam/sdk/io/amqp/AmqpMessageCoderTest.java | 89 --- sdks/java/io/cassandra/pom.xml | 2 +- .../beam/sdk/io/cassandra/CassandraIO.java | 2 +- sdks/java/io/common/pom.xml | 2 +- .../sdk/io/common/IOTestPipelineOptions.java | 6 +- sdks/java/io/elasticsearch/pom.xml | 10 +- .../sdk/io/elasticsearch/ElasticsearchIO.java | 17 +- .../elasticsearch/ElasticSearchIOTestUtils.java | 81 +-- .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 14 +- .../io/elasticsearch/ElasticsearchIOTest.java | 36 +- .../elasticsearch/ElasticsearchTestDataSet.java | 37 +- sdks/java/io/google-cloud-platform/pom.xml | 14 +- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 2 - .../io/gcp/bigquery/DynamicDestinations.java | 29 +- .../io/gcp/bigquery/GenerateShardedTable.java | 1 - .../beam/sdk/io/gcp/bigquery/ShardedKey.java | 67 ++ .../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 74 ++ .../sdk/io/gcp/bigquery/StreamingWriteFn.java | 1 - .../io/gcp/bigquery/StreamingWriteTables.java | 2 - .../sdk/io/gcp/bigquery/TagWithUniqueIds.java | 1 - .../io/gcp/bigquery/WriteBundlesToFiles.java | 2 - .../bigquery/WriteGroupedRecordsToFiles.java | 1 - .../sdk/io/gcp/bigquery/WritePartition.java | 1 - .../beam/sdk/io/gcp/bigquery/WriteTables.java | 1 - .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 8 +- .../io/gcp/bigtable/BigtableServiceImpl.java | 9 +- .../sdk/io/gcp/datastore/AdaptiveThrottler.java | 103 --- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 149 +--- .../sdk/io/gcp/datastore/MovingAverage.java | 50 -- .../sdk/io/gcp/spanner/AbstractSpannerFn.java | 58 -- .../sdk/io/gcp/spanner/CreateTransactionFn.java | 51 -- .../beam/sdk/io/gcp/spanner/MutationGroup.java | 67 -- .../io/gcp/spanner/MutationSizeEstimator.java | 9 - .../sdk/io/gcp/spanner/NaiveSpannerReadFn.java | 65 -- .../beam/sdk/io/gcp/spanner/SpannerConfig.java | 137 ---- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 616 +++++----------- .../sdk/io/gcp/spanner/SpannerWriteGroupFn.java | 125 ---- .../beam/sdk/io/gcp/spanner/Transaction.java | 33 - .../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 10 - .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 - .../sdk/io/gcp/bigtable/BigtableReadIT.java | 5 +- .../io/gcp/bigtable/BigtableTestOptions.java | 5 + .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 4 +- .../io/gcp/datastore/AdaptiveThrottlerTest.java | 111 --- .../sdk/io/gcp/datastore/DatastoreV1Test.java | 92 +-- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 2 +- .../sdk/io/gcp/spanner/FakeServiceFactory.java | 82 --- .../gcp/spanner/MutationSizeEstimatorTest.java | 12 - .../beam/sdk/io/gcp/spanner/RandomUtils.java | 41 -- .../sdk/io/gcp/spanner/SpannerIOReadTest.java | 281 -------- .../beam/sdk/io/gcp/spanner/SpannerIOTest.java | 244 +++++++ .../sdk/io/gcp/spanner/SpannerIOWriteTest.java | 258 ------- .../beam/sdk/io/gcp/spanner/SpannerReadIT.java | 166 ----- .../beam/sdk/io/gcp/spanner/SpannerWriteIT.java | 26 +- sdks/java/io/hadoop-common/pom.xml | 2 +- sdks/java/io/hadoop-file-system/pom.xml | 33 +- sdks/java/io/hadoop/input-format/pom.xml | 2 +- .../hadoop/inputformat/HadoopInputFormatIO.java | 2 +- sdks/java/io/hadoop/jdk1.8-tests/pom.xml | 4 +- .../inputformat/HIFIOWithElasticTest.java | 11 +- sdks/java/io/hadoop/pom.xml | 2 +- sdks/java/io/hbase/pom.xml | 26 +- .../io/hbase/HBaseCoderProviderRegistrar.java | 40 -- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 48 +- .../beam/sdk/io/hbase/HBaseMutationCoder.java | 42 -- .../hbase/HBaseCoderProviderRegistrarTest.java | 45 -- .../apache/beam/sdk/io/hbase/HBaseIOTest.java | 49 +- sdks/java/io/hcatalog/pom.xml | 175 ----- .../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 492 ------------- .../beam/sdk/io/hcatalog/package-info.java | 22 - .../io/hcatalog/EmbeddedMetastoreService.java | 87 --- .../beam/sdk/io/hcatalog/HCatalogIOTest.java | 277 -------- .../sdk/io/hcatalog/HCatalogIOTestUtils.java | 108 --- .../hcatalog/src/test/resources/hive-site.xml | 301 -------- sdks/java/io/jdbc/pom.xml | 4 +- .../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 2 +- sdks/java/io/jms/pom.xml | 2 +- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 2 +- sdks/java/io/kafka/pom.xml | 2 +- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 132 ++-- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 30 - sdks/java/io/kinesis/pom.xml | 2 +- .../sdk/io/kinesis/CheckpointGenerator.java | 6 +- .../beam/sdk/io/kinesis/CustomOptional.java | 111 ++- .../io/kinesis/DynamicCheckpointGenerator.java | 52 +- .../sdk/io/kinesis/GetKinesisRecordsResult.java | 49 +- .../sdk/io/kinesis/KinesisClientProvider.java | 4 +- .../apache/beam/sdk/io/kinesis/KinesisIO.java | 281 ++++---- .../beam/sdk/io/kinesis/KinesisReader.java | 206 +++--- .../sdk/io/kinesis/KinesisReaderCheckpoint.java | 97 ++- .../beam/sdk/io/kinesis/KinesisRecord.java | 177 +++-- .../beam/sdk/io/kinesis/KinesisRecordCoder.java | 68 +- .../beam/sdk/io/kinesis/KinesisSource.java | 147 ++-- .../beam/sdk/io/kinesis/RecordFilter.java | 18 +- .../apache/beam/sdk/io/kinesis/RoundRobin.java | 37 +- .../beam/sdk/io/kinesis/ShardCheckpoint.java | 241 ++++--- .../sdk/io/kinesis/ShardRecordsIterator.java | 106 ++- .../sdk/io/kinesis/SimplifiedKinesisClient.java | 215 +++--- .../beam/sdk/io/kinesis/StartingPoint.java | 84 ++- .../io/kinesis/StaticCheckpointGenerator.java | 27 +- .../io/kinesis/TransientKinesisException.java | 7 +- .../beam/sdk/io/kinesis/AmazonKinesisMock.java | 539 +++++++------- .../beam/sdk/io/kinesis/CustomOptionalTest.java | 27 +- .../kinesis/DynamicCheckpointGeneratorTest.java | 33 +- .../sdk/io/kinesis/KinesisMockReadTest.java | 97 ++- .../io/kinesis/KinesisReaderCheckpointTest.java | 52 +- .../beam/sdk/io/kinesis/KinesisReaderIT.java | 127 ++-- .../beam/sdk/io/kinesis/KinesisReaderTest.java | 166 +++-- .../sdk/io/kinesis/KinesisRecordCoderTest.java | 34 +- .../beam/sdk/io/kinesis/KinesisTestOptions.java | 43 +- .../beam/sdk/io/kinesis/KinesisUploader.java | 70 +- .../beam/sdk/io/kinesis/RecordFilterTest.java | 52 +- .../beam/sdk/io/kinesis/RoundRobinTest.java | 42 +- .../sdk/io/kinesis/ShardCheckpointTest.java | 203 +++--- .../io/kinesis/ShardRecordsIteratorTest.java | 216 +++--- .../io/kinesis/SimplifiedKinesisClientTest.java | 351 +++++---- sdks/java/io/mongodb/pom.xml | 2 +- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 2 +- .../apache/beam/sdk/io/mongodb/MongoDbIO.java | 317 ++------- .../beam/sdk/io/mongodb/MongoDbIOTest.java | 37 - sdks/java/io/mqtt/pom.xml | 2 +- .../org/apache/beam/sdk/io/mqtt/MqttIO.java | 2 +- sdks/java/io/pom.xml | 35 +- sdks/java/io/xml/pom.xml | 2 +- .../java/org/apache/beam/sdk/io/xml/XmlIO.java | 4 +- .../org/apache/beam/sdk/io/xml/XmlSink.java | 21 +- .../org/apache/beam/sdk/io/xml/XmlSinkTest.java | 4 +- sdks/java/java8tests/pom.xml | 2 +- sdks/java/javadoc/pom.xml | 19 +- .../maven-archetypes/examples-java8/pom.xml | 2 +- .../main/resources/archetype-resources/pom.xml | 1 + sdks/java/maven-archetypes/examples/pom.xml | 2 +- .../main/resources/archetype-resources/pom.xml | 1 + sdks/java/maven-archetypes/pom.xml | 2 +- sdks/java/maven-archetypes/starter/pom.xml | 2 +- .../resources/projects/basic/reference/pom.xml | 2 +- sdks/java/pom.xml | 2 +- sdks/pom.xml | 2 +- sdks/python/apache_beam/coders/coder_impl.py | 4 - sdks/python/apache_beam/coders/coders.py | 7 +- .../apache_beam/coders/coders_test_common.py | 8 - .../examples/snippets/snippets_test.py | 16 - .../apache_beam/examples/streaming_wordcount.py | 25 +- .../apache_beam/examples/windowed_wordcount.py | 93 --- sdks/python/apache_beam/io/filesystem.py | 22 +- sdks/python/apache_beam/io/gcp/gcsio.py | 10 +- sdks/python/apache_beam/io/gcp/pubsub.py | 180 ++--- sdks/python/apache_beam/io/gcp/pubsub_test.py | 101 +-- .../io/gcp/tests/bigquery_matcher.py | 6 +- .../io/gcp/tests/bigquery_matcher_test.py | 2 +- sdks/python/apache_beam/io/range_trackers.py | 130 ++++ .../apache_beam/io/range_trackers_test.py | 186 +++++ .../apache_beam/options/pipeline_options.py | 35 +- .../options/pipeline_options_test.py | 39 +- .../apache_beam/options/value_provider_test.py | 93 ++- sdks/python/apache_beam/pipeline.py | 230 +----- sdks/python/apache_beam/pipeline_test.py | 53 -- sdks/python/apache_beam/portability/__init__.py | 18 - .../apache_beam/portability/api/__init__.py | 21 - sdks/python/apache_beam/pvalue.py | 2 +- sdks/python/apache_beam/runners/api/__init__.py | 21 + .../runners/dataflow/dataflow_runner.py | 91 +-- .../runners/dataflow/dataflow_runner_test.py | 24 +- .../runners/dataflow/internal/apiclient.py | 35 +- .../runners/dataflow/internal/apiclient_test.py | 29 +- .../runners/dataflow/internal/dependency.py | 69 +- .../runners/dataflow/native_io/iobase_test.py | 39 +- .../dataflow/native_io/streaming_create.py | 72 -- .../runners/dataflow/ptransform_overrides.py | 52 -- .../runners/direct/bundle_factory.py | 2 +- .../apache_beam/runners/direct/direct_runner.py | 108 --- .../runners/direct/evaluation_context.py | 73 +- .../apache_beam/runners/direct/executor.py | 135 ++-- .../runners/direct/transform_evaluator.py | 447 +----------- .../runners/direct/transform_result.py | 41 ++ sdks/python/apache_beam/runners/direct/util.py | 67 -- .../runners/direct/watermark_manager.py | 100 +-- .../apache_beam/runners/pipeline_context.py | 19 +- .../runners/portability/fn_api_runner.py | 306 ++++---- .../runners/portability/fn_api_runner_test.py | 31 +- .../runners/worker/bundle_processor.py | 426 ----------- .../apache_beam/runners/worker/data_plane.py | 28 +- .../runners/worker/data_plane_test.py | 2 +- .../apache_beam/runners/worker/log_handler.py | 2 +- .../runners/worker/log_handler_test.py | 2 +- .../runners/worker/operation_specs.py | 9 +- .../apache_beam/runners/worker/operations.py | 1 - .../apache_beam/runners/worker/sdk_worker.py | 370 +++++++++- .../runners/worker/sdk_worker_main.py | 2 +- .../runners/worker/sdk_worker_test.py | 95 ++- sdks/python/apache_beam/testing/test_stream.py | 5 - .../apache_beam/testing/test_stream_test.py | 68 -- sdks/python/apache_beam/transforms/combiners.py | 8 - .../apache_beam/transforms/combiners_test.py | 7 +- sdks/python/apache_beam/transforms/core.py | 102 ++- .../python/apache_beam/transforms/ptransform.py | 43 +- sdks/python/apache_beam/transforms/trigger.py | 28 +- sdks/python/apache_beam/transforms/window.py | 4 +- .../apache_beam/typehints/trivial_inference.py | 3 +- .../typehints/trivial_inference_test.py | 7 - sdks/python/apache_beam/utils/plugin.py | 42 -- sdks/python/apache_beam/utils/timestamp.py | 5 - sdks/python/apache_beam/utils/urns.py | 2 +- sdks/python/apache_beam/version.py | 2 +- sdks/python/gen_protos.py | 2 +- sdks/python/pom.xml | 2 +- sdks/python/run_pylint.sh | 2 +- sdks/python/setup.py | 5 +- 462 files changed, 10754 insertions(+), 21718 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 36c5cc8..bd419a7 100644 --- a/.gitignore +++ b/.gitignore @@ -25,7 +25,7 @@ sdks/python/**/*.egg sdks/python/LICENSE sdks/python/NOTICE sdks/python/README.md -sdks/python/apache_beam/portability/api/*pb2*.* +sdks/python/apache_beam/runners/api/*pb2*.* # Ignore IntelliJ files. .idea/ http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/common_job_properties.groovy ---------------------------------------------------------------------- diff --git a/.test-infra/jenkins/common_job_properties.groovy b/.test-infra/jenkins/common_job_properties.groovy index 70534c6..6d4d68b 100644 --- a/.test-infra/jenkins/common_job_properties.groovy +++ b/.test-infra/jenkins/common_job_properties.groovy @@ -23,12 +23,11 @@ class common_job_properties { // Sets common top-level job properties for website repository jobs. - static void setTopLevelWebsiteJobProperties(context, - String branch = 'asf-site') { + static void setTopLevelWebsiteJobProperties(context) { setTopLevelJobProperties( context, 'beam-site', - branch, + 'asf-site', 'beam', 30) } @@ -265,10 +264,8 @@ class common_job_properties { shell('rm -rf PerfKitBenchmarker') // Clone appropriate perfkit branch shell('git clone https://github.com/GoogleCloudPlatform/PerfKitBenchmarker.git') - // Install Perfkit benchmark requirements. + // Install job requirements. shell('pip install --user -r PerfKitBenchmarker/requirements.txt') - // Install job requirements for Python SDK. - shell('pip install --user -e sdks/python/[gcp,test]') // Launch performance test. shell("python PerfKitBenchmarker/pkb.py $pkbArgs") } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy ---------------------------------------------------------------------- diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy deleted file mode 100644 index 6a71bda..0000000 --- a/.test-infra/jenkins/job_beam_PerformanceTests_Python.groovy +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import common_job_properties - -// This job runs the Beam Python performance tests on PerfKit Benchmarker. -job('beam_PerformanceTests_Python'){ - // Set default Beam job properties. - common_job_properties.setTopLevelMainJobProperties(delegate) - - // Run job in postcommit every 6 hours, don't trigger every push. - common_job_properties.setPostCommit( - delegate, - '0 */6 * * *', - false, - '[email protected]') - - // Allows triggering this build against pull requests. - common_job_properties.enablePhraseTriggeringFromPullRequest( - delegate, - 'Python SDK Performance Test', - 'Run Python Performance Test') - - def pipelineArgs = [ - project: 'apache-beam-testing', - staging_location: 'gs://temp-storage-for-end-to-end-tests/staging-it', - temp_location: 'gs://temp-storage-for-end-to-end-tests/temp-it', - output: 'gs://temp-storage-for-end-to-end-tests/py-it-cloud/output' - ] - def pipelineArgList = [] - pipelineArgs.each({ - key, value -> pipelineArgList.add("--$key=$value") - }) - def pipelineArgsJoined = pipelineArgList.join(',') - - def argMap = [ - beam_sdk : 'python', - benchmarks: 'beam_integration_benchmark', - beam_it_args: pipelineArgsJoined - ] - - common_job_properties.buildPerformanceTest(delegate, argMap) -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy ---------------------------------------------------------------------- diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy index df0a2c7..f23e741 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_JDKVersionsTest.groovy @@ -37,8 +37,6 @@ matrixJob('beam_PostCommit_Java_JDK_Versions_Test') { common_job_properties.setPostCommit( delegate, '0 */6 * * *', - false, - '', // TODO: Remove last two args once test is stable again. false) // Allows triggering this build against pull requests. http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy ---------------------------------------------------------------------- diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy index 6ef272c..f781b4e 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall_Windows.groovy @@ -32,8 +32,7 @@ mavenJob('beam_PostCommit_Java_MavenInstall_Windows') { common_job_properties.setMavenConfig(delegate, 'Maven 3.3.3 (Windows)') // Sets that this is a PostCommit job. - // TODO(BEAM-1042, BEAM-1045, BEAM-2269, BEAM-2299) Turn notifications back on once fixed. - common_job_properties.setPostCommit(delegate, '0 */6 * * *', false, '', false) + common_job_properties.setPostCommit(delegate, '0 */6 * * *', false) // Allows triggering this build against pull requests. common_job_properties.enablePhraseTriggeringFromPullRequest( http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy ---------------------------------------------------------------------- diff --git a/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy b/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy deleted file mode 100644 index 0e2ae3f..0000000 --- a/.test-infra/jenkins/job_beam_PreCommit_Website_Merge.groovy +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import common_job_properties - -// Defines a job. -job('beam_PreCommit_Website_Merge') { - description('Runs website tests for mergebot.') - - // Set common parameters. - common_job_properties.setTopLevelWebsiteJobProperties(delegate, 'mergebot') - - triggers { - githubPush() - } - - steps { - // Run the following shell script as a build step. - shell ''' - # Install RVM per instructions at https://rvm.io/rvm/install. - RVM_GPG_KEY=409B6B1796C275462A1703113804BB82D39DC0E3 - gpg --keyserver hkp://keys.gnupg.net --recv-keys $RVM_GPG_KEY - - \\curl -sSL https://get.rvm.io | bash - source /home/jenkins/.rvm/scripts/rvm - - # Install Ruby. - RUBY_VERSION_NUM=2.3.0 - rvm install ruby $RUBY_VERSION_NUM --autolibs=read-only - - # Install Bundler gem - PATH=~/.gem/ruby/$RUBY_VERSION_NUM/bin:$PATH - GEM_PATH=~/.gem/ruby/$RUBY_VERSION_NUM/:$GEM_PATH - gem install bundler --user-install - - # Install all needed gems. - bundle install --path ~/.gem/ - - # Build the new site and test it. - rm -fr ./content/ - bundle exec rake test - '''.stripIndent().trim() - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index ae64a79..701e4fe 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-examples-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -34,6 +34,10 @@ <packaging>jar</packaging> + <properties> + <spark.version>1.6.2</spark.version> + </properties> + <profiles> <!-- @@ -62,12 +66,6 @@ <groupId>org.apache.beam</groupId> <artifactId>beam-runners-apex</artifactId> <scope>runtime</scope> - <exclusions> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> </dependency> <!-- Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from @@ -97,12 +95,6 @@ <groupId>org.apache.beam</groupId> <artifactId>beam-runners-flink_2.10</artifactId> <scope>runtime</scope> - <exclusions> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> </dependency> </dependencies> </profile> @@ -124,11 +116,13 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> <scope>runtime</scope> <exclusions> <exclusion> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java index 49865ba..5e6df9c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java @@ -17,12 +17,11 @@ */ package org.apache.beam.examples.common; -import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.Verify.verifyNotNull; import javax.annotation.Nullable; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; -import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; @@ -54,12 +53,22 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone @Override public PDone expand(PCollection<String> input) { + // filenamePrefix may contain a directory and a filename component. Pull out only the filename + // component from that path for the PerWindowFiles. + String prefix = ""; ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix); - TextIO.Write write = - TextIO.write() - .to(new PerWindowFiles(resource)) - .withTempDirectory(resource.getCurrentDirectory()) - .withWindowedWrites(); + if (!resource.isDirectory()) { + prefix = verifyNotNull( + resource.getFilename(), + "A non-directory resource should have a non-null filename: %s", + resource); + } + + + TextIO.Write write = TextIO.write() + .to(resource.getCurrentDirectory()) + .withFilenamePolicy(new PerWindowFiles(prefix)) + .withWindowedWrites(); if (numShards != null) { write = write.withNumShards(numShards); } @@ -74,36 +83,31 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone */ public static class PerWindowFiles extends FilenamePolicy { - private final ResourceId baseFilename; + private final String prefix; - public PerWindowFiles(ResourceId baseFilename) { - this.baseFilename = baseFilename; + public PerWindowFiles(String prefix) { + this.prefix = prefix; } public String filenamePrefixForWindow(IntervalWindow window) { - String prefix = - baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), ""); return String.format("%s-%s-%s", prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end())); } @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { + public ResourceId windowedFilename( + ResourceId outputDirectory, WindowedContext context, String extension) { IntervalWindow window = (IntervalWindow) context.getWindow(); - String filename = - String.format( - "%s-%s-of-%s%s", - filenamePrefixForWindow(window), - context.getShardNumber(), - context.getNumShards(), - outputFileHints.getSuggestedFilenameSuffix()); - return baseFilename - .getCurrentDirectory() - .resolve(filename, StandardResolveOptions.RESOLVE_FILE); + String filename = String.format( + "%s-%s-of-%s%s", + filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(), + extension); + return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + ResourceId outputDirectory, Context context, String extension) { throw new UnsupportedOperationException("Unsupported."); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index bec7952..eb7e4c4 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -32,7 +32,6 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -150,8 +149,7 @@ public class WindowedWordCountIT { String outputPrefix = options.getOutput(); - PerWindowFiles filenamePolicy = - new PerWindowFiles(FileBasedSink.convertToFileResourceIfPossible(outputPrefix)); + PerWindowFiles filenamePolicy = new PerWindowFiles(outputPrefix); List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java8/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml index 6fd29a4..56295a4 100644 --- a/examples/java8/pom.xml +++ b/examples/java8/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-examples-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -35,6 +35,10 @@ <packaging>jar</packaging> + <properties> + <spark.version>1.6.2</spark.version> + </properties> + <profiles> <!-- The direct runner is available by default. @@ -62,12 +66,6 @@ <groupId>org.apache.beam</groupId> <artifactId>beam-runners-apex</artifactId> <scope>runtime</scope> - <exclusions> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> </dependency> <!-- Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from @@ -97,12 +95,6 @@ <groupId>org.apache.beam</groupId> <artifactId>beam-runners-flink_2.10</artifactId> <scope>runtime</scope> - <exclusions> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> </dependency> </dependencies> </profile> @@ -124,11 +116,13 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> <scope>runtime</scope> <exclusions> <exclusion> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java index 1d60198..e6c8ddb 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java @@ -18,6 +18,7 @@ package org.apache.beam.examples.complete.game.utils; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verifyNotNull; import java.io.Serializable; import java.util.ArrayList; @@ -27,7 +28,6 @@ import java.util.TimeZone; import java.util.stream.Collectors; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; -import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; @@ -111,12 +111,21 @@ public class WriteToText<InputT> checkArgument( input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder()); + // filenamePrefix may contain a directory and a filename component. Pull out only the filename + // component from that path for the PerWindowFiles. + String prefix = ""; ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix); + if (!resource.isDirectory()) { + prefix = verifyNotNull( + resource.getFilename(), + "A non-directory resource should have a non-null filename: %s", + resource); + } return input.apply( TextIO.write() - .to(new PerWindowFiles(resource)) - .withTempDirectory(resource.getCurrentDirectory()) + .to(resource.getCurrentDirectory()) + .withFilenamePolicy(new PerWindowFiles(prefix)) .withWindowedWrites() .withNumShards(3)); } @@ -130,33 +139,31 @@ public class WriteToText<InputT> */ protected static class PerWindowFiles extends FilenamePolicy { - private final ResourceId prefix; + private final String prefix; - public PerWindowFiles(ResourceId prefix) { + public PerWindowFiles(String prefix) { this.prefix = prefix; } public String filenamePrefixForWindow(IntervalWindow window) { - String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename(); - return String.format( - "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end())); + return String.format("%s-%s-%s", + prefix, formatter.print(window.start()), formatter.print(window.end())); } @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { + public ResourceId windowedFilename( + ResourceId outputDirectory, WindowedContext context, String extension) { IntervalWindow window = (IntervalWindow) context.getWindow(); - String filename = - String.format( - "%s-%s-of-%s%s", - filenamePrefixForWindow(window), - context.getShardNumber(), - context.getNumShards(), - outputFileHints.getSuggestedFilenameSuffix()); - return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE); + String filename = String.format( + "%s-%s-of-%s%s", + filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(), + extension); + return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + ResourceId outputDirectory, Context context, String extension) { throw new UnsupportedOperationException("Unsupported."); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java index 611e2b3..745c210 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java @@ -276,8 +276,6 @@ public class LeaderBoardTest implements Serializable { .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)), event(TestUser.BLUE_TWO, 3, Duration.ZERO), event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3))) - // Move the watermark to the end of the window to output on time - .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION)) // Move the watermark past the end of the allowed lateness plus the end of the window .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS) .plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1))) http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 51f4c35..a7e61dd 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 987760f..1d8d4b0 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ <url>http://beam.apache.org/</url> <inceptionYear>2016</inceptionYear> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <licenses> <license> @@ -101,14 +101,13 @@ <beamSurefireArgline /> <!-- If updating dependencies, please update any relevant javadoc offlineLinks --> - <apache.commons.compress.version>1.14</apache.commons.compress.version> - <apache.commons.lang.version>3.6</apache.commons.lang.version> - <apache.commons.text.version>1.1</apache.commons.text.version> + <apache.commons.lang.version>3.5</apache.commons.lang.version> + <apache.commons.compress.version>1.9</apache.commons.compress.version> <apex.kryo.version>2.24.0</apex.kryo.version> <api-common.version>1.0.0-rc2</api-common.version> <avro.version>1.8.2</avro.version> <bigquery.version>v2-rev295-1.22.0</bigquery.version> - <bigtable.version>0.9.7.1</bigtable.version> + <bigtable.version>0.9.6.2</bigtable.version> <cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version> <pubsubgrpc.version>0.1.0</pubsubgrpc.version> <clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version> @@ -127,14 +126,8 @@ <guava.version>20.0</guava.version> <grpc.version>1.2.0</grpc.version> <grpc-google-common-protos.version>0.1.9</grpc-google-common-protos.version> - <!-- - This is the version of Hadoop used to compile the module that depend on Hadoop. - This dependency is defined with a provided scope. - Users must supply their own Hadoop version at runtime. - --> - <hadoop.version>2.7.3</hadoop.version> <hamcrest.version>1.3</hamcrest.version> - <jackson.version>2.8.9</jackson.version> + <jackson.version>2.8.8</jackson.version> <findbugs.version>3.0.1</findbugs.version> <joda.version>2.4</joda.version> <junit.version>4.12</junit.version> @@ -144,8 +137,8 @@ <protobuf.version>3.2.0</protobuf.version> <pubsub.version>v1-rev10-1.22.0</pubsub.version> <slf4j.version>1.7.14</slf4j.version> - <spanner.version>0.20.0-beta</spanner.version> - <spark.version>1.6.3</spark.version> + <spanner.version>0.16.0-beta</spanner.version> + <spark.version>1.6.2</spark.version> <spring.version>4.3.5.RELEASE</spring.version> <stax2.version>3.1.4</stax2.version> <storage.version>v1-rev71-1.22.0</storage.version> @@ -159,7 +152,7 @@ <failsafe-plugin.version>2.20</failsafe-plugin.version> <maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version> <maven-dependency-plugin.version>3.0.1</maven-dependency-plugin.version> - <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version> + <maven-exec-plugin.version>1.4.0</maven-exec-plugin.version> <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version> <maven-resources-plugin.version>3.0.2</maven-resources-plugin.version> <maven-shade-plugin.version>3.0.0</maven-shade-plugin.version> @@ -167,7 +160,6 @@ <compiler.error.flag>-Werror</compiler.error.flag> <compiler.default.pkginfo.flag>-Xpkginfo:always</compiler.default.pkginfo.flag> <compiler.default.exclude>nothing</compiler.default.exclude> - <gax-grpc.version>0.20.0</gax-grpc.version> </properties> <packaging>pom</packaging> @@ -429,18 +421,6 @@ <dependency> <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-amqp</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-cassandra</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-elasticsearch</artifactId> <version>${project.version}</version> </dependency> @@ -478,12 +458,6 @@ <dependency> <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-hcatalog</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-jdbc</artifactId> <version>${project.version}</version> </dependency> @@ -538,13 +512,6 @@ <dependency> <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-core-java</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${project.version}</version> </dependency> @@ -598,12 +565,6 @@ </dependency> <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-text</artifactId> - <version>${apache.commons.text.version}</version> - </dependency> - - <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-all</artifactId> <version>${grpc.version}</version> @@ -664,12 +625,6 @@ </dependency> <dependency> - <groupId>com.google.api</groupId> - <artifactId>gax-grpc</artifactId> - <version>${gax-grpc.version}</version> - </dependency> - - <dependency> <groupId>com.google.api-client</groupId> <artifactId>google-api-client</artifactId> <version>${google-clients.version}</version> @@ -884,11 +839,6 @@ </dependency> <dependency> - <groupId>com.google.cloud</groupId> - <artifactId>google-cloud-core-grpc</artifactId> - <version>${grpc.version}</version> - </dependency> - <dependency> <groupId>com.google.cloud.bigtable</groupId> <artifactId>bigtable-protos</artifactId> <version>${bigtable.version}</version> @@ -1100,42 +1050,6 @@ <version>${snappy-java.version}</version> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.10</artifactId> - <version>${spark.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_2.10</artifactId> - <version>${spark.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-network-common_2.10</artifactId> - <version>${spark.version}</version> - </dependency> - <!-- Testing --> <dependency> @@ -1205,27 +1119,6 @@ <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index fd5aafb..4a36bec 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -75,13 +75,6 @@ <artifactId>apex-engine</artifactId> <version>${apex.core.version}</version> <scope>runtime</scope> - <exclusions> - <!-- Fix build on JDK-9 --> - <exclusion> - <groupId>jdk.tools</groupId> - <artifactId>jdk.tools</artifactId> - </exclusion> - </exclusions> </dependency> <!-- Beam --> @@ -191,13 +184,6 @@ <type>test-jar</type> <scope>test</scope> </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-core-java</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> </dependencies> <build> @@ -263,12 +249,12 @@ <configuration> <ignoredUsedUndeclaredDependencies> <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:${apex.core.version}</ignoredUsedUndeclaredDependency> - <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::${apache.commons.lang.version}</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency> <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:2.4</ignoredUsedUndeclaredDependency> <ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::${apex.kryo.version}</ignoredUsedUndeclaredDependency> <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.3.0</ignoredUsedUndeclaredDependency> <ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency> - <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:${hadoop.version}</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.6.0</ignoredUsedUndeclaredDependency> <ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency> <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:20.0</ignoredUsedUndeclaredDependency> </ignoredUsedUndeclaredDependencies> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index fd0a1c9..c595b3f 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -62,6 +62,8 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.MultiOutput; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.View.AsIterable; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -212,7 +214,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input */ public static class CreateApexPCollectionView<ElemT, ViewT> - extends PTransform<PCollection<ElemT>, PCollection<ElemT>> { + extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { private static final long serialVersionUID = 1L; private PCollectionView<ViewT> view; @@ -226,13 +228,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } @Override - public PCollection<ElemT> expand(PCollection<ElemT> input) { - return PCollection.<ElemT>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setCoder(input.getCoder()); - } - - public PCollectionView<ViewT> getView() { + public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { return view; } } @@ -245,7 +241,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } private static class StreamingWrapSingletonInList<T> - extends PTransform<PCollection<T>, PCollection<T>> { + extends PTransform<PCollection<T>, PCollectionView<T>> { private static final long serialVersionUID = 1L; CreatePCollectionView<T, T> transform; @@ -258,11 +254,10 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } @Override - public PCollection<T> expand(PCollection<T> input) { - input + public PCollectionView<T> expand(PCollection<T> input) { + return input .apply(ParDo.of(new WrapAsList<T>())) - .apply(CreateApexPCollectionView.<List<T>, T>of(transform.getView())); - return input; + .apply(CreateApexPCollectionView.<T, T>of(transform.getView())); } @Override @@ -272,12 +267,15 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { static class Factory<T> extends SingleInputOutputOverrideFactory< - PCollection<T>, PCollection<T>, + PCollection<T>, PCollectionView<T>, CreatePCollectionView<T, T>> { @Override - public PTransformReplacement<PCollection<T>, PCollection<T>> getReplacementTransform( - AppliedPTransform<PCollection<T>, PCollection<T>, CreatePCollectionView<T, T>> - transform) { + public PTransformReplacement<PCollection<T>, PCollectionView<T>> + getReplacementTransform( + AppliedPTransform< + PCollection<T>, PCollectionView<T>, + CreatePCollectionView<T, T>> + transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), new StreamingWrapSingletonInList<>(transform.getTransform())); @@ -286,19 +284,18 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } private static class StreamingViewAsIterable<T> - extends PTransform<PCollection<T>, PCollection<T>> { + extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { private static final long serialVersionUID = 1L; - private final PCollectionView<Iterable<T>> view; - private StreamingViewAsIterable(PCollectionView<Iterable<T>> view) { - this.view = view; - } + private StreamingViewAsIterable() {} @Override - public PCollection<T> expand(PCollection<T> input) { - return ((PCollection<T>) - input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())) - .apply(CreateApexPCollectionView.<T, Iterable<T>>of(view)); + public PCollectionView<Iterable<T>> expand(PCollection<T> input) { + PCollectionView<Iterable<T>> view = + PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateApexPCollectionView.<T, Iterable<T>> of(view)); } @Override @@ -308,17 +305,15 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { static class Factory<T> extends SingleInputOutputOverrideFactory< - PCollection<T>, PCollection<T>, CreatePCollectionView<T, Iterable<T>>> { + PCollection<T>, PCollectionView<Iterable<T>>, View.AsIterable<T>> { @Override - public PTransformReplacement<PCollection<T>, PCollection<T>> + public PTransformReplacement<PCollection<T>, PCollectionView<Iterable<T>>> getReplacementTransform( - AppliedPTransform< - PCollection<T>, PCollection<T>, - CreatePCollectionView<T, Iterable<T>>> + AppliedPTransform<PCollection<T>, PCollectionView<Iterable<T>>, AsIterable<T>> transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - new StreamingViewAsIterable<T>(transform.getTransform().getView())); + new StreamingViewAsIterable<T>()); } } } @@ -381,7 +376,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> transform) { return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), - SplittableParDo.forJavaParDo(transform.getTransform())); + new SplittableParDo<>(transform.getTransform())); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java index 02f53ec..bda074b 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,6 +154,7 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { unboundedSource, true, context.getPipelineOptions()); context.addOperator(operator, operator.output); } + } private static class CreateApexPCollectionViewTranslator<ElemT, ViewT> @@ -160,10 +162,11 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { private static final long serialVersionUID = 1L; @Override - public void translate( - CreateApexPCollectionView<ElemT, ViewT> transform, TranslationContext context) { - context.addView(transform.getView()); - LOG.debug("view {}", transform.getView().getName()); + public void translate(CreateApexPCollectionView<ElemT, ViewT> transform, + TranslationContext context) { + PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput(); + context.addView(view); + LOG.debug("view {}", view.getName()); } } @@ -174,8 +177,9 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { @Override public void translate( CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) { - context.addView(transform.getView()); - LOG.debug("view {}", transform.getView().getName()); + PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput(); + context.addView(view); + LOG.debug("view {}", view.getName()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index 94d13e1..aff3863 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -34,7 +34,6 @@ import org.apache.beam.runners.apex.translation.utils.ApexStateInternals; import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -94,8 +93,7 @@ class TranslationContext { } public <InputT extends PValue> InputT getInput() { - return (InputT) - Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform())); + return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs().values()); } public Map<TupleTag<?>, PValue> getOutputs() { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index c3cbab2..809ca2a 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -359,7 +359,10 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements } } if (sideInputs.isEmpty()) { - outputWatermark(mark); + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", mark); + } + output.emit(mark); return; } @@ -367,20 +370,10 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements Math.min(pushedBackWatermark.get(), currentInputWatermark); if (potentialOutputWatermark > currentOutputWatermark) { currentOutputWatermark = potentialOutputWatermark; - outputWatermark(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark)); - } - } - - private void outputWatermark(ApexStreamTuple.WatermarkTuple<?> mark) { - if (traceTuples) { - LOG.debug("\nemitting {}\n", mark); - } - output.emit(mark); - if (!additionalOutputPortMapping.isEmpty()) { - for (DefaultOutputPort<ApexStreamTuple<?>> additionalOutput : - additionalOutputPortMapping.values()) { - additionalOutput.emit(mark); + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", currentOutputWatermark); } + output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java index ba75746..e76096e 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java @@ -123,15 +123,11 @@ public class WordCountTest { options.setInputFile(new File(inputFile).getAbsolutePath()); String outputFilePrefix = "target/wordcountresult.txt"; options.setOutput(outputFilePrefix); + WordCountTest.main(TestPipeline.convertToArgs(options)); File outFile1 = new File(outputFilePrefix + "-00000-of-00002"); File outFile2 = new File(outputFilePrefix + "-00001-of-00002"); - Assert.assertTrue(!outFile1.exists() || outFile1.delete()); - Assert.assertTrue(!outFile2.exists() || outFile2.delete()); - - WordCountTest.main(TestPipeline.convertToArgs(options)); - - Assert.assertTrue("result files exist", outFile1.exists() && outFile2.exists()); + Assert.assertTrue(outFile1.exists() && outFile2.exists()); HashSet<String> results = new HashSet<>(); results.addAll(FileUtils.readLines(outFile1)); results.addAll(FileUtils.readLines(outFile2));
