This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new aeb6603e67 [INLONG-8825][Sort] Optimize the field type conversion 
between source and target in the whole database scenario (#8826)
aeb6603e67 is described below

commit aeb6603e6794fc784d847c56a691d32ced39f21b
Author: yunqingmoswu <[email protected]>
AuthorDate: Mon Sep 11 15:39:33 2023 +0800

    [INLONG-8825][Sort] Optimize the field type conversion between source and 
target in the whole database scenario (#8826)
---
 .../sort/protocol/constant/DataTypeConstants.java  |  54 +++++++
 .../sort/base/format/JsonDynamicSchemaFormat.java  | 108 +++++++++-----
 .../sort/base/schema/SchemaChangeHelper.java       |   9 +-
 .../inlong/sort/doris/schema/OperationHelper.java  |   9 +-
 .../sort/doris/schema/SchemaChangeHelper.java      |   8 +-
 .../kafka/DynamicKafkaSerializationSchema.java     |   8 +-
 .../sort/formats/json/utils/FormatJsonUtil.java    | 156 +++++++++++----------
 .../sort/formats/json/utils/FormatJsonUtil.java    |  77 +++++-----
 8 files changed, 260 insertions(+), 169 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DataTypeConstants.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DataTypeConstants.java
new file mode 100644
index 0000000000..3674d7613c
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DataTypeConstants.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.constant;
+
+/**
+ * The constants class of Data Type. All constants related to data types 
should be defined in this class.
+ * For example, the length of the Char or Varchar, the precision and scale of 
the Decimal, etc.
+ */
+public class DataTypeConstants {
+
+    /**
+     * The default precision of Decima. It is used to control the default 
precision
+     * when the source Decimal type precision cannot be obtained in the whole 
database migration.
+     */
+    public static final int DEFAULT_DECIMAL_PRECISION = 38;
+    /**
+     * The default scale of Decimal. It is used to control the default scale
+     * when the Source Decimal type scale cannot be obtained in the whole 
database migration.
+     */
+    public static final int DEFAULT_DECIMAL_SCALE = 5;
+    /**
+     * The default length of Char. It is used to control the default length of 
Char
+     * when the length of Source Char type cannot be obtained in the whole 
database migration.
+     */
+    public static final int DEFAULT_CHAR_LENGTH = 255;
+    /**
+     * The default length of Char format ty time such as 'yyyy-MM-dd HH:mm:ss' 
or 'HH:mm:ss', etc.
+     * It is used to control the default length of Char when the length of 
Source Char type
+     * cannot be obtained in the whole database migration.
+     */
+    public static final int DEFAULT_CHAR_TIME_LENGTH = 30;
+    /**
+     * The key of Timestamp with time_zone in Oracle. It is used in the whole 
database migration.
+     */
+    public static final Integer ORACLE_TIMESTAMP_TIME_ZONE = -101;
+
+    private DataTypeConstants() {
+    }
+}
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index e8bef34d90..5833719e48 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -24,13 +24,16 @@ import org.apache.flink.formats.common.TimestampFormat;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.RowKind;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -44,6 +47,9 @@ import java.util.regex.Pattern;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
 import static 
org.apache.inlong.sort.formats.json.utils.FormatJsonUtil.SQL_TYPE_2_FLINK_TYPE_MAPPING;
 import static 
org.apache.inlong.sort.formats.json.utils.FormatJsonUtil.SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING;
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_CHAR_LENGTH;
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION;
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE;
 
 /**
  * Json dynamic format class
@@ -61,20 +67,15 @@ import static 
org.apache.inlong.sort.formats.json.utils.FormatJsonUtil.SQL_TYPE_
 @SuppressWarnings("LanguageDetectionInspection")
 public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaFormat<JsonNode> {
 
-    public static final int DEFAULT_DECIMAL_PRECISION = 15;
-    public static final int DEFAULT_DECIMAL_SCALE = 5;
-    private static final Logger LOG = 
LoggerFactory.getLogger(JsonDynamicSchemaFormat.class);
+    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     /**
      * The first item of array
      */
     private static final Integer FIRST = 0;
-
     /**
      * dialect sql type pattern such as DECIMAL(38, 10) from mysql or oracle 
etc
      */
-    private static final Pattern DIALECT_SQL_TYPE_PATTERN = 
Pattern.compile("\\w+\\(([\\d,\\s]*)\\)");
-
-    public final ObjectMapper objectMapper = new ObjectMapper();
+    private static final Pattern DIALECT_SQL_TYPE_PATTERN = 
Pattern.compile("([\\w, \\s]+)\\(([\\d,\\s'\\-]*)\\)");
     protected final JsonToRowDataConverters rowDataConverters;
     protected final boolean adaptSparkEngine;
 
@@ -155,7 +156,7 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
      */
     @Override
     public JsonNode deserialize(byte[] message) throws IOException {
-        return objectMapper.readTree(message);
+        return OBJECT_MAPPER.readTree(message);
     }
 
     /**
@@ -266,8 +267,18 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
             Entry<String, JsonNode> entry = schemaFields.next();
             String name = entry.getKey();
             LogicalType type = sqlType2FlinkType(entry.getValue().asInt());
-            String dialectType = dialectSchema.get(name) != null ? 
dialectSchema.get(name).asText() : null;
-            type = handleDialectSqlType(type, dialectType);
+            String dialectType = null;
+            if (dialectSchema != null && dialectSchema.get(name) != null) {
+                dialectType = dialectSchema.get(name).asText();
+            }
+            try {
+                type = handleDialectSqlType(type, dialectType);
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        String.format("Handle dialect sql type failed, the 
field: %s, the dialect type: %s",
+                                name, dialectType),
+                        e);
+            }
             if (pkNames.contains(name)) {
                 type = type.copy(false);
             }
@@ -287,33 +298,58 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
         if (StringUtils.isBlank(dialectType)) {
             return type;
         }
+        Matcher matcher = DIALECT_SQL_TYPE_PATTERN.matcher(dialectType);
+        if (!matcher.matches()) {
+            return type;
+        }
+        String[] items = matcher.group(2).split(",");
         if (type instanceof DecimalType) {
-            Matcher matcher = DIALECT_SQL_TYPE_PATTERN.matcher(dialectType);
             int precision;
-            int scale;
-            DecimalType decimalType = new 
DecimalType(DecimalType.MAX_PRECISION);
-            if (matcher.matches()) {
-                String[] items = matcher.group(1).split(",");
-                precision = Integer.parseInt(items[0].trim());
-                if (precision < DecimalType.MIN_PRECISION || precision > 
DecimalType.MAX_PRECISION) {
-                    LOG.info("invalid decimal precision: {}, change to: {}", 
precision, decimalType.getPrecision());
-                    return decimalType;
-                }
-                if (items.length == 2) {
-                    scale = Integer.parseInt(items[1].trim());
-                    if (scale < DecimalType.MIN_SCALE || scale > precision) {
-                        decimalType = new DecimalType(precision);
-                        LOG.info("invalid decimal scale: {}, change to: {}", 
scale, decimalType.getScale());
-                        return decimalType;
-                    }
-                    return new DecimalType(precision, scale);
+            int scale = DEFAULT_DECIMAL_SCALE;
+            precision = Integer.parseInt(items[0].trim());
+            if (precision < DecimalType.MIN_PRECISION || precision > 
DecimalType.MAX_PRECISION) {
+                precision = DEFAULT_DECIMAL_PRECISION;
+            }
+            if (items.length == 2) {
+                scale = Integer.parseInt(items[1].trim());
+                if (scale < DecimalType.MIN_SCALE || scale > precision) {
+                    scale = DEFAULT_DECIMAL_SCALE;
                 }
-                LOG.info("Decimal has only precision {} without scale", 
precision);
-                return new DecimalType(precision);
             }
-            return decimalType;
+            return new DecimalType(precision, scale);
+        } else if (type instanceof CharType) {
+            int length = Integer.parseInt(items[0].trim());
+            if (length <= 0) {
+                length = DEFAULT_CHAR_LENGTH;
+            }
+            return new CharType(length);
+        } else if (type instanceof VarCharType) {
+            int length = Integer.parseInt(items[0].trim());
+            if (length <= 0) {
+                length = Integer.MAX_VALUE;
+            }
+            return new VarCharType(length);
+        } else if (type instanceof VarBinaryType) {
+            int length = Integer.parseInt(items[0].trim());
+            if (length <= 0) {
+                length = Integer.MAX_VALUE;
+            }
+            return new VarBinaryType(length);
+        } else if (type instanceof BinaryType) {
+            int length = Integer.parseInt(items[0].trim());
+            if (length <= 0) {
+                length = 1;
+            }
+            return new BinaryType(length);
+        } else if ("TINYINT(1)".equalsIgnoreCase(matcher.group(0))) {
+            return new TinyIntType();
+        } else if ("BIGINT UNSIGNED".equalsIgnoreCase(matcher.group(1))) {
+            return new DecimalType(20, 0);
+        } else if ("BIGINT UNSIGNED 
ZEROFILL".equalsIgnoreCase(matcher.group(1))) {
+            return new DecimalType(20, 0);
+        } else {
+            return type;
         }
-        return type;
     }
 
     public LogicalType sqlType2FlinkType(int jdbcType) {
@@ -341,11 +377,11 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
         List<Map<String, String>> values = new ArrayList<>();
         if (data.isArray()) {
             for (int i = 0; i < data.size(); i++) {
-                values.add(objectMapper.convertValue(data.get(i), new 
TypeReference<Map<String, String>>() {
+                values.add(OBJECT_MAPPER.convertValue(data.get(i), new 
TypeReference<Map<String, String>>() {
                 }));
             }
         } else {
-            values.add(objectMapper.convertValue(data, new 
TypeReference<Map<String, String>>() {
+            values.add(OBJECT_MAPPER.convertValue(data, new 
TypeReference<Map<String, String>>() {
             }));
         }
         return values;
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
index 8d7f766c1d..47d708bde6 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/schema/SchemaChangeHelper.java
@@ -44,16 +44,16 @@ import java.util.Set;
 
 /**
  * Schema change helper
- * */
+ */
 public abstract class SchemaChangeHelper implements SchemaChangeHandle {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaChangeHelper.class);
-    private final boolean schemaChange;
     protected final Map<SchemaChangeType, SchemaChangePolicy> policyMap;
     protected final JsonDynamicSchemaFormat dynamicSchemaFormat;
+    protected final SchemaUpdateExceptionPolicy exceptionPolicy;
+    private final boolean schemaChange;
     private final String databasePattern;
     private final String tablePattern;
-    protected final SchemaUpdateExceptionPolicy exceptionPolicy;
     private final SinkTableMetricData metricData;
     private final DirtySinkHelper<Object> dirtySinkHelper;
 
@@ -105,7 +105,7 @@ public abstract class SchemaChangeHelper implements 
SchemaChangeHandle {
                 return;
             }
             operation = Preconditions.checkNotNull(
-                    
dynamicSchemaFormat.objectMapper.convertValue(operationNode, new 
TypeReference<Operation>() {
+                    
JsonDynamicSchemaFormat.OBJECT_MAPPER.convertValue(operationNode, new 
TypeReference<Operation>() {
                     }), "Operation is null");
         } catch (Exception e) {
             if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
@@ -248,6 +248,7 @@ public abstract class SchemaChangeHelper implements 
SchemaChangeHandle {
             }
         }
     }
+
     private String parseValue(JsonNode data, String pattern) {
         try {
             return dynamicSchemaFormat.parse(data, pattern);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
index c00cf81d2d..61ab97b974 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
@@ -35,6 +35,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.StringJoiner;
 
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION;
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE;
+
 public class OperationHelper {
 
     private static final String APOSTROPHE = "'";
@@ -76,7 +79,7 @@ public class OperationHelper {
                     Preconditions.checkState(precisions.size() < 3,
                             "The length of precisions with DECIMAL must small 
than 3");
                     int precision = Integer.parseInt(precisions.get(0));
-                    int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE;
+                    int scale = DEFAULT_DECIMAL_SCALE;
                     if (precisions.size() == 2) {
                         scale = Integer.parseInt(precisions.get(1));
                     }
@@ -135,8 +138,8 @@ public class OperationHelper {
                 break;
             case Types.REAL:
             case Types.NUMERIC:
-                int precision = 
JsonDynamicSchemaFormat.DEFAULT_DECIMAL_PRECISION;
-                int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE;
+                int precision = DEFAULT_DECIMAL_PRECISION;
+                int scale = DEFAULT_DECIMAL_SCALE;
                 if (precisions != null && !precisions.isEmpty()) {
                     Preconditions.checkState(precisions.size() < 3,
                             "The length of precisions with NUMERIC must small 
than 3");
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
index 0eec4921c4..fbc2561d5f 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
@@ -145,7 +145,7 @@ public class SchemaChangeHelper {
             JsonNode operationNode = 
Preconditions.checkNotNull(data.get("operation"),
                     "Operation node is null");
             operation = Preconditions.checkNotNull(
-                    
dynamicSchemaFormat.objectMapper.convertValue(operationNode, new 
TypeReference<Operation>() {
+                    
JsonDynamicSchemaFormat.OBJECT_MAPPER.convertValue(operationNode, new 
TypeReference<Operation>() {
                     }), "Operation is null");
         } catch (Exception e) {
             if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
@@ -426,7 +426,7 @@ public class SchemaChangeHelper {
             HttpPost httpPost = new HttpPost(requestUrl);
             httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
             httpPost.setHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON);
-            httpPost.setEntity(new 
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
+            httpPost.setEntity(new 
StringEntity(JsonDynamicSchemaFormat.OBJECT_MAPPER.writeValueAsString(param)));
             // if any fenode succeeds, return true, else keep trying
             if (sendRequest(httpPost)) {
                 return true;
@@ -441,7 +441,7 @@ public class SchemaChangeHelper {
         Map<String, Object> param = buildRequestParam(column, dropColumn);
         HttpGetEntity httpGet = new HttpGetEntity(url);
         httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
-        httpGet.setEntity(new 
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
+        httpGet.setEntity(new 
StringEntity(JsonDynamicSchemaFormat.OBJECT_MAPPER.writeValueAsString(param)));
         boolean success = sendRequest(httpGet);
         if (!success) {
             LOGGER.warn("schema change can not do table {}.{}", database, 
table);
@@ -458,7 +458,7 @@ public class SchemaChangeHelper {
                     final int statusCode = 
response.getStatusLine().getStatusCode();
                     if (statusCode == HttpStatus.SC_OK && response.getEntity() 
!= null) {
                         String loadResult = 
EntityUtils.toString(response.getEntity());
-                        Map<String, Object> responseMap = 
dynamicSchemaFormat.objectMapper
+                        Map<String, Object> responseMap = 
JsonDynamicSchemaFormat.OBJECT_MAPPER
                                 .readValue(loadResult, Map.class);
                         String code = responseMap.getOrDefault("code", 
"-1").toString();
                         if (DORIS_HTTP_CALL_SUCCESS.equals(code)) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
index 0cfb19e8d3..49ea3981db 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
@@ -96,15 +96,13 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
     private final String sinkMultipleFormat;
     private final DirtyOptions dirtyOptions;
     private final @Nullable DirtySink<Object> dirtySink;
+    final private Map<SchemaChangeType, SchemaChangePolicy> policyMap;
     private boolean multipleSink;
     private JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
     private int[] partitions;
-
     private int parallelInstanceId;
-
     private int numParallelInstances;
     private SinkTopicMetricData metricData;
-    final private Map<SchemaChangeType, SchemaChangePolicy> policyMap;
 
     DynamicKafkaSerializationSchema(
             String topic,
@@ -272,7 +270,7 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
             JsonNode dataNode, List<ProducerRecord<byte[], byte[]>> values) {
         String topic = null;
         try {
-            byte[] data = 
jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
+            byte[] data = 
JsonDynamicSchemaFormat.OBJECT_MAPPER.writeValueAsBytes(baseMap);
             topic = jsonDynamicSchemaFormat.parse(rootNode, topicPattern);
             values.add(new ProducerRecord<>(topic,
                     extractPartition(null, null, data), null, data));
@@ -308,7 +306,7 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
             JsonNode operationNode = 
Preconditions.checkNotNull(rootNode.get("operation"),
                     "Operation node is null");
             operation = Preconditions.checkNotNull(
-                    
jsonDynamicSchemaFormat.objectMapper.convertValue(operationNode, new 
TypeReference<Operation>() {
+                    
JsonDynamicSchemaFormat.OBJECT_MAPPER.convertValue(operationNode, new 
TypeReference<Operation>() {
                     }), "Operation is null");
         } catch (Exception e) {
             LOG.error("Extract Operation from origin data failed", e);
diff --git 
a/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
 
b/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
index cdc15c2324..883620a74f 100644
--- 
a/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
+++ 
b/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
@@ -43,50 +43,30 @@ import org.apache.flink.table.types.logical.VarCharType;
 
 import java.util.Map;
 
-public class FormatJsonUtil {
-
-    private static final int DEFAULT_DECIMAL_PRECISION = 15;
-    private static final int DEFAULT_DECIMAL_SCALE = 5;
-    private static final Integer ORACLE_TIMESTAMP_TIME_ZONE = -101;
-
-    public static RowDataToJsonConverter rowDataToJsonConverter(DataType 
physicalRowDataType) {
-        return rowDataToJsonConverter(TimestampFormat.SQL, null, 
physicalRowDataType);
-    }
-
-    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
-            String mapNullKeyLiteral,
-            DataType physicalRowDataType) {
-        return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP, 
mapNullKeyLiteral, physicalRowDataType);
-    }
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_CHAR_LENGTH;
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_CHAR_TIME_LENGTH;
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION;
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE;
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.ORACLE_TIMESTAMP_TIME_ZONE;
 
-    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
-            MapNullKeyMode mapNullKeyMode,
-            String mapNullKeyLiteral, DataType physicalRowDataType) {
-        return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, 
mapNullKeyLiteral)
-                .createConverter(physicalRowDataType.getLogicalType());
-    }
-
-    public static RowDataToJsonConverter rowDataToJsonConverter(LogicalType 
rowType) {
-        return rowDataToJsonConverter(TimestampFormat.SQL, null, rowType);
-    }
-
-    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
-            String mapNullKeyLiteral,
-            LogicalType rowType) {
-        return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP, 
mapNullKeyLiteral, rowType);
-    }
-
-    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
-            MapNullKeyMode mapNullKeyMode,
-            String mapNullKeyLiteral, LogicalType rowType) {
-        return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, 
mapNullKeyLiteral)
-                .createConverter(rowType);
-    }
+public class FormatJsonUtil {
 
+    public static final Map<String, LogicalType> 
DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING =
+            ImmutableMap.<String, LogicalType>builder()
+                    .put("BOOLEAN", new BooleanType())
+                    .put("INT8", new TinyIntType())
+                    .put("INT16", new SmallIntType())
+                    .put("INT32", new IntType())
+                    .put("INT64", new BigIntType())
+                    .put("FLOAT32", new FloatType())
+                    .put("FLOAT64", new DoubleType())
+                    .put("STRING", new VarCharType())
+                    .put("BYTES", new VarBinaryType())
+                    .build();
     public static final Map<Integer, LogicalType> 
SQL_TYPE_2_FLINK_TYPE_MAPPING =
             ImmutableMap.<Integer, LogicalType>builder()
-                    .put(java.sql.Types.CHAR, new CharType())
-                    .put(java.sql.Types.VARCHAR, new VarCharType())
+                    .put(java.sql.Types.CHAR, new 
CharType(DEFAULT_CHAR_LENGTH))
+                    .put(java.sql.Types.VARCHAR, new 
VarCharType(VarCharType.MAX_LENGTH))
                     .put(java.sql.Types.SMALLINT, new SmallIntType())
                     .put(java.sql.Types.INTEGER, new IntType())
                     .put(java.sql.Types.BIGINT, new BigIntType())
@@ -101,26 +81,25 @@ public class FormatJsonUtil {
                     .put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new 
LocalZonedTimestampType())
                     .put(ORACLE_TIMESTAMP_TIME_ZONE, new 
LocalZonedTimestampType())
                     .put(java.sql.Types.TIMESTAMP, new TimestampType())
-                    .put(java.sql.Types.BINARY, new BinaryType())
-                    .put(java.sql.Types.VARBINARY, new VarBinaryType())
-                    .put(java.sql.Types.BLOB, new VarBinaryType())
-                    .put(java.sql.Types.CLOB, new VarBinaryType())
+                    .put(java.sql.Types.BINARY, new 
BinaryType(BinaryType.MAX_LENGTH))
+                    .put(java.sql.Types.VARBINARY, new 
VarBinaryType(VarBinaryType.MAX_LENGTH))
+                    .put(java.sql.Types.BLOB, new 
VarBinaryType(VarBinaryType.MAX_LENGTH))
+                    .put(java.sql.Types.CLOB, new 
VarBinaryType(VarBinaryType.MAX_LENGTH))
                     .put(java.sql.Types.DATE, new DateType())
                     .put(java.sql.Types.BOOLEAN, new BooleanType())
-                    .put(java.sql.Types.LONGNVARCHAR, new VarCharType())
-                    .put(java.sql.Types.LONGVARBINARY, new VarCharType())
-                    .put(java.sql.Types.LONGVARCHAR, new VarCharType())
-                    .put(java.sql.Types.ARRAY, new VarCharType())
-                    .put(java.sql.Types.NCHAR, new CharType())
-                    .put(java.sql.Types.NCLOB, new VarBinaryType())
+                    .put(java.sql.Types.LONGNVARCHAR, new 
VarCharType(VarCharType.MAX_LENGTH))
+                    .put(java.sql.Types.LONGVARBINARY, new 
VarCharType(VarCharType.MAX_LENGTH))
+                    .put(java.sql.Types.LONGVARCHAR, new 
VarCharType(VarCharType.MAX_LENGTH))
+                    .put(java.sql.Types.ARRAY, new 
VarCharType(VarCharType.MAX_LENGTH))
+                    .put(java.sql.Types.NCHAR, new 
CharType(DEFAULT_CHAR_LENGTH))
+                    .put(java.sql.Types.NCLOB, new 
VarBinaryType(VarBinaryType.MAX_LENGTH))
                     .put(java.sql.Types.TINYINT, new TinyIntType())
-                    .put(java.sql.Types.OTHER, new VarCharType())
+                    .put(java.sql.Types.OTHER, new 
VarCharType(VarCharType.MAX_LENGTH))
                     .build();
-
     public static final Map<Integer, LogicalType> 
SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING =
             ImmutableMap.<Integer, LogicalType>builder()
-                    .put(java.sql.Types.CHAR, new CharType())
-                    .put(java.sql.Types.VARCHAR, new VarCharType())
+                    .put(java.sql.Types.CHAR, new 
CharType(DEFAULT_CHAR_LENGTH))
+                    .put(java.sql.Types.VARCHAR, new 
VarCharType(VarCharType.MAX_LENGTH))
                     .put(java.sql.Types.SMALLINT, new SmallIntType())
                     .put(java.sql.Types.INTEGER, new IntType())
                     .put(java.sql.Types.BIGINT, new BigIntType())
@@ -130,36 +109,59 @@ public class FormatJsonUtil {
                     .put(java.sql.Types.DECIMAL, new 
DecimalType(DEFAULT_DECIMAL_PRECISION, DEFAULT_DECIMAL_SCALE))
                     .put(java.sql.Types.NUMERIC, new 
DecimalType(DEFAULT_DECIMAL_PRECISION, DEFAULT_DECIMAL_SCALE))
                     .put(java.sql.Types.BIT, new BooleanType())
-                    .put(java.sql.Types.TIME, new VarCharType())
+                    .put(java.sql.Types.TIME, new 
VarCharType(DEFAULT_CHAR_TIME_LENGTH))
+                    .put(java.sql.Types.TIME_WITH_TIMEZONE, new 
VarCharType(DEFAULT_CHAR_TIME_LENGTH))
                     .put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new 
LocalZonedTimestampType())
                     .put(ORACLE_TIMESTAMP_TIME_ZONE, new 
LocalZonedTimestampType())
                     .put(java.sql.Types.TIMESTAMP, new 
LocalZonedTimestampType())
-                    .put(java.sql.Types.BINARY, new BinaryType())
-                    .put(java.sql.Types.VARBINARY, new VarBinaryType())
-                    .put(java.sql.Types.BLOB, new VarBinaryType())
+                    .put(java.sql.Types.BINARY, new 
BinaryType(BinaryType.MAX_LENGTH))
+                    .put(java.sql.Types.VARBINARY, new 
VarBinaryType(VarBinaryType.MAX_LENGTH))
+                    .put(java.sql.Types.BLOB, new 
VarBinaryType(VarBinaryType.MAX_LENGTH))
+                    .put(java.sql.Types.CLOB, new 
VarBinaryType(VarBinaryType.MAX_LENGTH))
                     .put(java.sql.Types.DATE, new DateType())
                     .put(java.sql.Types.BOOLEAN, new BooleanType())
-                    .put(java.sql.Types.LONGNVARCHAR, new VarCharType())
-                    .put(java.sql.Types.LONGVARBINARY, new VarCharType())
-                    .put(java.sql.Types.LONGVARCHAR, new VarCharType())
-                    .put(java.sql.Types.ARRAY, new VarCharType())
-                    .put(java.sql.Types.NCHAR, new CharType())
-                    .put(java.sql.Types.NCLOB, new VarBinaryType())
+                    .put(java.sql.Types.LONGNVARCHAR, new 
VarCharType(VarCharType.MAX_LENGTH))
+                    .put(java.sql.Types.LONGVARBINARY, new 
VarCharType(VarCharType.MAX_LENGTH))
+                    .put(java.sql.Types.LONGVARCHAR, new 
VarCharType(VarCharType.MAX_LENGTH))
+                    .put(java.sql.Types.ARRAY, new 
VarCharType(VarCharType.MAX_LENGTH))
+                    .put(java.sql.Types.NCHAR, new 
CharType(DEFAULT_CHAR_LENGTH))
+                    .put(java.sql.Types.NCLOB, new 
VarBinaryType(VarBinaryType.MAX_LENGTH))
                     .put(java.sql.Types.TINYINT, new TinyIntType())
-                    .put(java.sql.Types.OTHER, new VarCharType())
+                    .put(java.sql.Types.OTHER, new 
VarCharType(VarCharType.MAX_LENGTH))
                     .build();
 
-    public static final Map<String, LogicalType> 
DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING =
-            ImmutableMap.<String, LogicalType>builder()
-                    .put("BOOLEAN", new BooleanType())
-                    .put("INT8", new TinyIntType())
-                    .put("INT16", new SmallIntType())
-                    .put("INT32", new IntType())
-                    .put("INT64", new BigIntType())
-                    .put("FLOAT32", new FloatType())
-                    .put("FLOAT64", new DoubleType())
-                    .put("STRING", new VarCharType())
-                    .put("BYTES", new VarBinaryType())
-                    .build();
+    public static RowDataToJsonConverter rowDataToJsonConverter(DataType 
physicalRowDataType) {
+        return rowDataToJsonConverter(TimestampFormat.SQL, null, 
physicalRowDataType);
+    }
+
+    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
+            String mapNullKeyLiteral,
+            DataType physicalRowDataType) {
+        return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP, 
mapNullKeyLiteral, physicalRowDataType);
+    }
+
+    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
+            MapNullKeyMode mapNullKeyMode,
+            String mapNullKeyLiteral, DataType physicalRowDataType) {
+        return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, 
mapNullKeyLiteral)
+                .createConverter(physicalRowDataType.getLogicalType());
+    }
+
+    public static RowDataToJsonConverter rowDataToJsonConverter(LogicalType 
rowType) {
+        return rowDataToJsonConverter(TimestampFormat.SQL, null, rowType);
+    }
+
+    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
+            String mapNullKeyLiteral,
+            LogicalType rowType) {
+        return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP, 
mapNullKeyLiteral, rowType);
+    }
+
+    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
+            MapNullKeyMode mapNullKeyMode,
+            String mapNullKeyLiteral, LogicalType rowType) {
+        return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, 
mapNullKeyLiteral)
+                .createConverter(rowType);
+    }
 
 }
diff --git 
a/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
 
b/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
index 7da129f323..456b556002 100644
--- 
a/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
+++ 
b/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
@@ -42,47 +42,12 @@ import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 
 import java.util.Map;
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION;
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE;
+import static 
org.apache.inlong.sort.protocol.constant.DataTypeConstants.ORACLE_TIMESTAMP_TIME_ZONE;
 
 public class FormatJsonUtil {
 
-    private static final int DEFAULT_DECIMAL_PRECISION = 15;
-    private static final int DEFAULT_DECIMAL_SCALE = 5;
-    private static final Integer ORACLE_TIMESTAMP_TIME_ZONE = -101;
-
-    public static RowDataToJsonConverter rowDataToJsonConverter(DataType 
physicalRowDataType) {
-        return rowDataToJsonConverter(TimestampFormat.SQL, null, 
physicalRowDataType);
-    }
-
-    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
-            String mapNullKeyLiteral,
-            DataType physicalRowDataType) {
-        return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP, 
mapNullKeyLiteral, physicalRowDataType);
-    }
-
-    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
-            MapNullKeyMode mapNullKeyMode,
-            String mapNullKeyLiteral, DataType physicalRowDataType) {
-        return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, 
mapNullKeyLiteral)
-                .createConverter(physicalRowDataType.getLogicalType());
-    }
-
-    public static RowDataToJsonConverter rowDataToJsonConverter(LogicalType 
rowType) {
-        return rowDataToJsonConverter(TimestampFormat.SQL, null, rowType);
-    }
-
-    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
-            String mapNullKeyLiteral,
-            LogicalType rowType) {
-        return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP, 
mapNullKeyLiteral, rowType);
-    }
-
-    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
-            MapNullKeyMode mapNullKeyMode,
-            String mapNullKeyLiteral, LogicalType rowType) {
-        return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, 
mapNullKeyLiteral)
-                .createConverter(rowType);
-    }
-
     public static final Map<Integer, LogicalType> 
SQL_TYPE_2_FLINK_TYPE_MAPPING =
             ImmutableMap.<Integer, LogicalType>builder()
                     .put(java.sql.Types.CHAR, new CharType())
@@ -116,7 +81,6 @@ public class FormatJsonUtil {
                     .put(java.sql.Types.TINYINT, new TinyIntType())
                     .put(java.sql.Types.OTHER, new VarCharType())
                     .build();
-
     public static final Map<Integer, LogicalType> 
SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING =
             ImmutableMap.<Integer, LogicalType>builder()
                     .put(java.sql.Types.CHAR, new CharType())
@@ -148,7 +112,6 @@ public class FormatJsonUtil {
                     .put(java.sql.Types.TINYINT, new TinyIntType())
                     .put(java.sql.Types.OTHER, new VarCharType())
                     .build();
-
     public static final Map<String, LogicalType> 
DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING =
             ImmutableMap.<String, LogicalType>builder()
                     .put("BOOLEAN", new BooleanType())
@@ -161,4 +124,38 @@ public class FormatJsonUtil {
                     .put("STRING", new VarCharType())
                     .put("BYTES", new VarBinaryType())
                     .build();
+
+    public static RowDataToJsonConverter rowDataToJsonConverter(DataType 
physicalRowDataType) {
+        return rowDataToJsonConverter(TimestampFormat.SQL, null, 
physicalRowDataType);
+    }
+
+    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
+            String mapNullKeyLiteral,
+            DataType physicalRowDataType) {
+        return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP, 
mapNullKeyLiteral, physicalRowDataType);
+    }
+
+    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
+            MapNullKeyMode mapNullKeyMode,
+            String mapNullKeyLiteral, DataType physicalRowDataType) {
+        return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, 
mapNullKeyLiteral)
+                .createConverter(physicalRowDataType.getLogicalType());
+    }
+
+    public static RowDataToJsonConverter rowDataToJsonConverter(LogicalType 
rowType) {
+        return rowDataToJsonConverter(TimestampFormat.SQL, null, rowType);
+    }
+
+    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
+            String mapNullKeyLiteral,
+            LogicalType rowType) {
+        return rowDataToJsonConverter(timestampFormat, MapNullKeyMode.DROP, 
mapNullKeyLiteral, rowType);
+    }
+
+    public static RowDataToJsonConverter 
rowDataToJsonConverter(TimestampFormat timestampFormat,
+            MapNullKeyMode mapNullKeyMode,
+            String mapNullKeyLiteral, LogicalType rowType) {
+        return new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, 
mapNullKeyLiteral)
+                .createConverter(rowType);
+    }
 }


Reply via email to