This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 970cae5f6 [cdc] Fix flink cdc source record watermark timestamp 
extractor. (#3640)
970cae5f6 is described below

commit 970cae5f6577db70ccd299ad713985c3dc8a1487
Author: HunterXHunter <[email protected]>
AuthorDate: Tue Jul 16 09:44:27 2024 +0800

    [cdc] Fix flink cdc source record watermark timestamp extractor. (#3640)
---
 .../org/apache/paimon/utils/JsonSerdeUtil.java     | 12 +++-
 .../watermark/CdcTimestampExtractorFactory.java    | 28 ++++++--
 .../action/cdc/watermark/CdcWatermarkStrategy.java |  8 ++-
 .../cdc/CdcDebeziumTimestampExtractorITCase.java   | 79 ++++++++++++++++++++++
 4 files changed, 119 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index 43d16c04c..a919d83c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -219,17 +219,27 @@ public class JsonSerdeUtil {
     }
 
     /** Parses a JSON string and extracts a value of the specified type from 
the given path keys. */
-    public static <T> T extractValue(JsonNode jsonNode, Class<T> valueType, 
String... path)
+    public static <T> T extractValueOrDefault(
+            JsonNode jsonNode, Class<T> valueType, T defaultValue, String... 
path)
             throws JsonProcessingException {
         for (String key : path) {
             jsonNode = jsonNode.get(key);
             if (jsonNode == null) {
+                if (defaultValue != null) {
+                    return defaultValue;
+                }
                 throw new IllegalArgumentException("Invalid path or key not 
found: " + key);
             }
         }
         return OBJECT_MAPPER_INSTANCE.treeToValue(jsonNode, valueType);
     }
 
+    /** Parses a JSON string and extracts a value of the specified type from 
the given path keys. */
+    public static <T> T extractValue(JsonNode jsonNode, Class<T> valueType, 
String... path)
+            throws JsonProcessingException {
+        return extractValueOrDefault(jsonNode, valueType, null, path);
+    }
+
     /** Checks if a specified node exists in a JSON string. */
     public static boolean isNodeExists(JsonNode jsonNode, String... path) {
         for (String key : path) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
index 2099287a6..e59d124a7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
@@ -22,7 +22,9 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
 import com.ververica.cdc.connectors.mysql.source.MySqlSource;
@@ -62,13 +64,15 @@ public class CdcTimestampExtractorFactory implements 
Serializable {
     }
 
     /** Timestamp extractor for MongoDB sources in CDC applications. */
-    public static class MongoDBCdcTimestampExtractor implements 
CdcTimestampExtractor {
+    public static class MongoDBCdcTimestampExtractor extends 
CdcDebeziumTimestampExtractor {
 
         private static final long serialVersionUID = 1L;
 
         @Override
         public long extractTimestamp(CdcSourceRecord record) throws 
JsonProcessingException {
-            return JsonSerdeUtil.extractValue((JsonNode) record.getValue(), 
Long.class, "ts_ms");
+            JsonNode json = JsonSerdeUtil.fromJson((String) record.getValue(), 
JsonNode.class);
+            // If the record is a schema-change event return Long.MIN_VALUE as 
result.
+            return JsonSerdeUtil.extractValueOrDefault(json, Long.class, 
Long.MIN_VALUE, "ts_ms");
         }
     }
 
@@ -109,12 +113,26 @@ public class CdcTimestampExtractorFactory implements 
Serializable {
     }
 
     /** Timestamp extractor for MySQL sources in CDC applications. */
-    public static class MysqlCdcTimestampExtractor implements 
CdcTimestampExtractor {
+    public static class MysqlCdcTimestampExtractor extends 
CdcDebeziumTimestampExtractor {
 
         @Override
         public long extractTimestamp(CdcSourceRecord record) throws 
JsonProcessingException {
-            return JsonSerdeUtil.extractValue(
-                    (JsonNode) record.getValue(), Long.class, "payload", 
"ts_ms");
+            JsonNode json = JsonSerdeUtil.fromJson((String) record.getValue(), 
JsonNode.class);
+
+            return JsonSerdeUtil.extractValueOrDefault(
+                    json, Long.class, Long.MIN_VALUE, "payload", "ts_ms");
+        }
+    }
+
+    /** Timestamp extractor for Cdc debezium deserialization. */
+    public abstract static class CdcDebeziumTimestampExtractor implements 
CdcTimestampExtractor {
+
+        protected final ObjectMapper objectMapper = new ObjectMapper();
+
+        public CdcDebeziumTimestampExtractor() {
+            objectMapper
+                    
.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
+                    
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
index 41816b0ce..964699a47 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
@@ -56,8 +56,12 @@ public class CdcWatermarkStrategy implements 
WatermarkStrategy<CdcSourceRecord>
                 } catch (JsonProcessingException e) {
                     throw new RuntimeException(e);
                 }
-                currentMaxTimestamp = Math.max(currentMaxTimestamp, tMs);
-                output.emitWatermark(new Watermark(currentMaxTimestamp - 1));
+                // If the record is a schema-change event ts_ms would be null, 
just ignore the
+                // record.
+                if (tMs != Long.MIN_VALUE) {
+                    currentMaxTimestamp = Math.max(currentMaxTimestamp, tMs);
+                    output.emitWatermark(new Watermark(currentMaxTimestamp - 
1));
+                }
             }
 
             @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcDebeziumTimestampExtractorITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcDebeziumTimestampExtractorITCase.java
new file mode 100644
index 000000000..ea5bcfe12
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcDebeziumTimestampExtractorITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.flink.action.cdc.mysql.DebeziumEventTest;
+import 
org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.URL;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link 
CdcTimestampExtractorFactory.CdcDebeziumTimestampExtractor}. */
+public class CdcDebeziumTimestampExtractorITCase {
+
+    private ObjectMapper objectMapper;
+
+    @BeforeEach
+    public void before() {
+        objectMapper = new ObjectMapper();
+        objectMapper
+                .configure(JsonParser.Feature.ALLOW_COMMENTS, true)
+                .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, 
true)
+                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+    }
+
+    @Test
+    public void testMysqlCdcTimestampExtractor() throws Exception {
+        CdcTimestampExtractorFactory.MysqlCdcTimestampExtractor extractor =
+                new CdcTimestampExtractorFactory.MysqlCdcTimestampExtractor();
+
+        JsonNode data = objectMapper.readValue("{\"payload\" : {\"ts_ms\": 
1}}", JsonNode.class);
+        CdcSourceRecord record = new CdcSourceRecord(data.toString());
+        assertThat(extractor.extractTimestamp(record)).isEqualTo(1L);
+
+        // If the record is a schema-change event `ts_ms` would be null, just 
ignore the record.
+        final URL url =
+                DebeziumEventTest.class
+                        .getClassLoader()
+                        .getResource("mysql/debezium-event-change.json");
+        assertThat(url).isNotNull();
+        JsonNode schemaChangeEvent = objectMapper.readValue(url, 
JsonNode.class);
+        record = new CdcSourceRecord(schemaChangeEvent.toString());
+        
assertThat(extractor.extractTimestamp(record)).isEqualTo(Long.MIN_VALUE);
+    }
+
+    @Test
+    public void testMongodbCdcTimestampExtractor() throws Exception {
+        CdcTimestampExtractorFactory.MongoDBCdcTimestampExtractor extractor =
+                new 
CdcTimestampExtractorFactory.MongoDBCdcTimestampExtractor();
+
+        JsonNode data = objectMapper.readValue("{\"ts_ms\": 1}", 
JsonNode.class);
+        CdcSourceRecord record = new CdcSourceRecord(data.toString());
+        assertThat(extractor.extractTimestamp(record)).isEqualTo(1L);
+    }
+}

Reply via email to