Repository: beam Updated Branches: refs/heads/master 73b9dd6a5 -> a859ec5a3
Flink: register known IOChannelFactories Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33f7082b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33f7082b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33f7082b Branch: refs/heads/master Commit: 33f7082b0f05c086e09a83cd98559bde3b70482d Parents: 73b9dd6 Author: Davor Bonaci <[email protected]> Authored: Mon Mar 6 16:51:04 2017 -0800 Committer: Davor Bonaci <[email protected]> Committed: Tue Mar 7 09:19:35 2017 -0800 ---------------------------------------------------------------------- .../flink/translation/utils/SerializedPipelineOptions.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/33f7082b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java index fe2602b..390e6da 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.IOChannelUtils; /** * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. @@ -52,6 +53,8 @@ public class SerializedPipelineOptions implements Serializable { if (pipelineOptions == null) { try { pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + + IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); } catch (IOException e) { throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); } @@ -59,5 +62,4 @@ public class SerializedPipelineOptions implements Serializable { return pipelineOptions; } - }
