This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fcf30bdec [cdc]Fix performance issue in CanalRecordParser (#3572)
fcf30bdec is described below

commit fcf30bdecb022f808462aa442aef91e0144b286a
Author: MOBIN <[email protected]>
AuthorDate: Mon Jun 24 17:03:24 2024 +0800

    [cdc]Fix performance issue in CanalRecordParser (#3572)
---
 .../action/cdc/format/canal/CanalRecordParser.java |  33 +++--
 .../flink/action/cdc/mysql/MySqlTypeUtils.java     | 133 ++++++++-------------
 2 files changed, 66 insertions(+), 100 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
index 76671739a..aae4aab82 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.RecordParser;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
@@ -31,6 +32,7 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeRefe
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
 
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,13 +141,6 @@ public class CanalRecordParser extends RecordParser {
                 schema, new TypeReference<LinkedHashMap<String, String>>() {});
     }
 
-    private void toPaimonFieldTypes(
-            LinkedHashMap<String, String> originalFieldTypes, RowType.Builder 
rowTypeBuilder) {
-        originalFieldTypes.forEach(
-                (name, type) ->
-                        rowTypeBuilder.field(name, 
MySqlTypeUtils.toDataType(type, typeMapping)));
-    }
-
     @Override
     protected String primaryField() {
         return "pkNames";
@@ -164,13 +159,19 @@ public class CanalRecordParser extends RecordParser {
         Map<String, String> rowData = new HashMap<>();
 
         if (originalFieldTypes != null) {
-            toPaimonFieldTypes(originalFieldTypes, rowTypeBuilder);
-            for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
-                String fieldName = entry.getKey();
-                String originalType = originalFieldTypes.get(fieldName);
-                String newValue =
-                        transformValue(Objects.toString(entry.getValue(), 
null), originalType);
-                rowData.put(fieldName, newValue);
+            for (Map.Entry<String, String> e : originalFieldTypes.entrySet()) {
+                String originalName = e.getKey();
+                String originalType = e.getValue();
+                Tuple3<String, Integer, Integer> typeInfo =
+                        MySqlTypeUtils.getTypeInfo(originalType);
+                DataType paimonDataType =
+                        MySqlTypeUtils.toDataType(
+                                typeInfo.f0, typeInfo.f1, typeInfo.f2, 
typeMapping);
+                rowTypeBuilder.field(originalName, paimonDataType);
+
+                String filedValue = 
Objects.toString(recordMap.get(originalName), null);
+                String newValue = transformValue(filedValue, typeInfo.f0, 
originalType);
+                rowData.put(originalName, newValue);
             }
         } else {
             fillDefaultTypes(record, rowTypeBuilder);
@@ -194,13 +195,11 @@ public class CanalRecordParser extends RecordParser {
                 .collect(Collectors.toMap(newData::get, oldData::get));
     }
 
-    private String transformValue(@Nullable String oldValue, String mySqlType) 
{
+    private String transformValue(@Nullable String oldValue, String shortType, 
String mySqlType) {
         if (oldValue == null) {
             return null;
         }
 
-        String shortType = MySqlTypeUtils.getShortType(mySqlType);
-
         if (MySqlTypeUtils.isSetType(shortType)) {
             return CanalFieldParser.convertSet(oldValue, mySqlType);
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index 2bc916671..ed5a3afd4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -31,13 +31,12 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
 
 import com.esri.core.geometry.ogc.OGCGeometry;
+import org.apache.flink.api.java.tuple.Tuple3;
 
 import javax.annotation.Nullable;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -140,28 +139,39 @@ public class MySqlTypeUtils {
     private static final String RIGHT_BRACKETS = ")";
     private static final String COMMA = ",";
 
-    private static final List<String> HAVE_SCALE_LIST =
-            Arrays.asList(DECIMAL, NUMERIC, DOUBLE, REAL, FIXED);
-    private static final List<String> MAP_TO_DECIMAL_TYPES =
-            Arrays.asList(
-                    NUMERIC,
-                    NUMERIC_UNSIGNED,
-                    NUMERIC_UNSIGNED_ZEROFILL,
-                    FIXED,
-                    FIXED_UNSIGNED,
-                    FIXED_UNSIGNED_ZEROFILL,
-                    DECIMAL,
-                    DECIMAL_UNSIGNED,
-                    DECIMAL_UNSIGNED_ZEROFILL);
-
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
-    public static DataType toDataType(String mysqlFullType, TypeMapping 
typeMapping) {
-        return toDataType(
-                MySqlTypeUtils.getShortType(mysqlFullType),
-                MySqlTypeUtils.getPrecision(mysqlFullType),
-                MySqlTypeUtils.getScale(mysqlFullType),
-                typeMapping);
+    public static Tuple3<String, Integer, Integer> getTypeInfo(String 
typeName) {
+        int leftBracketIndex = typeName.indexOf(LEFT_BRACKETS);
+        String shortType;
+        int length = 0;
+        int scale = 0;
+        if (leftBracketIndex != -1) {
+            int rightBracketIndex = typeName.indexOf(RIGHT_BRACKETS);
+            shortType =
+                    typeName.substring(0, 
leftBracketIndex).trim().toUpperCase()
+                            + typeName.substring(rightBracketIndex + 
1).toUpperCase();
+
+            String insideBrackets =
+                    typeName.substring(leftBracketIndex + 1, 
rightBracketIndex).trim();
+            int commaIndex = insideBrackets.indexOf(COMMA);
+
+            if (commaIndex != -1 && !isEnumType(shortType) && 
!isSetType(shortType)) {
+                length = Integer.parseInt(insideBrackets.substring(0, 
commaIndex).trim());
+                scale = Integer.parseInt(insideBrackets.substring(commaIndex + 
1).trim());
+            } else if (!isEnumType(shortType) && !isSetType(shortType)) {
+                length = Integer.parseInt(insideBrackets);
+            }
+        } else {
+            shortType = typeName.toUpperCase();
+            if (isDecimalType(shortType)) {
+                // when missing precision and scale of the decimal, we
+                // use the max precision and scale to avoid parse error
+                length = 38;
+                scale = 18;
+            }
+        }
+        return Tuple3.of(shortType, length, scale);
     }
 
     public static DataType toDataType(
@@ -354,71 +364,28 @@ public class MySqlTypeUtils {
         return objectWriter.writeValueAsString(geometryInfo);
     }
 
-    public static boolean isScaleType(String typeName) {
-        return HAVE_SCALE_LIST.stream()
-                .anyMatch(type -> 
getShortType(typeName).toUpperCase().startsWith(type));
-    }
-
-    public static boolean isEnumType(String typeName) {
-        return typeName.toUpperCase().startsWith(ENUM);
-    }
-
-    public static boolean isSetType(String typeName) {
-        return typeName.toUpperCase().startsWith(SET);
+    public static boolean isEnumType(String shortType) {
+        return shortType.equals(ENUM);
     }
 
-    private static boolean isDecimalType(String typeName) {
-        return MAP_TO_DECIMAL_TYPES.stream()
-                .anyMatch(type -> 
getShortType(typeName).toUpperCase().startsWith(type));
+    public static boolean isSetType(String shortType) {
+        return shortType.equals(SET);
     }
 
-    /* Get type after the brackets are removed.*/
-    public static String getShortType(String typeName) {
-
-        if (typeName.contains(LEFT_BRACKETS) && 
typeName.contains(RIGHT_BRACKETS)) {
-            return typeName.substring(0, 
typeName.indexOf(LEFT_BRACKETS)).trim()
-                    + typeName.substring(typeName.indexOf(RIGHT_BRACKETS) + 1);
-        } else {
-            return typeName;
-        }
-    }
-
-    public static int getPrecision(String typeName) {
-        if (typeName.contains(LEFT_BRACKETS)
-                && typeName.contains(RIGHT_BRACKETS)
-                && isScaleType(typeName)) {
-            return Integer.parseInt(
-                    typeName.substring(typeName.indexOf(LEFT_BRACKETS) + 1, 
typeName.indexOf(COMMA))
-                            .trim());
-        } else if ((typeName.contains(LEFT_BRACKETS)
-                && typeName.contains(RIGHT_BRACKETS)
-                && !isScaleType(typeName)
-                && !isEnumType(typeName)
-                && !isSetType(typeName))) {
-            return Integer.parseInt(
-                    typeName.substring(
-                                    typeName.indexOf(LEFT_BRACKETS) + 1,
-                                    typeName.indexOf(RIGHT_BRACKETS))
-                            .trim());
-        } else {
-            // when missing precision of the decimal, we
-            // use the max precision to avoid parse error
-            return isDecimalType(typeName) ? 38 : 0;
-        }
-    }
-
-    public static int getScale(String typeName) {
-        if (typeName.contains(LEFT_BRACKETS)
-                && typeName.contains(RIGHT_BRACKETS)
-                && isScaleType(typeName)) {
-            return Integer.parseInt(
-                    typeName.substring(
-                                    typeName.indexOf(COMMA) + 1, 
typeName.indexOf(RIGHT_BRACKETS))
-                            .trim());
-        } else {
-            // When missing scale of the decimal, we
-            // use the max scale to avoid parse error
-            return isDecimalType(typeName) ? 18 : 0;
+    private static boolean isDecimalType(String shortType) {
+        switch (shortType) {
+            case NUMERIC:
+            case NUMERIC_UNSIGNED:
+            case NUMERIC_UNSIGNED_ZEROFILL:
+            case FIXED:
+            case FIXED_UNSIGNED:
+            case FIXED_UNSIGNED_ZEROFILL:
+            case DECIMAL:
+            case DECIMAL_UNSIGNED:
+            case DECIMAL_UNSIGNED_ZEROFILL:
+                return true;
+            default:
+                return false;
         }
     }
 

Reply via email to