Repository: beam Updated Branches: refs/heads/master eea96ffa3 -> 735a82bda
ApexRunner: register standard IOs when deserializing pipeline options This needs to be done once per JVM using the Job's pipeline options. The following is a hack that is tolerant of multiple initialization. Is there a better way to execute something like this exactly once? Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dfbe9549 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dfbe9549 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dfbe9549 Branch: refs/heads/master Commit: dfbe95493302555d969751232ec4c93a63b8c6de Parents: eea96ff Author: Dan Halperin <[email protected]> Authored: Thu Apr 13 14:07:26 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Fri Apr 14 11:42:51 2017 -0700 ---------------------------------------------------------------------- .../translation/utils/SerializablePipelineOptions.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/dfbe9549/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java index d0dce2b..1a47ed5 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java @@ -18,20 +18,24 @@ package org.apache.beam.runners.apex.translation.utils; import com.fasterxml.jackson.databind.ObjectMapper; - import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; - +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.IOChannelUtils; /** * A wrapper to enable serialization of {@link PipelineOptions}. */ public class SerializablePipelineOptions implements Externalizable { + /* Used to ensure we initialize file systems exactly once, because it's a slow operation. */ + private static final AtomicBoolean FILE_SYSTEMS_INTIIALIZED = new AtomicBoolean(false); + private transient ApexPipelineOptions pipelineOptions; public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) { @@ -55,6 +59,11 @@ public class SerializablePipelineOptions implements Externalizable { String s = in.readUTF(); this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class) .as(ApexPipelineOptions.class); + + if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) { + IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); + FileSystems.setDefaultConfigInWorkers(pipelineOptions); + } } }
