This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fcf30bdec [cdc]Fix performance issue in CanalRecordParser (#3572)
fcf30bdec is described below
commit fcf30bdecb022f808462aa442aef91e0144b286a
Author: MOBIN <[email protected]>
AuthorDate: Mon Jun 24 17:03:24 2024 +0800
[cdc]Fix performance issue in CanalRecordParser (#3572)
---
.../action/cdc/format/canal/CanalRecordParser.java | 33 +++--
.../flink/action/cdc/mysql/MySqlTypeUtils.java | 133 ++++++++-------------
2 files changed, 66 insertions(+), 100 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
index 76671739a..aae4aab82 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
@@ -31,6 +32,7 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeRefe
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,13 +141,6 @@ public class CanalRecordParser extends RecordParser {
schema, new TypeReference<LinkedHashMap<String, String>>() {});
}
- private void toPaimonFieldTypes(
- LinkedHashMap<String, String> originalFieldTypes, RowType.Builder
rowTypeBuilder) {
- originalFieldTypes.forEach(
- (name, type) ->
- rowTypeBuilder.field(name,
MySqlTypeUtils.toDataType(type, typeMapping)));
- }
-
@Override
protected String primaryField() {
return "pkNames";
@@ -164,13 +159,19 @@ public class CanalRecordParser extends RecordParser {
Map<String, String> rowData = new HashMap<>();
if (originalFieldTypes != null) {
- toPaimonFieldTypes(originalFieldTypes, rowTypeBuilder);
- for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
- String fieldName = entry.getKey();
- String originalType = originalFieldTypes.get(fieldName);
- String newValue =
- transformValue(Objects.toString(entry.getValue(),
null), originalType);
- rowData.put(fieldName, newValue);
+ for (Map.Entry<String, String> e : originalFieldTypes.entrySet()) {
+ String originalName = e.getKey();
+ String originalType = e.getValue();
+ Tuple3<String, Integer, Integer> typeInfo =
+ MySqlTypeUtils.getTypeInfo(originalType);
+ DataType paimonDataType =
+ MySqlTypeUtils.toDataType(
+ typeInfo.f0, typeInfo.f1, typeInfo.f2,
typeMapping);
+ rowTypeBuilder.field(originalName, paimonDataType);
+
+ String filedValue =
Objects.toString(recordMap.get(originalName), null);
+ String newValue = transformValue(filedValue, typeInfo.f0,
originalType);
+ rowData.put(originalName, newValue);
}
} else {
fillDefaultTypes(record, rowTypeBuilder);
@@ -194,13 +195,11 @@ public class CanalRecordParser extends RecordParser {
.collect(Collectors.toMap(newData::get, oldData::get));
}
- private String transformValue(@Nullable String oldValue, String mySqlType)
{
+ private String transformValue(@Nullable String oldValue, String shortType,
String mySqlType) {
if (oldValue == null) {
return null;
}
- String shortType = MySqlTypeUtils.getShortType(mySqlType);
-
if (MySqlTypeUtils.isSetType(shortType)) {
return CanalFieldParser.convertSet(oldValue, mySqlType);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index 2bc916671..ed5a3afd4 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -31,13 +31,12 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMap
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
import com.esri.core.geometry.ogc.OGCGeometry;
+import org.apache.flink.api.java.tuple.Tuple3;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -140,28 +139,39 @@ public class MySqlTypeUtils {
private static final String RIGHT_BRACKETS = ")";
private static final String COMMA = ",";
- private static final List<String> HAVE_SCALE_LIST =
- Arrays.asList(DECIMAL, NUMERIC, DOUBLE, REAL, FIXED);
- private static final List<String> MAP_TO_DECIMAL_TYPES =
- Arrays.asList(
- NUMERIC,
- NUMERIC_UNSIGNED,
- NUMERIC_UNSIGNED_ZEROFILL,
- FIXED,
- FIXED_UNSIGNED,
- FIXED_UNSIGNED_ZEROFILL,
- DECIMAL,
- DECIMAL_UNSIGNED,
- DECIMAL_UNSIGNED_ZEROFILL);
-
private static final ObjectMapper objectMapper = new ObjectMapper();
- public static DataType toDataType(String mysqlFullType, TypeMapping
typeMapping) {
- return toDataType(
- MySqlTypeUtils.getShortType(mysqlFullType),
- MySqlTypeUtils.getPrecision(mysqlFullType),
- MySqlTypeUtils.getScale(mysqlFullType),
- typeMapping);
+ public static Tuple3<String, Integer, Integer> getTypeInfo(String
typeName) {
+ int leftBracketIndex = typeName.indexOf(LEFT_BRACKETS);
+ String shortType;
+ int length = 0;
+ int scale = 0;
+ if (leftBracketIndex != -1) {
+ int rightBracketIndex = typeName.indexOf(RIGHT_BRACKETS);
+ shortType =
+ typeName.substring(0,
leftBracketIndex).trim().toUpperCase()
+ + typeName.substring(rightBracketIndex +
1).toUpperCase();
+
+ String insideBrackets =
+ typeName.substring(leftBracketIndex + 1,
rightBracketIndex).trim();
+ int commaIndex = insideBrackets.indexOf(COMMA);
+
+ if (commaIndex != -1 && !isEnumType(shortType) &&
!isSetType(shortType)) {
+ length = Integer.parseInt(insideBrackets.substring(0,
commaIndex).trim());
+ scale = Integer.parseInt(insideBrackets.substring(commaIndex +
1).trim());
+ } else if (!isEnumType(shortType) && !isSetType(shortType)) {
+ length = Integer.parseInt(insideBrackets);
+ }
+ } else {
+ shortType = typeName.toUpperCase();
+ if (isDecimalType(shortType)) {
+ // when missing precision and scale of the decimal, we
+ // use the max precision and scale to avoid parse error
+ length = 38;
+ scale = 18;
+ }
+ }
+ return Tuple3.of(shortType, length, scale);
}
public static DataType toDataType(
@@ -354,71 +364,28 @@ public class MySqlTypeUtils {
return objectWriter.writeValueAsString(geometryInfo);
}
- public static boolean isScaleType(String typeName) {
- return HAVE_SCALE_LIST.stream()
- .anyMatch(type ->
getShortType(typeName).toUpperCase().startsWith(type));
- }
-
- public static boolean isEnumType(String typeName) {
- return typeName.toUpperCase().startsWith(ENUM);
- }
-
- public static boolean isSetType(String typeName) {
- return typeName.toUpperCase().startsWith(SET);
+ public static boolean isEnumType(String shortType) {
+ return shortType.equals(ENUM);
}
- private static boolean isDecimalType(String typeName) {
- return MAP_TO_DECIMAL_TYPES.stream()
- .anyMatch(type ->
getShortType(typeName).toUpperCase().startsWith(type));
+ public static boolean isSetType(String shortType) {
+ return shortType.equals(SET);
}
- /* Get type after the brackets are removed.*/
- public static String getShortType(String typeName) {
-
- if (typeName.contains(LEFT_BRACKETS) &&
typeName.contains(RIGHT_BRACKETS)) {
- return typeName.substring(0,
typeName.indexOf(LEFT_BRACKETS)).trim()
- + typeName.substring(typeName.indexOf(RIGHT_BRACKETS) + 1);
- } else {
- return typeName;
- }
- }
-
- public static int getPrecision(String typeName) {
- if (typeName.contains(LEFT_BRACKETS)
- && typeName.contains(RIGHT_BRACKETS)
- && isScaleType(typeName)) {
- return Integer.parseInt(
- typeName.substring(typeName.indexOf(LEFT_BRACKETS) + 1,
typeName.indexOf(COMMA))
- .trim());
- } else if ((typeName.contains(LEFT_BRACKETS)
- && typeName.contains(RIGHT_BRACKETS)
- && !isScaleType(typeName)
- && !isEnumType(typeName)
- && !isSetType(typeName))) {
- return Integer.parseInt(
- typeName.substring(
- typeName.indexOf(LEFT_BRACKETS) + 1,
- typeName.indexOf(RIGHT_BRACKETS))
- .trim());
- } else {
- // when missing precision of the decimal, we
- // use the max precision to avoid parse error
- return isDecimalType(typeName) ? 38 : 0;
- }
- }
-
- public static int getScale(String typeName) {
- if (typeName.contains(LEFT_BRACKETS)
- && typeName.contains(RIGHT_BRACKETS)
- && isScaleType(typeName)) {
- return Integer.parseInt(
- typeName.substring(
- typeName.indexOf(COMMA) + 1,
typeName.indexOf(RIGHT_BRACKETS))
- .trim());
- } else {
- // When missing scale of the decimal, we
- // use the max scale to avoid parse error
- return isDecimalType(typeName) ? 18 : 0;
+ private static boolean isDecimalType(String shortType) {
+ switch (shortType) {
+ case NUMERIC:
+ case NUMERIC_UNSIGNED:
+ case NUMERIC_UNSIGNED_ZEROFILL:
+ case FIXED:
+ case FIXED_UNSIGNED:
+ case FIXED_UNSIGNED_ZEROFILL:
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ case DECIMAL_UNSIGNED_ZEROFILL:
+ return true;
+ default:
+ return false;
}
}