This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ae45bd3c50abf8b2621a5de31410ae381f7ffa04 Author: Chesnay Schepler <[email protected]> AuthorDate: Mon Jul 25 15:50:07 2022 +0200 [FLINK-28634][json] Add simple JsonSerDeSchema --- .../docs/connectors/datastream/formats/json.md | 77 ++++++++++++++ .../docs/connectors/datastream/formats/json.md | 77 ++++++++++++++ flink-formats/flink-json/pom.xml | 6 ++ .../formats/json/JsonDeserializationSchema.java | 70 +++++++++++++ .../json/JsonNodeDeserializationSchema.java | 17 +--- .../formats/json/JsonSerializationSchema.java | 59 +++++++++++ .../json/JsonNodeDeserializationSchemaTest.java | 3 + .../flink/formats/json/JsonSerDeSchemaTest.java | 111 +++++++++++++++++++++ .../formats/DummyInitializationContext.java | 41 ++++++++ 9 files changed, 448 insertions(+), 13 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/formats/json.md b/docs/content.zh/docs/connectors/datastream/formats/json.md new file mode 100644 index 00000000000..232c45441f3 --- /dev/null +++ b/docs/content.zh/docs/connectors/datastream/formats/json.md @@ -0,0 +1,77 @@ +--- +title: "JSON" +weight: 4 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Json format + +To use the JSON format you need to add the Flink JSON dependency to your project: + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>{{< version >}}</version> + <scope>provided</scope> +</dependency> +``` + +Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`. +These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`. + +The `JsonDeserializationSchema` can be used with any connector that supports the `DeserializationSchema`. + +For example, this is how you use it with a `KafkaSource` to deserialize a `POJO`: + +```java +JsonDeserializationSchema<SomePojo> jsonFormat = new JsonDeserializationSchema<>(SomePojo.class); +KafkaSource<SomePojo> source = + KafkaSource.<SomePojo>builder() + .setValueOnlyDeserializer(jsonFormat) + ... +``` + +The `JsonSerializationSchema` can be used with any connector that supports the `SerializationSchema`. + +For example, this is how you use it with a `KafkaSink` to serialize a `POJO`: + +```java +JsonSerializationSchema<SomePojo> jsonFormat = new JsonSerializationSchema<>(); +KafkaSink<SomePojo> source = + KafkaSink.<SomePojo>builder() + .setRecordSerializer( + new KafkaRecordSerializationSchemaBuilder<>() + .setValueSerializationSchema(jsonFormat) + ... +``` + +## Custom Mapper + +Both schemas have constructors that accept a `SerializableSupplier<ObjectMapper>`, acting a factory for object mappers. +With this factory you gain full control over the created mapper, and can enable/disable various Jackson features or register modules to extend the set of supported types or add additional functionality. + +```java +JsonSerializationSchema<SomeClass> jsonFormat = new JsonSerializationSchema<>( + () -> new ObjectMapper() + .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)) + .registerModule(new ParameterNamesModule()); +``` \ No newline at end of file diff --git a/docs/content/docs/connectors/datastream/formats/json.md b/docs/content/docs/connectors/datastream/formats/json.md new file mode 100644 index 00000000000..57c97cb6b20 --- /dev/null +++ b/docs/content/docs/connectors/datastream/formats/json.md @@ -0,0 +1,77 @@ +--- +title: "JSON" +weight: 4 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Json format + +To use the JSON format you need to add the Flink JSON dependency to your project: + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>{{< version >}}</version> + <scope>provided</scope> +</dependency> +``` + +Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`. +These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`. + +The `JsonDeserializationSchema` can be used with any connector that supports the `DeserializationSchema`. + +For example, this is how you use it with a `KafkaSource` to deserialize a `POJO`: + +```java +JsonDeserializationSchema<SomePojo> jsonFormat=new JsonDeserializationSchema<>(SomePojo.class); +KafkaSource<SomePojo> source= + KafkaSource.<SomePojo>builder() + .setValueOnlyDeserializer(jsonFormat) + ... +``` + +The `JsonSerializationSchema` can be used with any connector that supports the `SerializationSchema`. + +For example, this is how you use it with a `KafkaSink` to serialize a `POJO`: + +```java +JsonSerializationSchema<SomePojo> jsonFormat=new JsonSerializationSchema<>(); +KafkaSink<SomePojo> source = + KafkaSink.<SomePojo>builder() + .setRecordSerializer( + new KafkaRecordSerializationSchemaBuilder<>() + .setValueSerializationSchema(jsonFormat) + ... +``` + +## Custom Mapper + +Both schemas have constructors that accept a `SerializableSupplier<ObjectMapper>`, acting a factory for object mappers. +With this factory you gain full control over the created mapper, and can enable/disable various Jackson features or register modules to extend the set of supported types or add additional functionality. + +```java +JsonSerializationSchema<SomeClass> jsonFormat=new JsonSerializationSchema<>( + () -> new ObjectMapper() + .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)) + .registerModule(new ParameterNamesModule()); +``` \ No newline at end of file diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml index 28c7b4629b9..ab697cf7ed9 100644 --- a/flink-formats/flink-json/pom.xml +++ b/flink-formats/flink-json/pom.xml @@ -109,6 +109,12 @@ under the License. </dependency> <!-- test utils dependency --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils</artifactId> diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java new file mode 100644 index 00000000000..fd28712d56a --- /dev/null +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.function.SerializableSupplier; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +/** DeserializationSchema that deserializes a JSON String. */ +@PublicEvolving +public class JsonDeserializationSchema<T> extends AbstractDeserializationSchema<T> { + + private static final long serialVersionUID = 1L; + + private final Class<T> clazz; + private final SerializableSupplier<ObjectMapper> mapperFactory; + protected transient ObjectMapper mapper; + + public JsonDeserializationSchema(Class<T> clazz) { + this(clazz, () -> new ObjectMapper()); + } + + public JsonDeserializationSchema(TypeInformation<T> typeInformation) { + this(typeInformation, () -> new ObjectMapper()); + } + + public JsonDeserializationSchema( + Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory) { + super(clazz); + this.clazz = clazz; + this.mapperFactory = mapperFactory; + } + + public JsonDeserializationSchema( + TypeInformation<T> typeInformation, SerializableSupplier<ObjectMapper> mapperFactory) { + super(typeInformation); + this.clazz = typeInformation.getTypeClass(); + this.mapperFactory = mapperFactory; + } + + @Override + public void open(InitializationContext context) { + mapper = mapperFactory.get(); + } + + @Override + public T deserialize(byte[] message) throws IOException { + return mapper.readValue(message, clazz); + } +} diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java index 55c61e1a6ba..36aa4843f79 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java @@ -17,28 +17,19 @@ package org.apache.flink.formats.json; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import java.io.IOException; - /** * DeserializationSchema that deserializes a JSON String into an ObjectNode. * * <p>Fields can be accessed by calling objectNode.get(<name>).as(<type>) */ @PublicEvolving -public class JsonNodeDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> { - - private static final long serialVersionUID = -1699854177598621044L; +public class JsonNodeDeserializationSchema extends JsonDeserializationSchema<ObjectNode> { - private final ObjectMapper mapper = new ObjectMapper(); + private static final long serialVersionUID = 2L; - @Override - public ObjectNode deserialize(byte[] message) throws IOException { - return mapper.readValue(message, ObjectNode.class); + public JsonNodeDeserializationSchema() { + super(ObjectNode.class); } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java new file mode 100644 index 00000000000..c029fa1dc9c --- /dev/null +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.util.function.SerializableSupplier; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +/** SerializationSchema that serializes an object to a JSON String. */ +@PublicEvolving +public class JsonSerializationSchema<T> implements SerializationSchema<T> { + + private static final long serialVersionUID = 1L; + + private final SerializableSupplier<ObjectMapper> mapperFactory; + + protected transient ObjectMapper mapper; + + public JsonSerializationSchema() { + this(() -> new ObjectMapper()); + } + + public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory) { + this.mapperFactory = mapperFactory; + } + + @Override + public void open(InitializationContext context) { + mapper = mapperFactory.get(); + } + + @Override + public byte[] serialize(T element) { + try { + return mapper.writeValueAsBytes(element); + } catch (JsonProcessingException e) { + throw new RuntimeException( + String.format("Could not serialize value '%s'.", element), e); + } + } +} diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java index 741b492dedf..2bd8dc5d68b 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java @@ -17,6 +17,8 @@ package org.apache.flink.formats.json; +import org.apache.flink.connector.testutils.formats.DummyInitializationContext; + import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -37,6 +39,7 @@ class JsonNodeDeserializationSchemaTest { byte[] serializedValue = mapper.writeValueAsBytes(initialValue); JsonNodeDeserializationSchema schema = new JsonNodeDeserializationSchema(); + schema.open(new DummyInitializationContext()); ObjectNode deserializedValue = schema.deserialize(serializedValue); assertThat(deserializedValue.get("key").asInt()).isEqualTo(4); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java new file mode 100644 index 00000000000..5ed992c1eb1 --- /dev/null +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.connector.testutils.formats.DummyInitializationContext; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +import static org.assertj.core.api.Assertions.assertThat; + +class JsonSerDeSchemaTest { + private static final JsonSerializationSchema<Event> SERIALIZATION_SCHEMA; + private static final JsonDeserializationSchema<Event> DESERIALIZATION_SCHEMA; + private static final String JSON = "{\"x\":34,\"y\":\"hello\"}"; + + static { + SERIALIZATION_SCHEMA = new JsonSerializationSchema<>(); + SERIALIZATION_SCHEMA.open(new DummyInitializationContext()); + DESERIALIZATION_SCHEMA = new JsonDeserializationSchema<>(Event.class); + DESERIALIZATION_SCHEMA.open(new DummyInitializationContext()); + } + + @Test + void testSrialization() throws IOException { + final byte[] serialized = SERIALIZATION_SCHEMA.serialize(new Event(34, "hello")); + assertThat(serialized).isEqualTo(JSON.getBytes(StandardCharsets.UTF_8)); + } + + @Test + void testDeserialization() throws IOException { + final Event deserialized = + DESERIALIZATION_SCHEMA.deserialize(JSON.getBytes(StandardCharsets.UTF_8)); + assertThat(deserialized).isEqualTo(new Event(34, "hello")); + } + + @Test + void testRoundTrip() throws IOException { + final Event original = new Event(34, "hello"); + + final byte[] serialized = SERIALIZATION_SCHEMA.serialize(original); + + final Event deserialized = DESERIALIZATION_SCHEMA.deserialize(serialized); + + assertThat(deserialized).isEqualTo(original); + } + + private static class Event { + + private int x; + private String y = null; + + public Event() {} + + public Event(int x, String y) { + this.x = x; + this.y = y; + } + + public int getX() { + return x; + } + + public void setX(int x) { + this.x = x; + } + + public String getY() { + return y; + } + + public void setY(String y) { + this.y = y; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Event event = (Event) o; + return x == event.x && Objects.equals(y, event.y); + } + + @Override + public int hashCode() { + return Objects.hash(x, y); + } + } +} diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java new file mode 100644 index 00000000000..6335f0d3fc9 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testutils.formats; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.SimpleUserCodeClassLoader; +import org.apache.flink.util.UserCodeClassLoader; + +/** A dummy context for serialization schemas. */ +public class DummyInitializationContext + implements SerializationSchema.InitializationContext, + DeserializationSchema.InitializationContext { + + @Override + public MetricGroup getMetricGroup() { + return new UnregisteredMetricsGroup(); + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + return SimpleUserCodeClassLoader.create(DummyInitializationContext.class.getClassLoader()); + } +}
