Repository: beam
Updated Branches:
  refs/heads/master eea96ffa3 -> 735a82bda


ApexRunner: register standard IOs when deserializing pipeline options

This needs to be done once per JVM using the Job's pipeline options. The 
following
is a hack that is tolerant of multiple initialization.

Is there a better way to execute something like this exactly once?


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dfbe9549
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dfbe9549
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dfbe9549

Branch: refs/heads/master
Commit: dfbe95493302555d969751232ec4c93a63b8c6de
Parents: eea96ff
Author: Dan Halperin <[email protected]>
Authored: Thu Apr 13 14:07:26 2017 -0700
Committer: Dan Halperin <[email protected]>
Committed: Fri Apr 14 11:42:51 2017 -0700

----------------------------------------------------------------------
 .../translation/utils/SerializablePipelineOptions.java | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dfbe9549/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
index d0dce2b..1a47ed5 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
@@ -18,20 +18,24 @@
 package org.apache.beam.runners.apex.translation.utils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.IOChannelUtils;
 
 /**
  * A wrapper to enable serialization of {@link PipelineOptions}.
  */
 public class SerializablePipelineOptions implements Externalizable {
 
+  /* Used to ensure we initialize file systems exactly once, because it's a 
slow operation. */
+  private static final AtomicBoolean FILE_SYSTEMS_INTIIALIZED = new 
AtomicBoolean(false);
+
   private transient ApexPipelineOptions pipelineOptions;
 
   public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) {
@@ -55,6 +59,11 @@ public class SerializablePipelineOptions implements 
Externalizable {
     String s = in.readUTF();
     this.pipelineOptions = new ObjectMapper().readValue(s, 
PipelineOptions.class)
         .as(ApexPipelineOptions.class);
+
+    if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
+      IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+      FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+    }
   }
 
 }

Reply via email to