Rename FileSystems.setDefaultConfigInWorkers And document that it's not for users.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f3540d47 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f3540d47 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f3540d47 Branch: refs/heads/master Commit: f3540d47f10c18859340a738a7e93643ee57f604 Parents: 15df211 Author: Dan Halperin <[email protected]> Authored: Fri May 12 11:46:16 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Fri May 12 14:59:10 2017 -0700 ---------------------------------------------------------------------- .../translation/utils/SerializablePipelineOptions.java | 2 +- .../translation/utils/SerializedPipelineOptions.java | 2 +- .../dataflow/DataflowPipelineTranslatorTest.java | 2 +- .../beam/runners/dataflow/DataflowRunnerTest.java | 6 +++--- .../dataflow/options/DataflowPipelineOptionsTest.java | 4 ++-- .../beam/runners/dataflow/util/PackageUtilTest.java | 2 +- .../runners/spark/translation/SparkRuntimeContext.java | 2 +- .../main/java/org/apache/beam/sdk/PipelineRunner.java | 2 +- .../main/java/org/apache/beam/sdk/io/FileSystems.java | 12 +++++++++++- .../java/org/apache/beam/sdk/testing/TestPipeline.java | 2 +- .../extensions/gcp/storage/GcsFileSystemRegistrar.java | 2 +- .../sdk/extensions/gcp/storage/GcsResourceIdTest.java | 2 +- .../apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java | 2 +- .../apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java | 2 +- 14 files changed, 27 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java index 02afa7a..46b04fc 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java @@ -62,7 +62,7 @@ public class SerializablePipelineOptions implements Externalizable { .as(ApexPipelineOptions.class); if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) { - FileSystems.setDefaultConfigInWorkers(pipelineOptions); + FileSystems.setDefaultPipelineOptions(pipelineOptions); } } http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java index 84f3bf4..40b6dd6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -56,7 +56,7 @@ public class SerializedPipelineOptions implements Serializable { try { pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class); - FileSystems.setDefaultConfigInWorkers(pipelineOptions); + FileSystems.setDefaultPipelineOptions(pipelineOptions); } catch (IOException e) { throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); } http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 93c1e5b..87744f0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -144,7 +144,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Pipeline p = Pipeline.create(options); // Enable the FileSystems API to know about gs:// URIs in this test. - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object")) .apply("WriteMyFile", TextIO.write().to("gs://bucket/object")); http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index ce01aa1..8f10b18 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -177,7 +177,7 @@ public class DataflowRunnerTest { .apply("WriteMyFile", TextIO.write().to("gs://bucket/object")); // Enable the FileSystems API to know about gs:// URIs in this test. - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); return p; } @@ -246,7 +246,7 @@ public class DataflowRunnerTest { options.setGcpCredential(new TestCredential()); // Configure the FileSystem registrar to use these options. - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); return options; } @@ -771,7 +771,7 @@ public class DataflowRunnerTest { @Test public void testInvalidNumberOfWorkerHarnessThreads() throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); options.setRunner(DataflowRunner.class); options.setProject("foo-12345"); http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java index 613604a..cea44f0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -137,7 +137,7 @@ public class DataflowPipelineOptionsTest { @Test public void testDefaultToTempLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); options.setPathValidatorClass(NoopPathValidator.class); options.setTempLocation("gs://temp_location/"); assertEquals("gs://temp_location/", options.getGcpTempLocation()); @@ -147,7 +147,7 @@ public class DataflowPipelineOptionsTest { @Test public void testDefaultToGcpTempLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); options.setPathValidatorClass(NoopPathValidator.class); options.setTempLocation("gs://temp_location/"); options.setGcpTempLocation("gs://gcp_temp_location/"); http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index c7a660e..5d0c0f2 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -120,7 +120,7 @@ public class PackageUtilTest { GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); pipelineOptions.setGcsUtil(mockGcsUtil); - FileSystems.setDefaultConfigInWorkers(pipelineOptions); + FileSystems.setDefaultPipelineOptions(pipelineOptions); createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 82e8b44..f3fe99c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -90,7 +90,7 @@ public class SparkRuntimeContext implements Serializable { } } // Register standard FileSystems. - FileSystems.setDefaultConfigInWorkers(pipelineOptions); + FileSystems.setDefaultPipelineOptions(pipelineOptions); } return pipelineOptions; } http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java index 87705af..c114501 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java @@ -41,7 +41,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> { PipelineOptionsValidator.validate(PipelineOptions.class, options); // (Re-)register standard FileSystems. Clobbers any prior credentials. - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); @SuppressWarnings("unchecked") PipelineRunner<? extends PipelineResult> result = http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index cfb63c0..1aacc90 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -51,6 +51,7 @@ import java.util.regex.Pattern; import javax.annotation.Nonnull; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; @@ -448,12 +449,21 @@ public class FileSystems { /********************************** METHODS FOR REGISTRATION **********************************/ + /** @deprecated to be removed. */ + @Deprecated // for DataflowRunner backwards compatibility. + public static void setDefaultConfigInWorkers(PipelineOptions options) { + setDefaultPipelineOptions(options); + } + /** * Sets the default configuration in workers. * * <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes. + * + * <p>This is expected only to be used by runners after {@code Pipeline.run}, or in tests. */ - public static void setDefaultConfigInWorkers(PipelineOptions options) { + @Internal + public static void setDefaultPipelineOptions(PipelineOptions options) { checkNotNull(options, "options"); Set<FileSystemRegistrar> registrars = Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index e04c2f8..9206e04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -413,7 +413,7 @@ public class TestPipeline extends Pipeline implements TestRule { } options.setStableUniqueNames(CheckEnabled.ERROR); - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); return options; } catch (IOException e) { throw new RuntimeException( http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java index f954b33..f3a67b2 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java @@ -40,7 +40,7 @@ public class GcsFileSystemRegistrar implements FileSystemRegistrar { public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) { checkNotNull( options, - "Expect the runner have called FileSystems.setDefaultConfigInWorkers()."); + "Expect the runner have called FileSystems.setDefaultPipelineOptions()."); return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java index 2a67501..c1e214e 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java @@ -168,7 +168,7 @@ public class GcsResourceIdTest { @Test public void testResourceIdTester() throws Exception { - FileSystems.setDefaultConfigInWorkers(TestPipeline.testingPipelineOptions()); + FileSystems.setDefaultPipelineOptions(TestPipeline.testingPipelineOptions()); ResourceIdTester.runResourceIdBattery(toResourceIdentifier("gs://bucket/foo/")); } http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java index 14591d8..88275f4 100644 --- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java @@ -219,7 +219,7 @@ public class HadoopFileSystemTest { HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions() .as(HadoopFileSystemOptions.class); options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf())); - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); PCollection<String> pc = p.apply( TextIO.read().from(testPath("testFile*").toString())); PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC"); http://git-wip-us.apache.org/repos/asf/beam/blob/f3540d47/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java index f179132..c4a8577 100644 --- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java @@ -53,7 +53,7 @@ public class HadoopResourceIdTest { // Register HadoopFileSystem for this test. HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class); options.setHdfsConfiguration(Collections.singletonList(configuration)); - FileSystems.setDefaultConfigInWorkers(options); + FileSystems.setDefaultPipelineOptions(options); } @After
