This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new aeb6603e67 [INLONG-8825][Sort] Optimize the field type conversion
between source and target in the whole database scenario (#8826)
aeb6603e67 is described below
commit aeb6603e6794fc784d847c56a691d32ced39f21b
Author: yunqingmoswu <[email protected]>
AuthorDate: Mon Sep 11 15:39:33 2023 +0800
[INLONG-8825][Sort] Optimize the field type conversion between source and
target in the whole database scenario (#8826)
---
.../sort/protocol/constant/DataTypeConstants.java | 54 +++++++
.../sort/base/format/JsonDynamicSchemaFormat.java | 108 +++++++++-----
.../sort/base/schema/SchemaChangeHelper.java | 9 +-
.../inlong/sort/doris/schema/OperationHelper.java | 9 +-
.../sort/doris/schema/SchemaChangeHelper.java | 8 +-
.../kafka/DynamicKafkaSerializationSchema.java | 8 +-
.../sort/formats/json/utils/FormatJsonUtil.java | 156 +++++++++++----------
.../sort/formats/json/utils/FormatJsonUtil.java | 77 +++++-----
8 files changed, 260 insertions(+), 169 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DataTypeConstants.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DataTypeConstants.java
new file mode 100644
index 0000000000..3674d7613c
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DataTypeConstants.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.constant;
+
+/**
+ * The constants class of Data Type. All constants related to data types
should be defined in this class.
+ * For example, the length of the Char or Varchar, the precision and scale of
the Decimal, etc.
+ */
+public class DataTypeConstants {
+
+ /**
+ * The default precision of Decima. It is used to control the default
precision
+ * when the source Decimal type precision cannot be obtained in the whole
database migration.
+ */
+ public static final int DEFAULT_DECIMAL_PRECISION = 38;
+ /**
+ * The default scale of Decimal. It is used to control the default scale
+ * when the Source Decimal type scale cannot be obtained in the whole
database migration.
+ */
+ public static final int DEFAULT_DECIMAL_SCALE = 5;
+ /**
+ * The default length of Char. It is used to control the default length of
Char
+ * when the length of Source Char type cannot be obtained in the whole
database migration.
+ */
+ public static final int DEFAULT_CHAR_LENGTH = 255;
+ /**
+ * The default length of Char format ty time such as 'yyyy-MM-dd HH:mm:ss'
or 'HH:mm:ss', etc.
+ * It is used to control the default length of Char when the length of
Source Char type
+ * cannot be obtained in the whole database migration.
+ */
+ public static final int DEFAULT_CHAR_TIME_LENGTH = 30;
+ /**
+ * The key of Timestamp with time_zone in Oracle. It is used in the whole
database migration.
+ */
+ public static final Integer ORACLE_TIMESTAMP_TIME_ZONE = -101;
+
+ private DataTypeConstants() {
+ }
+}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index e8bef34d90..5833719e48 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -24,13 +24,16 @@ import org.apache.flink.formats.common.TimestampFormat;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -44,6 +47,9 @@ import java.util.regex.Pattern;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
import static
org.apache.inlong.sort.formats.json.utils.FormatJsonUtil.SQL_TYPE_2_FLINK_TYPE_MAPPING;
import static
org.apache.inlong.sort.formats.json.utils.FormatJsonUtil.SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_CHAR_LENGTH;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE;
/**
* Json dynamic format class
@@ -61,20 +67,15 @@ import static
org.apache.inlong.sort.formats.json.utils.FormatJsonUtil.SQL_TYPE_
@SuppressWarnings("LanguageDetectionInspection")
public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaFormat<JsonNode> {
- public static final int DEFAULT_DECIMAL_PRECISION = 15;
- public static final int DEFAULT_DECIMAL_SCALE = 5;
- private static final Logger LOG =
LoggerFactory.getLogger(JsonDynamicSchemaFormat.class);
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* The first item of array
*/
private static final Integer FIRST = 0;
-
/**
* dialect sql type pattern such as DECIMAL(38, 10) from mysql or oracle
etc
*/
- private static final Pattern DIALECT_SQL_TYPE_PATTERN =
Pattern.compile("\\w+\\(([\\d,\\s]*)\\)");
-
- public final ObjectMapper objectMapper = new ObjectMapper();
+ private static final Pattern DIALECT_SQL_TYPE_PATTERN =
Pattern.compile("([\\w, \\s]+)\\(([\\d,\\s'\\-]*)\\)");
protected final JsonToRowDataConverters rowDataConverters;
protected final boolean adaptSparkEngine;
@@ -155,7 +156,7 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
*/
@Override
public JsonNode deserialize(byte[] message) throws IOException {
- return objectMapper.readTree(message);
+ return OBJECT_MAPPER.readTree(message);
}
/**
@@ -266,8 +267,18 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
Entry<String, JsonNode> entry = schemaFields.next();
String name = entry.getKey();
LogicalType type = sqlType2FlinkType(entry.getValue().asInt());
- String dialectType = dialectSchema.get(name) != null ?
dialectSchema.get(name).asText() : null;
- type = handleDialectSqlType(type, dialectType);
+ String dialectType = null;
+ if (dialectSchema != null && dialectSchema.get(name) != null) {
+ dialectType = dialectSchema.get(name).asText();
+ }
+ try {
+ type = handleDialectSqlType(type, dialectType);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Handle dialect sql type failed, the
field: %s, the dialect type: %s",
+ name, dialectType),
+ e);
+ }
if (pkNames.contains(name)) {
type = type.copy(false);
}
@@ -287,33 +298,58 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
if (StringUtils.isBlank(dialectType)) {
return type;
}
+ Matcher matcher = DIALECT_SQL_TYPE_PATTERN.matcher(dialectType);
+ if (!matcher.matches()) {
+ return type;
+ }
+ String[] items = matcher.group(2).split(",");
if (type instanceof DecimalType) {
- Matcher matcher = DIALECT_SQL_TYPE_PATTERN.matcher(dialectType);
int precision;
- int scale;
- DecimalType decimalType = new
DecimalType(DecimalType.MAX_PRECISION);
- if (matcher.matches()) {
- String[] items = matcher.group(1).split(",");
- precision = Integer.parseInt(items[0].trim());
- if (precision < DecimalType.MIN_PRECISION || precision >
DecimalType.MAX_PRECISION) {
- LOG.info("invalid decimal precision: {}, change to: {}",
precision, decimalType.getPrecision());
- return decimalType;
- }
- if (items.length == 2) {
- scale = Integer.parseInt(items[1].trim());
- if (scale < DecimalType.MIN_SCALE || scale > precision) {
- decimalType = new DecimalType(precision);
- LOG.info("invalid decimal scale: {}, change to: {}",
scale, decimalType.getScale());
- return decimalType;
- }
- return new DecimalType(precision, scale);
+ int scale = DEFAULT_DECIMAL_SCALE;
+ precision = Integer.parseInt(items[0].trim());
+ if (precision < DecimalType.MIN_PRECISION || precision >
DecimalType.MAX_PRECISION) {
+ precision = DEFAULT_DECIMAL_PRECISION;
+ }
+ if (items.length == 2) {
+ scale = Integer.parseInt(items[1].trim());
+ if (scale < DecimalType.MIN_SCALE || scale > precision) {
+ scale = DEFAULT_DECIMAL_SCALE;
}
- LOG.info("Decimal has only precision {} without scale",
precision);
- return new DecimalType(precision);
}
- return decimalType;
+ return new DecimalType(precision, scale);
+ } else if (type instanceof CharType) {
+ int length = Integer.parseInt(items[0].trim());
+ if (length <= 0) {
+ length = DEFAULT_CHAR_LENGTH;
+ }
+ return new CharType(length);
+ } else if (type instanceof VarCharType) {
+ int length = Integer.parseInt(items[0].trim());
+ if (length <= 0) {
+ length = Integer.MAX_VALUE;
+ }
+ return new VarCharType(length);
+ } else if (type instanceof VarBinaryType) {
+ int length = Integer.parseInt(items[0].trim());
+ if (length <= 0) {
+ length = Integer.MAX_VALUE;
+ }
+ return new VarBinaryType(length);
+ } else if (type instanceof BinaryType) {
+ int length = Integer.parseInt(items[0].trim());
+ if (length <= 0) {
+ length = 1;
+ }
+ return new BinaryType(length);
+ } else if ("TINYINT(1)".equalsIgnoreCase(matcher.group(0))) {
+ return new TinyIntType();
+ } else if ("BIGINT UNSIGNED".equalsIgnoreCase(matcher.group(1))) {
+ return new DecimalType(20, 0);
+ } else if ("BIGINT UNSIGNED
ZEROFILL".equalsIgnoreCase(matcher.group(1))) {
+ return new DecimalType(20, 0);
+ } else {
+ return type;
}
- return type;
}
public LogicalType sqlType2FlinkType(int jdbcType) {
@@ -341,11 +377,11 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
List<Map<String, String>> values = new ArrayList<>();
if (data.isArray()) {
for (int i = 0; i < data.size(); i++) {
- values.add(objectMapper.convertValue(data.get(i), new
TypeReference<Map<String, String>>() {
+ values.add(OBJECT_MAPPER.convertValue(data.get(i), new
TypeReference<Map<String, String>>() {
}));
}
} else {
- values.add(objectMapper.convertValue(data, new
TypeReference<Map<String, String>>() {
+ values.add(OBJECT_MAPPER.convertValue(data, new
TypeReference<Map<String, String>>() {
}));
}
return values;
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
index 8d7f766c1d..47d708bde6 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
@@ -44,16 +44,16 @@ import java.util.Set;
/**
* Schema change helper
- * */
+ */
public abstract class SchemaChangeHelper implements SchemaChangeHandle {
private static final Logger LOGGER =
LoggerFactory.getLogger(SchemaChangeHelper.class);
- private final boolean schemaChange;
protected final Map<SchemaChangeType, SchemaChangePolicy> policyMap;
protected final JsonDynamicSchemaFormat dynamicSchemaFormat;
+ protected final SchemaUpdateExceptionPolicy exceptionPolicy;
+ private final boolean schemaChange;
private final String databasePattern;
private final String tablePattern;
- protected final SchemaUpdateExceptionPolicy exceptionPolicy;
private final SinkTableMetricData metricData;
private final DirtySinkHelper<Object> dirtySinkHelper;
@@ -105,7 +105,7 @@ public abstract class SchemaChangeHelper implements
SchemaChangeHandle {
return;
}
operation = Preconditions.checkNotNull(
-
dynamicSchemaFormat.objectMapper.convertValue(operationNode, new
TypeReference<Operation>() {
+
JsonDynamicSchemaFormat.OBJECT_MAPPER.convertValue(operationNode, new
TypeReference<Operation>() {
}), "Operation is null");
} catch (Exception e) {
if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
@@ -248,6 +248,7 @@ public abstract class SchemaChangeHelper implements
SchemaChangeHandle {
}
}
}
+
private String parseValue(JsonNode data, String pattern) {
try {
return dynamicSchemaFormat.parse(data, pattern);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
index c00cf81d2d..61ab97b974 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
@@ -35,6 +35,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.StringJoiner;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE;
+
public class OperationHelper {
private static final String APOSTROPHE = "'";
@@ -76,7 +79,7 @@ public class OperationHelper {
Preconditions.checkState(precisions.size() < 3,
"The length of precisions with DECIMAL must small
than 3");
int precision = Integer.parseInt(precisions.get(0));
- int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE;
+ int scale = DEFAULT_DECIMAL_SCALE;
if (precisions.size() == 2) {
scale = Integer.parseInt(precisions.get(1));
}
@@ -135,8 +138,8 @@ public class OperationHelper {
break;
case Types.REAL:
case Types.NUMERIC:
- int precision =
JsonDynamicSchemaFormat.DEFAULT_DECIMAL_PRECISION;
- int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE;
+ int precision = DEFAULT_DECIMAL_PRECISION;
+ int scale = DEFAULT_DECIMAL_SCALE;
if (precisions != null && !precisions.isEmpty()) {
Preconditions.checkState(precisions.size() < 3,
"The length of precisions with NUMERIC must small
than 3");
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
index 0eec4921c4..fbc2561d5f 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
@@ -145,7 +145,7 @@ public class SchemaChangeHelper {
JsonNode operationNode =
Preconditions.checkNotNull(data.get("operation"),
"Operation node is null");
operation = Preconditions.checkNotNull(
-
dynamicSchemaFormat.objectMapper.convertValue(operationNode, new
TypeReference<Operation>() {
+
JsonDynamicSchemaFormat.OBJECT_MAPPER.convertValue(operationNode, new
TypeReference<Operation>() {
}), "Operation is null");
} catch (Exception e) {
if (exceptionPolicy ==
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
@@ -426,7 +426,7 @@ public class SchemaChangeHelper {
HttpPost httpPost = new HttpPost(requestUrl);
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpPost.setHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON);
- httpPost.setEntity(new
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
+ httpPost.setEntity(new
StringEntity(JsonDynamicSchemaFormat.OBJECT_MAPPER.writeValueAsString(param)));
// if any fenode succeeds, return true, else keep trying
if (sendRequest(httpPost)) {
return true;
@@ -441,7 +441,7 @@ public class SchemaChangeHelper {
Map<String, Object> param = buildRequestParam(column, dropColumn);
HttpGetEntity httpGet = new HttpGetEntity(url);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
- httpGet.setEntity(new
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
+ httpGet.setEntity(new
StringEntity(JsonDynamicSchemaFormat.OBJECT_MAPPER.writeValueAsString(param)));
boolean success = sendRequest(httpGet);
if (!success) {
LOGGER.warn("schema change can not do table {}.{}", database,
table);
@@ -458,7 +458,7 @@ public class SchemaChangeHelper {
final int statusCode =
response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK && response.getEntity()
!= null) {
String loadResult =
EntityUtils.toString(response.getEntity());
- Map<String, Object> responseMap =
dynamicSchemaFormat.objectMapper
+ Map<String, Object> responseMap =
JsonDynamicSchemaFormat.OBJECT_MAPPER
.readValue(loadResult, Map.class);
String code = responseMap.getOrDefault("code",
"-1").toString();
if (DORIS_HTTP_CALL_SUCCESS.equals(code)) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
index 0cfb19e8d3..49ea3981db 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
@@ -96,15 +96,13 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
private final String sinkMultipleFormat;
private final DirtyOptions dirtyOptions;
private final @Nullable DirtySink<Object> dirtySink;
+ final private Map<SchemaChangeType, SchemaChangePolicy> policyMap;
private boolean multipleSink;
private JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
private int[] partitions;
-
private int parallelInstanceId;
-
private int numParallelInstances;
private SinkTopicMetricData metricData;
- final private Map<SchemaChangeType, SchemaChangePolicy> policyMap;
DynamicKafkaSerializationSchema(
String topic,
@@ -272,7 +270,7 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
JsonNode dataNode, List<ProducerRecord<byte[], byte[]>> values) {
String topic = null;
try {
- byte[] data =
jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
+ byte[] data =
JsonDynamicSchemaFormat.OBJECT_MAPPER.writeValueAsBytes(baseMap);
topic = jsonDynamicSchemaFormat.parse(rootNode, topicPattern);
values.add(new ProducerRecord<>(topic,
extractPartition(null, null, data), null, data));
@@ -308,7 +306,7 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
JsonNode operationNode =
Preconditions.checkNotNull(rootNode.get("operation"),
"Operation node is null");
operation = Preconditions.checkNotNull(
-
jsonDynamicSchemaFormat.objectMapper.convertValue(operationNode, new
TypeReference<Operation>() {
+
JsonDynamicSchemaFormat.OBJECT_MAPPER.convertValue(operationNode, new
TypeReference<Operation>() {
}), "Operation is null");
} catch (Exception e) {
LOG.error("Extract Operation from origin data failed", e);
diff --git
a/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
b/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
index cdc15c2324..883620a74f 100644
---
a/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
+++
b/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
@@ -43,50 +43,30 @@ import org.apache.flink.table.types.logical.VarCharType;
import java.util.Map;
-public class FormatJsonUtil {
-
- private static final int DEFAULT_DECIMAL_PRECISION = 15;
- private static final int DEFAULT_DECIMAL_SCALE = 5;
- private static final Integer ORACLE_TIMESTAMP_TIME_ZONE = -101;
-
- public static RowDataToJsonConverter rowDataToJsonConverter(DataType
physicalRowDataType) {
- return rowDataToJsonConverter(TimestampFormat.SQL, null,
physicalRowDataType);
- }
-
- public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
- String mapNullKeyLiteral,
- DataType physicalRowDataType) {
- return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP,
mapNullKeyLiteral, physicalRowDataType);
- }
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_CHAR_LENGTH;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_CHAR_TIME_LENGTH;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.ORACLE_TIMESTAMP_TIME_ZONE;
- public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
- MapNullKeyMode mapNullKeyMode,
- String mapNullKeyLiteral, DataType physicalRowDataType) {
- return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode,
mapNullKeyLiteral)
- .createConverter(physicalRowDataType.getLogicalType());
- }
-
- public static RowDataToJsonConverter rowDataToJsonConverter(LogicalType
rowType) {
- return rowDataToJsonConverter(TimestampFormat.SQL, null, rowType);
- }
-
- public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
- String mapNullKeyLiteral,
- LogicalType rowType) {
- return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP,
mapNullKeyLiteral, rowType);
- }
-
- public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
- MapNullKeyMode mapNullKeyMode,
- String mapNullKeyLiteral, LogicalType rowType) {
- return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode,
mapNullKeyLiteral)
- .createConverter(rowType);
- }
+public class FormatJsonUtil {
+ public static final Map<String, LogicalType>
DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING =
+ ImmutableMap.<String, LogicalType>builder()
+ .put("BOOLEAN", new BooleanType())
+ .put("INT8", new TinyIntType())
+ .put("INT16", new SmallIntType())
+ .put("INT32", new IntType())
+ .put("INT64", new BigIntType())
+ .put("FLOAT32", new FloatType())
+ .put("FLOAT64", new DoubleType())
+ .put("STRING", new VarCharType())
+ .put("BYTES", new VarBinaryType())
+ .build();
public static final Map<Integer, LogicalType>
SQL_TYPE_2_FLINK_TYPE_MAPPING =
ImmutableMap.<Integer, LogicalType>builder()
- .put(java.sql.Types.CHAR, new CharType())
- .put(java.sql.Types.VARCHAR, new VarCharType())
+ .put(java.sql.Types.CHAR, new
CharType(DEFAULT_CHAR_LENGTH))
+ .put(java.sql.Types.VARCHAR, new
VarCharType(VarCharType.MAX_LENGTH))
.put(java.sql.Types.SMALLINT, new SmallIntType())
.put(java.sql.Types.INTEGER, new IntType())
.put(java.sql.Types.BIGINT, new BigIntType())
@@ -101,26 +81,25 @@ public class FormatJsonUtil {
.put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new
LocalZonedTimestampType())
.put(ORACLE_TIMESTAMP_TIME_ZONE, new
LocalZonedTimestampType())
.put(java.sql.Types.TIMESTAMP, new TimestampType())
- .put(java.sql.Types.BINARY, new BinaryType())
- .put(java.sql.Types.VARBINARY, new VarBinaryType())
- .put(java.sql.Types.BLOB, new VarBinaryType())
- .put(java.sql.Types.CLOB, new VarBinaryType())
+ .put(java.sql.Types.BINARY, new
BinaryType(BinaryType.MAX_LENGTH))
+ .put(java.sql.Types.VARBINARY, new
VarBinaryType(VarBinaryType.MAX_LENGTH))
+ .put(java.sql.Types.BLOB, new
VarBinaryType(VarBinaryType.MAX_LENGTH))
+ .put(java.sql.Types.CLOB, new
VarBinaryType(VarBinaryType.MAX_LENGTH))
.put(java.sql.Types.DATE, new DateType())
.put(java.sql.Types.BOOLEAN, new BooleanType())
- .put(java.sql.Types.LONGNVARCHAR, new VarCharType())
- .put(java.sql.Types.LONGVARBINARY, new VarCharType())
- .put(java.sql.Types.LONGVARCHAR, new VarCharType())
- .put(java.sql.Types.ARRAY, new VarCharType())
- .put(java.sql.Types.NCHAR, new CharType())
- .put(java.sql.Types.NCLOB, new VarBinaryType())
+ .put(java.sql.Types.LONGNVARCHAR, new
VarCharType(VarCharType.MAX_LENGTH))
+ .put(java.sql.Types.LONGVARBINARY, new
VarCharType(VarCharType.MAX_LENGTH))
+ .put(java.sql.Types.LONGVARCHAR, new
VarCharType(VarCharType.MAX_LENGTH))
+ .put(java.sql.Types.ARRAY, new
VarCharType(VarCharType.MAX_LENGTH))
+ .put(java.sql.Types.NCHAR, new
CharType(DEFAULT_CHAR_LENGTH))
+ .put(java.sql.Types.NCLOB, new
VarBinaryType(VarBinaryType.MAX_LENGTH))
.put(java.sql.Types.TINYINT, new TinyIntType())
- .put(java.sql.Types.OTHER, new VarCharType())
+ .put(java.sql.Types.OTHER, new
VarCharType(VarCharType.MAX_LENGTH))
.build();
-
public static final Map<Integer, LogicalType>
SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING =
ImmutableMap.<Integer, LogicalType>builder()
- .put(java.sql.Types.CHAR, new CharType())
- .put(java.sql.Types.VARCHAR, new VarCharType())
+ .put(java.sql.Types.CHAR, new
CharType(DEFAULT_CHAR_LENGTH))
+ .put(java.sql.Types.VARCHAR, new
VarCharType(VarCharType.MAX_LENGTH))
.put(java.sql.Types.SMALLINT, new SmallIntType())
.put(java.sql.Types.INTEGER, new IntType())
.put(java.sql.Types.BIGINT, new BigIntType())
@@ -130,36 +109,59 @@ public class FormatJsonUtil {
.put(java.sql.Types.DECIMAL, new
DecimalType(DEFAULT_DECIMAL_PRECISION, DEFAULT_DECIMAL_SCALE))
.put(java.sql.Types.NUMERIC, new
DecimalType(DEFAULT_DECIMAL_PRECISION, DEFAULT_DECIMAL_SCALE))
.put(java.sql.Types.BIT, new BooleanType())
- .put(java.sql.Types.TIME, new VarCharType())
+ .put(java.sql.Types.TIME, new
VarCharType(DEFAULT_CHAR_TIME_LENGTH))
+ .put(java.sql.Types.TIME_WITH_TIMEZONE, new
VarCharType(DEFAULT_CHAR_TIME_LENGTH))
.put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new
LocalZonedTimestampType())
.put(ORACLE_TIMESTAMP_TIME_ZONE, new
LocalZonedTimestampType())
.put(java.sql.Types.TIMESTAMP, new
LocalZonedTimestampType())
- .put(java.sql.Types.BINARY, new BinaryType())
- .put(java.sql.Types.VARBINARY, new VarBinaryType())
- .put(java.sql.Types.BLOB, new VarBinaryType())
+ .put(java.sql.Types.BINARY, new
BinaryType(BinaryType.MAX_LENGTH))
+ .put(java.sql.Types.VARBINARY, new
VarBinaryType(VarBinaryType.MAX_LENGTH))
+ .put(java.sql.Types.BLOB, new
VarBinaryType(VarBinaryType.MAX_LENGTH))
+ .put(java.sql.Types.CLOB, new
VarBinaryType(VarBinaryType.MAX_LENGTH))
.put(java.sql.Types.DATE, new DateType())
.put(java.sql.Types.BOOLEAN, new BooleanType())
- .put(java.sql.Types.LONGNVARCHAR, new VarCharType())
- .put(java.sql.Types.LONGVARBINARY, new VarCharType())
- .put(java.sql.Types.LONGVARCHAR, new VarCharType())
- .put(java.sql.Types.ARRAY, new VarCharType())
- .put(java.sql.Types.NCHAR, new CharType())
- .put(java.sql.Types.NCLOB, new VarBinaryType())
+ .put(java.sql.Types.LONGNVARCHAR, new
VarCharType(VarCharType.MAX_LENGTH))
+ .put(java.sql.Types.LONGVARBINARY, new
VarCharType(VarCharType.MAX_LENGTH))
+ .put(java.sql.Types.LONGVARCHAR, new
VarCharType(VarCharType.MAX_LENGTH))
+ .put(java.sql.Types.ARRAY, new
VarCharType(VarCharType.MAX_LENGTH))
+ .put(java.sql.Types.NCHAR, new
CharType(DEFAULT_CHAR_LENGTH))
+ .put(java.sql.Types.NCLOB, new
VarBinaryType(VarBinaryType.MAX_LENGTH))
.put(java.sql.Types.TINYINT, new TinyIntType())
- .put(java.sql.Types.OTHER, new VarCharType())
+ .put(java.sql.Types.OTHER, new
VarCharType(VarCharType.MAX_LENGTH))
.build();
- public static final Map<String, LogicalType>
DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING =
- ImmutableMap.<String, LogicalType>builder()
- .put("BOOLEAN", new BooleanType())
- .put("INT8", new TinyIntType())
- .put("INT16", new SmallIntType())
- .put("INT32", new IntType())
- .put("INT64", new BigIntType())
- .put("FLOAT32", new FloatType())
- .put("FLOAT64", new DoubleType())
- .put("STRING", new VarCharType())
- .put("BYTES", new VarBinaryType())
- .build();
+ public static RowDataToJsonConverter rowDataToJsonConverter(DataType
physicalRowDataType) {
+ return rowDataToJsonConverter(TimestampFormat.SQL, null,
physicalRowDataType);
+ }
+
+ public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
+ String mapNullKeyLiteral,
+ DataType physicalRowDataType) {
+ return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP,
mapNullKeyLiteral, physicalRowDataType);
+ }
+
+ public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
+ MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral, DataType physicalRowDataType) {
+ return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode,
mapNullKeyLiteral)
+ .createConverter(physicalRowDataType.getLogicalType());
+ }
+
+ public static RowDataToJsonConverter rowDataToJsonConverter(LogicalType
rowType) {
+ return rowDataToJsonConverter(TimestampFormat.SQL, null, rowType);
+ }
+
+ public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
+ String mapNullKeyLiteral,
+ LogicalType rowType) {
+ return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP,
mapNullKeyLiteral, rowType);
+ }
+
+ public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
+ MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral, LogicalType rowType) {
+ return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode,
mapNullKeyLiteral)
+ .createConverter(rowType);
+ }
}
diff --git
a/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
b/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
index 7da129f323..456b556002 100644
---
a/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
+++
b/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
@@ -42,47 +42,12 @@ import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import java.util.Map;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE;
+import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.ORACLE_TIMESTAMP_TIME_ZONE;
public class FormatJsonUtil {
- private static final int DEFAULT_DECIMAL_PRECISION = 15;
- private static final int DEFAULT_DECIMAL_SCALE = 5;
- private static final Integer ORACLE_TIMESTAMP_TIME_ZONE = -101;
-
- public static RowDataToJsonConverter rowDataToJsonConverter(DataType
physicalRowDataType) {
- return rowDataToJsonConverter(TimestampFormat.SQL, null,
physicalRowDataType);
- }
-
- public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
- String mapNullKeyLiteral,
- DataType physicalRowDataType) {
- return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP,
mapNullKeyLiteral, physicalRowDataType);
- }
-
- public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
- MapNullKeyMode mapNullKeyMode,
- String mapNullKeyLiteral, DataType physicalRowDataType) {
- return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode,
mapNullKeyLiteral)
- .createConverter(physicalRowDataType.getLogicalType());
- }
-
- public static RowDataToJsonConverter rowDataToJsonConverter(LogicalType
rowType) {
- return rowDataToJsonConverter(TimestampFormat.SQL, null, rowType);
- }
-
- public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
- String mapNullKeyLiteral,
- LogicalType rowType) {
- return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP,
mapNullKeyLiteral, rowType);
- }
-
- public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
- MapNullKeyMode mapNullKeyMode,
- String mapNullKeyLiteral, LogicalType rowType) {
- return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode,
mapNullKeyLiteral)
- .createConverter(rowType);
- }
-
public static final Map<Integer, LogicalType>
SQL_TYPE_2_FLINK_TYPE_MAPPING =
ImmutableMap.<Integer, LogicalType>builder()
.put(java.sql.Types.CHAR, new CharType())
@@ -116,7 +81,6 @@ public class FormatJsonUtil {
.put(java.sql.Types.TINYINT, new TinyIntType())
.put(java.sql.Types.OTHER, new VarCharType())
.build();
-
public static final Map<Integer, LogicalType>
SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING =
ImmutableMap.<Integer, LogicalType>builder()
.put(java.sql.Types.CHAR, new CharType())
@@ -148,7 +112,6 @@ public class FormatJsonUtil {
.put(java.sql.Types.TINYINT, new TinyIntType())
.put(java.sql.Types.OTHER, new VarCharType())
.build();
-
public static final Map<String, LogicalType>
DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING =
ImmutableMap.<String, LogicalType>builder()
.put("BOOLEAN", new BooleanType())
@@ -161,4 +124,38 @@ public class FormatJsonUtil {
.put("STRING", new VarCharType())
.put("BYTES", new VarBinaryType())
.build();
+
+ public static RowDataToJsonConverter rowDataToJsonConverter(DataType
physicalRowDataType) {
+ return rowDataToJsonConverter(TimestampFormat.SQL, null,
physicalRowDataType);
+ }
+
+ public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
+ String mapNullKeyLiteral,
+ DataType physicalRowDataType) {
+ return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP,
mapNullKeyLiteral, physicalRowDataType);
+ }
+
+ public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
+ MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral, DataType physicalRowDataType) {
+ return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode,
mapNullKeyLiteral)
+ .createConverter(physicalRowDataType.getLogicalType());
+ }
+
+ public static RowDataToJsonConverter rowDataToJsonConverter(LogicalType
rowType) {
+ return rowDataToJsonConverter(TimestampFormat.SQL, null, rowType);
+ }
+
+ public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
+ String mapNullKeyLiteral,
+ LogicalType rowType) {
+ return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP,
mapNullKeyLiteral, rowType);
+ }
+
+ public static RowDataToJsonConverter
rowDataToJsonConverter(TimestampFormat timestampFormat,
+ MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral, LogicalType rowType) {
+ return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode,
mapNullKeyLiteral)
+ .createConverter(rowType);
+ }
}