Stop registering IOChannelFactories in SDK harness, runners, tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5519fe51 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5519fe51 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5519fe51 Branch: refs/heads/master Commit: 5519fe51aea3e95fa0ea4a36c7b036917b7f94ec Parents: dc9e004 Author: Dan Halperin <[email protected]> Authored: Wed May 3 18:02:42 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu May 4 09:32:45 2017 -0700 ---------------------------------------------------------------------- .../apex/translation/utils/SerializablePipelineOptions.java | 2 -- .../flink/translation/utils/SerializedPipelineOptions.java | 2 -- .../beam/runners/spark/translation/SparkRuntimeContext.java | 4 +--- .../main/java/org/apache/beam/sdk/runners/PipelineRunner.java | 4 +--- .../src/main/java/org/apache/beam/sdk/testing/TestPipeline.java | 2 -- .../src/main/java/org/apache/beam/fn/harness/FnHarness.java | 3 --- 6 files changed, 2 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java index 14476b5..02afa7a 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java @@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.common.ReflectHelpers; /** @@ -63,7 +62,6 @@ public class SerializablePipelineOptions implements Externalizable { .as(ApexPipelineOptions.class); if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) { - IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); FileSystems.setDefaultConfigInWorkers(pipelineOptions); } } http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java index f717fd7..84f3bf4 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.io.Serializable; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.common.ReflectHelpers; /** @@ -57,7 +56,6 @@ public class SerializedPipelineOptions implements Serializable { try { pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class); - IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); FileSystems.setDefaultConfigInWorkers(pipelineOptions); } catch (IOException e) { throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 6bba863..e006143 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.IOChannelUtils; /** * The SparkRuntimeContext allows us to define useful features on the client side before our @@ -78,8 +77,7 @@ public class SparkRuntimeContext implements Serializable { pipelineOptions = deserializePipelineOptions(serializedPipelineOptions); } } - // register IO factories. - IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); + // Register standard FileSystems. FileSystems.setDefaultConfigInWorkers(pipelineOptions); } return pipelineOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java index a318dfc..229e04f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.InstanceBuilder; /** @@ -44,8 +43,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> { checkNotNull(options); PipelineOptionsValidator.validate(PipelineOptions.class, options); - // (Re-)register standard IO factories. Clobbers any prior credentials. - IOChannelUtils.registerIOFactoriesAllowOverride(options); + // (Re-)register standard FileSystems. Clobbers any prior credentials. FileSystems.setDefaultConfigInWorkers(options); @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 4d0cc2b..868dcbd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -46,7 +46,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; @@ -400,7 +399,6 @@ public class TestPipeline extends Pipeline implements TestRule { } options.setStableUniqueNames(CheckEnabled.ERROR); - IOChannelUtils.registerIOFactoriesAllowOverride(options); FileSystems.setDefaultConfigInWorkers(options); return options; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/beam/blob/5519fe51/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 24f826c..05ab44f 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -33,7 +33,6 @@ import org.apache.beam.fn.harness.stream.StreamObserverFactory; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,8 +89,6 @@ public class FnHarness { public static void main(PipelineOptions options, BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor, BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception { - IOChannelUtils.registerIOFactories(options); - ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options); StreamObserverFactory streamObserverFactory = StreamObserverFactory.fromOptions(options); PrintStream originalErrStream = System.err;
