This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c20c64f28 [INLONG-8337][Sort] Support getting accurate precision and 
scale of decimal type for Iceberg (#8338)
6c20c64f28 is described below

commit 6c20c64f289d10719612c20df223a857d79e7ded
Author: Liao Rui <[email protected]>
AuthorDate: Wed Jun 28 20:50:58 2023 +0800

    [INLONG-8337][Sort] Support getting accurate precision and scale of decimal 
type for Iceberg (#8338)
    
    Co-authored-by: healchow <[email protected]>
---
 .../base/format/CanalJsonDynamicSchemaFormat.java  |  8 ++-
 .../format/DebeziumJsonDynamicSchemaFormat.java    | 16 +++++-
 .../sort/base/format/JsonDynamicSchemaFormat.java  | 64 +++++++++++++++++++++-
 3 files changed, 83 insertions(+), 5 deletions(-)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
index 653aaabb64..4e8c817ebf 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
@@ -39,6 +39,8 @@ public class CanalJsonDynamicSchemaFormat extends 
JsonDynamicSchemaFormat {
     private static final String OLD = "old";
     private static final String PK_NAMES = "pkNames";
     private static final String SCHEMA = "sqlType";
+    private static final String MYSQL_TYPE = "mysqlType";
+    private static final String ORACLE_TYE = "oracleType";
     private static final String OP_TYPE = "type";
     private static final String OP_INSERT = "INSERT";
     private static final String OP_UPDATE = "UPDATE";
@@ -121,10 +123,14 @@ public class CanalJsonDynamicSchemaFormat extends 
JsonDynamicSchemaFormat {
     @Override
     public RowType extractSchema(JsonNode data, List<String> pkNames) {
         JsonNode schema = data.get(SCHEMA);
+        JsonNode dialectSchema = data.get(MYSQL_TYPE);
+        if (dialectSchema == null) {
+            dialectSchema = data.get(ORACLE_TYE);
+        }
         if (schema == null) {
             throw new IllegalArgumentException(String.format("Not found schema 
from: %s", data));
         }
-        return extractSchemaNode(schema, pkNames);
+        return extractSchemaNode(schema, dialectSchema, pkNames);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
index 0c9a3fe519..7d996ddade 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
@@ -48,6 +48,8 @@ public class DebeziumJsonDynamicSchemaFormat extends 
JsonDynamicSchemaFormat {
     private static final String DDL = "ddl";
     private static final String SCHEMA = "schema";
     private static final String SQL_TYPE = "sqlType";
+    private static final String MYSQL_TYPE = "mysqlType";
+    private static final String ORACLE_TYE = "oracleType";
     private static final String AFTER = "after";
     private static final String BEFORE = "before";
     private static final String SOURCE = "source";
@@ -220,10 +222,14 @@ public class DebeziumJsonDynamicSchemaFormat extends 
JsonDynamicSchemaFormat {
                 throw new IllegalArgumentException(String.format("Error 
schema: %s.", data));
             }
             JsonNode schemaNode = sourceNode.get(SQL_TYPE);
+            JsonNode dialectSchema = sourceNode.get(MYSQL_TYPE);
+            if (dialectSchema == null) {
+                dialectSchema = sourceNode.get(ORACLE_TYE);
+            }
             if (schemaNode == null) {
                 throw new IllegalArgumentException(String.format("Error 
schema: %s.", data));
             }
-            return super.extractSchemaNode(schemaNode, pkNames);
+            return super.extractSchemaNode(schemaNode, dialectSchema, pkNames);
         }
         return extractSchemaFromExtractInfo(payload, pkNames);
     }
@@ -240,7 +246,7 @@ public class DebeziumJsonDynamicSchemaFormat extends 
JsonDynamicSchemaFormat {
             }
             for (JsonNode field : schema.get(FIELDS)) {
                 if (AFTER.equals(field.get(FIELD).asText())) {
-                    return extractSchemaNode(field.get(FIELDS), pkNames);
+                    return extractSchemaNode(field.get(FIELDS), null, pkNames);
                 }
             }
             throw new IllegalArgumentException(String.format("Error schema: 
%s.", schema));
@@ -248,11 +254,15 @@ public class DebeziumJsonDynamicSchemaFormat extends 
JsonDynamicSchemaFormat {
     }
 
     @Override
-    public RowType extractSchemaNode(JsonNode schema, List<String> pkNames) {
+    public RowType extractSchemaNode(JsonNode schema, JsonNode dialectSchema, 
List<String> pkNames) {
         List<RowType.RowField> fields = new ArrayList<>();
         for (JsonNode field : schema) {
             String name = field.get(FIELD).asText();
             LogicalType type = 
debeziumType2FlinkType(field.get(TYPE).asText());
+            if (dialectSchema != null) {
+                String dialectType = dialectSchema.get(name) != null ? 
dialectSchema.get(name).asText() : null;
+                type = handleDialectSqlType(type, dialectType);
+            }
             if (pkNames.contains(name)) {
                 type = type.copy(false);
             }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index 09c5aafa94..6ed59b1de2 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.base.format;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.formats.common.TimestampFormat;
@@ -44,6 +45,8 @@ import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.RowKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -52,6 +55,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
 
@@ -71,6 +75,7 @@ import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPA
 @SuppressWarnings("LanguageDetectionInspection")
 public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaFormat<JsonNode> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(JsonDynamicSchemaFormat.class);
     /**
      * The first item of array
      */
@@ -78,6 +83,12 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
 
     private static final int DEFAULT_DECIMAL_PRECISION = 15;
     private static final int DEFAULT_DECIMAL_SCALE = 5;
+    /**
+     * dialect sql type pattern such as DECIMAL(38, 10) from mysql or oracle 
etc
+     */
+    private static final Pattern DIALECT_SQL_TYPE_PATTERN = 
Pattern.compile("\\w+\\(([\\d,\\s]*)\\)");
+
+    private static final Integer ORACLE_TIMESTAMP_TIME_ZONE = -101;
 
     private static final Map<Integer, LogicalType> 
SQL_TYPE_2_FLINK_TYPE_MAPPING =
             ImmutableMap.<Integer, LogicalType>builder()
@@ -95,6 +106,7 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
                     .put(java.sql.Types.TIME, new TimeType())
                     .put(java.sql.Types.TIME_WITH_TIMEZONE, new TimeType())
                     .put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new 
LocalZonedTimestampType())
+                    .put(ORACLE_TIMESTAMP_TIME_ZONE, new 
LocalZonedTimestampType())
                     .put(java.sql.Types.TIMESTAMP, new TimestampType())
                     .put(java.sql.Types.BINARY, new BinaryType())
                     .put(java.sql.Types.VARBINARY, new VarBinaryType())
@@ -127,12 +139,20 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
                     .put(java.sql.Types.BIT, new BooleanType())
                     .put(java.sql.Types.TIME, new VarCharType())
                     .put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new 
LocalZonedTimestampType())
+                    .put(ORACLE_TIMESTAMP_TIME_ZONE, new 
LocalZonedTimestampType())
                     .put(java.sql.Types.TIMESTAMP, new 
LocalZonedTimestampType())
                     .put(java.sql.Types.BINARY, new BinaryType())
                     .put(java.sql.Types.VARBINARY, new VarBinaryType())
                     .put(java.sql.Types.BLOB, new VarBinaryType())
                     .put(java.sql.Types.DATE, new DateType())
                     .put(java.sql.Types.BOOLEAN, new BooleanType())
+                    .put(java.sql.Types.LONGNVARCHAR, new VarCharType())
+                    .put(java.sql.Types.LONGVARBINARY, new VarCharType())
+                    .put(java.sql.Types.LONGVARCHAR, new VarCharType())
+                    .put(java.sql.Types.ARRAY, new VarCharType())
+                    .put(java.sql.Types.NCHAR, new CharType())
+                    .put(java.sql.Types.NCLOB, new VarBinaryType())
+                    .put(java.sql.Types.TINYINT, new TinyIntType())
                     .put(java.sql.Types.OTHER, new VarCharType())
                     .build();
 
@@ -321,13 +341,15 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
      */
     public abstract String getOpType(JsonNode root);
 
-    protected RowType extractSchemaNode(JsonNode schema, List<String> pkNames) 
{
+    protected RowType extractSchemaNode(JsonNode schema, JsonNode 
dialectSchema, List<String> pkNames) {
         Iterator<Entry<String, JsonNode>> schemaFields = schema.fields();
         List<RowField> fields = new ArrayList<>();
         while (schemaFields.hasNext()) {
             Entry<String, JsonNode> entry = schemaFields.next();
             String name = entry.getKey();
             LogicalType type = sqlType2FlinkType(entry.getValue().asInt());
+            String dialectType = dialectSchema.get(name) != null ? 
dialectSchema.get(name).asText() : null;
+            type = handleDialectSqlType(type, dialectType);
             if (pkNames.contains(name)) {
                 type = type.copy(false);
             }
@@ -336,6 +358,46 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
         return new RowType(fields);
     }
 
+    /**
+     * set precision and scale for decimal and other types
+     *
+     * @param type original flink type
+     * @param dialectType database dialect type
+     * @return flink type revised according to dialect type
+     */
+    public LogicalType handleDialectSqlType(LogicalType type, String 
dialectType) {
+        if (StringUtils.isBlank(dialectType)) {
+            return type;
+        }
+        if (type instanceof DecimalType) {
+            Matcher matcher = DIALECT_SQL_TYPE_PATTERN.matcher(dialectType);
+            int precision;
+            int scale;
+            DecimalType decimalType = new 
DecimalType(DecimalType.MAX_PRECISION);
+            if (matcher.matches()) {
+                String[] items = matcher.group(1).split(",");
+                precision = Integer.parseInt(items[0].trim());
+                if (precision < DecimalType.MIN_PRECISION || precision > 
DecimalType.MAX_PRECISION) {
+                    LOG.info("invalid decimal precision: {}, change to: {}", 
precision, decimalType.getPrecision());
+                    return decimalType;
+                }
+                if (items.length == 2) {
+                    scale = Integer.parseInt(items[1].trim());
+                    if (scale < DecimalType.MIN_SCALE || scale > precision) {
+                        decimalType = new DecimalType(precision);
+                        LOG.info("invalid decimal scale: {}, change to: {}", 
scale, decimalType.getScale());
+                        return decimalType;
+                    }
+                    return new DecimalType(precision, scale);
+                }
+                LOG.info("Decimal has only precision {} without scale", 
precision);
+                return new DecimalType(precision);
+            }
+            return decimalType;
+        }
+        return type;
+    }
+
     public LogicalType sqlType2FlinkType(int jdbcType) {
         Map<Integer, LogicalType> typeMap = adaptSparkEngine
                 ? SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING

Reply via email to