DataflowRunner: switch from IOChannels to FileSystems for creating files
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5a38ed2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5a38ed2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5a38ed2 Branch: refs/heads/master Commit: e5a38ed2610b8ef72192e5a1b9a5630578300164 Parents: c102d27 Author: Dan Halperin <[email protected]> Authored: Wed May 3 17:55:32 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu May 4 09:32:45 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e5a38ed2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 2b54ba7..1a806b9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -84,6 +84,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.WriteFiles; @@ -114,7 +115,6 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.NameUtils; @@ -192,9 +192,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * @return The newly created runner. */ public static DataflowRunner fromOptions(PipelineOptions options) { - // (Re-)register standard IO factories. Clobbers any prior credentials. - IOChannelUtils.registerIOFactoriesAllowOverride(options); - DataflowPipelineOptions dataflowOptions = PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options); ArrayList<String> missing = new ArrayList<>(); @@ -578,9 +575,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { fileLocation.startsWith("/") || fileLocation.startsWith("gs://"), "Location must be local or on Cloud Storage, got %s.", fileLocation); + ResourceId fileResource = FileSystems.matchNewResource(fileLocation, false /* isDirectory */); String workSpecJson = DataflowPipelineTranslator.jobToString(newJob); - try (PrintWriter printWriter = new PrintWriter( - Channels.newOutputStream(IOChannelUtils.create(fileLocation, MimeTypes.TEXT)))) { + try (PrintWriter printWriter = + new PrintWriter( + Channels.newOutputStream(FileSystems.create(fileResource, MimeTypes.TEXT)))) { printWriter.print(workSpecJson); LOG.info("Printed job specification to {}", fileLocation); } catch (IOException ex) {
