This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit db4f7781173bdc5790f7b3e1fd3eeebfa4b31fc9 Author: Chesnay Schepler <[email protected]> AuthorDate: Mon Jul 25 15:50:44 2022 +0200 [FLINK-28634][json] Deprecate JsonNodeDeserializationSchema Subsumed by more general 'JsonDeserializationSchema'. --- .../deserializer/KafkaRecordDeserializationSchemaTest.java | 13 ++++++++----- .../flink/formats/json/JsonNodeDeserializationSchema.java | 4 +++- .../formats/json/JsonNodeDeserializationSchemaTest.java | 1 + 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java index b0b898fb5ad..c2e735a0c13 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java @@ -18,8 +18,9 @@ package org.apache.flink.connector.kafka.source.reader.deserializer; +import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext; -import org.apache.flink.formats.json.JsonNodeDeserializationSchema; +import org.apache.flink.formats.json.JsonDeserializationSchema; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.flink.util.Collector; @@ -35,7 +36,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -58,10 +58,11 @@ public class KafkaRecordDeserializationSchemaTest { } @Test - public void testKafkaDeserializationSchemaWrapper() throws IOException { + public void testKafkaDeserializationSchemaWrapper() throws Exception { final ConsumerRecord<byte[], byte[]> consumerRecord = getConsumerRecord(); KafkaRecordDeserializationSchema<ObjectNode> schema = KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)); + schema.open(new DummyInitializationContext()); SimpleCollector<ObjectNode> collector = new SimpleCollector<>(); schema.deserialize(consumerRecord, collector); @@ -76,10 +77,12 @@ public class KafkaRecordDeserializationSchemaTest { } @Test - public void testKafkaValueDeserializationSchemaWrapper() throws IOException { + public void testKafkaValueDeserializationSchemaWrapper() throws Exception { final ConsumerRecord<byte[], byte[]> consumerRecord = getConsumerRecord(); KafkaRecordDeserializationSchema<ObjectNode> schema = - KafkaRecordDeserializationSchema.valueOnly(new JsonNodeDeserializationSchema()); + KafkaRecordDeserializationSchema.valueOnly( + new JsonDeserializationSchema<>(ObjectNode.class)); + schema.open(new DummyInitializationContext()); SimpleCollector<ObjectNode> collector = new SimpleCollector<>(); schema.deserialize(consumerRecord, collector); diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java index 36aa4843f79..928a6f1e59d 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java @@ -23,8 +23,10 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje * DeserializationSchema that deserializes a JSON String into an ObjectNode. * * <p>Fields can be accessed by calling objectNode.get(<name>).as(<type>) + * + * @deprecated Use {@code new JsonDeserializationSchema(ObjectNode.class)} instead */ -@PublicEvolving +@Deprecated public class JsonNodeDeserializationSchema extends JsonDeserializationSchema<ObjectNode> { private static final long serialVersionUID = 2L; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java index 2bd8dc5d68b..90751525feb 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java @@ -29,6 +29,7 @@ import java.io.IOException; import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link JsonNodeDeserializationSchema}. */ +@SuppressWarnings("deprecation") class JsonNodeDeserializationSchemaTest { @Test
