Repository: beam
Updated Branches:
  refs/heads/master 948f7ef2f -> 185dc4798


[BEAM-2031] Add support to PipelineOptionsFactory/ProxyInvocationHandler for 
auto registration of Jackson modules

This is towards adding a mixin module for Hadoop Configuration so that it can 
live on a PipelineOptions object.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e1d5174e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e1d5174e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e1d5174e

Branch: refs/heads/master
Commit: e1d5174eaffef3b2ac7df6187c43ffbccce32b09
Parents: 948f7ef
Author: Luke Cwik <[email protected]>
Authored: Thu Apr 27 16:24:57 2017 -0700
Committer: Lukasz Cwik <[email protected]>
Committed: Thu Apr 27 20:14:54 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/options/PipelineOptions.java       |  4 +-
 .../sdk/options/PipelineOptionsFactory.java     |  3 +-
 .../sdk/options/ProxyInvocationHandler.java     | 16 +++--
 .../sdk/options/PipelineOptionsFactoryTest.java | 71 +++++++++++++++++++-
 4 files changed, 83 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e1d5174e/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 88d6576..063eac4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -186,7 +186,9 @@ import org.joda.time.format.DateTimeFormatter;
  * <a href="https://github.com/FasterXML/jackson-annotations";>annotations</a> 
to aid in
  * serialization of custom types. We point you to the public
  * <a href="https://github.com/FasterXML/jackson";>Jackson documentation</a> 
