This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 83fec2521 [flink] Improve EventParser initializes mysqlFiled every 
record (#1281)
83fec2521 is described below

commit 83fec2521416296267e61becbf41a60e339d61e0
Author: HZY <[email protected]>
AuthorDate: Mon Jun 12 16:35:46 2023 +0800

    [flink] Improve EventParser initializes mysqlFiled every record (#1281)
---
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 66 ++++++++++------------
 1 file changed, 30 insertions(+), 36 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index e814e9701..54240098c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -66,9 +66,8 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     private final TableNameConverter tableNameConverter;
     private final List<ComputedColumn> computedColumns;
 
+    private JsonNode root;
     private JsonNode payload;
-    private Map<String, String> mySqlFieldTypes;
-    private Map<String, String> fieldClassNames;
 
     public MySqlDebeziumJsonEventParser(
             ZoneId serverTimeZone, boolean caseSensitive, List<ComputedColumn> 
computedColumns) {
@@ -94,18 +93,8 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     @Override
     public void setRawEvent(String rawEvent) {
         try {
-            JsonNode root = objectMapper.readValue(rawEvent, JsonNode.class);
-            JsonNode schema =
-                    Preconditions.checkNotNull(
-                            root.get("schema"),
-                            "MySqlDebeziumJsonEventParser only supports 
debezium JSON with schema. "
-                                    + "Please make sure that `includeSchema` 
is true "
-                                    + "in the 
JsonDebeziumDeserializationSchema you created");
+            root = objectMapper.readValue(rawEvent, JsonNode.class);
             payload = root.get("payload");
-
-            if (!isSchemaChange()) {
-                updateFieldTypes(schema);
-            }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -117,29 +106,6 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
         return tableNameConverter.convert(tableName);
     }
 
-    private void updateFieldTypes(JsonNode schema) {
-        mySqlFieldTypes = new HashMap<>();
-        fieldClassNames = new HashMap<>();
-        JsonNode arrayNode = schema.get("fields");
-        for (int i = 0; i < arrayNode.size(); i++) {
-            JsonNode elementNode = arrayNode.get(i);
-            String field = elementNode.get("field").asText();
-            if ("before".equals(field) || "after".equals(field)) {
-                JsonNode innerArrayNode = elementNode.get("fields");
-                for (int j = 0; j < innerArrayNode.size(); j++) {
-                    JsonNode innerElementNode = innerArrayNode.get(j);
-                    String fieldName = innerElementNode.get("field").asText();
-                    String fieldType = innerElementNode.get("type").asText();
-                    mySqlFieldTypes.put(fieldName, fieldType);
-                    if (innerElementNode.get("name") != null) {
-                        String className = 
innerElementNode.get("name").asText();
-                        fieldClassNames.put(fieldName, className);
-                    }
-                }
-            }
-        }
-    }
-
     private boolean isSchemaChange() {
         return payload.get("op") == null;
     }
@@ -219,6 +185,34 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     }
 
     private Map<String, String> extractRow(JsonNode recordRow) {
+        JsonNode schema =
+                Preconditions.checkNotNull(
+                        root.get("schema"),
+                        "MySqlDebeziumJsonEventParser only supports debezium 
JSON with schema. "
+                                + "Please make sure that `includeSchema` is 
true "
+                                + "in the JsonDebeziumDeserializationSchema 
you created");
+
+        Map<String, String> mySqlFieldTypes = new HashMap<>();
+        Map<String, String> fieldClassNames = new HashMap<>();
+        JsonNode arrayNode = schema.get("fields");
+        for (int i = 0; i < arrayNode.size(); i++) {
+            JsonNode elementNode = arrayNode.get(i);
+            String field = elementNode.get("field").asText();
+            if ("before".equals(field) || "after".equals(field)) {
+                JsonNode innerArrayNode = elementNode.get("fields");
+                for (int j = 0; j < innerArrayNode.size(); j++) {
+                    JsonNode innerElementNode = innerArrayNode.get(j);
+                    String fieldName = innerElementNode.get("field").asText();
+                    String fieldType = innerElementNode.get("type").asText();
+                    mySqlFieldTypes.put(fieldName, fieldType);
+                    if (innerElementNode.get("name") != null) {
+                        String className = 
innerElementNode.get("name").asText();
+                        fieldClassNames.put(fieldName, className);
+                    }
+                }
+            }
+        }
+
         // the geometry, point type can not be converted to string, so we 
convert it to Object
         // first.
         Map<String, Object> jsonMap =

Reply via email to