Repository: beam Updated Branches: refs/heads/master 948f7ef2f -> 185dc4798
[BEAM-2031] Add support to PipelineOptionsFactory/ProxyInvocationHandler for auto registration of Jackson modules This is towards adding a mixin module for Hadoop Configuration so that it can live on a PipelineOptions object. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e1d5174e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e1d5174e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e1d5174e Branch: refs/heads/master Commit: e1d5174eaffef3b2ac7df6187c43ffbccce32b09 Parents: 948f7ef Author: Luke Cwik <[email protected]> Authored: Thu Apr 27 16:24:57 2017 -0700 Committer: Lukasz Cwik <[email protected]> Committed: Thu Apr 27 20:14:54 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/options/PipelineOptions.java | 4 +- .../sdk/options/PipelineOptionsFactory.java | 3 +- .../sdk/options/ProxyInvocationHandler.java | 16 +++-- .../sdk/options/PipelineOptionsFactoryTest.java | 71 +++++++++++++++++++- 4 files changed, 83 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e1d5174e/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 88d6576..063eac4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -186,7 +186,9 @@ import org.joda.time.format.DateTimeFormatter; * <a href="https://github.com/FasterXML/jackson-annotations">annotations</a> to aid in * serialization of custom types. We point you to the public * <a href="https://github.com/FasterXML/jackson">Jackson documentation</a> when attempting - * to add serialization support for your custom types. + * to add serialization support for your custom types. Note that {@link PipelineOptions} relies on + * Jackson's ability to automatically configure the {@link ObjectMapper} with additional modules via + * {@link ObjectMapper#findModules()}. * * <p>Note: It is an error to have the same property available in multiple interfaces with only * some of them being annotated with {@link JsonIgnore @JsonIgnore}. It is also an error to mark a http://git-wip-us.apache.org/repos/asf/beam/blob/e1d5174e/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index 1ecd577..b947419 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -444,7 +444,8 @@ public class PipelineOptionsFactory { private static final Logger LOG = LoggerFactory.getLogger(PipelineOptionsFactory.class); @SuppressWarnings("rawtypes") private static final Class<?>[] EMPTY_CLASS_ARRAY = new Class[0]; - private static final ObjectMapper MAPPER = new ObjectMapper(); + static final ObjectMapper MAPPER = new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); private static final ClassLoader CLASS_LOADER; private static final Map<String, Class<? extends PipelineRunner<?>>> SUPPORTED_PIPELINE_RUNNERS; http://git-wip-us.apache.org/repos/asf/beam/blob/e1d5174e/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java index a0e3ec2..eda21a8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java @@ -88,7 +88,6 @@ import org.apache.beam.sdk.util.common.ReflectHelpers; */ @ThreadSafe class ProxyInvocationHandler implements InvocationHandler { - private static final ObjectMapper MAPPER = new ObjectMapper(); /** * No two instances of this class are considered equivalent hence we generate a random hash code. */ @@ -480,9 +479,10 @@ class ProxyInvocationHandler implements InvocationHandler { */ private Object getValueFromJson(String propertyName, Method method) { try { - JavaType type = MAPPER.getTypeFactory().constructType(method.getGenericReturnType()); + JavaType type = PipelineOptionsFactory.MAPPER.getTypeFactory() + .constructType(method.getGenericReturnType()); JsonNode jsonNode = jsonOptions.get(propertyName); - return MAPPER.readValue(jsonNode.toString(), type); + return PipelineOptionsFactory.MAPPER.readValue(jsonNode.toString(), type); } catch (IOException e) { throw new RuntimeException("Unable to parse representation", e); } @@ -645,7 +645,8 @@ class ProxyInvocationHandler implements InvocationHandler { DisplayData displayData = DisplayData.from(value); for (DisplayData.Item item : displayData.items()) { @SuppressWarnings("unchecked") - Map<String, Object> serializedItem = MAPPER.convertValue(item, Map.class); + Map<String, Object> serializedItem = + PipelineOptionsFactory.MAPPER.convertValue(item, Map.class); serializedDisplayData.add(serializedItem); } @@ -700,10 +701,11 @@ class ProxyInvocationHandler implements InvocationHandler { // Attempt to serialize and deserialize each property. for (Map.Entry<String, BoundValue> entry : options.entrySet()) { try { - String serializedValue = MAPPER.writeValueAsString(entry.getValue().getValue()); - JavaType type = MAPPER.getTypeFactory() + String serializedValue = + PipelineOptionsFactory.MAPPER.writeValueAsString(entry.getValue().getValue()); + JavaType type = PipelineOptionsFactory.MAPPER.getTypeFactory() .constructType(propertyToReturnType.get(entry.getKey())); - MAPPER.readValue(serializedValue, type); + PipelineOptionsFactory.MAPPER.readValue(serializedValue, type); } catch (Exception e) { throw new IOException(String.format( "Failed to serialize and deserialize property '%s' with value '%s'", http://git-wip-us.apache.org/repos/asf/beam/blob/e1d5174e/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 769a30a..11bd7b9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -29,12 +29,24 @@ import static org.junit.Assert.assertTrue; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.auto.service.AutoService; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.PrintStream; import java.util.List; import java.util.Map; @@ -1641,7 +1653,6 @@ public class PipelineOptionsFactoryTest { } } - /** * A {@link PipelineRunnerRegistrar} to demonstrate default {@link PipelineRunner} registration. */ @@ -1658,7 +1669,6 @@ public class PipelineOptionsFactoryTest { void setRegisteredExampleFooBar(Object registeredExampleFooBar); } - /** * A {@link PipelineOptionsRegistrar} to demonstrate default {@link PipelineOptions} registration. */ @@ -1669,4 +1679,61 @@ public class PipelineOptionsFactoryTest { return ImmutableList.<Class<? extends PipelineOptions>>of(RegisteredTestOptions.class); } } + + @Test + public void testRegistrationOfJacksonModulesForObjectMapper() throws Exception { + JacksonIncompatibleOptions options = PipelineOptionsFactory + .fromArgs("--jacksonIncompatible=\"testValue\"") + .as(JacksonIncompatibleOptions.class); + assertEquals("testValue", options.getJacksonIncompatible().value); + } + + /** PipelineOptions used to test auto registration of Jackson modules. */ + public interface JacksonIncompatibleOptions extends PipelineOptions { + JacksonIncompatible getJacksonIncompatible(); + void setJacksonIncompatible(JacksonIncompatible value); + } + + /** A Jackson {@link Module} to test auto-registration of modules. */ + @AutoService(Module.class) + public static class RegisteredTestModule extends SimpleModule { + public RegisteredTestModule() { + super("RegisteredTestModule"); + setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class); + } + } + + /** A class which Jackson does not know how to serialize/deserialize. */ + public static class JacksonIncompatible { + private final String value; + public JacksonIncompatible(String value) { + this.value = value; + } + } + + /** A Jackson mixin used to add annotations to other classes. */ + @JsonDeserialize(using = JacksonIncompatibleDeserializer.class) + @JsonSerialize(using = JacksonIncompatibleSerializer.class) + public static final class JacksonIncompatibleMixin {} + + /** A Jackson deserializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleDeserializer extends + JsonDeserializer<JacksonIncompatible> { + + @Override + public JacksonIncompatible deserialize(JsonParser jsonParser, + DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + return new JacksonIncompatible(jsonParser.readValueAs(String.class)); + } + } + + /** A Jackson serializer for {@link JacksonIncompatible}. */ + public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> { + + @Override + public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + jsonGenerator.writeString(jacksonIncompatible.value); + } + } }
