Add PipelineOptionsTranslation This converts a PipelineOptions instance to and from a Protobuf Struct.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7ebb620 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7ebb620 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7ebb620 Branch: refs/heads/master Commit: f7ebb6201b5f6e0bb3c585733b6c934eef62c68b Parents: a1835c6 Author: Thomas Groh <[email protected]> Authored: Tue Aug 15 13:22:01 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Wed Sep 20 10:33:30 2017 -0700 ---------------------------------------------------------------------- .../PipelineOptionsTranslation.java | 51 +++++++ .../PipelineOptionsTranslationTest.java | 143 +++++++++++++++++++ 2 files changed, 194 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f7ebb620/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java new file mode 100644 index 0000000..4cdca61 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Struct; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.common.ReflectHelpers; + +/** Utilities for going to/from Runner API pipeline options. */ +public class PipelineOptionsTranslation { + private static final ObjectMapper MAPPER = + new ObjectMapper() + .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + + /** Converts the provided {@link PipelineOptions} to a {@link Struct}. */ + public static Struct toProto(PipelineOptions options) { + Struct.Builder builder = Struct.newBuilder(); + try { + // The JSON format of a Protobuf Struct is the JSON object that is equivalent to that struct + // (with values encoded in a standard json-codeable manner). See Beam PR 3719 for more. + JsonFormat.parser().merge(MAPPER.writeValueAsString(options), builder); + return builder.build(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** Converts the provided {@link Struct} into {@link PipelineOptions}. */ + public static PipelineOptions fromProto(Struct protoOptions) throws IOException { + return MAPPER.readValue(JsonFormat.printer().print(protoOptions), PipelineOptions.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f7ebb620/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java new file mode 100644 index 0000000..eb59bac --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import org.apache.beam.sdk.options.ApplicationNameOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for {@link PipelineOptionsTranslation}. */ +@RunWith(Enclosed.class) +public class PipelineOptionsTranslationTest { + /** Tests that translations can round-trip through the proto format. */ + @RunWith(Parameterized.class) + public static class ToFromProtoTest { + @Parameters(name = "{index}: {0}") + public static Iterable<? extends PipelineOptions> options() { + PipelineOptionsFactory.register(TestUnserializableOptions.class); + PipelineOptionsFactory.register(TestDefaultOptions.class); + PipelineOptionsFactory.register(TestOptions.class); + PipelineOptions emptyOptions = PipelineOptionsFactory.create(); + + TestUnserializableOptions withNonSerializable = + PipelineOptionsFactory.as(TestUnserializableOptions.class); + withNonSerializable.setUnserializable(new Object()); + + TestOptions withCustomField = PipelineOptionsFactory.as(TestOptions.class); + withCustomField.setExample(99); + + PipelineOptions withSettings = PipelineOptionsFactory.create(); + withSettings.as(ApplicationNameOptions.class).setAppName("my_app"); + withSettings.setJobName("my_job"); + + PipelineOptions withParsedSettings = + PipelineOptionsFactory.fromArgs("--jobName=my_job --appName=my_app").create(); + + return ImmutableList.of( + emptyOptions, withNonSerializable, withCustomField, withSettings, withParsedSettings); + } + + @Parameter(0) + public PipelineOptions options; + + @Test + public void testToFromProto() throws Exception { + options.getOptionsId(); + Struct originalStruct = PipelineOptionsTranslation.toProto(options); + PipelineOptions deserializedStruct = PipelineOptionsTranslation.fromProto(originalStruct); + + Struct reserializedStruct = PipelineOptionsTranslation.toProto(deserializedStruct); + assertThat(reserializedStruct.getFieldsMap(), equalTo(originalStruct.getFieldsMap())); + } + } + + /** Tests that translations contain the correct contents. */ + @RunWith(JUnit4.class) + public static class TranslationTest { + @Test + public void customSettingsRetained() throws Exception { + TestOptions options = PipelineOptionsFactory.as(TestOptions.class); + options.setExample(23); + Struct serialized = PipelineOptionsTranslation.toProto(options); + PipelineOptions deserialized = PipelineOptionsTranslation.fromProto(serialized); + + assertThat(deserialized.as(TestOptions.class).getExample(), equalTo(23)); + } + + @Test + public void ignoredSettingsNotSerialized() throws Exception { + TestUnserializableOptions opts = PipelineOptionsFactory.as(TestUnserializableOptions.class); + opts.setUnserializable(new Object()); + + Struct serialized = PipelineOptionsTranslation.toProto(opts); + PipelineOptions deserialized = PipelineOptionsTranslation.fromProto(serialized); + + assertThat( + deserialized.as(TestUnserializableOptions.class).getUnserializable(), is(nullValue())); + } + + @Test + public void defaultsRestored() throws Exception { + Struct serialized = + PipelineOptionsTranslation.toProto(PipelineOptionsFactory.as(TestDefaultOptions.class)); + PipelineOptions deserialized = PipelineOptionsTranslation.fromProto(serialized); + + assertThat(deserialized.as(TestDefaultOptions.class).getDefault(), equalTo(19)); + } + } + + /** {@link PipelineOptions} with an unserializable option. */ + public interface TestUnserializableOptions extends PipelineOptions { + @JsonIgnore + Object getUnserializable(); + + void setUnserializable(Object unserializable); + } + + /** {@link PipelineOptions} with an default option. */ + public interface TestDefaultOptions extends PipelineOptions { + @Default.Integer(19) + int getDefault(); + + void setDefault(int example); + } + + /** {@link PipelineOptions} for testing. */ + public interface TestOptions extends PipelineOptions { + int getExample(); + + void setExample(int example); + } +}
