Repository: beam Updated Branches: refs/heads/master 70e53e7dc -> 1b363ae9f
Eliminate Pipeline.getOptions Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/55351dce Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/55351dce Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/55351dce Branch: refs/heads/master Commit: 55351dcebec8ba9e166c4f90555edca6b90b1b14 Parents: 70e53e7 Author: Kenneth Knowles <[email protected]> Authored: Wed May 3 03:06:36 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu May 4 13:00:15 2017 -0700 ---------------------------------------------------------------------- .../beam/examples/complete/game/GameStats.java | 7 +++- .../examples/complete/game/HourlyTeamScore.java | 6 ++- .../examples/complete/game/LeaderBoard.java | 26 ++++++++---- .../beam/examples/complete/game/UserScore.java | 20 +++++---- .../complete/game/utils/WriteToBigQuery.java | 32 +++++++------- .../game/utils/WriteWindowedToBigQuery.java | 8 ++-- .../beam/runners/direct/DirectRunner.java | 1 + .../runners/direct/DisplayDataValidator.java | 6 +-- .../dataflow/testing/TestDataflowRunner.java | 3 +- .../testing/TestDataflowRunnerTest.java | 18 +++----- .../apache/beam/runners/spark/SparkRunner.java | 4 +- .../beam/runners/spark/SparkRunnerDebugger.java | 4 +- .../beam/runners/spark/TestSparkRunner.java | 5 +-- .../spark/translation/EvaluationContext.java | 16 +++++-- .../spark/translation/SparkRuntimeContext.java | 4 +- .../SparkRunnerStreamingContextFactory.java | 2 +- .../apache/beam/runners/spark/CacheTest.java | 2 +- .../streaming/TrackStreamingSourcesTest.java | 2 +- .../main/java/org/apache/beam/sdk/Pipeline.java | 14 +------ .../apache/beam/sdk/testing/TestPipeline.java | 26 +++++------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 44 ++++++++++---------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 1 + 22 files changed, 131 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index a46d3c5..abbb13b 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -24,6 +24,7 @@ import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -300,6 +301,8 @@ public class GameStats extends LeaderBoard { // Write the result to BigQuery .apply("WriteTeamSums", new WriteWindowedToBigQuery<KV<String, Integer>>( + options.as(GcpOptions.class).getProject(), + options.getDataset(), options.getGameStatsTablePrefix() + "_team", configureWindowedWrite())); @@ -327,7 +330,9 @@ public class GameStats extends LeaderBoard { // Write this info to a BigQuery table. .apply("WriteAvgSessionLength", new WriteWindowedToBigQuery<Double>( - options.getGameStatsTablePrefix() + "_sessions", configureSessionWindowWrite())); + options.as(GcpOptions.class).getProject(), + options.getDataset(), + options.getGameStatsTablePrefix() + "_sessions", configureSessionWindowWrite())); // [END DocInclude_Rewindow] http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java index 3f1ffb0..2928882 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.TimeZone; import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -185,7 +186,10 @@ public class HourlyTeamScore extends UserScore { // Extract and sum teamname/score pairs from the event data. .apply("ExtractTeamScore", new ExtractAndSumScore("team")) .apply("WriteTeamScoreSums", - new WriteWindowedToBigQuery<KV<String, Integer>>(options.getHourlyTeamScoreTableName(), + new WriteWindowedToBigQuery<KV<String, Integer>>( + options.as(GcpOptions.class).getProject(), + options.getDataset(), + options.getHourlyTeamScoreTableName(), configureWindowedTableWrite())); http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index 9af34c5..bfad9f6 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -27,6 +27,7 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery; import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -194,14 +195,20 @@ public class LeaderBoard extends HourlyTeamScore { .withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic())) .apply("ParseGameEvent", ParDo.of(new ParseEventFn())); - gameEvents.apply("CalculateTeamScores", - new CalculateTeamScores( - Duration.standardMinutes(options.getTeamWindowDuration()), - Duration.standardMinutes(options.getAllowedLateness()))) + gameEvents + .apply( + "CalculateTeamScores", + new CalculateTeamScores( + Duration.standardMinutes(options.getTeamWindowDuration()), + Duration.standardMinutes(options.getAllowedLateness()))) // Write the results to BigQuery. - .apply("WriteTeamScoreSums", - new WriteWindowedToBigQuery<KV<String, Integer>>( - options.getLeaderBoardTableName() + "_team", configureWindowedTableWrite())); + .apply( + "WriteTeamScoreSums", + new WriteWindowedToBigQuery<KV<String, Integer>>( + options.as(GcpOptions.class).getProject(), + options.getDataset(), + options.getLeaderBoardTableName() + "_team", + configureWindowedTableWrite())); gameEvents .apply( "CalculateUserScores", @@ -210,7 +217,10 @@ public class LeaderBoard extends HourlyTeamScore { .apply( "WriteUserScoreSums", new WriteToBigQuery<KV<String, Integer>>( - options.getLeaderBoardTableName() + "_user", configureGlobalWindowBigQueryWrite())); + options.as(GcpOptions.class).getProject(), + options.getDataset(), + options.getLeaderBoardTableName() + "_user", + configureGlobalWindowBigQueryWrite())); // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the // command line. http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java index c136c2e..8110146 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java @@ -24,6 +24,7 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -226,13 +227,18 @@ public class UserScore { Pipeline pipeline = Pipeline.create(options); // Read events from a text file and parse them. - pipeline.apply(TextIO.read().from(options.getInput())) - .apply("ParseGameEvent", ParDo.of(new ParseEventFn())) - // Extract and sum username/score pairs from the event data. - .apply("ExtractUserScore", new ExtractAndSumScore("user")) - .apply("WriteUserScoreSums", - new WriteToBigQuery<KV<String, Integer>>(options.getUserScoreTableName(), - configureBigQueryWrite())); + pipeline + .apply(TextIO.read().from(options.getInput())) + .apply("ParseGameEvent", ParDo.of(new ParseEventFn())) + // Extract and sum username/score pairs from the event data. + .apply("ExtractUserScore", new ExtractAndSumScore("user")) + .apply( + "WriteUserScoreSums", + new WriteToBigQuery<KV<String, Integer>>( + options.as(GcpOptions.class).getProject(), + options.getDataset(), + options.getUserScoreTableName(), + configureBigQueryWrite())); // Run the batch pipeline. pipeline.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java index f767d21..2ec4e5c 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java @@ -25,13 +25,9 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.beam.examples.complete.game.UserScore; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -47,14 +43,21 @@ import org.apache.beam.sdk.values.PDone; public class WriteToBigQuery<InputT> extends PTransform<PCollection<InputT>, PDone> { + protected String projectId; + protected String datasetId; protected String tableName; protected Map<String, FieldInfo<InputT>> fieldInfo; public WriteToBigQuery() { } - public WriteToBigQuery(String tableName, + public WriteToBigQuery( + String projectId, + String datasetId, + String tableName, Map<String, FieldInfo<InputT>> fieldInfo) { + this.projectId = projectId; + this.datasetId = datasetId; this.tableName = tableName; this.fieldInfo = fieldInfo; } @@ -120,20 +123,21 @@ public class WriteToBigQuery<InputT> @Override public PDone expand(PCollection<InputT> teamAndScore) { teamAndScore - .apply("ConvertToRow", ParDo.of(new BuildRowFn())) - .apply(BigQueryIO.writeTableRows().to(getTable(teamAndScore.getPipeline(), tableName)) - .withSchema(getSchema()) - .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(WriteDisposition.WRITE_APPEND)); + .apply("ConvertToRow", ParDo.of(new BuildRowFn())) + .apply( + BigQueryIO.writeTableRows() + .to(getTable(projectId, datasetId, tableName)) + .withSchema(getSchema()) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(WriteDisposition.WRITE_APPEND)); return PDone.in(teamAndScore.getPipeline()); } /** Utility to construct an output table reference. */ - static TableReference getTable(Pipeline pipeline, String tableName) { - PipelineOptions options = pipeline.getOptions(); + static TableReference getTable(String projectId, String datasetId, String tableName) { TableReference table = new TableReference(); - table.setDatasetId(options.as(UserScore.Options.class).getDataset()); - table.setProjectId(options.as(GcpOptions.class).getProject()); + table.setDatasetId(datasetId); + table.setProjectId(projectId); table.setTableId(tableName); return table; } http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java index e602258..deb9db2 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java @@ -37,9 +37,9 @@ import org.apache.beam.sdk.values.PDone; public class WriteWindowedToBigQuery<T> extends WriteToBigQuery<T> { - public WriteWindowedToBigQuery(String tableName, - Map<String, FieldInfo<T>> fieldInfo) { - super(tableName, fieldInfo); + public WriteWindowedToBigQuery( + String projectId, String datasetId, String tableName, Map<String, FieldInfo<T>> fieldInfo) { + super(projectId, datasetId, tableName, fieldInfo); } /** Convert each key/score pair into a BigQuery TableRow. */ @@ -62,7 +62,7 @@ public class WriteWindowedToBigQuery<T> teamAndScore .apply("ConvertToRow", ParDo.of(new BuildRowFn())) .apply(BigQueryIO.writeTableRows() - .to(getTable(teamAndScore.getPipeline(), tableName)) + .to(getTable(projectId, datasetId, tableName)) .withSchema(getSchema()) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(WriteDisposition.WRITE_APPEND)); http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index c6168b3e..984598a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -163,6 +163,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { pipeline.traverseTopologically(keyedPValueVisitor); DisplayDataValidator.validatePipeline(pipeline); + DisplayDataValidator.validateOptions(getPipelineOptions()); DirectGraph graph = graphVisitor.getGraph(); EvaluationContext context = http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java index c77cb48..209c801 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.direct; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; @@ -31,12 +32,11 @@ class DisplayDataValidator { private DisplayDataValidator() {} static void validatePipeline(Pipeline pipeline) { - validateOptions(pipeline); validateTransforms(pipeline); } - private static void validateOptions(Pipeline pipeline) { - evaluateDisplayData(pipeline.getOptions()); + static void validateOptions(PipelineOptions options) { + evaluateDisplayData(options); } private static void validateTransforms(Pipeline pipeline) { http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index ba9d971..c238d80 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -98,7 +98,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { updatePAssertCount(pipeline); - TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); + TestPipelineOptions testPipelineOptions = options.as(TestPipelineOptions.class); final DataflowPipelineJob job; job = runner.run(pipeline); @@ -188,7 +188,6 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { @VisibleForTesting void updatePAssertCount(Pipeline pipeline) { - DataflowPipelineOptions options = pipeline.getOptions().as(DataflowPipelineOptions.class); if (DataflowRunner.hasExperiment(options, "beam_fn_api")) { // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 assertions. expectedNumberOfAssertions = 0; http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index 54eb88d..eb068e6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -464,8 +464,7 @@ public class TestDataflowRunnerTest { when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); - p.getOptions().as(TestPipelineOptions.class) - .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); + options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); when(mockClient.getJobMetrics(anyString())) .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); @@ -488,8 +487,7 @@ public class TestDataflowRunnerTest { when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); - p.getOptions().as(TestPipelineOptions.class) - .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); + options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) .thenReturn(State.DONE); @@ -515,8 +513,7 @@ public class TestDataflowRunnerTest { when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); - p.getOptions().as(TestPipelineOptions.class) - .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); + options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); when(mockClient.getJobMetrics(anyString())) .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); @@ -544,8 +541,7 @@ public class TestDataflowRunnerTest { when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); - p.getOptions().as(TestPipelineOptions.class) - .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); + options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) .thenReturn(State.DONE); @@ -570,8 +566,7 @@ public class TestDataflowRunnerTest { when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); - p.getOptions().as(TestPipelineOptions.class) - .setOnSuccessMatcher(new TestFailureMatcher()); + options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher()); when(mockClient.getJobMetrics(anyString())) .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); @@ -606,8 +601,7 @@ public class TestDataflowRunnerTest { when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); - p.getOptions().as(TestPipelineOptions.class) - .setOnSuccessMatcher(new TestFailureMatcher()); + options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher()); when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) .thenReturn(State.FAILED); http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 97487f3..1a0c042 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -193,7 +193,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { } else { // create the evaluation context final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions); - final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline); + final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline, mOptions); translator = new TransformTranslator.Translator(); // update the cache candidates @@ -383,7 +383,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { LOG.info( "Deferring combine transformation {} for job {}", transform, - ctxt.getPipeline().getOptions().getJobName()); + ctxt.getOptions().getJobName()); return true; } // default. http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java index 7f7aefc..8d47e1a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java @@ -89,10 +89,10 @@ public final class SparkRunnerDebugger extends PipelineRunner<SparkPipelineResul && ((TestSparkPipelineOptions) options).isForceStreaming()) { SparkPipelineTranslator streamingTranslator = new StreamingTransformTranslator.Translator(translator); - EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc); + EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc); visitor = new SparkNativePipelineVisitor(streamingTranslator, ctxt); } else { - EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc); + EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc); visitor = new SparkNativePipelineVisitor(translator, ctxt); } http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 1e67813..6808d7b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -83,6 +83,7 @@ import org.slf4j.LoggerFactory; public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { private static final Logger LOG = LoggerFactory.getLogger(TestSparkRunner.class); + private final TestSparkPipelineOptions testSparkPipelineOptions; private SparkRunner delegate; private boolean isForceStreaming; @@ -90,6 +91,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { private TestSparkRunner(TestSparkPipelineOptions options) { this.delegate = SparkRunner.fromOptions(options); this.isForceStreaming = options.isForceStreaming(); + this.testSparkPipelineOptions = options; } public static TestSparkRunner fromOptions(PipelineOptions options) { @@ -101,9 +103,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { @Override public SparkPipelineResult run(Pipeline pipeline) { - TestSparkPipelineOptions testSparkPipelineOptions = - pipeline.getOptions().as(TestSparkPipelineOptions.class); - // // if the pipeline forces execution as a streaming pipeline, // and the source is an adapted unbounded source (as bounded), // read it as unbounded source via UnboundedReadFromBoundedSource. http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 838c504..5d77e91 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue; @@ -57,15 +58,18 @@ public class EvaluationContext { private AppliedPTransform<?, ?, ?> currentTransform; private final SparkPCollectionView pviews = new SparkPCollectionView(); private final Map<PCollection, Long> cacheCandidates = new HashMap<>(); + private final PipelineOptions options; - public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) { + public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options) { this.jsc = jsc; this.pipeline = pipeline; - this.runtime = new SparkRuntimeContext(pipeline); + this.options = options; + this.runtime = new SparkRuntimeContext(pipeline, options); } - public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, JavaStreamingContext jssc) { - this(jsc, pipeline); + public EvaluationContext( + JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options, JavaStreamingContext jssc) { + this(jsc, pipeline, options); this.jssc = jssc; } @@ -81,6 +85,10 @@ public class EvaluationContext { return pipeline; } + public PipelineOptions getOptions() { + return options; + } + public SparkRuntimeContext getRuntimeContext() { return runtime; } http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index e006143..3db1ab5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -35,8 +35,8 @@ public class SparkRuntimeContext implements Serializable { private final String serializedPipelineOptions; private transient CoderRegistry coderRegistry; - SparkRuntimeContext(Pipeline pipeline) { - this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions()); + SparkRuntimeContext(Pipeline pipeline, PipelineOptions options) { + this.serializedPipelineOptions = serializePipelineOptions(options); } private String serializePipelineOptions(PipelineOptions pipelineOptions) { http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index 2dd18f3..6a153ff 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -82,7 +82,7 @@ public class SparkRunnerStreamingContextFactory implements Function0<JavaStreami // We must first init accumulators since translators expect them to be instantiated. SparkRunner.initAccumulators(options, jsc); - EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc); + EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc); // update cache candidates SparkRunner.updateCacheCandidates(pipeline, translator, ctxt); pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java index 24b2e7b..d3d0823 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java @@ -51,7 +51,7 @@ public class CacheTest { pCollection.apply(Count.<String>globally()); JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); - EvaluationContext ctxt = new EvaluationContext(jsc, pipeline); + EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options); SparkRunner.CacheVisitor cacheVisitor = new SparkRunner.CacheVisitor(new TransformTranslator.Translator(), ctxt); pipeline.traverseTopologically(cacheVisitor); http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java index 41ccd08..3dcab26 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java @@ -135,7 +135,7 @@ public class TrackStreamingSourcesTest { Pipeline pipeline, Class<? extends PTransform> transformClassToAssert, Integer... expected) { - this.ctxt = new EvaluationContext(jssc.sparkContext(), pipeline, jssc); + this.ctxt = new EvaluationContext(jssc.sparkContext(), pipeline, options, jssc); this.evaluator = new SparkRunner.Evaluator( new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()), ctxt); this.transformClassToAssert = transformClassToAssert; http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 6fa7a5a..f7c3f24 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -460,7 +460,7 @@ public class Pipeline { private Set<String> usedFullNames = new HashSet<>(); private CoderRegistry coderRegistry; private final List<String> unstableNames = new ArrayList<>(); - protected final PipelineOptions defaultOptions; + private final PipelineOptions defaultOptions; protected Pipeline(PipelineOptions options) { this.defaultOptions = options; @@ -472,18 +472,6 @@ public class Pipeline { } /** - * Returns the default {@link PipelineOptions} provided to {@link #create(PipelineOptions)}. - * - * @deprecated see BEAM-818 Remove Pipeline.getPipelineOptions. Configuration should be explicitly - * provided to a transform if it is required. - */ - @Deprecated - public PipelineOptions getOptions() { - return defaultOptions; - } - - - /** * Applies a {@link PTransform} to the given {@link PInput}. * * @see Pipeline#apply http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 2d34b22..96cae51 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -26,7 +26,6 @@ import com.fasterxml.jackson.core.TreeNode; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Predicates; @@ -104,6 +103,8 @@ import org.junit.runners.model.Statement; */ public class TestPipeline extends Pipeline implements TestRule { + private final PipelineOptions options; + private static class PipelineRunEnforcement { @SuppressWarnings("WeakerAccess") @@ -183,10 +184,7 @@ public class TestPipeline extends Pipeline implements TestRule { private void verifyPipelineExecution() { if (!isEmptyPipeline(pipeline)) { if (!runAttempted && !enableAutoRunIfMissing) { - throw new PipelineRunMissingException( - "The pipeline has not been run (runner: " - + pipeline.getOptions().getRunner().getSimpleName() - + ")"); + throw new PipelineRunMissingException("The pipeline has not been run."); } else { final List<TransformHierarchy.Node> pipelineNodes = recordPipelineNodes(pipeline); @@ -272,6 +270,11 @@ public class TestPipeline extends Pipeline implements TestRule { private TestPipeline(final PipelineOptions options) { super(options); + this.options = options; + } + + public PipelineOptions getOptions() { + return this.options; } @Override @@ -288,7 +291,7 @@ public class TestPipeline extends Pipeline implements TestRule { .anyMatch(Annotations.Predicates.isCategoryOf(NeedsRunner.class, true)); final boolean crashingRunner = - CrashingRunner.class.isAssignableFrom(getOptions().getRunner()); + CrashingRunner.class.isAssignableFrom(options.getRunner()); checkState( !(annotatedWithNeedsRunner && crashingRunner), @@ -381,18 +384,9 @@ public class TestPipeline extends Pipeline implements TestRule { return this; } - @VisibleForTesting - @Override - /** - * Get this pipeline's options. - */ - public PipelineOptions getOptions() { - return defaultOptions; - } - @Override public String toString() { - return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName(); + return "TestPipeline#" + options.as(ApplicationNameOptions.class).getAppName(); } /** Creates {@link PipelineOptions} for testing. */ http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 0e36393..fbbf862 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -888,6 +888,26 @@ public class BigQueryIO { public void validate(PipelineOptions pipelineOptions) { BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class); + // The user specified a table. + if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) { + TableReference table = getTableWithDefaultProject(options).get(); + DatasetService datasetService = getBigQueryServices().getDatasetService(options); + // Check for destination table presence and emptiness for early failure notification. + // Note that a presence check can fail when the table or dataset is created by an earlier + // stage of the pipeline. For these cases the #withoutValidation method can be used to + // disable the check. + BigQueryHelpers.verifyDatasetPresence(datasetService, table); + if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) { + BigQueryHelpers.verifyTablePresence(datasetService, table); + } + if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) { + BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table); + } + } + } + + @Override + public WriteResult expand(PCollection<T> input) { // We must have a destination to write to! checkState( getTableFunction() != null || getJsonTableRef() != null @@ -916,29 +936,7 @@ public class BigQueryIO { checkArgument(2 > Iterables.size(Iterables.filter(allSchemaArgs, Predicates.notNull())), "No more than one of jsonSchema, schemaFromView, or dynamicDestinations may " - + "be set"); - - // The user specified a table. - if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) { - TableReference table = getTableWithDefaultProject(options).get(); - DatasetService datasetService = getBigQueryServices().getDatasetService(options); - // Check for destination table presence and emptiness for early failure notification. - // Note that a presence check can fail when the table or dataset is created by an earlier - // stage of the pipeline. For these cases the #withoutValidation method can be used to - // disable the check. - BigQueryHelpers.verifyDatasetPresence(datasetService, table); - if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) { - BigQueryHelpers.verifyTablePresence(datasetService, table); - } - if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) { - BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table); - } - } - } - - @Override - public WriteResult expand(PCollection<T> input) { - validate(input.getPipeline().getOptions()); + + "be set"); DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations(); if (dynamicDestinations == null) { http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index aabae3e..b893ad5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1175,6 +1175,7 @@ public class BigQueryIOTest implements Serializable { .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withSchema(new TableSchema()) .withTestServices(fakeBqServices)); + p.run(); } @Test
