[BEAM-59] Register standard FileSystems wherever we register IOChannelFactories
Additionally, drop an unnecessary use of `GcsOptions` in `PipelineRunner`. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b43c92f2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b43c92f2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b43c92f2 Branch: refs/heads/jstorm-runner Commit: b43c92f208304cfc10d79b140682dfbe6580d7c4 Parents: c52ce7c Author: Dan Halperin <[email protected]> Authored: Mon Apr 17 20:39:48 2017 -0700 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Tue Apr 18 10:33:10 2017 +0200 ---------------------------------------------------------------------- .../flink/translation/utils/SerializedPipelineOptions.java | 2 ++ .../beam/runners/spark/translation/SparkRuntimeContext.java | 2 ++ .../main/java/org/apache/beam/sdk/runners/PipelineRunner.java | 7 +++---- .../main/java/org/apache/beam/sdk/testing/TestPipeline.java | 2 ++ 4 files changed, 9 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java index 390e6da..2256bb1 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.IOChannelUtils; @@ -55,6 +56,7 @@ public class SerializedPipelineOptions implements Serializable { pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); + FileSystems.setDefaultConfigInWorkers(pipelineOptions); } catch (IOException e) { throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); } http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 4ccfead..9d0f576 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; @@ -130,6 +131,7 @@ public class SparkRuntimeContext implements Serializable { } // register IO factories. IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); + FileSystems.setDefaultConfigInWorkers(pipelineOptions); } return pipelineOptions; } http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java index 80bb90f..7b2fba3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java @@ -21,9 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.InstanceBuilder; @@ -41,11 +40,11 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> { * @return The newly created runner. */ public static PipelineRunner<? extends PipelineResult> fromOptions(PipelineOptions options) { - GcsOptions gcsOptions = PipelineOptionsValidator.validate(GcsOptions.class, options); checkNotNull(options); // (Re-)register standard IO factories. Clobbers any prior credentials. - IOChannelUtils.registerIOFactoriesAllowOverride(gcsOptions); + IOChannelUtils.registerIOFactoriesAllowOverride(options); + FileSystems.setDefaultConfigInWorkers(options); @SuppressWarnings("unchecked") PipelineRunner<? extends PipelineResult> result = http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index a4ab196..3d3de51 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -40,6 +40,7 @@ import java.util.Map.Entry; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -404,6 +405,7 @@ public class TestPipeline extends Pipeline implements TestRule { options.setStableUniqueNames(CheckEnabled.ERROR); IOChannelUtils.registerIOFactoriesAllowOverride(options); + FileSystems.setDefaultConfigInWorkers(options); return options; } catch (IOException e) { throw new RuntimeException(
