This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6c20c64f28 [INLONG-8337][Sort] Support getting accurate precision and
scale of decimal type for Iceberg (#8338)
6c20c64f28 is described below
commit 6c20c64f289d10719612c20df223a857d79e7ded
Author: Liao Rui <[email protected]>
AuthorDate: Wed Jun 28 20:50:58 2023 +0800
[INLONG-8337][Sort] Support getting accurate precision and scale of decimal
type for Iceberg (#8338)
Co-authored-by: healchow <[email protected]>
---
.../base/format/CanalJsonDynamicSchemaFormat.java | 8 ++-
.../format/DebeziumJsonDynamicSchemaFormat.java | 16 +++++-
.../sort/base/format/JsonDynamicSchemaFormat.java | 64 +++++++++++++++++++++-
3 files changed, 83 insertions(+), 5 deletions(-)
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
index 653aaabb64..4e8c817ebf 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
@@ -39,6 +39,8 @@ public class CanalJsonDynamicSchemaFormat extends
JsonDynamicSchemaFormat {
private static final String OLD = "old";
private static final String PK_NAMES = "pkNames";
private static final String SCHEMA = "sqlType";
+ private static final String MYSQL_TYPE = "mysqlType";
+ private static final String ORACLE_TYE = "oracleType";
private static final String OP_TYPE = "type";
private static final String OP_INSERT = "INSERT";
private static final String OP_UPDATE = "UPDATE";
@@ -121,10 +123,14 @@ public class CanalJsonDynamicSchemaFormat extends
JsonDynamicSchemaFormat {
@Override
public RowType extractSchema(JsonNode data, List<String> pkNames) {
JsonNode schema = data.get(SCHEMA);
+ JsonNode dialectSchema = data.get(MYSQL_TYPE);
+ if (dialectSchema == null) {
+ dialectSchema = data.get(ORACLE_TYE);
+ }
if (schema == null) {
throw new IllegalArgumentException(String.format("Not found schema
from: %s", data));
}
- return extractSchemaNode(schema, pkNames);
+ return extractSchemaNode(schema, dialectSchema, pkNames);
}
@Override
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
index 0c9a3fe519..7d996ddade 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
@@ -48,6 +48,8 @@ public class DebeziumJsonDynamicSchemaFormat extends
JsonDynamicSchemaFormat {
private static final String DDL = "ddl";
private static final String SCHEMA = "schema";
private static final String SQL_TYPE = "sqlType";
+ private static final String MYSQL_TYPE = "mysqlType";
+ private static final String ORACLE_TYE = "oracleType";
private static final String AFTER = "after";
private static final String BEFORE = "before";
private static final String SOURCE = "source";
@@ -220,10 +222,14 @@ public class DebeziumJsonDynamicSchemaFormat extends
JsonDynamicSchemaFormat {
throw new IllegalArgumentException(String.format("Error
schema: %s.", data));
}
JsonNode schemaNode = sourceNode.get(SQL_TYPE);
+ JsonNode dialectSchema = sourceNode.get(MYSQL_TYPE);
+ if (dialectSchema == null) {
+ dialectSchema = sourceNode.get(ORACLE_TYE);
+ }
if (schemaNode == null) {
throw new IllegalArgumentException(String.format("Error
schema: %s.", data));
}
- return super.extractSchemaNode(schemaNode, pkNames);
+ return super.extractSchemaNode(schemaNode, dialectSchema, pkNames);
}
return extractSchemaFromExtractInfo(payload, pkNames);
}
@@ -240,7 +246,7 @@ public class DebeziumJsonDynamicSchemaFormat extends
JsonDynamicSchemaFormat {
}
for (JsonNode field : schema.get(FIELDS)) {
if (AFTER.equals(field.get(FIELD).asText())) {
- return extractSchemaNode(field.get(FIELDS), pkNames);
+ return extractSchemaNode(field.get(FIELDS), null, pkNames);
}
}
throw new IllegalArgumentException(String.format("Error schema:
%s.", schema));
@@ -248,11 +254,15 @@ public class DebeziumJsonDynamicSchemaFormat extends
JsonDynamicSchemaFormat {
}
@Override
- public RowType extractSchemaNode(JsonNode schema, List<String> pkNames) {
+ public RowType extractSchemaNode(JsonNode schema, JsonNode dialectSchema,
List<String> pkNames) {
List<RowType.RowField> fields = new ArrayList<>();
for (JsonNode field : schema) {
String name = field.get(FIELD).asText();
LogicalType type =
debeziumType2FlinkType(field.get(TYPE).asText());
+ if (dialectSchema != null) {
+ String dialectType = dialectSchema.get(name) != null ?
dialectSchema.get(name).asText() : null;
+ type = handleDialectSqlType(type, dialectType);
+ }
if (pkNames.contains(name)) {
type = type.copy(false);
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index 09c5aafa94..6ed59b1de2 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.base.format;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat;
@@ -44,6 +45,8 @@ import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -52,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
@@ -71,6 +75,7 @@ import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPA
@SuppressWarnings("LanguageDetectionInspection")
public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaFormat<JsonNode> {
+ private static final Logger LOG =
LoggerFactory.getLogger(JsonDynamicSchemaFormat.class);
/**
* The first item of array
*/
@@ -78,6 +83,12 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
private static final int DEFAULT_DECIMAL_PRECISION = 15;
private static final int DEFAULT_DECIMAL_SCALE = 5;
+ /**
+ * dialect sql type pattern such as DECIMAL(38, 10) from mysql or oracle
etc
+ */
+ private static final Pattern DIALECT_SQL_TYPE_PATTERN =
Pattern.compile("\\w+\\(([\\d,\\s]*)\\)");
+
+ private static final Integer ORACLE_TIMESTAMP_TIME_ZONE = -101;
private static final Map<Integer, LogicalType>
SQL_TYPE_2_FLINK_TYPE_MAPPING =
ImmutableMap.<Integer, LogicalType>builder()
@@ -95,6 +106,7 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
.put(java.sql.Types.TIME, new TimeType())
.put(java.sql.Types.TIME_WITH_TIMEZONE, new TimeType())
.put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new
LocalZonedTimestampType())
+ .put(ORACLE_TIMESTAMP_TIME_ZONE, new
LocalZonedTimestampType())
.put(java.sql.Types.TIMESTAMP, new TimestampType())
.put(java.sql.Types.BINARY, new BinaryType())
.put(java.sql.Types.VARBINARY, new VarBinaryType())
@@ -127,12 +139,20 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
.put(java.sql.Types.BIT, new BooleanType())
.put(java.sql.Types.TIME, new VarCharType())
.put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new
LocalZonedTimestampType())
+ .put(ORACLE_TIMESTAMP_TIME_ZONE, new
LocalZonedTimestampType())
.put(java.sql.Types.TIMESTAMP, new
LocalZonedTimestampType())
.put(java.sql.Types.BINARY, new BinaryType())
.put(java.sql.Types.VARBINARY, new VarBinaryType())
.put(java.sql.Types.BLOB, new VarBinaryType())
.put(java.sql.Types.DATE, new DateType())
.put(java.sql.Types.BOOLEAN, new BooleanType())
+ .put(java.sql.Types.LONGNVARCHAR, new VarCharType())
+ .put(java.sql.Types.LONGVARBINARY, new VarCharType())
+ .put(java.sql.Types.LONGVARCHAR, new VarCharType())
+ .put(java.sql.Types.ARRAY, new VarCharType())
+ .put(java.sql.Types.NCHAR, new CharType())
+ .put(java.sql.Types.NCLOB, new VarBinaryType())
+ .put(java.sql.Types.TINYINT, new TinyIntType())
.put(java.sql.Types.OTHER, new VarCharType())
.build();
@@ -321,13 +341,15 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
*/
public abstract String getOpType(JsonNode root);
- protected RowType extractSchemaNode(JsonNode schema, List<String> pkNames)
{
+ protected RowType extractSchemaNode(JsonNode schema, JsonNode
dialectSchema, List<String> pkNames) {
Iterator<Entry<String, JsonNode>> schemaFields = schema.fields();
List<RowField> fields = new ArrayList<>();
while (schemaFields.hasNext()) {
Entry<String, JsonNode> entry = schemaFields.next();
String name = entry.getKey();
LogicalType type = sqlType2FlinkType(entry.getValue().asInt());
+ String dialectType = dialectSchema.get(name) != null ?
dialectSchema.get(name).asText() : null;
+ type = handleDialectSqlType(type, dialectType);
if (pkNames.contains(name)) {
type = type.copy(false);
}
@@ -336,6 +358,46 @@ public abstract class JsonDynamicSchemaFormat extends
AbstractDynamicSchemaForma
return new RowType(fields);
}
+ /**
+ * set precision and scale for decimal and other types
+ *
+ * @param type original flink type
+ * @param dialectType database dialect type
+ * @return flink type revised according to dialect type
+ */
+ public LogicalType handleDialectSqlType(LogicalType type, String
dialectType) {
+ if (StringUtils.isBlank(dialectType)) {
+ return type;
+ }
+ if (type instanceof DecimalType) {
+ Matcher matcher = DIALECT_SQL_TYPE_PATTERN.matcher(dialectType);
+ int precision;
+ int scale;
+ DecimalType decimalType = new
DecimalType(DecimalType.MAX_PRECISION);
+ if (matcher.matches()) {
+ String[] items = matcher.group(1).split(",");
+ precision = Integer.parseInt(items[0].trim());
+ if (precision < DecimalType.MIN_PRECISION || precision >
DecimalType.MAX_PRECISION) {
+ LOG.info("invalid decimal precision: {}, change to: {}",
precision, decimalType.getPrecision());
+ return decimalType;
+ }
+ if (items.length == 2) {
+ scale = Integer.parseInt(items[1].trim());
+ if (scale < DecimalType.MIN_SCALE || scale > precision) {
+ decimalType = new DecimalType(precision);
+ LOG.info("invalid decimal scale: {}, change to: {}",
scale, decimalType.getScale());
+ return decimalType;
+ }
+ return new DecimalType(precision, scale);
+ }
+ LOG.info("Decimal has only precision {} without scale",
precision);
+ return new DecimalType(precision);
+ }
+ return decimalType;
+ }
+ return type;
+ }
+
public LogicalType sqlType2FlinkType(int jdbcType) {
Map<Integer, LogicalType> typeMap = adaptSparkEngine
? SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING