[BEAM-196] abstraction for PipelineOptions serialization
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81577b31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81577b31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81577b31 Branch: refs/heads/master Commit: 81577b31c2642522f7dd4ba8eba794df48a0ca56 Parents: 56e28a9 Author: Maximilian Michels <[email protected]> Authored: Mon Apr 18 17:40:38 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Mon Apr 18 18:10:05 2016 +0200 ---------------------------------------------------------------------- .../utils/SerializedPipelineOptions.java | 54 ++++++++++++++++ .../beam/runners/flink/PipelineOptionsTest.java | 68 ++++++++++++++++++++ 2 files changed, 122 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81577b31/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java new file mode 100644 index 0000000..7439e02 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink.translation.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.sdk.options.PipelineOptions; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. + */ +public class SerializedPipelineOptions implements Serializable { + + private byte[] serializedOptions; + + public SerializedPipelineOptions(PipelineOptions options) { + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + new ObjectMapper().writeValue(baos, options); + this.serializedOptions = baos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException("Couldn't serialize PipelineOptions.", e); + } + + } + + public PipelineOptions deserializeOptions() { + try { + return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + } catch (IOException e) { + throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81577b31/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java new file mode 100644 index 0000000..464c6df --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Tests the serialization and deserialization of PipelineOptions. + */ +public class PipelineOptionsTest { + + private interface MyOptions extends FlinkPipelineOptions { + @Description("Bla bla bla") + @Default.String("Hello") + String getTestOption(); + void setTestOption(String value); + } + + private static MyOptions options; + private static SerializedPipelineOptions serializedOptions; + + private final static String[] args = new String[]{"--testOption=nothing"}; + + @BeforeClass + public static void beforeTest() { + options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class); + serializedOptions = new SerializedPipelineOptions(options); + } + + @Test + public void testDeserialization() { + MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class); + assertEquals("nothing", deserializedOptions.getTestOption()); + } + + @Test + public void testCaching() { + MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class); + assertNotNull(deserializedOptions); + assertEquals(deserializedOptions, serializedOptions.getPipelineOptions()); + assertEquals(deserializedOptions, serializedOptions.getPipelineOptions()); + assertEquals(deserializedOptions, serializedOptions.getPipelineOptions()); + } + +}
