This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c0bf0ac3fb1fe4814bff09807ed2040bb13da052 Author: Chesnay Schepler <[email protected]> AuthorDate: Thu Jul 28 16:02:26 2022 +0200 [FLINK-28621][core] Add central Jackson mapper factory methods --- .../sink/testutils/KinesisFirehoseTestUtils.java | 6 ++-- .../kinesis/sink/examples/SinkIntoKinesis.java | 9 +++-- .../JSONKeyValueDeserializationSchema.java | 3 +- .../KafkaRecordDeserializationSchemaTest.java | 12 ++++--- .../JSONKeyValueDeserializationSchemaTest.java | 31 ++++++++--------- flink-core/pom.xml | 11 +++--- .../flink/util/jackson/JacksonMapperFactory.java | 40 ++++++++++++++++++++++ .../util/jackson/JacksonMapperFactoryTest.java | 37 +++++++++----------- .../flink/docs/rest/OpenApiSpecGenerator.java | 4 ++- .../flink/docs/rest/RestAPIDocGenerator.java | 3 +- .../table/test/KinesisFirehoseTableITTest.java | 14 +++++--- .../table/test/KinesisStreamsTableApiIT.java | 11 +++--- .../flink/tests/util/flink/FlinkDistribution.java | 3 +- .../kinesis/test/KinesisTableApiITCase.java | 11 +++--- .../flink/streaming/kinesis/test/model/Order.java | 2 +- .../apache/flink/formats/csv/CsvBulkWriter.java | 3 +- .../flink/formats/csv/CsvFileFormatFactory.java | 5 +-- .../apache/flink/formats/csv/CsvReaderFormat.java | 5 +-- .../csv/CsvRowDataDeserializationSchema.java | 5 +-- .../formats/csv/CsvRowDataSerializationSchema.java | 3 +- .../formats/csv/CsvRowDeserializationSchema.java | 5 +-- .../formats/csv/CsvRowSerializationSchema.java | 3 +- .../flink/formats/csv/RowCsvInputFormat.java | 4 +-- .../flink/formats/csv/DataStreamCsvITCase.java | 10 +++--- .../formats/json/JsonDeserializationSchema.java | 5 +-- .../json/JsonRowDataDeserializationSchema.java | 3 +- .../json/JsonRowDataSerializationSchema.java | 3 +- .../formats/json/JsonRowDeserializationSchema.java | 3 +- .../flink/formats/json/JsonRowSchemaConverter.java | 3 +- .../formats/json/JsonRowSerializationSchema.java | 3 +- .../json/JsonNodeDeserializationSchemaTest.java | 3 +- .../formats/json/JsonRowDataSerDeSchemaTest.java | 37 +++++++++----------- .../json/JsonRowDeserializationSchemaTest.java | 21 +++++------- .../flink/python/metric/FlinkMetricContainer.java | 5 ++- .../runtime/webmonitor/history/HistoryServer.java | 3 +- .../history/HistoryServerArchiveFetcher.java | 3 +- .../rest/compatibility/CompatibilityRoutines.java | 3 +- .../rest/compatibility/RestAPIStabilityTest.java | 3 +- .../runtime/webmonitor/WebFrontendITCase.java | 17 ++++----- .../webmonitor/history/HistoryServerTest.java | 4 ++- .../highavailability/FileSystemJobResultStore.java | 3 +- .../flink/runtime/history/FsJobArchivist.java | 3 +- .../flink/runtime/rest/util/RestMapperUtils.java | 4 ++- .../FileSystemJobResultStoreTestInternal.java | 3 +- .../jobgraph/jsonplan/JsonGeneratorTest.java | 3 +- .../taskmanager/TaskManagerDetailsHandlerTest.java | 3 +- .../messages/json/JobResultDeserializerTest.java | 3 +- .../json/SerializedThrowableSerializerTest.java | 5 +-- .../json/SerializedValueSerializerTest.java | 3 +- .../flink/streaming/api/graph/JSONGenerator.java | 3 +- .../table/connector/source/CompactPartitions.java | 3 +- .../plan/nodes/exec/serde/JsonSerdeUtil.java | 3 +- .../nodes/exec/serde/LogicalTypeJsonSerdeTest.java | 6 ++-- .../nodes/exec/serde/PartitionSpecSerdeTest.java | 13 ++++--- .../exec/serde/RankProcessStrategySerdeTest.java | 3 +- .../plan/nodes/exec/serde/RankRangeSerdeTest.java | 3 +- .../plan/nodes/exec/serde/RankTypeSerdeTest.java | 3 +- .../plan/nodes/exec/serde/SortSpecSerdeTest.java | 12 ++++--- .../flink/table/planner/utils/JsonTestUtils.java | 4 ++- .../flink/table/planner/utils/TableTestBase.scala | 7 ++-- .../jsonplan/JsonJobGraphGenerationTest.java | 6 ++-- 61 files changed, 278 insertions(+), 177 deletions(-) diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java index 129c4a43e7f..f4dee623b64 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.firehose.sink.testutils; import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -39,7 +40,7 @@ import java.util.List; */ public class KinesisFirehoseTestUtils { - private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper(); public static FirehoseClient createFirehoseClient(String endpoint, SdkHttpClient httpClient) { return AWSServicesTestUtils.createAwsSyncClient( @@ -68,11 +69,10 @@ public class KinesisFirehoseTestUtils { public static DataStream<String> getSampleDataGenerator( StreamExecutionEnvironment env, int endValue) { - ObjectMapper mapper = new ObjectMapper(); return env.fromSequence(1, endValue) .map(Object::toString) .returns(String.class) - .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data))); + .map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", data))); } public static List<String> getSampleData(int endValue) throws JsonProcessingException { diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java index 362e25fda77..c861f44fbdf 100644 --- a/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java @@ -22,6 +22,7 @@ import org.apache.flink.connector.aws.config.AWSConfigConstants; import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -39,8 +40,9 @@ import java.util.Properties; */ public class SinkIntoKinesis { + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + public static void main(String[] args) throws Exception { - ObjectMapper mapper = new ObjectMapper(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10_000); @@ -48,7 +50,10 @@ public class SinkIntoKinesis { env.fromSequence(1, 10_000_000L) .map(Object::toString) .returns(String.class) - .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data))); + .map( + data -> + OBJECT_MAPPER.writeValueAsString( + ImmutableMap.of("data", data))); Properties sinkProperties = new Properties(); sinkProperties.put(AWSConfigConstants.AWS_REGION, "your-region-here"); diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java index 86817433a6f..e2b428eec1e 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -55,7 +56,7 @@ public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSc @Override public void open(DeserializationSchema.InitializationContext context) throws Exception { - mapper = new ObjectMapper(); + mapper = JacksonMapperFactory.createObjectMapper(); } @Override diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java index c2e735a0c13..8766719a0c1 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java @@ -23,6 +23,7 @@ import org.apache.flink.connector.testutils.source.deserialization.TestingDeseri import org.apache.flink.formats.json.JsonDeserializationSchema; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.flink.util.Collector; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; @@ -46,6 +47,8 @@ import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for KafkaRecordDeserializationSchema. */ public class KafkaRecordDeserializationSchemaTest { + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + private static Map<String, ?> configurableConfiguration; private static Map<String, ?> configuration; private static boolean isKeyDeserializer; @@ -135,14 +138,13 @@ public class KafkaRecordDeserializationSchemaTest { } private ConsumerRecord<byte[], byte[]> getConsumerRecord() throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - ObjectNode initialKey = mapper.createObjectNode(); + ObjectNode initialKey = OBJECT_MAPPER.createObjectNode(); initialKey.put("index", 4); - byte[] serializedKey = mapper.writeValueAsBytes(initialKey); + byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey); - ObjectNode initialValue = mapper.createObjectNode(); + ObjectNode initialValue = OBJECT_MAPPER.createObjectNode(); initialValue.put("word", "world"); - byte[] serializedValue = mapper.writeValueAsBytes(initialValue); + byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue); return new ConsumerRecord<>("topic#1", 3, 4L, serializedKey, serializedValue); } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java index 3a052cee2af..ddbcf1c94c8 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -31,16 +32,17 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for the{@link JSONKeyValueDeserializationSchema}. */ public class JSONKeyValueDeserializationSchemaTest { + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + @Test public void testDeserializeWithoutMetadata() throws Exception { - ObjectMapper mapper = new ObjectMapper(); - ObjectNode initialKey = mapper.createObjectNode(); + ObjectNode initialKey = OBJECT_MAPPER.createObjectNode(); initialKey.put("index", 4); - byte[] serializedKey = mapper.writeValueAsBytes(initialKey); + byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey); - ObjectNode initialValue = mapper.createObjectNode(); + ObjectNode initialValue = OBJECT_MAPPER.createObjectNode(); initialValue.put("word", "world"); - byte[] serializedValue = mapper.writeValueAsBytes(initialValue); + byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue); JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false); schema.open(new DummyInitializationContext()); @@ -54,12 +56,11 @@ public class JSONKeyValueDeserializationSchemaTest { @Test public void testDeserializeWithoutKey() throws Exception { - ObjectMapper mapper = new ObjectMapper(); byte[] serializedKey = null; - ObjectNode initialValue = mapper.createObjectNode(); + ObjectNode initialValue = OBJECT_MAPPER.createObjectNode(); initialValue.put("word", "world"); - byte[] serializedValue = mapper.writeValueAsBytes(initialValue); + byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue); JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false); schema.open(new DummyInitializationContext()); @@ -88,10 +89,9 @@ public class JSONKeyValueDeserializationSchemaTest { @Test public void testDeserializeWithoutValue() throws Exception { - ObjectMapper mapper = new ObjectMapper(); - ObjectNode initialKey = mapper.createObjectNode(); + ObjectNode initialKey = OBJECT_MAPPER.createObjectNode(); initialKey.put("index", 4); - byte[] serializedKey = mapper.writeValueAsBytes(initialKey); + byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey); byte[] serializedValue = null; @@ -107,14 +107,13 @@ public class JSONKeyValueDeserializationSchemaTest { @Test public void testDeserializeWithMetadata() throws Exception { - ObjectMapper mapper = new ObjectMapper(); - ObjectNode initialKey = mapper.createObjectNode(); + ObjectNode initialKey = OBJECT_MAPPER.createObjectNode(); initialKey.put("index", 4); - byte[] serializedKey = mapper.writeValueAsBytes(initialKey); + byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey); - ObjectNode initialValue = mapper.createObjectNode(); + ObjectNode initialValue = OBJECT_MAPPER.createObjectNode(); initialValue.put("word", "world"); - byte[] serializedValue = mapper.writeValueAsBytes(initialValue); + byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue); JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(true); schema.open(new DummyInitializationContext()); diff --git a/flink-core/pom.xml b/flink-core/pom.xml index ef3061f1ac5..45be24efc05 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -51,6 +51,11 @@ under the License. <artifactId>flink-shaded-asm-9</artifactId> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-jackson</artifactId> + </dependency> + <!-- standard utilities --> <dependency> <groupId>org.apache.commons</groupId> @@ -114,12 +119,6 @@ under the License. <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-jackson</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> diff --git a/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java new file mode 100644 index 00000000000..2c0b57f0471 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.jackson; + +import org.apache.flink.annotation.Experimental; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; + +/** Factory for Jackson mappers. */ +@Experimental +public final class JacksonMapperFactory { + + public static ObjectMapper createObjectMapper() { + final ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper; + } + + public static CsvMapper createCsvMapper() { + final CsvMapper csvMapper = new CsvMapper(); + return csvMapper; + } + + private JacksonMapperFactory() {} +} diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java similarity index 52% copy from flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java copy to flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java index 90751525feb..61f2057ca82 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java @@ -15,35 +15,30 @@ * limitations under the License. */ -package org.apache.flink.formats.json; - -import org.apache.flink.connector.testutils.formats.DummyInitializationContext; +package org.apache.flink.util.jackson; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; import org.junit.jupiter.api.Test; -import java.io.IOException; - import static org.assertj.core.api.Assertions.assertThat; -/** Tests for the {@link JsonNodeDeserializationSchema}. */ -@SuppressWarnings("deprecation") -class JsonNodeDeserializationSchemaTest { +class JacksonMapperFactoryTest { @Test - void testDeserialize() throws IOException { - ObjectMapper mapper = new ObjectMapper(); - ObjectNode initialValue = mapper.createObjectNode(); - initialValue.put("key", 4).put("value", "world"); - byte[] serializedValue = mapper.writeValueAsBytes(initialValue); - - JsonNodeDeserializationSchema schema = new JsonNodeDeserializationSchema(); - schema.open(new DummyInitializationContext()); - ObjectNode deserializedValue = schema.deserialize(serializedValue); - - assertThat(deserializedValue.get("key").asInt()).isEqualTo(4); - assertThat(deserializedValue.get("value").asText()).isEqualTo("world"); + void testCreateObjectMapperReturnDistinctMappers() { + final ObjectMapper mapper1 = JacksonMapperFactory.createObjectMapper(); + final ObjectMapper mapper2 = JacksonMapperFactory.createObjectMapper(); + + assertThat(mapper1).isNotSameAs(mapper2); + } + + @Test + void testCreateCsvMapperReturnDistinctMappers() { + final CsvMapper mapper1 = JacksonMapperFactory.createCsvMapper(); + final CsvMapper mapper2 = JacksonMapperFactory.createCsvMapper(); + + assertThat(mapper1).isNotSameAs(mapper2); } } diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java index 0265ff2c957..fe17e807f8f 100644 --- a/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java +++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders; import org.apache.flink.util.ConfigurationException; import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature; @@ -100,7 +101,8 @@ public class OpenApiSpecGenerator { static { ModelResolver.enumsAsRef = true; final ObjectMapper mapper = - new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); + JacksonMapperFactory.createObjectMapper() + .configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); modelConverterContext = new ModelConverterContextImpl(Collections.singletonList(new ModelResolver(mapper))); } diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java index 489bc6415ed..a21ec24cc3e 100644 --- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java +++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint; import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint; import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion; import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.SerializableString; @@ -99,7 +100,7 @@ public class RestAPIDocGenerator { private static final JsonSchemaGenerator schemaGen; static { - mapper = new ObjectMapper(); + mapper = JacksonMapperFactory.createObjectMapper(); mapper.getFactory().setCharacterEscapes(new HTMLCharacterEscapes()); mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); schemaGen = new JsonSchemaGenerator(mapper); diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java index 172693c5ac6..ce67a707aec 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java @@ -26,10 +26,13 @@ import org.apache.flink.test.util.SQLJobSubmission; import org.apache.flink.tests.util.TestUtils; import org.apache.flink.util.DockerImageVersions; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.AfterClass; @@ -79,6 +82,8 @@ public class KinesisFirehoseTableITTest extends TestLogger { private static final String BUCKET_NAME = "s3-firehose"; private static final String STREAM_NAME = "s3-stream"; + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + private final Path sqlConnectorFirehoseJar = TestUtils.getResource(".*firehose.jar"); private SdkHttpClient httpClient; @@ -207,7 +212,7 @@ public class KinesisFirehoseTableITTest extends TestLogger { private <T> T fromJson(final String json, final Class<T> type) { try { - return new ObjectMapper().readValue(json, type); + return OBJECT_MAPPER.readValue(json, type); } catch (JsonProcessingException e) { throw new RuntimeException(String.format("Failed to deserialize json: %s", json), e); } @@ -218,6 +223,7 @@ public class KinesisFirehoseTableITTest extends TestLogger { private final String code; private final int quantity; + @JsonCreator public Order( @JsonProperty("code") final String code, @JsonProperty("quantity") int quantity) { this.code = code; diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java index 1f415c4bcec..d52ca2cb83d 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java @@ -27,12 +27,13 @@ import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer; import org.apache.flink.test.util.SQLJobSubmission; import org.apache.flink.tests.util.TestUtils; import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.AfterClass; @@ -78,6 +79,8 @@ public class KinesisStreamsTableApiIT { private static final String ORDERS_STREAM = "orders"; private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite"; private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000"; + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + private SdkHttpClient httpClient; private KinesisClient kinesisClient; @@ -202,7 +205,7 @@ public class KinesisStreamsTableApiIT { private <T> T fromJson(final String json, final Class<T> type) { try { - return new ObjectMapper().readValue(json, type); + return OBJECT_MAPPER.readValue(json, type); } catch (JsonProcessingException e) { throw new RuntimeException("Test Failure.", e); } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java index e9b939adf4d..05f11f820b8 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java @@ -29,6 +29,7 @@ import org.apache.flink.tests.util.AutoClosableProcess; import org.apache.flink.tests.util.TestUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.FutureTaskWithException; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -75,7 +76,7 @@ public final class FlinkDistribution { private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); private static final Pattern ROOT_LOGGER_PATTERN = Pattern.compile("(rootLogger.level =).*"); private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver"; diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java index 4cd75934ed7..b637108e5db 100644 --- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java @@ -28,11 +28,12 @@ import org.apache.flink.test.util.SQLJobSubmission; import org.apache.flink.tests.util.TestUtils; import org.apache.flink.util.DockerImageVersions; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.hamcrest.Matchers; import org.junit.After; import org.junit.AfterClass; @@ -67,6 +68,8 @@ public class KinesisTableApiITCase extends TestLogger { private static final String LARGE_ORDERS_STREAM = "large_orders"; private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite"; + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + private final Path sqlConnectorKinesisJar = TestUtils.getResource(".*kinesis.jar"); private static final Network network = Network.newNetwork(); @@ -166,7 +169,7 @@ public class KinesisTableApiITCase extends TestLogger { private <T> String toJson(final T object) { try { - return new ObjectMapper().writeValueAsString(object); + return OBJECT_MAPPER.writeValueAsString(object); } catch (JsonProcessingException e) { throw new RuntimeException("Test Failure.", e); } @@ -174,7 +177,7 @@ public class KinesisTableApiITCase extends TestLogger { private <T> T fromJson(final String json, final Class<T> type) { try { - return new ObjectMapper().readValue(json, type); + return OBJECT_MAPPER.readValue(json, type); } catch (JsonProcessingException e) { throw new RuntimeException("Test Failure.", e); } diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java index 15c158c4c8f..58cec308eae 100644 --- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.kinesis.test.model; -import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import java.util.Objects; diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java index 69b3f9bce28..f9a8e01bb07 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java @@ -21,6 +21,7 @@ package org.apache.flink.formats.csv; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.formats.common.Converter; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; @@ -89,7 +90,7 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> { */ static <T> CsvBulkWriter<T, T, Void> forPojo(Class<T> pojoClass, FSDataOutputStream stream) { final Converter<T, T, Void> converter = (value, context) -> value; - final CsvMapper csvMapper = new CsvMapper(); + final CsvMapper csvMapper = JacksonMapperFactory.createCsvMapper(); final CsvSchema schema = csvMapper.schemaFor(pojoClass).withoutQuoteChar(); return new CsvBulkWriter<>(csvMapper, schema, converter, null, stream); } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java index b0a0b27d5e7..f977acd5b4f 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java @@ -46,6 +46,7 @@ import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.plan.stats.TableStats; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -132,7 +133,7 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter .createRowConverter(projectedRowType, true); CsvReaderFormat<RowData> csvReaderFormat = new CsvReaderFormat<>( - () -> new CsvMapper(), + () -> JacksonMapperFactory.createCsvMapper(), ignored -> schema, JsonNode.class, converter, @@ -178,7 +179,7 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter final RowDataToCsvConverter converter = RowDataToCsvConverters.createRowConverter(rowType); return out -> { - final CsvMapper mapper = new CsvMapper(); + final CsvMapper mapper = JacksonMapperFactory.createCsvMapper(); final ObjectNode container = mapper.createObjectNode(); final RowDataToCsvConverter.RowDataToCsvFormatConverterContext converterContext = diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java index c2d14008cb9..02f3afbe43e 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java @@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.formats.common.Converter; import org.apache.flink.util.function.SerializableFunction; import org.apache.flink.util.function.SerializableSupplier; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; @@ -108,7 +109,7 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> { */ public static <T> CsvReaderFormat<T> forSchema( CsvSchema schema, TypeInformation<T> typeInformation) { - return forSchema(() -> new CsvMapper(), ignored -> schema, typeInformation); + return forSchema(JacksonMapperFactory::createCsvMapper, ignored -> schema, typeInformation); } /** @@ -163,7 +164,7 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> { */ public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType) { return forSchema( - () -> new CsvMapper(), + () -> JacksonMapperFactory.createCsvMapper(), mapper -> mapper.schemaFor(pojoType).withoutQuoteChar(), TypeInformation.of(pojoType)); } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java index 0c3f68e97f8..b77312d41b8 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java @@ -24,10 +24,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; import javax.annotation.Nullable; @@ -77,7 +77,8 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch @Override public void open(InitializationContext context) { - this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + this.objectReader = + JacksonMapperFactory.createCsvMapper().readerFor(JsonNode.class).with(csvSchema); } /** A builder for creating a {@link CsvRowDataDeserializationSchema}. */ diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java index f7fd810e691..737b74eda20 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java @@ -24,6 +24,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.SerializableSupplier; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -152,7 +153,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema< rowType, csvSchema, () -> { - final CsvMapper csvMapper = new CsvMapper(); + final CsvMapper csvMapper = JacksonMapperFactory.createCsvMapper(); csvMapper.configure( JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, !isScientificNotation); diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java index 24153b4de46..b2a91a9fb34 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java @@ -28,10 +28,10 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; import java.io.IOException; @@ -93,7 +93,8 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema< @Override public void open(InitializationContext context) throws Exception { - objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + objectReader = + JacksonMapperFactory.createCsvMapper().readerFor(JsonNode.class).with(csvSchema); } /** A builder for creating a {@link CsvRowDeserializationSchema}. */ diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java index eca56958d96..53f4edf16e3 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; @@ -93,7 +94,7 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row> @Override public void open(InitializationContext context) throws Exception { - this.csvMapper = new CsvMapper(); + this.csvMapper = JacksonMapperFactory.createCsvMapper(); this.objectWriter = csvMapper.writer(csvSchema); } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java index 75a22cb2dd6..53634c5cc83 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java @@ -24,10 +24,10 @@ import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.csv.CsvRowDeserializationSchema.RuntimeConverter; import org.apache.flink.types.Row; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; import java.io.IOException; @@ -90,7 +90,7 @@ public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> { super.open(split); prepareConverter(); this.iterator = - new CsvMapper() + JacksonMapperFactory.createCsvMapper() .readerFor(JsonNode.class) .with(csvSchema) .readValues(csvInputStream); diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java index ea073432116..363a17afdef 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.collect.ClientAndIterator; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; @@ -74,6 +75,8 @@ import static org.assertj.core.api.Assertions.assertThat; @ExtendWith({TestLoggerExtension.class}) public class DataStreamCsvITCase { + private static final CsvMapper CSV_MAPPER = JacksonMapperFactory.createCsvMapper(); + private static final int PARALLELISM = 4; @TempDir File outDir; @@ -163,7 +166,7 @@ public class DataStreamCsvITCase { final CsvReaderFormat<CityPojo> csvFormat = CsvReaderFormat.forSchema( - () -> new CsvMapper(), + () -> CSV_MAPPER, mapper -> mapper.schemaFor(CityPojo.class) .withoutQuoteChar() @@ -229,9 +232,8 @@ public class DataStreamCsvITCase { private static <T> BulkWriter.Factory<T> factoryForPojo(Class<T> pojoClass) { final Converter<T, T, Void> converter = (value, context) -> value; - final CsvMapper csvMapper = new CsvMapper(); - final CsvSchema schema = csvMapper.schemaFor(pojoClass).withoutQuoteChar(); - return (out) -> new CsvBulkWriter<>(csvMapper, schema, converter, null, out); + final CsvSchema schema = CSV_MAPPER.schemaFor(pojoClass).withoutQuoteChar(); + return (out) -> new CsvBulkWriter<>(CSV_MAPPER, schema, converter, null, out); } private static Map<File, String> getFileContentByPath(File directory) throws IOException { diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java index fd28712d56a..cc244b0a0c2 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.function.SerializableSupplier; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -37,11 +38,11 @@ public class JsonDeserializationSchema<T> extends AbstractDeserializationSchema< protected transient ObjectMapper mapper; public JsonDeserializationSchema(Class<T> clazz) { - this(clazz, () -> new ObjectMapper()); + this(clazz, JacksonMapperFactory::createObjectMapper); } public JsonDeserializationSchema(TypeInformation<T> typeInformation) { - this(typeInformation, () -> new ObjectMapper()); + this(typeInformation, JacksonMapperFactory::createObjectMapper); } public JsonDeserializationSchema( diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java index 805b299c11c..9a57bac203b 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java @@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; @@ -97,7 +98,7 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R @Override public void open(InitializationContext context) throws Exception { objectMapper = - new ObjectMapper() + JacksonMapperFactory.createObjectMapper() .configure( JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true); diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java index 6a8c619eeac..c8b7f73b64d 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -86,7 +87,7 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa @Override public void open(InitializationContext context) throws Exception { mapper = - new ObjectMapper() + JacksonMapperFactory.createObjectMapper() .configure( JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, encodeDecimalAsPlainNumber); diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java index b2b7e6dada5..dd4a9bb9f99 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.types.Row; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; @@ -121,7 +122,7 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row> @Override public void open(InitializationContext context) throws Exception { - objectMapper = new ObjectMapper(); + objectMapper = JacksonMapperFactory.createObjectMapper(); if (hasDecimalType) { objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java index fe412320456..d7761851ee4 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -97,7 +98,7 @@ public final class JsonRowSchemaConverter { @SuppressWarnings("unchecked") public static <T> TypeInformation<T> convert(String jsonSchema) { Preconditions.checkNotNull(jsonSchema, "JSON schema"); - final ObjectMapper mapper = new ObjectMapper(); + final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); mapper.getFactory() .enable(JsonParser.Feature.ALLOW_COMMENTS) .enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES) diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java index e789307967e..f185d211bf9 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import org.apache.flink.util.WrappingRuntimeException; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -96,7 +97,7 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> { @Override public void open(InitializationContext context) throws Exception { - mapper = new ObjectMapper(); + mapper = JacksonMapperFactory.createObjectMapper(); } /** Builder for {@link JsonRowSerializationSchema}. */ diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java index 90751525feb..e6b2a3e05cb 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.json; import org.apache.flink.connector.testutils.formats.DummyInitializationContext; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -34,7 +35,7 @@ class JsonNodeDeserializationSchemaTest { @Test void testDeserialize() throws IOException { - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); ObjectNode initialValue = mapper.createObjectNode(); initialValue.put("key", 4).put("value", "world"); byte[] serializedValue = mapper.writeValueAsBytes(initialValue); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java index 888be3b9fe7..883c3f09134 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java @@ -31,6 +31,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -83,6 +84,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; */ class JsonRowDataSerDeSchemaTest { + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + @Test void testSerDe() throws Exception { byte tinyint = 'c'; @@ -115,11 +118,10 @@ class JsonRowDataSerDeSchemaTest { innerMap.put("key", 234); nestedMap.put("inner_map", innerMap); - ObjectMapper objectMapper = new ObjectMapper(); - ArrayNode doubleNode = objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D); + ArrayNode doubleNode = OBJECT_MAPPER.createArrayNode().add(1.1D).add(2.2D).add(3.3D); // Root - ObjectNode root = objectMapper.createObjectNode(); + ObjectNode root = OBJECT_MAPPER.createObjectNode(); root.put("bool", true); root.put("tinyint", tinyint); root.put("smallint", smallint); @@ -139,7 +141,7 @@ class JsonRowDataSerDeSchemaTest { root.putObject("multiSet").put("element", 2); root.putObject("map2map").putObject("inner_map").put("key", 234); - byte[] serializedJson = objectMapper.writeValueAsBytes(root); + byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root); DataType dataType = ROW( @@ -220,8 +222,7 @@ class JsonRowDataSerDeSchemaTest { double doubleValue = random.nextDouble(); float floatValue = random.nextFloat(); - ObjectMapper objectMapper = new ObjectMapper(); - ObjectNode root = objectMapper.createObjectNode(); + ObjectNode root = OBJECT_MAPPER.createObjectNode(); root.put("bool", String.valueOf(bool)); root.put("int", String.valueOf(integer)); root.put("bigint", String.valueOf(bigint)); @@ -230,7 +231,7 @@ class JsonRowDataSerDeSchemaTest { root.put("float1", String.valueOf(floatValue)); root.put("float2", new BigDecimal(floatValue)); - byte[] serializedJson = objectMapper.writeValueAsBytes(root); + byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root); DataType dataType = ROW( @@ -296,11 +297,9 @@ class JsonRowDataSerDeSchemaTest { true); open(serializationSchema); - ObjectMapper objectMapper = new ObjectMapper(); - // the first row { - ObjectNode root = objectMapper.createObjectNode(); + ObjectNode root = OBJECT_MAPPER.createObjectNode(); root.put("f1", 1); root.put("f2", true); root.put("f3", "str"); @@ -312,7 +311,7 @@ class JsonRowDataSerDeSchemaTest { ObjectNode row = root.putObject("f6"); row.put("f1", "this is row1"); row.put("f2", 12); - byte[] serializedJson = objectMapper.writeValueAsBytes(root); + byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root); RowData rowData = deserializationSchema.deserialize(serializedJson); byte[] actual = serializationSchema.serialize(rowData); assertThat(serializedJson).containsExactly(actual); @@ -320,7 +319,7 @@ class JsonRowDataSerDeSchemaTest { // the second row { - ObjectNode root = objectMapper.createObjectNode(); + ObjectNode root = OBJECT_MAPPER.createObjectNode(); root.put("f1", 10); root.put("f2", false); root.put("f3", "newStr"); @@ -332,7 +331,7 @@ class JsonRowDataSerDeSchemaTest { ObjectNode row = root.putObject("f6"); row.put("f1", "this is row2"); row.putNull("f2"); - byte[] serializedJson = objectMapper.writeValueAsBytes(root); + byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root); RowData rowData = deserializationSchema.deserialize(serializedJson); byte[] actual = serializationSchema.serialize(rowData); assertThat(serializedJson).containsExactly(actual); @@ -419,12 +418,10 @@ class JsonRowDataSerDeSchemaTest { @Test void testDeserializationMissingField() throws Exception { - ObjectMapper objectMapper = new ObjectMapper(); - // Root - ObjectNode root = objectMapper.createObjectNode(); + ObjectNode root = OBJECT_MAPPER.createObjectNode(); root.put("id", 123123123); - byte[] serializedJson = objectMapper.writeValueAsBytes(root); + byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root); DataType dataType = ROW(FIELD("name", STRING())); RowType schema = (RowType) dataType.getLogicalType(); @@ -504,14 +501,12 @@ class JsonRowDataSerDeSchemaTest { true); open(serializationSchema); - ObjectMapper objectMapper = new ObjectMapper(); - - ObjectNode root = objectMapper.createObjectNode(); + ObjectNode root = OBJECT_MAPPER.createObjectNode(); root.put("timestamp3", "1990-10-14 12:12:43.123"); root.put("timestamp9", "1990-10-14 12:12:43.123456789"); root.put("timestamp_with_local_timezone3", "1990-10-14 12:12:43.123Z"); root.put("timestamp_with_local_timezone9", "1990-10-14 12:12:43.123456789Z"); - byte[] serializedJson = objectMapper.writeValueAsBytes(root); + byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root); RowData rowData = deserializationSchema.deserialize(serializedJson); byte[] actual = serializationSchema.serialize(rowData); assertThat(serializedJson).containsExactly(actual); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java index deee53ae011..81e370c0fd3 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java @@ -21,6 +21,7 @@ package org.apache.flink.formats.json; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.types.Row; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -52,6 +53,8 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; /** Tests for the {@link JsonRowDeserializationSchema}. */ public class JsonRowDeserializationSchemaTest { + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + @Rule public ExpectedException thrown = ExpectedException.none(); /** Tests simple deserialization using type information. */ @@ -73,10 +76,8 @@ public class JsonRowDeserializationSchemaTest { innerMap.put("key", 234); nestedMap.put("inner_map", innerMap); - ObjectMapper objectMapper = new ObjectMapper(); - // Root - ObjectNode root = objectMapper.createObjectNode(); + ObjectNode root = OBJECT_MAPPER.createObjectNode(); root.put("id", id); root.put("name", name); root.put("bytes", bytes); @@ -89,7 +90,7 @@ public class JsonRowDeserializationSchemaTest { root.putObject("map").put("flink", 123); root.putObject("map2map").putObject("inner_map").put("key", 234); - byte[] serializedJson = objectMapper.writeValueAsBytes(root); + byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root); JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder( @@ -149,10 +150,8 @@ public class JsonRowDeserializationSchemaTest { }; final String[] strings = new String[] {"one", "two", "three"}; - final ObjectMapper objectMapper = new ObjectMapper(); - // Root - ObjectNode root = objectMapper.createObjectNode(); + ObjectNode root = OBJECT_MAPPER.createObjectNode(); root.put("id", id.longValue()); root.putNull("idOrNull"); root.put("name", name); @@ -164,7 +163,7 @@ public class JsonRowDeserializationSchemaTest { root.putArray("strings").add("one").add("two").add("three"); root.putObject("nested").put("booleanField", true).put("decimalField", 12); - final byte[] serializedJson = objectMapper.writeValueAsBytes(root); + final byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root); JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder( @@ -212,12 +211,10 @@ public class JsonRowDeserializationSchemaTest { /** Tests deserialization with non-existing field name. */ @Test public void testMissingNode() throws Exception { - ObjectMapper objectMapper = new ObjectMapper(); - // Root - ObjectNode root = objectMapper.createObjectNode(); + ObjectNode root = OBJECT_MAPPER.createObjectNode(); root.put("id", 123123123); - byte[] serializedJson = objectMapper.writeValueAsBytes(root); + byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root); TypeInformation<Row> rowTypeInformation = Types.ROW_NAMED(new String[] {"name"}, Types.STRING); diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java index a184c39c621..df2f84a283b 100644 --- a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java +++ b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -56,6 +57,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public final class FlinkMetricContainer { + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + private static final String METRIC_KEY_SEPARATOR = GlobalConfiguration.loadConfiguration().getString(MetricOptions.SCOPE_DELIMITER); @@ -206,7 +209,7 @@ public final class FlinkMetricContainer { static ArrayList getNameSpaceArray(MetricKey metricKey) { MetricName metricName = metricKey.metricName(); try { - return new ObjectMapper().readValue(metricName.getNamespace(), ArrayList.class); + return OBJECT_MAPPER.readValue(metricName.getNamespace(), ArrayList.class); } catch (JsonProcessingException e) { throw new RuntimeException( String.format("Parse namespace[%s] error. ", metricName.getNamespace()), e); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index c7c779c4e93..fff6d42306f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -49,6 +49,7 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ShutdownHookUtil; import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -100,7 +101,7 @@ import java.util.function.Consumer; public class HistoryServer { private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); private final Configuration config; diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index 8e5aee9c633..a8d782354a0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; @@ -104,7 +105,7 @@ class HistoryServerArchiveFetcher { private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); private static final JsonFactory jacksonFactory = new JsonFactory(); - private static final ObjectMapper mapper = new ObjectMapper(); + private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); private static final String JSON_FILE_ENDING = ".json"; diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java index 1bf3b8f99cb..20bbc04501c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.compatibility; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -137,7 +138,7 @@ enum CompatibilityRoutines { REQUEST_ROUTINE, RESPONSE_ROUTINE)); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); private static final JsonSchemaGenerator SCHEMA_GENERATOR = new JsonSchemaGenerator(OBJECT_MAPPER); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java index 0dd5704b1dd..fec3204ebea 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint; import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint; import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion; import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.util.DefaultIndenter; @@ -56,7 +57,7 @@ final class RestAPIStabilityTest { private static final String SNAPSHOT_RESOURCE_PATTERN = "rest_api_%s.snapshot"; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); private static class StableRestApiVersionProvider implements ArgumentsProvider { diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index d23a0cc1898..b677a094d54 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; import org.apache.flink.test.junit5.InjectClusterClient; import org.apache.flink.test.junit5.InjectClusterRESTAddress; import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -81,6 +82,7 @@ class WebFrontendITCase { private static final int NUM_TASK_MANAGERS = 2; private static final int NUM_SLOTS = 4; + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration(); @RegisterExtension @@ -170,8 +172,7 @@ class WebFrontendITCase { void getNumberOfTaskManagers(@InjectClusterRESTAddress URI restAddress) throws Exception { String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/"); - ObjectMapper mapper = new ObjectMapper(); - JsonNode response = mapper.readTree(json); + JsonNode response = OBJECT_MAPPER.readTree(json); ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers"); assertThat(taskManagers).hasSize(NUM_TASK_MANAGERS); @@ -181,8 +182,7 @@ class WebFrontendITCase { void getTaskManagers(@InjectClusterRESTAddress URI restAddress) throws Exception { String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/"); - ObjectMapper mapper = new ObjectMapper(); - JsonNode parsed = mapper.readTree(json); + JsonNode parsed = OBJECT_MAPPER.readTree(json); ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers"); assertThat(taskManagers).hasSize(NUM_TASK_MANAGERS); @@ -231,8 +231,7 @@ class WebFrontendITCase { throws Exception { String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/"); - ObjectMapper mapper = new ObjectMapper(); - JsonNode parsed = mapper.readTree(json); + JsonNode parsed = OBJECT_MAPPER.readTree(json); ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers"); JsonNode taskManager = taskManagers.get(0); String id = taskManager.get("id").asText(); @@ -278,8 +277,7 @@ class WebFrontendITCase { private static Map<String, String> fromKeyValueJsonArray(String jsonString) { try { Map<String, String> map = new HashMap<>(); - ObjectMapper m = new ObjectMapper(); - ArrayNode array = (ArrayNode) m.readTree(jsonString); + ArrayNode array = (ArrayNode) OBJECT_MAPPER.readTree(jsonString); Iterator<JsonNode> elements = array.elements(); while (elements.hasNext()) { @@ -394,8 +392,7 @@ class WebFrontendITCase { String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/jobs/overview"); - ObjectMapper mapper = new ObjectMapper(); - JsonNode parsed = mapper.readTree(json); + JsonNode parsed = OBJECT_MAPPER.readTree(json); ArrayNode jsonJobs = (ArrayNode) parsed.get("jobs"); assertThat(jsonJobs.size()).isEqualTo(1); assertThat(jsonJobs.get(0).get("duration").asInt()).isGreaterThan(0); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index 4c5f7ea2645..8ec80e33358 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; @@ -78,7 +79,8 @@ class HistoryServerTest { .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT); private static final ObjectMapper OBJECT_MAPPER = - new ObjectMapper().enable(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); + JacksonMapperFactory.createObjectMapper() + .enable(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); private MiniClusterWithClientResource cluster; private File jmDirectory; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java index 010ce77e74c..986b7a03343 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.json.JobResultDeserializer; import org.apache.flink.runtime.rest.messages.json.JobResultSerializer; import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; @@ -66,7 +67,7 @@ public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore { return filename.endsWith(FILE_EXTENSION); } - private final ObjectMapper mapper = new ObjectMapper(); + private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); private final FileSystem fileSystem; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java index 67c9c2b7655..65c76fdefaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java @@ -26,6 +26,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonEncoding; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; @@ -47,7 +48,7 @@ public class FsJobArchivist { private static final Logger LOG = LoggerFactory.getLogger(FsJobArchivist.class); private static final JsonFactory jacksonFactory = new JsonFactory(); - private static final ObjectMapper mapper = new ObjectMapper(); + private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); private static final String ARCHIVE = "archive"; private static final String PATH = "path"; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java index 9a6bfba2f6e..eb42f41fabe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.rest.util; +import org.apache.flink.util.jackson.JacksonMapperFactory; + import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature; @@ -27,7 +29,7 @@ public class RestMapperUtils { private static final ObjectMapper objectMapper; static { - objectMapper = new ObjectMapper(); + objectMapper = JacksonMapperFactory.createObjectMapper(); objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java index 20d6f8a4c29..c952e5d4342 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java @@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.FileUtils; import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -45,7 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat; @ExtendWith(TestLoggerExtension.class) public class FileSystemJobResultStoreTestInternal { - private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper(); private FileSystemJobResultStore fileSystemJobResultStore; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java index a1a37b1d2cc..a7a3b942e2b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -96,7 +97,7 @@ public class JsonGeneratorTest { assertNotNull(plan); // validate the produced JSON - ObjectMapper m = new ObjectMapper(); + ObjectMapper m = JacksonMapperFactory.createObjectMapper(); JsonNode rootNode = m.readTree(plan); // core fields diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java index 5e2e14864d8..6d1b58fa3d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo; import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration; import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -118,7 +119,7 @@ class TaskManagerDetailsHandlerTest { 20L, Collections.emptyList()); - ObjectMapper objectMapper = new ObjectMapper(); + ObjectMapper objectMapper = JacksonMapperFactory.createObjectMapper(); String actualJson = objectMapper.writeValueAsString(actual); String expectedJson = objectMapper.writeValueAsString(expected); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java index 962b02c99f8..c9752f9fc34 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.messages.json; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -43,7 +44,7 @@ public class JobResultDeserializerTest extends TestLogger { final SimpleModule simpleModule = new SimpleModule(); simpleModule.addDeserializer(JobResult.class, new JobResultDeserializer()); - objectMapper = new ObjectMapper(); + objectMapper = JacksonMapperFactory.createObjectMapper(); objectMapper.registerModule(simpleModule); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java index ac66aa314c4..b167fa7fbcb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.messages.json; import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; @@ -33,7 +34,7 @@ import static org.junit.Assert.assertTrue; /** Tests for {@link SerializedThrowableSerializer} and {@link SerializedThrowableDeserializer}. */ public class SerializedThrowableSerializerTest extends TestLogger { - private ObjectMapper objectMapper = new ObjectMapper(); + private ObjectMapper objectMapper; @Before public void setUp() { @@ -42,7 +43,7 @@ public class SerializedThrowableSerializerTest extends TestLogger { SerializedThrowable.class, new SerializedThrowableDeserializer()); simpleModule.addSerializer(SerializedThrowable.class, new SerializedThrowableSerializer()); - objectMapper = new ObjectMapper(); + objectMapper = JacksonMapperFactory.createObjectMapper(); objectMapper.registerModule(simpleModule); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java index ee3d57c00b4..6e6ec96cc64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.messages.json; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; @@ -40,7 +41,7 @@ public class SerializedValueSerializerTest extends TestLogger { @Before public void setUp() { - objectMapper = new ObjectMapper(); + objectMapper = JacksonMapperFactory.createObjectMapper(); final SimpleModule simpleModule = new SimpleModule(); final JavaType serializedValueWildcardType = objectMapper diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java index 7047fd01e4a..b39243f27eb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -45,7 +46,7 @@ public class JSONGenerator { public static final String PARALLELISM = "parallelism"; private StreamGraph streamGraph; - private final ObjectMapper mapper = new ObjectMapper(); + private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); public JSONGenerator(StreamGraph streamGraph) { this.streamGraph = streamGraph; diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java index 333a3480b1a..57ce1970720 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java @@ -20,6 +20,7 @@ package org.apache.flink.table.connector.source; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; @@ -50,7 +51,7 @@ public class CompactPartitions implements Serializable { private static final long serialVersionUID = 1L; private static final String FIELD_NAME_COMPACT_PARTITIONS = "compact-partitions"; - private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper(); private final List<CompactPartition> compactPartitions; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java index 8a601601d3f..46fd71036b2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java @@ -40,6 +40,7 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.extraction.ExtractionUtils; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; @@ -100,7 +101,7 @@ public class JsonSerdeUtil { private static final ObjectMapper OBJECT_MAPPER_INSTANCE; static { - OBJECT_MAPPER_INSTANCE = new ObjectMapper(); + OBJECT_MAPPER_INSTANCE = JacksonMapperFactory.createObjectMapper(); OBJECT_MAPPER_INSTANCE.setTypeFactory( // Make sure to register the classloader of the planner diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java index d27dbb5abe3..d571d753885 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java @@ -64,9 +64,9 @@ import org.apache.flink.table.types.logical.ZonedTimestampType; import org.apache.flink.table.types.utils.DataTypeFactoryMock; import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.types.Row; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; @@ -139,8 +139,8 @@ public class LogicalTypeJsonSerdeTest { // maximum plan content tableConfig.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, ALL); final String maximumJson = toJson(serdeContext, STRUCTURED_TYPE); - final ObjectMapper mapper = new ObjectMapper(); - final JsonNode maximumJsonNode = mapper.readTree(maximumJson); + final JsonNode maximumJsonNode = + JacksonMapperFactory.createObjectMapper().readTree(maximumJson); assertThat(maximumJsonNode.get(LogicalTypeJsonSerializer.FIELD_NAME_ATTRIBUTES)) .isNotNull(); assertThat(maximumJsonNode.get(LogicalTypeJsonSerializer.FIELD_NAME_DESCRIPTION).asText()) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java index 159ef2c6ab6..b35f3b808fd 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; import org.apache.flink.table.planner.plan.nodes.exec.spec.PartitionSpec; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -30,20 +31,22 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test PartitionSpec json ser/de. */ public class PartitionSpecSerdeTest { + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + @Test public void testPartitionSpec() throws JsonProcessingException { PartitionSpec spec = new PartitionSpec(new int[] {1, 2, 3}); - ObjectMapper mapper = new ObjectMapper(); - assertThat(mapper.readValue(mapper.writeValueAsString(spec), PartitionSpec.class)) + assertThat( + OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(spec), PartitionSpec.class)) .isEqualTo(spec); } @Test public void testAllInOne() throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); assertThat( - mapper.readValue( - mapper.writeValueAsString(PartitionSpec.ALL_IN_ONE), + OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(PartitionSpec.ALL_IN_ONE), PartitionSpec.class)) .isEqualTo(PartitionSpec.ALL_IN_ONE); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java index 82c1b3849fa..69c9e553d77 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; import org.apache.flink.table.planner.plan.utils.RankProcessStrategy; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -32,7 +33,7 @@ public class RankProcessStrategySerdeTest { @Test public void testRankRange() throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); RankProcessStrategy[] strategies = new RankProcessStrategy[] { RankProcessStrategy.UNDEFINED_STRATEGY, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java index 90abd497f54..bc5e6a246ba 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java @@ -22,6 +22,7 @@ import org.apache.flink.table.runtime.operators.rank.ConstantRankRange; import org.apache.flink.table.runtime.operators.rank.ConstantRankRangeWithoutEnd; import org.apache.flink.table.runtime.operators.rank.RankRange; import org.apache.flink.table.runtime.operators.rank.VariableRankRange; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -35,7 +36,7 @@ public class RankRangeSerdeTest { @Test public void testRankRange() throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); RankRange[] ranges = new RankRange[] { new ConstantRankRange(1, 2), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java index e1daa9d03e3..04fa1552512 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; import org.apache.flink.table.runtime.operators.rank.RankType; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -32,7 +33,7 @@ public class RankTypeSerdeTest { @Test public void testRankType() throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); + ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); for (RankType type : RankType.values()) { RankType result = mapper.readValue(mapper.writeValueAsString(type), RankType.class); assertThat(result).isEqualTo(type); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java index 2b5600980dd..af2190203b5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -29,6 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test SortSpec json ser/de. */ public class SortSpecSerdeTest { + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); @Test public void testSortSpec() throws JsonProcessingException { @@ -39,15 +41,17 @@ public class SortSpecSerdeTest { .addField(3, false, true) .addField(4, false, false) .build(); - ObjectMapper mapper = new ObjectMapper(); - assertThat(mapper.readValue(mapper.writeValueAsString(sortSpec), SortSpec.class)) + assertThat( + OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(sortSpec), SortSpec.class)) .isEqualTo(sortSpec); } @Test public void testAny() throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - assertThat(mapper.readValue(mapper.writeValueAsString(SortSpec.ANY), SortSpec.class)) + assertThat( + OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(SortSpec.ANY), SortSpec.class)) .isEqualTo(SortSpec.ANY); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java index 682f3b3e656..6aa9f96c7d0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.utils; import org.apache.flink.FlinkVersion; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -29,7 +30,8 @@ import java.io.IOException; /** This class contains a collection of generic utilities to deal with JSON in tests. */ public final class JsonTestUtils { - private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER_INSTANCE = + JacksonMapperFactory.createObjectMapper(); private JsonTestUtils() {} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index fd2fd085519..35499ba9439 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -66,6 +66,7 @@ import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.table.typeutils.FieldInfoUtils import org.apache.flink.types.Row import org.apache.flink.util.{FlinkUserCodeClassLoaders, MutableURLClassLoader} +import org.apache.flink.util.jackson.JacksonMapperFactory import _root_.java.math.{BigDecimal => JBigDecimal} import _root_.java.util @@ -1612,6 +1613,8 @@ object PlanKind extends Enumeration { object TableTestUtil { + private val objectMapper = JacksonMapperFactory.createObjectMapper() + val STREAM_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build() val BATCH_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance().inBatchMode().build() @@ -1706,14 +1709,14 @@ object TableTestUtil { @throws[IOException] def getFormattedJson(json: String): String = { - val parser = new ObjectMapper().getFactory.createParser(json) + val parser = objectMapper.getFactory.createParser(json) val jsonNode: JsonNode = parser.readValueAsTree[JsonNode] jsonNode.toString } @throws[IOException] def getPrettyJson(json: String): String = { - val parser = new ObjectMapper().getFactory.createParser(json) + val parser = objectMapper.getFactory.createParser(json) val jsonNode: JsonNode = parser.readValueAsTree[JsonNode] jsonNode.toPrettyString } diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java index 1bbf8d35c42..3e7a589f779 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java @@ -32,6 +32,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; +import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; @@ -245,6 +246,8 @@ public class JsonJobGraphGenerationTest { private static class GenericValidator implements JsonValidator { + private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper(); + private final int expectedParallelism; private final int numNodes; @@ -258,8 +261,7 @@ public class JsonJobGraphGenerationTest { final Map<String, JsonNode> idToNode = new HashMap<>(); // validate the produced JSON - ObjectMapper m = new ObjectMapper(); - JsonNode rootNode = m.readTree(json); + JsonNode rootNode = OBJECT_MAPPER.readTree(json); JsonNode idField = rootNode.get("jid"); JsonNode nameField = rootNode.get("name");
