This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 328007f0b9a3e4da31b20e75b94d9c339b168af0 Author: Chesnay Schepler <[email protected]> AuthorDate: Fri Aug 5 13:45:18 2022 +0200 [FLINK-28621] Enable Date/Time&Optional support for all mappers --- .../flink/util/jackson/JacksonMapperFactory.java | 12 +++ .../util/jackson/JacksonMapperFactoryTest.java | 95 ++++++++++++++++++++++ .../plan/nodes/exec/serde/JsonSerdeUtil.java | 6 -- 3 files changed, 107 insertions(+), 6 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java index 2c0b57f0471..e43451f92b5 100644 --- a/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java +++ b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java @@ -20,7 +20,10 @@ package org.apache.flink.util.jackson; import org.apache.flink.annotation.Experimental; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; /** Factory for Jackson mappers. */ @Experimental @@ -28,13 +31,22 @@ public final class JacksonMapperFactory { public static ObjectMapper createObjectMapper() { final ObjectMapper objectMapper = new ObjectMapper(); + registerModules(objectMapper); return objectMapper; } public static CsvMapper createCsvMapper() { final CsvMapper csvMapper = new CsvMapper(); + registerModules(csvMapper); return csvMapper; } + private static void registerModules(ObjectMapper mapper) { + mapper.registerModule(new JavaTimeModule()) + .registerModule(new Jdk8Module().configureAbsentsAsNulls(true)) + .disable(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + private JacksonMapperFactory() {} } diff --git a/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java index 61f2057ca82..c624b920549 100644 --- a/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java @@ -17,11 +17,19 @@ package org.apache.flink.util.jackson; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser; import org.junit.jupiter.api.Test; +import java.time.Instant; +import java.util.Optional; + import static org.assertj.core.api.Assertions.assertThat; class JacksonMapperFactoryTest { @@ -41,4 +49,91 @@ class JacksonMapperFactoryTest { assertThat(mapper1).isNotSameAs(mapper2); } + + @Test + void testObjectMapperOptionalSupportedEnabled() throws Exception { + final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); + + assertThat(mapper.writeValueAsString(new TypeWithOptional(Optional.of("value")))) + .isEqualTo("{\"data\":\"value\"}"); + assertThat(mapper.writeValueAsString(new TypeWithOptional(Optional.empty()))) + .isEqualTo("{\"data\":null}"); + + assertThat(mapper.readValue("{\"data\":\"value\"}", TypeWithOptional.class).data) + .contains("value"); + assertThat(mapper.readValue("{\"data\":null}", TypeWithOptional.class).data).isEmpty(); + assertThat(mapper.readValue("{}", TypeWithOptional.class).data).isEmpty(); + } + + @Test + void testCsvMapperOptionalSupportedEnabled() throws Exception { + final CsvMapper mapper = + JacksonMapperFactory.createCsvMapper() + // ensures symmetric read/write behavior for empty optionals/strings + // ensures: Optional.empty() --write--> "" --read--> Optional.empty() + // otherwise: Optional.empty() --write--> "" --read--> Optional("") + // we should consider enabling this by default, but it unfortunately + // also affects String parsing without Optionals (i.e., prior code) + .enable(CsvParser.Feature.EMPTY_STRING_AS_NULL); + + final ObjectWriter writer = mapper.writerWithSchemaFor(TypeWithOptional.class); + + assertThat(writer.writeValueAsString(new TypeWithOptional(Optional.of("value")))) + .isEqualTo("value\n"); + assertThat(writer.writeValueAsString(new TypeWithOptional(Optional.empty()))) + .isEqualTo("\n"); + + final ObjectReader reader = mapper.readerWithSchemaFor(TypeWithOptional.class); + + assertThat(reader.readValue("value\n", TypeWithOptional.class).data).contains("value"); + assertThat(reader.readValue("null\n", TypeWithOptional.class).data).contains("null"); + assertThat(reader.readValue("\n", TypeWithOptional.class).data).isEmpty(); + } + + @Test + void testObjectMappeDateTimeSupportedEnabled() throws Exception { + final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); + + final String instantString = "2022-08-07T12:00:33.107787800Z"; + final Instant instant = Instant.parse(instantString); + final String instantJson = String.format("{\"data\":\"%s\"}", instantString); + + assertThat(mapper.writeValueAsString(new TypeWithInstant(instant))).isEqualTo(instantJson); + assertThat(mapper.readValue(instantJson, TypeWithInstant.class).data).isEqualTo(instant); + } + + @Test + void testCsvMapperDateTimeSupportedEnabled() throws Exception { + final CsvMapper mapper = JacksonMapperFactory.createCsvMapper(); + + final String instantString = "2022-08-07T12:00:33.107787800Z"; + final Instant instant = Instant.parse(instantString); + final String instantCsv = String.format("\"%s\"\n", instantString); + + final ObjectWriter writer = mapper.writerWithSchemaFor(TypeWithInstant.class); + + assertThat(writer.writeValueAsString(new TypeWithInstant(instant))).isEqualTo(instantCsv); + + final ObjectReader reader = mapper.readerWithSchemaFor(TypeWithInstant.class); + + assertThat(reader.readValue(instantCsv, TypeWithInstant.class).data).isEqualTo(instant); + } + + public static class TypeWithOptional { + public Optional<String> data; + + @JsonCreator + public TypeWithOptional(@JsonProperty("data") Optional<String> data) { + this.data = data; + } + } + + public static class TypeWithInstant { + public Instant data; + + @JsonCreator + public TypeWithInstant(@JsonProperty("data") Instant data) { + this.data = data; + } + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java index 46fd71036b2..13c36e40029 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java @@ -54,14 +54,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.Module; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jdk8.Jdk8Module; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.type.RelDataType; @@ -109,9 +106,6 @@ public class JsonSerdeUtil { .getTypeFactory() .withClassLoader(JsonSerdeUtil.class.getClassLoader())); OBJECT_MAPPER_INSTANCE.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false); - OBJECT_MAPPER_INSTANCE.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false); - OBJECT_MAPPER_INSTANCE.registerModule(new Jdk8Module().configureAbsentsAsNulls(true)); - OBJECT_MAPPER_INSTANCE.registerModule(new JavaTimeModule()); OBJECT_MAPPER_INSTANCE.registerModule(createFlinkTableJacksonModule()); }
