This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 820eca53d [cdc] Use CdcSourceRecord as source type instead of raw 
String (#2783)
820eca53d is described below

commit 820eca53d51b3cf4c4f32478b54f35528fc1681d
Author: Harvey Yue <[email protected]>
AuthorDate: Fri Apr 26 16:37:02 2024 +0800

    [cdc] Use CdcSourceRecord as source type instead of raw String (#2783)
    
    Co-authored-by: zhuangchong <[email protected]>
---
 .../org/apache/paimon/utils/JsonSerdeUtil.java     | 16 ++--
 paimon-e2e-tests/pom.xml                           |  2 +-
 paimon-flink/paimon-flink-cdc/pom.xml              |  4 +
 .../paimon/flink/action/cdc/CdcSourceRecord.java   | 83 ++++++++++++++++++
 .../flink/action/cdc/MessageQueueSchemaUtils.java  |  2 +-
 .../flink/action/cdc/SyncDatabaseActionBase.java   |  2 +-
 .../paimon/flink/action/cdc/SyncJobHandler.java    |  4 +-
 .../flink/action/cdc/SyncTableActionBase.java      |  2 +-
 .../action/cdc/SynchronizationActionBase.java      | 10 +--
 .../flink/action/cdc/format/RecordParser.java      | 28 ++++---
 .../cdc/format/debezium/DebeziumRecordParser.java  |  5 +-
 .../flink/action/cdc/kafka/KafkaActionUtils.java   | 48 +++++++----
 .../action/cdc/mongodb/MongoDBActionUtils.java     | 12 +--
 .../action/cdc/mongodb/MongoDBRecordParser.java    | 11 ++-
 .../cdc/mongodb/MongoDBSyncDatabaseAction.java     |  3 +-
 .../action/cdc/mongodb/MongoDBSyncTableAction.java |  3 +-
 .../flink/action/cdc/mysql/MySqlActionUtils.java   | 11 +--
 .../flink/action/cdc/mysql/MySqlRecordParser.java  |  8 +-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  3 +-
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  3 +-
 .../action/cdc/postgres/PostgresActionUtils.java   | 11 +--
 .../action/cdc/postgres/PostgresRecordParser.java  |  9 +-
 .../cdc/postgres/PostgresSyncTableAction.java      |  3 +-
 .../flink/action/cdc/pulsar/PulsarActionUtils.java | 31 ++++---
 .../CdcDebeziumDeserializationSchema.java          | 97 ++++++++++++++++++++++
 .../CdcJsonDeserializationSchema.java              | 74 +++++++++++++++++
 .../watermark/CdcTimestampExtractorFactory.java    | 17 ++--
 .../action/cdc/watermark/CdcWatermarkStrategy.java |  9 +-
 .../cdc/kafka/KafkaSyncTableActionITCase.java      |  6 +-
 29 files changed, 407 insertions(+), 110 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index aa493dd4a..43d16c04c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -219,24 +219,22 @@ public class JsonSerdeUtil {
     }
 
     /** Parses a JSON string and extracts a value of the specified type from 
the given path keys. */
-    public static <T> T extractValue(String json, Class<T> valueType, 
String... path)
+    public static <T> T extractValue(JsonNode jsonNode, Class<T> valueType, 
String... path)
             throws JsonProcessingException {
-        JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
         for (String key : path) {
-            currentNode = currentNode.get(key);
-            if (currentNode == null) {
+            jsonNode = jsonNode.get(key);
+            if (jsonNode == null) {
                 throw new IllegalArgumentException("Invalid path or key not 
found: " + key);
             }
         }
-        return OBJECT_MAPPER_INSTANCE.treeToValue(currentNode, valueType);
+        return OBJECT_MAPPER_INSTANCE.treeToValue(jsonNode, valueType);
     }
 
     /** Checks if a specified node exists in a JSON string. */
-    public static boolean isNodeExists(String json, String... path) throws 
JsonProcessingException {
-        JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
+    public static boolean isNodeExists(JsonNode jsonNode, String... path) {
         for (String key : path) {
-            currentNode = currentNode.get(key);
-            if (currentNode == null) {
+            jsonNode = jsonNode.get(key);
+            if (jsonNode == null) {
                 return false;
             }
         }
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index fdcdbda7a..862b39361 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -33,7 +33,7 @@ under the License.
 
     <properties>
         <flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
-        <flink.cdc.version>2.3.0</flink.cdc.version>
+        <flink.cdc.version>2.4.2</flink.cdc.version>
         
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
     </properties>
 
diff --git a/paimon-flink/paimon-flink-cdc/pom.xml 
b/paimon-flink/paimon-flink-cdc/pom.xml
index 1b18833a4..868eac71f 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -280,6 +280,10 @@ under the License.
                         <configuration>
                             <relocations>
                                 <!-- Same as flink-sql-connector-kafka. -->
+                                <relocation>
+                                    <pattern>org.apache.kafka.connect</pattern>
+                                    
<shadedPattern>com.ververica.cdc.connectors.shaded.org.apache.kafka.connect</shadedPattern>
+                                </relocation>
                                 <relocation>
                                     <pattern>org.apache.kafka</pattern>
                                     
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
new file mode 100644
index 000000000..51a14534c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A data change record from the CDC source. */
+public class CdcSourceRecord implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @Nullable private final String topic;
+
+    @Nullable private final Object key;
+
+    // TODO Use generics to support more scenarios.
+    private final Object value;
+
+    public CdcSourceRecord(@Nullable String topic, @Nullable Object key, 
Object value) {
+        this.topic = topic;
+        this.key = key;
+        this.value = value;
+    }
+
+    public CdcSourceRecord(Object value) {
+        this(null, null, value);
+    }
+
+    @Nullable
+    public String getTopic() {
+        return topic;
+    }
+
+    @Nullable
+    public Object getKey() {
+        return key;
+    }
+
+    public Object getValue() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof CdcSourceRecord)) {
+            return false;
+        }
+
+        CdcSourceRecord that = (CdcSourceRecord) o;
+        return Objects.equals(topic, that.topic)
+                && Objects.equals(key, that.key)
+                && Objects.equals(value, that.value);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topic, key, value);
+    }
+
+    @Override
+    public String toString() {
+        return topic + ": " + key + " " + value;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
index 590f63911..73e63bcd0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
@@ -89,7 +89,7 @@ public class MessageQueueSchemaUtils {
     /** Wrap the consumer for different message queues. */
     public interface ConsumerWrapper extends AutoCloseable {
 
-        List<String> getRecords(int pollTimeOutMills);
+        List<CdcSourceRecord> getRecords(int pollTimeOutMills);
 
         String topic();
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index ecfca9a17..4fa6ca76c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -110,7 +110,7 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
     }
 
     @Override
-    protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
+    protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> 
recordParse() {
         return syncJobHandler.provideRecordParser(
                 caseSensitive, Collections.emptyList(), typeMapping, 
metadataConverters);
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
index 255e1ca7e..c674e560b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
@@ -180,7 +180,7 @@ public class SyncJobHandler {
         }
     }
 
-    public Source<String, ?, ?> provideSource() {
+    public Source<CdcSourceRecord, ?, ?> provideSource() {
         switch (sourceType) {
             case KAFKA:
                 return KafkaActionUtils.buildKafkaSource(cdcSourceConfig);
@@ -192,7 +192,7 @@ public class SyncJobHandler {
         }
     }
 
-    public FlatMapFunction<String, RichCdcMultiplexRecord> provideRecordParser(
+    public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> 
provideRecordParser(
             boolean caseSensitive,
             List<ComputedColumn> computedColumns,
             TypeMapping typeMapping,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index ff271ad59..f4e5bfe6a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -155,7 +155,7 @@ public abstract class SyncTableActionBase extends 
SynchronizationActionBase {
     }
 
     @Override
-    protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
+    protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> 
recordParse() {
         return syncJobHandler.provideRecordParser(
                 caseSensitive, computedColumns, typeMapping, 
metadataConverters);
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 5c04d5707..944185347 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -132,7 +132,7 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
         return syncJobHandler.provideSource();
     }
 
-    private DataStreamSource<String> buildDataStreamSource(Object source) {
+    private DataStreamSource<CdcSourceRecord> buildDataStreamSource(Object 
source) {
         if (source instanceof Source) {
             boolean isAutomaticWatermarkCreationEnabled =
                     
tableConfig.containsKey(CoreOptions.TAG_AUTOMATIC_CREATION.key())
@@ -143,7 +143,7 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
             Options options = Options.fromMap(tableConfig);
             Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
             String watermarkAlignGroup = 
options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
-            WatermarkStrategy<String> watermarkStrategy =
+            WatermarkStrategy<CdcSourceRecord> watermarkStrategy =
                     isAutomaticWatermarkCreationEnabled
                             ? watermarkAlignGroup != null
                                     ? new 
CdcWatermarkStrategy(createExtractor(source))
@@ -158,18 +158,18 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
                 watermarkStrategy = 
watermarkStrategy.withIdleness(idleTimeout);
             }
             return env.fromSource(
-                    (Source<String, ?, ?>) source,
+                    (Source<CdcSourceRecord, ?, ?>) source,
                     watermarkStrategy,
                     syncJobHandler.provideSourceName());
         }
         if (source instanceof SourceFunction) {
             return env.addSource(
-                    (SourceFunction<String>) source, 
syncJobHandler.provideSourceName());
+                    (SourceFunction<CdcSourceRecord>) source, 
syncJobHandler.provideSourceName());
         }
         throw new UnsupportedOperationException("Unrecognized source type");
     }
 
-    protected abstract FlatMapFunction<String, RichCdcMultiplexRecord> 
recordParse();
+    protected abstract FlatMapFunction<CdcSourceRecord, 
RichCdcMultiplexRecord> recordParse();
 
     protected abstract EventParser.Factory<RichCdcMultiplexRecord> 
buildEventParserFactory();
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
index 4dcbb2a81..f4543759a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.format;
 
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
@@ -26,7 +27,6 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.TypeUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -55,8 +55,10 @@ import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDupl
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
+import static org.apache.paimon.utils.JsonSerdeUtil.convertValue;
 import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs;
 import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
+import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString;
 
 /**
  * Provides a base implementation for parsing messages of various formats into 
{@link
@@ -66,7 +68,8 @@ import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
  * Subclasses are expected to provide specific implementations for extracting 
records, validating
  * message formats, and other format-specific operations.
  */
-public abstract class RecordParser implements FlatMapFunction<String, 
RichCdcMultiplexRecord> {
+public abstract class RecordParser
+        implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RecordParser.class);
 
@@ -86,7 +89,7 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
     }
 
     @Nullable
-    public Schema buildSchema(String record) {
+    public Schema buildSchema(CdcSourceRecord record) {
         try {
             setRoot(record);
             if (isDDL()) {
@@ -103,7 +106,7 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
             builder.primaryKey(extractPrimaryKeys());
             return builder.build();
         } catch (Exception e) {
-            logInvalidJsonString(record);
+            logInvalidSourceRecord(record);
             throw e;
         }
     }
@@ -126,12 +129,12 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
     }
 
     @Override
-    public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) {
+    public void flatMap(CdcSourceRecord value, 
Collector<RichCdcMultiplexRecord> out) {
         try {
             setRoot(value);
             extractRecords().forEach(out::collect);
         } catch (Exception e) {
-            logInvalidJsonString(value);
+            logInvalidSourceRecord(value);
             throw e;
         }
     }
@@ -140,7 +143,7 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
             JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) 
{
         paimonFieldTypes.putAll(fillDefaultTypes(record));
         Map<String, Object> recordMap =
-                JsonSerdeUtil.convertValue(record, new 
TypeReference<Map<String, Object>>() {});
+                convertValue(record, new TypeReference<Map<String, Object>>() 
{});
         Map<String, String> rowData =
                 recordMap.entrySet().stream()
                         .filter(entry -> Objects.nonNull(entry.getKey()))
@@ -151,8 +154,7 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
                                             if 
(Objects.nonNull(entry.getValue())
                                                     && 
!TypeUtils.isBasicType(entry.getValue())) {
                                                 try {
-                                                    return 
JsonSerdeUtil.writeValueAsString(
-                                                            entry.getValue());
+                                                    return 
writeValueAsString(entry.getValue());
                                                 } catch 
(JsonProcessingException e) {
                                                     LOG.error("Failed to 
deserialize record.", e);
                                                     return 
Objects.toString(entry.getValue());
@@ -217,8 +219,8 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
                 new CdcRecord(rowKind, data));
     }
 
-    protected void setRoot(String record) {
-        root = JsonSerdeUtil.fromJson(record, JsonNode.class);
+    protected void setRoot(CdcSourceRecord record) {
+        root = (JsonNode) record.getValue();
     }
 
     protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
@@ -243,8 +245,8 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
         return isNull(node) ? null : node.asText();
     }
 
-    private void logInvalidJsonString(String json) {
-        LOG.info("Invalid Json:\n{}", json);
+    private void logInvalidSourceRecord(CdcSourceRecord record) {
+        LOG.error("Invalid source record:\n{}", record.toString());
     }
 
     protected void checkNotNull(JsonNode node, String key) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
index 96f1bc074..471856462 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.format.debezium;
 
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.RecordParser;
@@ -119,8 +120,8 @@ public class DebeziumRecordParser extends RecordParser {
     }
 
     @Override
-    protected void setRoot(String record) {
-        JsonNode node = JsonSerdeUtil.fromJson(record, JsonNode.class);
+    protected void setRoot(CdcSourceRecord record) {
+        JsonNode node = (JsonNode) record.getValue();
 
         hasSchema = false;
         if (node.has(FIELD_SCHEMA)) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 64a543beb..4f0be0ef2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -18,11 +18,12 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
 import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import 
org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;
 import org.apache.paimon.utils.StringUtils;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
@@ -34,14 +35,14 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
@@ -69,8 +70,8 @@ public class KafkaActionUtils {
     private static final String PARTITION = "partition";
     private static final String OFFSET = "offset";
 
-    public static KafkaSource<String> buildKafkaSource(Configuration 
kafkaConfig) {
-        KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
+    public static KafkaSource<CdcSourceRecord> buildKafkaSource(Configuration 
kafkaConfig) {
+        KafkaSourceBuilder<CdcSourceRecord> kafkaSourceBuilder = 
KafkaSource.builder();
 
         if (kafkaConfig.contains(KafkaConnectorOptions.TOPIC)) {
             List<String> topics =
@@ -83,11 +84,9 @@ public class KafkaActionUtils {
                     
Pattern.compile(kafkaConfig.get(KafkaConnectorOptions.TOPIC_PATTERN)));
         }
 
-        KafkaValueOnlyDeserializationSchemaWrapper<String> schema =
-                new KafkaValueOnlyDeserializationSchemaWrapper<>(new 
SimpleStringSchema());
-        kafkaSourceBuilder.setDeserializer(schema);
-
-        kafkaSourceBuilder.setGroupId(kafkaPropertiesGroupId(kafkaConfig));
+        kafkaSourceBuilder
+                .setValueOnlyDeserializer(new CdcJsonDeserializationSchema())
+                .setGroupId(kafkaPropertiesGroupId(kafkaConfig));
 
         Properties properties = createKafkaProperties(kafkaConfig);
 
@@ -252,13 +251,17 @@ public class KafkaActionUtils {
                 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                 
kafkaConfig.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS));
         props.put(ConsumerConfig.GROUP_ID_CONFIG, 
kafkaPropertiesGroupId(kafkaConfig));
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         props.put(
-                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        props.put(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
-        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
 
         String topic;
         if (kafkaConfig.contains(KafkaConnectorOptions.TOPIC)) {
@@ -310,20 +313,29 @@ public class KafkaActionUtils {
 
     private static class KafkaConsumerWrapper implements 
MessageQueueSchemaUtils.ConsumerWrapper {
 
-        private final KafkaConsumer<String, String> consumer;
+        private final KafkaConsumer<byte[], byte[]> consumer;
         private final String topic;
 
-        KafkaConsumerWrapper(KafkaConsumer<String, String> kafkaConsumer, 
String topic) {
+        KafkaConsumerWrapper(KafkaConsumer<byte[], byte[]> kafkaConsumer, 
String topic) {
             this.consumer = kafkaConsumer;
             this.topic = topic;
         }
 
         @Override
-        public List<String> getRecords(int pollTimeOutMills) {
-            ConsumerRecords<String, String> consumerRecords =
+        public List<CdcSourceRecord> getRecords(int pollTimeOutMills) {
+            ConsumerRecords<byte[], byte[]> consumerRecords =
                     consumer.poll(Duration.ofMillis(pollTimeOutMills));
+            CdcJsonDeserializationSchema deserializationSchema = new 
CdcJsonDeserializationSchema();
             return 
StreamSupport.stream(consumerRecords.records(topic).spliterator(), false)
-                    .map(ConsumerRecord::value)
+                    .map(
+                            consumerRecord -> {
+                                try {
+                                    return deserializationSchema.deserialize(
+                                            consumerRecord.value());
+                                } catch (IOException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            })
                     .collect(Collectors.toList());
         }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
index c1da23b0b..65d22ba33 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -18,12 +18,14 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import 
org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;
+
 import com.ververica.cdc.connectors.base.options.SourceOptions;
 import com.ververica.cdc.connectors.base.options.StartupOptions;
 import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
 import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
 import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -83,9 +85,9 @@ public class MongoDBActionUtils {
                     .withDescription(
                             "Determines whether to use the default MongoDB _id 
generation strategy. If set to true, the default _id generation will remove the 
outer $oid nesting. If set to false, no additional processing will be done on 
the _id field.");
 
-    public static MongoDBSource<String> buildMongodbSource(
+    public static MongoDBSource<CdcSourceRecord> buildMongodbSource(
             Configuration mongodbConfig, String tableList) {
-        MongoDBSourceBuilder<String> sourceBuilder = MongoDBSource.builder();
+        MongoDBSourceBuilder<CdcSourceRecord> sourceBuilder = 
MongoDBSource.builder();
 
         if (mongodbConfig.contains(MongoDBSourceOptions.USERNAME)
                 && mongodbConfig.contains(MongoDBSourceOptions.PASSWORD)) {
@@ -132,8 +134,8 @@ public class MongoDBActionUtils {
 
         Map<String, Object> customConverterConfigs = new HashMap<>();
         customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
-        JsonDebeziumDeserializationSchema schema =
-                new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+        CdcDebeziumDeserializationSchema schema =
+                new CdcDebeziumDeserializationSchema(false, 
customConverterConfigs);
 
         return sourceBuilder.deserializer(schema).build();
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
index 1307c070a..9264e409f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import 
org.apache.paimon.flink.action.cdc.mongodb.strategy.Mongo4VersionStrategy;
 import 
org.apache.paimon.flink.action.cdc.mongodb.strategy.MongoVersionStrategy;
@@ -33,7 +34,7 @@ import org.apache.flink.util.Collector;
 import java.util.List;
 
 /**
- * A parser for MongoDB Debezium JSON strings, converting them into a list of 
{@link
+ * A parser for MongoDB Debezium JSON records, converting them into a list of 
{@link
  * RichCdcMultiplexRecord}s.
  *
  * <p>This parser is designed to process and transform incoming MongoDB 
Debezium JSON records into a
@@ -51,7 +52,8 @@ import java.util.List;
  * <p>Note: This parser is primarily intended for use in Flink streaming 
applications that process
  * MongoDB CDC data.
  */
-public class MongoDBRecordParser implements FlatMapFunction<String, 
RichCdcMultiplexRecord> {
+public class MongoDBRecordParser
+        implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {
 
     private static final String FIELD_DATABASE = "db";
     private static final String FIELD_TABLE = "coll";
@@ -72,8 +74,9 @@ public class MongoDBRecordParser implements 
FlatMapFunction<String, RichCdcMulti
     }
 
     @Override
-    public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) 
throws Exception {
-        root = OBJECT_MAPPER.readValue(value, JsonNode.class);
+    public void flatMap(CdcSourceRecord value, 
Collector<RichCdcMultiplexRecord> out)
+            throws Exception {
+        root = OBJECT_MAPPER.readValue((String) value.getValue(), 
JsonNode.class);
         String databaseName = extractString(FIELD_DATABASE);
         String collection = extractString(FIELD_TABLE);
         MongoVersionStrategy versionStrategy =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 3fcfccd07..7881aa58b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action.cdc.mongodb;
 
 import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
 import org.apache.paimon.flink.action.cdc.SyncJobHandler;
 
@@ -58,7 +59,7 @@ public class MongoDBSyncDatabaseAction extends 
SyncDatabaseActionBase {
     }
 
     @Override
-    protected MongoDBSource<String> buildSource() {
+    protected MongoDBSource<CdcSourceRecord> buildSource() {
         return MongoDBActionUtils.buildMongodbSource(
                 cdcSourceConfig,
                 CdcActionCommonUtils.combinedModeTableList(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 32596863a..0c5b07886 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.SyncJobHandler;
 import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
 import org.apache.paimon.schema.Schema;
@@ -69,7 +70,7 @@ public class MongoDBSyncTableAction extends 
SyncTableActionBase {
     }
 
     @Override
-    protected MongoDBSource<String> buildSource() {
+    protected MongoDBSource<CdcSourceRecord> buildSource() {
         String tableList =
                 cdcSourceConfig.get(MongoDBSourceOptions.DATABASE)
                         + "\\."
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 6269fb291..cc9644b54 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -19,9 +19,11 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.schema.JdbcSchemaUtils;
 import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
+import 
org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;
 import org.apache.paimon.schema.Schema;
 
 import com.ververica.cdc.connectors.mysql.source.MySqlSource;
@@ -30,7 +32,6 @@ import 
com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
 import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
 import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
 import com.ververica.cdc.connectors.mysql.table.StartupOptions;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
 import com.ververica.cdc.debezium.utils.JdbcUrlUtils;
 import org.apache.flink.configuration.ConfigOption;
@@ -142,9 +143,9 @@ public class MySqlActionUtils {
         return mySqlSchemasInfo;
     }
 
-    public static MySqlSource<String> buildMySqlSource(
+    public static MySqlSource<CdcSourceRecord> buildMySqlSource(
             Configuration mySqlConfig, String tableList, TypeMapping 
typeMapping) {
-        MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
+        MySqlSourceBuilder<CdcSourceRecord> sourceBuilder = 
MySqlSource.builder();
 
         sourceBuilder
                 .hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME))
@@ -220,8 +221,8 @@ public class MySqlActionUtils {
 
         Map<String, Object> customConverterConfigs = new HashMap<>();
         customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
-        JsonDebeziumDeserializationSchema schema =
-                new JsonDebeziumDeserializationSchema(true, 
customConverterConfigs);
+        CdcDebeziumDeserializationSchema schema =
+                new CdcDebeziumDeserializationSchema(true, 
customConverterConfigs);
 
         boolean scanNewlyAddedTables = 
mySqlConfig.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 72b42b466..45dcaa3c2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
@@ -69,7 +70,7 @@ import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
  * A parser for MySql Debezium JSON strings, converting them into a list of 
{@link
  * RichCdcMultiplexRecord}s.
  */
-public class MySqlRecordParser implements FlatMapFunction<String, 
RichCdcMultiplexRecord> {
+public class MySqlRecordParser implements FlatMapFunction<CdcSourceRecord, 
RichCdcMultiplexRecord> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlRecordParser.class);
 
@@ -109,8 +110,9 @@ public class MySqlRecordParser implements 
FlatMapFunction<String, RichCdcMultipl
     }
 
     @Override
-    public void flatMap(String rawEvent, Collector<RichCdcMultiplexRecord> 
out) throws Exception {
-        root = objectMapper.readValue(rawEvent, DebeziumEvent.class);
+    public void flatMap(CdcSourceRecord rawEvent, 
Collector<RichCdcMultiplexRecord> out)
+            throws Exception {
+        root = objectMapper.readValue((String) rawEvent.getValue(), 
DebeziumEvent.class);
         currentTable = 
root.payload().source().get(AbstractSourceInfo.TABLE_NAME_KEY).asText();
         databaseName = 
root.payload().source().get(AbstractSourceInfo.DATABASE_NAME_KEY).asText();
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index da3b80c5f..80a7c8b84 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
 import org.apache.paimon.flink.action.cdc.SyncJobHandler;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
@@ -179,7 +180,7 @@ public class MySqlSyncDatabaseAction extends 
SyncDatabaseActionBase {
     }
 
     @Override
-    protected MySqlSource<String> buildSource() {
+    protected MySqlSource<CdcSourceRecord> buildSource() {
         return MySqlActionUtils.buildMySqlSource(
                 cdcSourceConfig,
                 tableList(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 919cdab3c..de6c22a21 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.SyncJobHandler;
 import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
 import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
@@ -99,7 +100,7 @@ public class MySqlSyncTableAction extends 
SyncTableActionBase {
     }
 
     @Override
-    protected MySqlSource<String> buildSource() {
+    protected MySqlSource<CdcSourceRecord> buildSource() {
         String tableList =
                 String.format(
                         "(%s)\\.(%s)",
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java
index 49b78db41..45d8f69cc 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java
@@ -19,9 +19,11 @@
 package org.apache.paimon.flink.action.cdc.postgres;
 
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.schema.JdbcSchemaUtils;
 import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
+import 
org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;
 import org.apache.paimon.options.OptionsUtils;
 import org.apache.paimon.schema.Schema;
 
@@ -30,7 +32,6 @@ import 
com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
 import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
 import 
com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder.PostgresIncrementalSource;
 import 
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.kafka.connect.json.JsonConverterConfig;
@@ -116,9 +117,9 @@ public class PostgresActionUtils {
         return jdbcSchemasInfo;
     }
 
-    public static JdbcIncrementalSource<String> buildPostgresSource(
+    public static JdbcIncrementalSource<CdcSourceRecord> buildPostgresSource(
             Configuration postgresConfig, String[] schemaList, String[] 
tableList) {
-        PostgresSourceBuilder<String> sourceBuilder = 
PostgresIncrementalSource.builder();
+        PostgresSourceBuilder<CdcSourceRecord> sourceBuilder = 
PostgresIncrementalSource.builder();
 
         sourceBuilder
                 .hostname(postgresConfig.get(PostgresSourceOptions.HOSTNAME))
@@ -170,8 +171,8 @@ public class PostgresActionUtils {
 
         Map<String, Object> customConverterConfigs = new HashMap<>();
         customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
-        JsonDebeziumDeserializationSchema schema =
-                new JsonDebeziumDeserializationSchema(true, 
customConverterConfigs);
+        CdcDebeziumDeserializationSchema schema =
+                new CdcDebeziumDeserializationSchema(true, 
customConverterConfigs);
         return 
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index 6a2ef96a1..de54b97ae 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action.cdc.postgres;
 
 import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
@@ -81,7 +82,8 @@ import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
  * A parser for PostgreSQL Debezium JSON strings, converting them into a list 
of {@link
  * RichCdcMultiplexRecord}s.
  */
-public class PostgresRecordParser implements FlatMapFunction<String, 
RichCdcMultiplexRecord> {
+public class PostgresRecordParser
+        implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PostgresRecordParser.class);
 
@@ -132,8 +134,9 @@ public class PostgresRecordParser implements 
FlatMapFunction<String, RichCdcMult
     }
 
     @Override
-    public void flatMap(String rawEvent, Collector<RichCdcMultiplexRecord> 
out) throws Exception {
-        root = objectMapper.readValue(rawEvent, DebeziumEvent.class);
+    public void flatMap(CdcSourceRecord rawEvent, 
Collector<RichCdcMultiplexRecord> out)
+            throws Exception {
+        root = objectMapper.readValue((String) rawEvent.getValue(), 
DebeziumEvent.class);
 
         currentTable = 
root.payload().source().get(AbstractSourceInfo.TABLE_NAME_KEY).asText();
         databaseName = 
root.payload().source().get(AbstractSourceInfo.DATABASE_NAME_KEY).asText();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java
index 6dbcf29da..e7d692b91 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.postgres;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.SyncJobHandler;
 import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
 import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
@@ -101,7 +102,7 @@ public class PostgresSyncTableAction extends 
SyncTableActionBase {
     }
 
     @Override
-    protected JdbcIncrementalSource<String> buildSource() {
+    protected JdbcIncrementalSource<CdcSourceRecord> buildSource() {
         List<JdbcSchemasInfo.JdbcSchemaInfo> pkTables = 
postgresSchemasInfo.pkTables();
         Set<String> schemaList = new HashSet<>();
         String[] tableList = new String[pkTables.size()];
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
index 27560b835..66bdc1847 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
@@ -18,10 +18,11 @@
 
 package org.apache.paimon.flink.action.cdc.pulsar;
 
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
 import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import 
org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -50,6 +51,7 @@ import org.apache.pulsar.common.lookup.GetTopicsResult;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -162,15 +164,15 @@ public class PulsarActionUtils {
                     .defaultValue(true)
                     .withDescription("To specify the boundedness of a 
stream.");
 
-    public static PulsarSource<String> buildPulsarSource(Configuration 
pulsarConfig) {
-        PulsarSourceBuilder<String> pulsarSourceBuilder = 
PulsarSource.builder();
+    public static PulsarSource<CdcSourceRecord> 
buildPulsarSource(Configuration pulsarConfig) {
+        PulsarSourceBuilder<CdcSourceRecord> pulsarSourceBuilder = 
PulsarSource.builder();
 
         // the minimum setup
         pulsarSourceBuilder
                 .setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
                 .setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
                 
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
-                .setDeserializationSchema(new SimpleStringSchema());
+                .setDeserializationSchema(new CdcJsonDeserializationSchema());
 
         
pulsarConfig.getOptional(TOPIC).ifPresent(pulsarSourceBuilder::setTopics);
         
pulsarConfig.getOptional(TOPIC_PATTERN).ifPresent(pulsarSourceBuilder::setTopicPattern);
@@ -296,10 +298,10 @@ public class PulsarActionUtils {
             SourceConfiguration pulsarSourceConfiguration = new 
SourceConfiguration(pulsarConfig);
             PulsarClient pulsarClient = 
PulsarClientFactory.createClient(pulsarSourceConfiguration);
 
-            ConsumerBuilder<String> consumerBuilder =
+            ConsumerBuilder<byte[]> consumerBuilder =
                     createConsumerBuilder(
                             pulsarClient,
-                            org.apache.pulsar.client.api.Schema.STRING,
+                            org.apache.pulsar.client.api.Schema.BYTES,
                             pulsarSourceConfiguration);
 
             // The default position is Latest
@@ -325,7 +327,7 @@ public class PulsarActionUtils {
             }
 
             // Create the consumer configuration by using common utils.
-            Consumer<String> consumer = consumerBuilder.subscribe();
+            Consumer<byte[]> consumer = consumerBuilder.subscribe();
 
             return new PulsarConsumerWrapper(consumer, topic);
         } catch (PulsarClientException e) {
@@ -373,22 +375,25 @@ public class PulsarActionUtils {
 
     private static class PulsarConsumerWrapper implements 
MessageQueueSchemaUtils.ConsumerWrapper {
 
-        private final Consumer<String> consumer;
+        private final Consumer<byte[]> consumer;
         private final String topic;
 
-        PulsarConsumerWrapper(Consumer<String> consumer, String topic) {
+        PulsarConsumerWrapper(Consumer<byte[]> consumer, String topic) {
             this.consumer = consumer;
             this.topic = topic;
         }
 
         @Override
-        public List<String> getRecords(int pollTimeOutMills) {
+        public List<CdcSourceRecord> getRecords(int pollTimeOutMills) {
             try {
-                Message<String> message = consumer.receive(pollTimeOutMills, 
TimeUnit.MILLISECONDS);
+                Message<byte[]> message = consumer.receive(pollTimeOutMills, 
TimeUnit.MILLISECONDS);
+                CdcJsonDeserializationSchema deserializationSchema =
+                        new CdcJsonDeserializationSchema();
                 return message == null
                         ? Collections.emptyList()
-                        : Collections.singletonList(message.getValue());
-            } catch (PulsarClientException e) {
+                        : Collections.singletonList(
+                                
deserializationSchema.deserialize(message.getValue()));
+            } catch (IOException e) {
                 throw new RuntimeException(e);
             }
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java
new file mode 100644
index 000000000..9ce39ad29
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.serialization;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
+
+/**
+ * A JSON format implementation of {@link DebeziumDeserializationSchema} which 
deserializes the
+ * received {@link SourceRecord} to {@link CdcSourceRecord}.
+ */
+public class CdcDebeziumDeserializationSchema
+        implements DebeziumDeserializationSchema<CdcSourceRecord> {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient JsonConverter jsonConverter;
+
+    /**
+     * Configuration whether to enable {@link 
JsonConverterConfig#SCHEMAS_ENABLE_CONFIG} to include
+     * schema in messages.
+     */
+    private final Boolean includeSchema;
+
+    /** The custom configurations for {@link JsonConverter}. */
+    private final Map<String, Object> customConverterConfigs;
+
+    public CdcDebeziumDeserializationSchema() {
+        this(false);
+    }
+
+    public CdcDebeziumDeserializationSchema(Boolean includeSchema) {
+        this(includeSchema, null);
+    }
+
+    public CdcDebeziumDeserializationSchema(
+            Boolean includeSchema, Map<String, Object> customConverterConfigs) 
{
+        this.includeSchema = includeSchema;
+        this.customConverterConfigs = customConverterConfigs;
+    }
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<CdcSourceRecord> 
out) throws Exception {
+        if (jsonConverter == null) {
+            initializeJsonConverter();
+        }
+        byte[] bytes =
+                jsonConverter.fromConnectData(record.topic(), 
record.valueSchema(), record.value());
+        out.collect(new CdcSourceRecord(record.topic(), null, new 
String(bytes)));
+    }
+
+    /** Initialize {@link JsonConverter} with given configs. */
+    private void initializeJsonConverter() {
+        jsonConverter = new JsonConverter();
+        final HashMap<String, Object> configs = new HashMap<>(2);
+        configs.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.VALUE.getName());
+        configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
+        if (customConverterConfigs != null) {
+            configs.putAll(customConverterConfigs);
+        }
+        jsonConverter.configure(configs);
+    }
+
+    @Override
+    public TypeInformation<CdcSourceRecord> getProducedType() {
+        return getForClass(CdcSourceRecord.class);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcJsonDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcJsonDeserializationSchema.java
new file mode 100644
index 000000000..07461cd49
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcJsonDeserializationSchema.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.serialization;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
+
+/** A simple deserialization schema for {@link CdcSourceRecord}. */
+public class CdcJsonDeserializationSchema implements 
DeserializationSchema<CdcSourceRecord> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CdcJsonDeserializationSchema.class);
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    public CdcJsonDeserializationSchema() {
+        objectMapper
+                .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, 
true)
+                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+    }
+
+    @Override
+    public CdcSourceRecord deserialize(byte[] message) throws IOException {
+        if (message == null) {
+            return null;
+        }
+
+        try {
+            return new CdcSourceRecord(objectMapper.readValue(message, 
JsonNode.class));
+        } catch (Exception e) {
+            LOG.error("Invalid Json:\n{}", new String(message));
+            throw e;
+        }
+    }
+
+    @Override
+    public boolean isEndOfStream(CdcSourceRecord nextElement) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<CdcSourceRecord> getProducedType() {
+        return getForClass(CdcSourceRecord.class);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
index 286d5ffaa..2099287a6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.flink.action.cdc.watermark;
 
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
 import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
 import com.ververica.cdc.connectors.mysql.source.MySqlSource;
@@ -65,8 +67,8 @@ public class CdcTimestampExtractorFactory implements 
Serializable {
         private static final long serialVersionUID = 1L;
 
         @Override
-        public long extractTimestamp(String record) throws 
JsonProcessingException {
-            return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms");
+        public long extractTimestamp(CdcSourceRecord record) throws 
JsonProcessingException {
+            return JsonSerdeUtil.extractValue((JsonNode) record.getValue(), 
Long.class, "ts_ms");
         }
     }
 
@@ -76,7 +78,9 @@ public class CdcTimestampExtractorFactory implements 
Serializable {
         private static final long serialVersionUID = 1L;
 
         @Override
-        public long extractTimestamp(String record) throws 
JsonProcessingException {
+        public long extractTimestamp(CdcSourceRecord cdcSourceRecord)
+                throws JsonProcessingException {
+            JsonNode record = (JsonNode) cdcSourceRecord.getValue();
             if (JsonSerdeUtil.isNodeExists(record, "mysqlType")) {
                 // Canal json
                 return JsonSerdeUtil.extractValue(record, Long.class, "ts");
@@ -108,14 +112,15 @@ public class CdcTimestampExtractorFactory implements 
Serializable {
     public static class MysqlCdcTimestampExtractor implements 
CdcTimestampExtractor {
 
         @Override
-        public long extractTimestamp(String record) throws 
JsonProcessingException {
-            return JsonSerdeUtil.extractValue(record, Long.class, "payload", 
"ts_ms");
+        public long extractTimestamp(CdcSourceRecord record) throws 
JsonProcessingException {
+            return JsonSerdeUtil.extractValue(
+                    (JsonNode) record.getValue(), Long.class, "payload", 
"ts_ms");
         }
     }
 
     /** Interface defining the contract for CDC timestamp extraction. */
     public interface CdcTimestampExtractor extends Serializable {
 
-        long extractTimestamp(String record) throws JsonProcessingException;
+        long extractTimestamp(CdcSourceRecord record) throws 
JsonProcessingException;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
index 218d45d63..41816b0ce 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.watermark;
 
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import 
org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.CdcTimestampExtractor;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -32,7 +33,7 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
  * Watermark strategy for CDC sources, generating watermarks based on 
timestamps extracted from
  * records.
  */
-public class CdcWatermarkStrategy implements WatermarkStrategy<String> {
+public class CdcWatermarkStrategy implements 
WatermarkStrategy<CdcSourceRecord> {
 
     private final CdcTimestampExtractor timestampExtractor;
     private static final long serialVersionUID = 1L;
@@ -43,12 +44,12 @@ public class CdcWatermarkStrategy implements 
WatermarkStrategy<String> {
     }
 
     @Override
-    public WatermarkGenerator<String> createWatermarkGenerator(
+    public WatermarkGenerator<CdcSourceRecord> createWatermarkGenerator(
             WatermarkGeneratorSupplier.Context context) {
-        return new WatermarkGenerator<String>() {
+        return new WatermarkGenerator<CdcSourceRecord>() {
 
             @Override
-            public void onEvent(String record, long timestamp, WatermarkOutput 
output) {
+            public void onEvent(CdcSourceRecord record, long timestamp, 
WatermarkOutput output) {
                 long tMs;
                 try {
                     tMs = timestampExtractor.extractTimestamp(record);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index 5785f680a..5bb63b6a8 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -765,10 +765,8 @@ public class KafkaSyncTableActionITCase extends 
KafkaActionITCaseBase {
                                 + "1.000011, 2.000022, 3.000033, "
                                 + "1.000111, 2.000222, 3.000333, "
                                 + "12345.110, 12345.220, 12345.330, "
-                                // TODO fix FIXED
-                                + "1.2345678987654322E32, 
1.2345678987654322E32, 1.2345678987654322E32, "
-                                // TODO fix BIG DECIMAL
-                                + "11111, 22222, 33333, 
2222222222222222400000000000.0000000000, "
+                                + "123456789876543212345678987654321.11, 
123456789876543212345678987654321.22, 123456789876543212345678987654321.33, "
+                                + "11111, 22222, 33333, 
2222222222222222300000001111.1234567890, "
                                 + "19439, "
                                 // display value of datetime is not affected 
by timezone
                                 + "2023-03-23T14:30:05, 
2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, "

Reply via email to