Repository: incubator-beam Updated Branches: refs/heads/master 1c21aa2d5 -> c1de175bd
Move tempLocation to PipelineOptions. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8bc0659a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8bc0659a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8bc0659a Branch: refs/heads/master Commit: 8bc0659af754786677446f2f9941702f9e9ee5be Parents: 45309ca Author: Pei He <[email protected]> Authored: Mon Mar 14 16:02:32 2016 -0700 Committer: Pei He <[email protected]> Committed: Mon Mar 14 16:53:49 2016 -0700 ---------------------------------------------------------------------- .../FlinkGroupAlsoByWindowWrapper.java | 11 ++++++++- .../sdk/options/DataflowPipelineOptions.java | 26 +++----------------- .../dataflow/sdk/options/PipelineOptions.java | 14 +++++++++++ .../sdk/runners/DataflowPipelineRunner.java | 4 +++ .../sdk/runners/DataflowPipelineRunnerTest.java | 6 ++--- 5 files changed, 33 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc0659a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index e115a15..b413d7a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -476,6 +476,15 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> @Override public void setStableUniqueNames(CheckEnabled enabled) { } + + @Override + public String getTempLocation() { + return null; + } + + @Override + public void setTempLocation(String tempLocation) { + } }; } return options; @@ -628,4 +637,4 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> // restore the timerInternals. this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc0659a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java index 1aa4342..6794032 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java @@ -34,8 +34,6 @@ public interface DataflowPipelineOptions extends GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions, DataflowProfilingOptions { - static final String DATAFLOW_STORAGE_LOCATION = "Dataflow Storage Location"; - @Description("Project id. Required when running a Dataflow in the cloud. " + "See https://cloud.google.com/storage/docs/projects for further details.") @Override @@ -46,36 +44,18 @@ public interface DataflowPipelineOptions extends void setProject(String value); /** - * GCS path for temporary files, e.g. gs://bucket/object - * - * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://" - * - * <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()} must be set. If - * {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using - * {@link #getStagingLocation()}. - */ - @Description("GCS path for temporary files, eg \"gs://bucket/object\". " - + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". " - + "At least one of tempLocation or stagingLocation must be set. If tempLocation is unset, " - + "defaults to using stagingLocation.") - @Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION}) - String getTempLocation(); - void setTempLocation(String value); - - /** * GCS path for staging local files, e.g. gs://bucket/object * * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://" * - * <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()} must be set. If - * {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using - * {@link #getStagingLocation()}. + * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()} + * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow + * pipeline defaults to using {@link PipelineOptions#getTempLocation()}. */ @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". " + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". " + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, " + "defaults to using tempLocation.") - @Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION}) String getStagingLocation(); void setStagingLocation(String value); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc0659a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java index 923033d..4c33a22 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java @@ -245,4 +245,18 @@ public interface PipelineOptions { @Default.Enum("WARNING") CheckEnabled getStableUniqueNames(); void setStableUniqueNames(CheckEnabled enabled); + + /** + * A pipeline level default location for storing temporary files. + * + * <p>This can be a path of any file system. + * + * <p>{@link #getTempLocation()} can be used as a default location in other + * {@link PipelineOptions}. + * + * <p>If it is unset, {@link PipelineRunner} can override it. + */ + @Description("A pipeline level default location for storing temporary files.") + String getTempLocation(); + void setTempLocation(String value); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc0659a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index c90b904..d716b95 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -251,6 +251,10 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> } PathValidator validator = dataflowOptions.getPathValidator(); + Preconditions.checkArgument(!(Strings.isNullOrEmpty(dataflowOptions.getTempLocation()) + && Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())), + "Missing required value: at least one of tempLocation or stagingLocation must be set."); + if (dataflowOptions.getStagingLocation() != null) { validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc0659a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java index c5f2d3f..300d5d5 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java @@ -649,10 +649,8 @@ public class DataflowPipelineRunnerTest { options.setProject("foo-project"); thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Missing required value for group"); - thrown.expectMessage(DataflowPipelineOptions.DATAFLOW_STORAGE_LOCATION); - thrown.expectMessage("getStagingLocation"); - thrown.expectMessage("getTempLocation"); + thrown.expectMessage( + "Missing required value: at least one of tempLocation or stagingLocation must be set."); DataflowPipelineRunner.fromOptions(options); }