when attempting
- * to add serialization support for your custom types.
+ * to add serialization support for your custom types. Note that {@link 
PipelineOptions} relies on
+ * Jackson's ability to automatically configure the {@link ObjectMapper} with 
additional modules via
+ * {@link ObjectMapper#findModules()}.
  *
  * <p>Note: It is an error to have the same property available in multiple 
interfaces with only
  * some of them being annotated with {@link JsonIgnore @JsonIgnore}. It is 
also an error to mark a

http://git-wip-us.apache.org/repos/asf/beam/blob/e1d5174e/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index 1ecd577..b947419 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -444,7 +444,8 @@ public class PipelineOptionsFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(PipelineOptionsFactory.class);
   @SuppressWarnings("rawtypes")
   private static final Class<?>[] EMPTY_CLASS_ARRAY = new Class[0];
-  private static final ObjectMapper MAPPER = new ObjectMapper();
+  static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
+      ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
   private static final ClassLoader CLASS_LOADER;
 
   private static final Map<String, Class<? extends PipelineRunner<?>>> 
SUPPORTED_PIPELINE_RUNNERS;

http://git-wip-us.apache.org/repos/asf/beam/blob/e1d5174e/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index a0e3ec2..eda21a8 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -88,7 +88,6 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
  */
 @ThreadSafe
 class ProxyInvocationHandler implements InvocationHandler {
-  private static final ObjectMapper MAPPER = new ObjectMapper();
   /**
    * No two instances of this class are considered equivalent hence we 
generate a random hash code.
    */
@@ -480,9 +479,10 @@ class ProxyInvocationHandler implements InvocationHandler {
    */
   private Object getValueFromJson(String propertyName, Method method) {
     try {
-      JavaType type = 
MAPPER.getTypeFactory().constructType(method.getGenericReturnType());
+      JavaType type = PipelineOptionsFactory.MAPPER.getTypeFactory()
+          .constructType(method.getGenericReturnType());
       JsonNode jsonNode = jsonOptions.get(propertyName);
-      return MAPPER.readValue(jsonNode.toString(), type);
+      return PipelineOptionsFactory.MAPPER.readValue(jsonNode.toString(), 
type);
     } catch (IOException e) {
       throw new RuntimeException("Unable to parse representation", e);
     }
@@ -645,7 +645,8 @@ class ProxyInvocationHandler implements InvocationHandler {
         DisplayData displayData = DisplayData.from(value);
         for (DisplayData.Item item : displayData.items()) {
           @SuppressWarnings("unchecked")
-          Map<String, Object> serializedItem = MAPPER.convertValue(item, 
Map.class);
+          Map<String, Object> serializedItem =
+              PipelineOptionsFactory.MAPPER.convertValue(item, Map.class);
           serializedDisplayData.add(serializedItem);
         }
 
@@ -700,10 +701,11 @@ class ProxyInvocationHandler implements InvocationHandler 
{
       // Attempt to serialize and deserialize each property.
       for (Map.Entry<String, BoundValue> entry : options.entrySet()) {
         try {
-          String serializedValue = 
MAPPER.writeValueAsString(entry.getValue().getValue());
-          JavaType type = MAPPER.getTypeFactory()
+          String serializedValue =
+              
PipelineOptionsFactory.MAPPER.writeValueAsString(entry.getValue().getValue());
+          JavaType type = PipelineOptionsFactory.MAPPER.getTypeFactory()
               .constructType(propertyToReturnType.get(entry.getKey()));
-          MAPPER.readValue(serializedValue, type);
+          PipelineOptionsFactory.MAPPER.readValue(serializedValue, type);
         } catch (Exception e) {
           throw new IOException(String.format(
               "Failed to serialize and deserialize property '%s' with value 
'%s'",

http://git-wip-us.apache.org/repos/asf/beam/blob/e1d5174e/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 769a30a..11bd7b9 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -29,12 +29,24 @@ import static org.junit.Assert.assertTrue;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ListMultimap;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.PrintStream;
 import java.util.List;
 import java.util.Map;
@@ -1641,7 +1653,6 @@ public class PipelineOptionsFactoryTest {
     }
   }
 
-
   /**
    * A {@link PipelineRunnerRegistrar} to demonstrate default {@link 
PipelineRunner} registration.
    */
@@ -1658,7 +1669,6 @@ public class PipelineOptionsFactoryTest {
     void setRegisteredExampleFooBar(Object registeredExampleFooBar);
   }
 
-
   /**
    * A {@link PipelineOptionsRegistrar} to demonstrate default {@link 
PipelineOptions} registration.
    */
@@ -1669,4 +1679,61 @@ public class PipelineOptionsFactoryTest {
       return ImmutableList.<Class<? extends 
PipelineOptions>>of(RegisteredTestOptions.class);
     }
   }
+
+  @Test
+  public void testRegistrationOfJacksonModulesForObjectMapper() throws 
Exception {
+    JacksonIncompatibleOptions options = PipelineOptionsFactory
+        .fromArgs("--jacksonIncompatible=\"testValue\"")
+        .as(JacksonIncompatibleOptions.class);
+    assertEquals("testValue", options.getJacksonIncompatible().value);
+  }
+
+  /** PipelineOptions used to test auto registration of Jackson modules. */
+  public interface JacksonIncompatibleOptions extends PipelineOptions {
+    JacksonIncompatible getJacksonIncompatible();
+    void setJacksonIncompatible(JacksonIncompatible value);
+  }
+
+  /** A Jackson {@link Module} to test auto-registration of modules. */
+  @AutoService(Module.class)
+  public static class RegisteredTestModule extends SimpleModule {
+    public RegisteredTestModule() {
+      super("RegisteredTestModule");
+      setMixInAnnotation(JacksonIncompatible.class, 
JacksonIncompatibleMixin.class);
+    }
+  }
+
+  /** A class which Jackson does not know how to serialize/deserialize. */
+  public static class JacksonIncompatible {
+    private final String value;
+    public JacksonIncompatible(String value) {
+      this.value = value;
+    }
+  }
+
+  /** A Jackson mixin used to add annotations to other classes. */
+  @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
+  @JsonSerialize(using = JacksonIncompatibleSerializer.class)
+  public static final class JacksonIncompatibleMixin {}
+
+  /** A Jackson deserializer for {@link JacksonIncompatible}. */
+  public static class JacksonIncompatibleDeserializer extends
+      JsonDeserializer<JacksonIncompatible> {
+
+    @Override
+    public JacksonIncompatible deserialize(JsonParser jsonParser,
+        DeserializationContext deserializationContext) throws IOException, 
JsonProcessingException {
+      return new JacksonIncompatible(jsonParser.readValueAs(String.class));
+    }
+  }
+
+  /** A Jackson serializer for {@link JacksonIncompatible}. */
+  public static class JacksonIncompatibleSerializer extends 
JsonSerializer<JacksonIncompatible> {
+
+    @Override
+    public void serialize(JacksonIncompatible jacksonIncompatible, 
JsonGenerator jsonGenerator,
+        SerializerProvider serializerProvider) throws IOException, 
JsonProcessingException {
+      jsonGenerator.writeString(jacksonIncompatible.value);
+    }
+  }
 }

Reply via email to