This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 83fec2521 [flink] Improve EventParser initializes mysqlFiled every
record (#1281)
83fec2521 is described below
commit 83fec2521416296267e61becbf41a60e339d61e0
Author: HZY <[email protected]>
AuthorDate: Mon Jun 12 16:35:46 2023 +0800
[flink] Improve EventParser initializes mysqlFiled every record (#1281)
---
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 66 ++++++++++------------
1 file changed, 30 insertions(+), 36 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index e814e9701..54240098c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -66,9 +66,8 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
private final TableNameConverter tableNameConverter;
private final List<ComputedColumn> computedColumns;
+ private JsonNode root;
private JsonNode payload;
- private Map<String, String> mySqlFieldTypes;
- private Map<String, String> fieldClassNames;
public MySqlDebeziumJsonEventParser(
ZoneId serverTimeZone, boolean caseSensitive, List<ComputedColumn>
computedColumns) {
@@ -94,18 +93,8 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
@Override
public void setRawEvent(String rawEvent) {
try {
- JsonNode root = objectMapper.readValue(rawEvent, JsonNode.class);
- JsonNode schema =
- Preconditions.checkNotNull(
- root.get("schema"),
- "MySqlDebeziumJsonEventParser only supports
debezium JSON with schema. "
- + "Please make sure that `includeSchema`
is true "
- + "in the
JsonDebeziumDeserializationSchema you created");
+ root = objectMapper.readValue(rawEvent, JsonNode.class);
payload = root.get("payload");
-
- if (!isSchemaChange()) {
- updateFieldTypes(schema);
- }
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -117,29 +106,6 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
return tableNameConverter.convert(tableName);
}
- private void updateFieldTypes(JsonNode schema) {
- mySqlFieldTypes = new HashMap<>();
- fieldClassNames = new HashMap<>();
- JsonNode arrayNode = schema.get("fields");
- for (int i = 0; i < arrayNode.size(); i++) {
- JsonNode elementNode = arrayNode.get(i);
- String field = elementNode.get("field").asText();
- if ("before".equals(field) || "after".equals(field)) {
- JsonNode innerArrayNode = elementNode.get("fields");
- for (int j = 0; j < innerArrayNode.size(); j++) {
- JsonNode innerElementNode = innerArrayNode.get(j);
- String fieldName = innerElementNode.get("field").asText();
- String fieldType = innerElementNode.get("type").asText();
- mySqlFieldTypes.put(fieldName, fieldType);
- if (innerElementNode.get("name") != null) {
- String className =
innerElementNode.get("name").asText();
- fieldClassNames.put(fieldName, className);
- }
- }
- }
- }
- }
-
private boolean isSchemaChange() {
return payload.get("op") == null;
}
@@ -219,6 +185,34 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
}
private Map<String, String> extractRow(JsonNode recordRow) {
+ JsonNode schema =
+ Preconditions.checkNotNull(
+ root.get("schema"),
+ "MySqlDebeziumJsonEventParser only supports debezium
JSON with schema. "
+ + "Please make sure that `includeSchema` is
true "
+ + "in the JsonDebeziumDeserializationSchema
you created");
+
+ Map<String, String> mySqlFieldTypes = new HashMap<>();
+ Map<String, String> fieldClassNames = new HashMap<>();
+ JsonNode arrayNode = schema.get("fields");
+ for (int i = 0; i < arrayNode.size(); i++) {
+ JsonNode elementNode = arrayNode.get(i);
+ String field = elementNode.get("field").asText();
+ if ("before".equals(field) || "after".equals(field)) {
+ JsonNode innerArrayNode = elementNode.get("fields");
+ for (int j = 0; j < innerArrayNode.size(); j++) {
+ JsonNode innerElementNode = innerArrayNode.get(j);
+ String fieldName = innerElementNode.get("field").asText();
+ String fieldType = innerElementNode.get("type").asText();
+ mySqlFieldTypes.put(fieldName, fieldType);
+ if (innerElementNode.get("name") != null) {
+ String className =
innerElementNode.get("name").asText();
+ fieldClassNames.put(fieldName, className);
+ }
+ }
+ }
+ }
+
// the geometry, point type can not be converted to string, so we
convert it to Object
// first.
Map<String, Object> jsonMap =