DataflowPipelineOptions: use FileSystems, not IOChannelUtils, to resolve staging location
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc9e0048 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc9e0048 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc9e0048 Branch: refs/heads/master Commit: dc9e00485afaf737557bc6a82750e45ecba34926 Parents: e5a38ed Author: Dan Halperin <[email protected]> Authored: Wed May 3 18:01:29 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu May 4 09:32:45 2017 -0700 ---------------------------------------------------------------------- .../options/DataflowPipelineOptions.java | 14 +++++--------- .../dataflow/DataflowPipelineTranslatorTest.java | 2 +- .../runners/dataflow/DataflowRunnerTest.java | 4 +++- .../options/DataflowPipelineOptionsTest.java | 19 +++++++++---------- 4 files changed, 18 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 0796b6d..11618af 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -17,11 +17,12 @@ */ package org.apache.beam.runners.dataflow.options; -import java.io.IOException; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; import org.apache.beam.sdk.options.ApplicationNameOptions; @@ -32,7 +33,6 @@ import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.util.IOChannelUtils; /** * Options that can be used to configure the {@link DataflowRunner}. @@ -137,13 +137,9 @@ public interface DataflowPipelineOptions "Error constructing default value for stagingLocation: gcpTempLocation is not" + " a valid GCS path, %s. ", gcpTempLocation), e); } - try { - return IOChannelUtils.resolve(gcpTempLocation, "staging"); - } catch (IOException e) { - throw new IllegalArgumentException(String.format( - "Unable to resolve stagingLocation from gcpTempLocation: %s." - + " Please set the staging location explicitly.", gcpTempLocation), e); - } + return FileSystems.matchNewResource(gcpTempLocation, true /* isDirectory */) + .resolve("staging", StandardResolveOptions.RESOLVE_DIRECTORY) + .toString(); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index a6ad8c5..70f00fb 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -224,7 +224,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertThat(optionsMap, hasEntry("jobName", (Object) "some-job-name")); assertThat(optionsMap, hasEntry("tempLocation", (Object) "gs://somebucket/some/path")); assertThat(optionsMap, - hasEntry("stagingLocation", (Object) "gs://somebucket/some/path/staging")); + hasEntry("stagingLocation", (Object) "gs://somebucket/some/path/staging/")); assertThat(optionsMap, hasEntry("stableUniqueNames", (Object) "WARNING")); assertThat(optionsMap, hasEntry("streaming", (Object) false)); assertThat(optionsMap, hasEntry("numberOfWorkerHarnessThreads", (Object) 0)); http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index c0dfbee..5aebf29 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -157,8 +157,9 @@ public class DataflowRunnerTest { } }); when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true); + when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true); when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET))).thenReturn(true); - when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging"))). + when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging/"))). thenReturn(true); when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET))).thenReturn(true); when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET))).thenReturn(false); @@ -826,6 +827,7 @@ public class DataflowRunnerTest { @Test public void testInvalidNumberOfWorkerHarnessThreads() throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + FileSystems.setDefaultConfigInWorkers(options); options.setRunner(DataflowRunner.class); options.setProject("foo-12345"); http://git-wip-us.apache.org/repos/asf/beam/blob/dc9e0048/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java index 30eee0e..8b8fd6c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -22,10 +22,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ResetDateTimeProvider; import org.apache.beam.sdk.testing.RestoreSystemProperties; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.NoopPathValidator; import org.junit.Rule; import org.junit.Test; @@ -127,7 +127,6 @@ public class DataflowPipelineOptionsTest { @Test public void testStagingLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - IOChannelUtils.registerIOFactoriesAllowOverride(options); options.setPathValidatorClass(NoopPathValidator.class); options.setTempLocation("gs://temp_location"); options.setStagingLocation("gs://staging_location"); @@ -138,21 +137,21 @@ public class DataflowPipelineOptionsTest { @Test public void testDefaultToTempLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - IOChannelUtils.registerIOFactoriesAllowOverride(options); + FileSystems.setDefaultConfigInWorkers(options); options.setPathValidatorClass(NoopPathValidator.class); - options.setTempLocation("gs://temp_location"); - assertEquals("gs://temp_location", options.getGcpTempLocation()); - assertEquals("gs://temp_location/staging", options.getStagingLocation()); + options.setTempLocation("gs://temp_location/"); + assertEquals("gs://temp_location/", options.getGcpTempLocation()); + assertEquals("gs://temp_location/staging/", options.getStagingLocation()); } @Test public void testDefaultToGcpTempLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - IOChannelUtils.registerIOFactoriesAllowOverride(options); + FileSystems.setDefaultConfigInWorkers(options); options.setPathValidatorClass(NoopPathValidator.class); - options.setTempLocation("gs://temp_location"); - options.setGcpTempLocation("gs://gcp_temp_location"); - assertEquals("gs://gcp_temp_location/staging", options.getStagingLocation()); + options.setTempLocation("gs://temp_location/"); + options.setGcpTempLocation("gs://gcp_temp_location/"); + assertEquals("gs://gcp_temp_location/staging/", options.getStagingLocation()); } @Test
