Repository: beam
Updated Branches:
  refs/heads/master 73b9dd6a5 -> a859ec5a3


Flink: register known IOChannelFactories


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33f7082b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33f7082b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33f7082b

Branch: refs/heads/master
Commit: 33f7082b0f05c086e09a83cd98559bde3b70482d
Parents: 73b9dd6
Author: Davor Bonaci <[email protected]>
Authored: Mon Mar 6 16:51:04 2017 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Tue Mar 7 09:19:35 2017 -0800

----------------------------------------------------------------------
 .../flink/translation/utils/SerializedPipelineOptions.java       | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/33f7082b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index fe2602b..390e6da 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.IOChannelUtils;
 
 /**
  * Encapsulates the PipelineOptions in serialized form to ship them to the 
cluster.
@@ -52,6 +53,8 @@ public class SerializedPipelineOptions implements 
Serializable {
     if (pipelineOptions == null) {
       try {
         pipelineOptions = new ObjectMapper().readValue(serializedOptions, 
PipelineOptions.class);
+
+        IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
       } catch (IOException e) {
         throw new RuntimeException("Couldn't deserialize the 
PipelineOptions.", e);
       }
@@ -59,5 +62,4 @@ public class SerializedPipelineOptions implements 
Serializable {
 
     return pipelineOptions;
   }
-
 }

Reply via email to