This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 328007f0b9a3e4da31b20e75b94d9c339b168af0
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Aug 5 13:45:18 2022 +0200

    [FLINK-28621] Enable Date/Time&Optional support for all mappers
---
 .../flink/util/jackson/JacksonMapperFactory.java   | 12 +++
 .../util/jackson/JacksonMapperFactoryTest.java     | 95 ++++++++++++++++++++++
 .../plan/nodes/exec/serde/JsonSerdeUtil.java       |  6 --
 3 files changed, 107 insertions(+), 6 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
 
b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
index 2c0b57f0471..e43451f92b5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
@@ -20,7 +20,10 @@ package org.apache.flink.util.jackson;
 import org.apache.flink.annotation.Experimental;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
 /** Factory for Jackson mappers. */
 @Experimental
@@ -28,13 +31,22 @@ public final class JacksonMapperFactory {
 
     public static ObjectMapper createObjectMapper() {
         final ObjectMapper objectMapper = new ObjectMapper();
+        registerModules(objectMapper);
         return objectMapper;
     }
 
     public static CsvMapper createCsvMapper() {
         final CsvMapper csvMapper = new CsvMapper();
+        registerModules(csvMapper);
         return csvMapper;
     }
 
+    private static void registerModules(ObjectMapper mapper) {
+        mapper.registerModule(new JavaTimeModule())
+                .registerModule(new Jdk8Module().configureAbsentsAsNulls(true))
+                .disable(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS)
+                .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+    }
+
     private JacksonMapperFactory() {}
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
 
b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
index 61f2057ca82..c624b920549 100644
--- 
a/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
@@ -17,11 +17,19 @@
 
 package org.apache.flink.util.jackson;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.Instant;
+import java.util.Optional;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 class JacksonMapperFactoryTest {
@@ -41,4 +49,91 @@ class JacksonMapperFactoryTest {
 
         assertThat(mapper1).isNotSameAs(mapper2);
     }
+
+    @Test
+    void testObjectMapperOptionalSupportedEnabled() throws Exception {
+        final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
+
+        assertThat(mapper.writeValueAsString(new 
TypeWithOptional(Optional.of("value"))))
+                .isEqualTo("{\"data\":\"value\"}");
+        assertThat(mapper.writeValueAsString(new 
TypeWithOptional(Optional.empty())))
+                .isEqualTo("{\"data\":null}");
+
+        assertThat(mapper.readValue("{\"data\":\"value\"}", 
TypeWithOptional.class).data)
+                .contains("value");
+        assertThat(mapper.readValue("{\"data\":null}", 
TypeWithOptional.class).data).isEmpty();
+        assertThat(mapper.readValue("{}", 
TypeWithOptional.class).data).isEmpty();
+    }
+
+    @Test
+    void testCsvMapperOptionalSupportedEnabled() throws Exception {
+        final CsvMapper mapper =
+                JacksonMapperFactory.createCsvMapper()
+                        // ensures symmetric read/write behavior for empty 
optionals/strings
+                        // ensures:   Optional.empty() --write--> "" --read--> 
Optional.empty()
+                        // otherwise: Optional.empty() --write--> "" --read--> 
Optional("")
+                        // we should consider enabling this by default, but it 
unfortunately
+                        // also affects String parsing without Optionals 
(i.e., prior code)
+                        .enable(CsvParser.Feature.EMPTY_STRING_AS_NULL);
+
+        final ObjectWriter writer = 
mapper.writerWithSchemaFor(TypeWithOptional.class);
+
+        assertThat(writer.writeValueAsString(new 
TypeWithOptional(Optional.of("value"))))
+                .isEqualTo("value\n");
+        assertThat(writer.writeValueAsString(new 
TypeWithOptional(Optional.empty())))
+                .isEqualTo("\n");
+
+        final ObjectReader reader = 
mapper.readerWithSchemaFor(TypeWithOptional.class);
+
+        assertThat(reader.readValue("value\n", 
TypeWithOptional.class).data).contains("value");
+        assertThat(reader.readValue("null\n", 
TypeWithOptional.class).data).contains("null");
+        assertThat(reader.readValue("\n", 
TypeWithOptional.class).data).isEmpty();
+    }
+
+    @Test
+    void testObjectMappeDateTimeSupportedEnabled() throws Exception {
+        final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
+
+        final String instantString = "2022-08-07T12:00:33.107787800Z";
+        final Instant instant = Instant.parse(instantString);
+        final String instantJson = String.format("{\"data\":\"%s\"}", 
instantString);
+
+        assertThat(mapper.writeValueAsString(new 
TypeWithInstant(instant))).isEqualTo(instantJson);
+        assertThat(mapper.readValue(instantJson, 
TypeWithInstant.class).data).isEqualTo(instant);
+    }
+
+    @Test
+    void testCsvMapperDateTimeSupportedEnabled() throws Exception {
+        final CsvMapper mapper = JacksonMapperFactory.createCsvMapper();
+
+        final String instantString = "2022-08-07T12:00:33.107787800Z";
+        final Instant instant = Instant.parse(instantString);
+        final String instantCsv = String.format("\"%s\"\n", instantString);
+
+        final ObjectWriter writer = 
mapper.writerWithSchemaFor(TypeWithInstant.class);
+
+        assertThat(writer.writeValueAsString(new 
TypeWithInstant(instant))).isEqualTo(instantCsv);
+
+        final ObjectReader reader = 
mapper.readerWithSchemaFor(TypeWithInstant.class);
+
+        assertThat(reader.readValue(instantCsv, 
TypeWithInstant.class).data).isEqualTo(instant);
+    }
+
+    public static class TypeWithOptional {
+        public Optional<String> data;
+
+        @JsonCreator
+        public TypeWithOptional(@JsonProperty("data") Optional<String> data) {
+            this.data = data;
+        }
+    }
+
+    public static class TypeWithInstant {
+        public Instant data;
+
+        @JsonCreator
+        public TypeWithInstant(@JsonProperty("data") Instant data) {
+            this.data = data;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 46fd71036b2..13c36e40029 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -54,14 +54,11 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.Module;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
@@ -109,9 +106,6 @@ public class JsonSerdeUtil {
                         .getTypeFactory()
                         
.withClassLoader(JsonSerdeUtil.class.getClassLoader()));
         OBJECT_MAPPER_INSTANCE.configure(MapperFeature.USE_GETTERS_AS_SETTERS, 
false);
-        
OBJECT_MAPPER_INSTANCE.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS,
 false);
-        OBJECT_MAPPER_INSTANCE.registerModule(new 
Jdk8Module().configureAbsentsAsNulls(true));
-        OBJECT_MAPPER_INSTANCE.registerModule(new JavaTimeModule());
         OBJECT_MAPPER_INSTANCE.registerModule(createFlinkTableJacksonModule());
     }
 

Reply via email to