This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fb8444b946 [Feature][Format] Improve 
maxwell_json,canal_json,debezium_json format add ts_ms and table (#9701)
fb8444b946 is described below

commit fb8444b946512a5a8f518e25e76d346b2118d6ea
Author: dyp12 <[email protected]>
AuthorDate: Wed Aug 20 15:21:55 2025 +0800

    [Feature][Format] Improve maxwell_json,canal_json,debezium_json format add 
ts_ms and table (#9701)
---
 docs/en/transform-v2/metadata.md                   |  16 ++--
 docs/zh/transform-v2/metadata.md                   |  16 ++--
 .../seatunnel/api/table/type/CommonOptions.java    |   6 +-
 .../seatunnel/api/table/type/SeaTunnelRow.java     |   2 +
 .../seatunnel/file/local/LocalFileTest.java        |  69 ++++++++++----
 .../e2e/connector/kafka/KafkaFormatIT.java         |  96 +++++++++----------
 .../json/canal/CanalJsonDeserializationSchema.java |  14 +++
 .../json/canal/CanalJsonSerializationSchema.java   |  26 +++++-
 .../DebeziumJsonDeserializationSchema.java         |  15 +++
 .../json/debezium/DebeziumJsonFormatOptions.java   |   2 +-
 .../debezium/DebeziumJsonSerializationSchema.java  |  38 +++++++-
 .../maxwell/MaxWellJsonDeserializationSchema.java  |  29 ++++++
 .../maxwell/MaxWellJsonSerializationSchema.java    |  23 ++++-
 .../json/ogg/OggJsonDeserializationSchema.java     |  25 ++++-
 .../json/ogg/OggJsonSerializationSchema.java       |  17 +++-
 .../json/canal/CanalJsonSerDeSchemaTest.java       |  52 +++++------
 .../json/debezium/DebeziumJsonSerDeSchemaTest.java |  40 ++++----
 .../json/maxwell/MaxWellJsonSerDeSchemaTest.java   | 104 ++++++++++-----------
 .../format/json/ogg/OggJsonSerDeSchemaTest.java    |  41 ++++----
 19 files changed, 414 insertions(+), 217 deletions(-)

diff --git a/docs/en/transform-v2/metadata.md b/docs/en/transform-v2/metadata.md
index 536038b4d8..89c0bea651 100644
--- a/docs/en/transform-v2/metadata.md
+++ b/docs/en/transform-v2/metadata.md
@@ -7,14 +7,14 @@ Metadata transform plugin for adding metadata fields to data
 
 ## Available Metadata
 
-|    Key    | DataType | Description                                           
                                             |
-|:---------:|:--------:|:---------------------------------------------------------------------------------------------------|
-| Database  |  string  | Name of the table that contain the row.               
                                             |
-|   Table   |  string  | Name of the table that contain the row.               
                                             |
-|  RowKind  |  string  | The type of operation                                 
                                             |
-| EventTime |   Long   | The time at which the connector processed the event.  
                                             |
-|   Delay   |   Long   | The difference between data extraction time and 
database change time                               |
-| Partition |  string  | Contains the partition field of the corresponding 
number table of the row, multiple using `,` join |
+|    Key    | DataType | Description                                           
                                                   |
+|:---------:|:--------:|:---------------------------------------------------------------------------------------------------------|
+| Database  |  string  | Name of the table that contain the row.               
                                                   |
+|   Table   |  string  | Name of the table that contain the row.               
                                                   |
+|  RowKind  |  string  | The type of operation                                 
                                                   |
+| EventTime |   Long   | The time at which the connector processed the 
event.And the data should be milliseconds                  |
+|   Delay   |   Long   | The difference between data extraction time and 
database change time.And the data should be milliseconds |
+| Partition |  string  | Contains the partition field of the corresponding 
number table of the row, multiple using `,` join       |
 
 ### note
     `Delay` `EventTime` only worked on cdc series connectors for now , except 
TiDB-CDC
diff --git a/docs/zh/transform-v2/metadata.md b/docs/zh/transform-v2/metadata.md
index 2e92c1282f..72ebd320f8 100644
--- a/docs/zh/transform-v2/metadata.md
+++ b/docs/zh/transform-v2/metadata.md
@@ -7,14 +7,14 @@
 
 ## 支持的元数据
 
-|    Key    | DataType |       Description       |
-|:---------:|:--------:|:-----------------------:|
-| Database  |  string  |        包含该行的数据库名        |
-|   Table   |  string  |        包含该行的数表名         |
-|  RowKind  |  string  |           行类型           |
-| EventTime |   Long   |                         |
-|   Delay   |   Long   |    数据抽取时间与数据库变更时间的差     |
-| Partition |  string  | 包含该行对应数表的分区字段,多个使用`,`连接 |
+|    Key    | DataType |          Description          |
+|:---------:|:--------:|:-----------------------------:|
+| Database  |  string  |           包含该行的数据库名           |
+|   Table   |  string  |           包含该行的数表名            |
+|  RowKind  |  string  |              行类型              |
+| EventTime |   Long   |    该行的对应的数据时间,统一格式是到毫秒的时间戳    |
+|   Delay   |   Long   | 数据抽取时间与数据库变更时间的差,统一格式是到毫秒的时间戳 |
+| Partition |  string  |    包含该行对应数表的分区字段,多个使用`,`连接    |
 
 ### 注意事项
     `Delay` `EventTime`目前只适用于cdc系列连接器,TiDB-CDC除外
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
index 8b5b36682a..d12810cad7 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
@@ -49,9 +49,13 @@ public enum CommonOptions {
     ROW_KIND("RowKind", true),
     /**
      * The key of {@link SeaTunnelRow#getOptions()} to store the EVENT_TIME 
value of the row value.
+     * And the data should be milliseconds.
      */
     EVENT_TIME("EventTime", true),
-    /** The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value 
of the row value. */
+    /**
+     * The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value 
of the row value. And
+     * the data should be milliseconds.
+     */
     DELAY("Delay", true);
 
     private final String name;
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 091284b7e3..ed44437b82 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -95,6 +95,7 @@ public final class SeaTunnelRow implements Serializable {
         SeaTunnelRow newRow = new SeaTunnelRow(newFields);
         newRow.setRowKind(this.getRowKind());
         newRow.setTableId(this.getTableId());
+        newRow.setOptions(this.getOptions());
         return newRow;
     }
 
@@ -106,6 +107,7 @@ public final class SeaTunnelRow implements Serializable {
         SeaTunnelRow newRow = new SeaTunnelRow(newFields);
         newRow.setRowKind(this.getRowKind());
         newRow.setTableId(this.getTableId());
+        newRow.setOptions(this.getOptions());
         return newRow;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
index 66ea90e701..3e07a6f11e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
@@ -48,6 +48,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
+
 @DisabledOnOs(
         value = OS.WINDOWS,
         disabledReason =
@@ -346,24 +348,33 @@ public class LocalFileTest {
                         Collections.emptyList(),
                         "comment");
 
+        Map<String, Object> rowOptions = new HashMap<>();
+        rowOptions.put(EVENT_TIME.getName(), 1L);
+
         SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {1L, "A", 100});
         row1.setRowKind(RowKind.INSERT);
         row1.setTableId(TablePath.DEFAULT.getFullName());
+        row1.setOptions(rowOptions);
         SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {2L, "B", 100});
         row2.setRowKind(RowKind.INSERT);
         row2.setTableId(TablePath.DEFAULT.getFullName());
+        row2.setOptions(rowOptions);
         SeaTunnelRow row3 = new SeaTunnelRow(new Object[] {3L, "C", 100});
         row3.setRowKind(RowKind.INSERT);
         row3.setTableId(TablePath.DEFAULT.getFullName());
+        row3.setOptions(rowOptions);
         SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[] {1L, 
"A", 100});
         row1UpdateBefore.setTableId(TablePath.DEFAULT.getFullName());
         row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE);
+        row1UpdateBefore.setOptions(rowOptions);
         SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[] {1L, 
"A_1", 100});
         row1UpdateAfter.setTableId(TablePath.DEFAULT.getFullName());
         row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER);
+        row1UpdateAfter.setOptions(rowOptions);
         SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[] {2L, "B", 
100});
         row2Delete.setTableId(TablePath.DEFAULT.getFullName());
         row2Delete.setRowKind(RowKind.DELETE);
+        row2Delete.setOptions(rowOptions);
 
         SinkFlowTestUtils.runBatchWithCheckpointDisabled(
                 catalogTable,
@@ -379,22 +390,22 @@ public class LocalFileTest {
         String dataStr = FileUtils.readFileToStr(path);
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"INSERT\"}"));
+                        
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"INSERT\"}"));
+                        
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"data\":[{\"a\":3,\"b\":\"C\",\"c\":100}],\"type\":\"INSERT\"}"));
+                        
"{\"data\":[{\"a\":3,\"b\":\"C\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"DELETE\"}"));
+                        
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"data\":[{\"a\":1,\"b\":\"A_1\",\"c\":100}],\"type\":\"INSERT\"}"));
+                        
"{\"data\":[{\"a\":1,\"b\":\"A_1\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"DELETE\"}"));
+                        
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
     }
 
     @Test
@@ -431,24 +442,33 @@ public class LocalFileTest {
                         Collections.emptyList(),
                         "comment");
 
+        Map<String, Object> rowOptions = new HashMap<>();
+        rowOptions.put(EVENT_TIME.getName(), 1L);
+
         SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {1L, "A", 100});
         row1.setRowKind(RowKind.INSERT);
         row1.setTableId(TablePath.DEFAULT.getFullName());
+        row1.setOptions(rowOptions);
         SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {2L, "B", 100});
         row2.setRowKind(RowKind.INSERT);
         row2.setTableId(TablePath.DEFAULT.getFullName());
+        row2.setOptions(rowOptions);
         SeaTunnelRow row3 = new SeaTunnelRow(new Object[] {3L, "C", 100});
         row3.setRowKind(RowKind.INSERT);
         row3.setTableId(TablePath.DEFAULT.getFullName());
+        row3.setOptions(rowOptions);
         SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[] {1L, 
"A", 100});
         row1UpdateBefore.setTableId(TablePath.DEFAULT.getFullName());
         row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE);
+        row1UpdateBefore.setOptions(rowOptions);
         SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[] {1L, 
"A_1", 100});
         row1UpdateAfter.setTableId(TablePath.DEFAULT.getFullName());
         row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER);
+        row1UpdateAfter.setOptions(rowOptions);
         SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[] {2L, "B", 
100});
         row2Delete.setTableId(TablePath.DEFAULT.getFullName());
         row2Delete.setRowKind(RowKind.DELETE);
+        row2Delete.setOptions(rowOptions);
 
         SinkFlowTestUtils.runBatchWithCheckpointDisabled(
                 catalogTable,
@@ -464,22 +484,22 @@ public class LocalFileTest {
         String dataStr = FileUtils.readFileToStr(path);
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A\",\"c\":100},\"op\":\"c\"}"));
+                        
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"before\":null,\"after\":{\"a\":2,\"b\":\"B\",\"c\":100},\"op\":\"c\"}"));
+                        
"{\"before\":null,\"after\":{\"a\":2,\"b\":\"B\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"before\":null,\"after\":{\"a\":3,\"b\":\"C\",\"c\":100},\"op\":\"c\"}"));
+                        
"{\"before\":null,\"after\":{\"a\":3,\"b\":\"C\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"before\":{\"a\":1,\"b\":\"A\",\"c\":100},\"after\":null,\"op\":\"d\"}"));
+                        
"{\"before\":{\"a\":1,\"b\":\"A\",\"c\":100},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"op\":\"c\"}"));
+                        
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"before\":{\"a\":2,\"b\":\"B\",\"c\":100},\"after\":null,\"op\":\"d\"}"));
+                        
"{\"before\":{\"a\":2,\"b\":\"B\",\"c\":100},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
     }
 
     @Test
@@ -515,25 +535,33 @@ public class LocalFileTest {
                         Collections.emptyMap(),
                         Collections.emptyList(),
                         "comment");
+        Map<String, Object> rowOptions = new HashMap<>();
+        rowOptions.put(EVENT_TIME.getName(), 1L);
 
         SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {1L, "A", 100});
         row1.setRowKind(RowKind.INSERT);
         row1.setTableId(TablePath.DEFAULT.getFullName());
+        row1.setOptions(rowOptions);
         SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {2L, "B", 100});
         row2.setRowKind(RowKind.INSERT);
         row2.setTableId(TablePath.DEFAULT.getFullName());
+        row2.setOptions(rowOptions);
         SeaTunnelRow row3 = new SeaTunnelRow(new Object[] {3L, "C", 100});
         row3.setRowKind(RowKind.INSERT);
         row3.setTableId(TablePath.DEFAULT.getFullName());
+        row3.setOptions(rowOptions);
         SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[] {1L, 
"A", 100});
         row1UpdateBefore.setTableId(TablePath.DEFAULT.getFullName());
         row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE);
+        row1UpdateBefore.setOptions(rowOptions);
         SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[] {1L, 
"A_1", 100});
         row1UpdateAfter.setTableId(TablePath.DEFAULT.getFullName());
         row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER);
+        row1UpdateAfter.setOptions(rowOptions);
         SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[] {2L, "B", 
100});
         row2Delete.setTableId(TablePath.DEFAULT.getFullName());
         row2Delete.setRowKind(RowKind.DELETE);
+        row2Delete.setOptions(rowOptions);
 
         SinkFlowTestUtils.runBatchWithCheckpointDisabled(
                 catalogTable,
@@ -548,17 +576,22 @@ public class LocalFileTest {
         Path path = 
Paths.get("/tmp/seatunnel/LocalFileTest/maxwell_json_file.maxwell_json");
         String dataStr = FileUtils.readFileToStr(path);
         Assertions.assertTrue(
-                
dataStr.contains("{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"INSERT\"}"));
+                dataStr.contains(
+                        
"{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
         Assertions.assertTrue(
-                
dataStr.contains("{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"INSERT\"}"));
+                dataStr.contains(
+                        
"{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
         Assertions.assertTrue(
-                
dataStr.contains("{\"data\":{\"a\":3,\"b\":\"C\",\"c\":100},\"type\":\"INSERT\"}"));
+                dataStr.contains(
+                        
"{\"data\":{\"a\":3,\"b\":\"C\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
         Assertions.assertTrue(
-                
dataStr.contains("{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"DELETE\"}"));
+                dataStr.contains(
+                        
"{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
         Assertions.assertTrue(
                 dataStr.contains(
-                        
"{\"data\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"type\":\"INSERT\"}"));
+                        
"{\"data\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
         Assertions.assertTrue(
-                
dataStr.contains("{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"DELETE\"}"));
+                dataStr.contains(
+                        
"{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
index 36e24ec9b0..5025d676c7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
@@ -503,20 +503,20 @@ public class KafkaFormatIT extends TestSuiteBase 
implements TestResource {
     private void checkCanalFormat() {
         List<String> expectedResult =
                 Arrays.asList(
-                        
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"}],\"type\":\"INSERT\"}",
-                        "{\"data\":[{\"id\":1102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":\"8.1\"}],\"type\":\"INSERT\"}",
-                        "{\"data\":[{\"id\":1103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":\"0.8\"}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":1104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":\"0.75\"}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":1105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":\"0.875\"}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":1106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":\"1.0\"}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":1108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind breaker\",\"weight\":\"0.1\"}],\"type\":\"INSERT\"}",
-                        "{\"data\":[{\"id\":1109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"}],\"type\":\"DELETE\"}",
-                        
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"4.56\"}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"}],\"type\":\"DELETE\"}",
-                        
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"7.88\"}],\"type\":\"INSERT\"}",
-                        "{\"data\":[{\"id\":1109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"}],\"type\":\"DELETE\"}");
+                        
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+                        "{\"data\":[{\"id\":1102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":\"8.1\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+                        "{\"data\":[{\"id\":1103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":\"0.8\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+                        
"{\"data\":[{\"id\":1104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":\"0.75\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+                        
"{\"data\":[{\"id\":1105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":\"0.875\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+                        
"{\"data\":[{\"id\":1106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":\"1.0\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+                        
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+                        
"{\"data\":[{\"id\":1108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind 
breaker\",\"weight\":\"0.1\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+                        "{\"data\":[{\"id\":1109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+                        
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"}],\"type\":\"DELETE\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900619}",
+                        
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"4.56\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900619}",
+                        
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"}],\"type\":\"DELETE\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900619}",
+                        
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"7.88\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900619}",
+                        "{\"data\":[{\"id\":1109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"}],\"type\":\"DELETE\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900619}");
 
         ArrayList<String> result = new ArrayList<>();
         ArrayList<String> topics = new ArrayList<>();
@@ -562,20 +562,20 @@ public class KafkaFormatIT extends TestSuiteBase 
implements TestResource {
     private void checkMaxWellFormat() {
         List<String> expectedResult =
                 Arrays.asList(
-                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":\"1.0\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"4.56\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"7.88\"},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"},\"type\":\"DELETE\"}");
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":\"0.8\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":\"1.0\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind 
breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"},\"type\":\"DELETE\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"4.56\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"},\"type\":\"DELETE\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"7.88\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"},\"type\":\"DELETE\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}");
 
         ArrayList<String> result = new ArrayList<>();
         ArrayList<String> topics = new ArrayList<>();
@@ -619,26 +619,26 @@ public class KafkaFormatIT extends TestSuiteBase 
implements TestResource {
     private void checkOggFormat() {
         List<String> kafkaExpectedResult =
                 Arrays.asList(
-                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.140000104904175\"},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":\"8.100000381469727\"},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":\"0.800000011920929\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":\"1\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.300000190734863\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind breaker\",\"weight\":\"0.10000000149011612\"},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.200000762939453\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":\"1\"},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter 
hammer\",\"weight\":\"1\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.300000190734863\"},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.099999904632568\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":\"0.20000000298023224\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":\"5.179999828338623\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":\"0.20000000298023224\"},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water 
resistent white wind breaker\",\"weight\":\"0.5\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":\"5.179999828338623\"},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":\"5.170000076293945\"},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":\"5.170000076293945\"},\"type\":\"DELETE\"}");
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.140000104904175\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384406000}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":\"8.100000381469727\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":\"0.800000011920929\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":\"1\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.300000190734863\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind 
breaker\",\"weight\":\"0.10000000149011612\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.200000762939453\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":\"1\"},\"type\":\"DELETE\",\"table\":\"test-ogg-source\",\"op_ts\":1589390787000}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter 
hammer\",\"weight\":\"1\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589390787000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.300000190734863\"},\"type\":\"DELETE\",\"table\":\"test-ogg-source\",\"op_ts\":1589390899000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.099999904632568\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589390899000}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind 
breaker\",\"weight\":\"0.20000000298023224\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589391010000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":\"5.179999828338623\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589391043000}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind 
breaker\",\"weight\":\"0.20000000298023224\"},\"type\":\"DELETE\",\"table\":\"test-ogg-source\",\"op_ts\":1589391140000}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water 
resistent white wind 
breaker\",\"weight\":\"0.5\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589391140000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":\"5.179999828338623\"},\"type\":\"DELETE\",\"table\":\"test-ogg-source\",\"op_ts\":1589391130000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":\"5.170000076293945\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589391130000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":\"5.170000076293945\"},\"type\":\"DELETE\",\"table\":\"test-ogg-source\",\"op_ts\":1589391144000}");
 
         ArrayList<String> checkKafkaConsumerResult = new ArrayList<>();
         ArrayList<String> topics = new ArrayList<>();
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
index 77055a2f8e..df9d30af5a 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -57,6 +58,8 @@ public class CanalJsonDeserializationSchema implements 
DeserializationSchema<Sea
 
     private static final String FIELD_TABLE = "table";
 
+    private static final String FIELD_TS = "ts";
+
     private static final String OP_INSERT = "INSERT";
 
     private static final String OP_UPDATE = "UPDATE";
@@ -137,6 +140,7 @@ public class CanalJsonDeserializationSchema implements 
DeserializationSchema<Sea
 
             JsonNode dataNode = jsonNode.get(FIELD_DATA);
             String op = jsonNode.get(FIELD_TYPE).asText();
+            JsonNode tsNode = jsonNode.get(FIELD_TS);
             // When a null value is encountered, an exception needs to be 
thrown for easy sensing
             if (dataNode == null || dataNode.isNull()) {
                 // We'll skip the query or create or alter event data
@@ -154,6 +158,9 @@ public class CanalJsonDeserializationSchema implements 
DeserializationSchema<Sea
                         if (tablePath != null && 
!tablePath.toString().isEmpty()) {
                             row.setTableId(tablePath.toString());
                         }
+                        if (tsNode != null) {
+                            MetadataUtil.setEventTime(row, tsNode.asLong());
+                        }
                         out.collect(row);
                     }
                     break;
@@ -178,6 +185,10 @@ public class CanalJsonDeserializationSchema implements 
DeserializationSchema<Sea
                         if (tablePath != null && 
!tablePath.toString().isEmpty()) {
                             after.setTableId(tablePath.toString());
                         }
+                        if (tsNode != null) {
+                            MetadataUtil.setEventTime(before, tsNode.asLong());
+                            MetadataUtil.setEventTime(after, tsNode.asLong());
+                        }
                         out.collect(before);
                         out.collect(after);
                     }
@@ -189,6 +200,9 @@ public class CanalJsonDeserializationSchema implements 
DeserializationSchema<Sea
                         if (tablePath != null && 
!tablePath.toString().isEmpty()) {
                             row.setTableId(tablePath.toString());
                         }
+                        if (tsNode != null) {
+                            MetadataUtil.setEventTime(row, tsNode.asLong());
+                        }
                         out.collect(row);
                     }
                     break;
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
index c3663ab86c..03575d052d 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
@@ -19,6 +19,7 @@
 package org.apache.seatunnel.format.json.canal;
 
 import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -29,9 +30,13 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.nio.charset.Charset;
 
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
 
 public class CanalJsonSerializationSchema implements SerializationSchema {
 
@@ -48,12 +53,12 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema {
 
     public CanalJsonSerializationSchema(SeaTunnelRowType rowType) {
         this.jsonSerializer = new 
JsonSerializationSchema(createJsonRowType(rowType));
-        this.reuse = new SeaTunnelRow(2);
+        this.reuse = new SeaTunnelRow(5);
     }
 
     public CanalJsonSerializationSchema(SeaTunnelRowType rowType, Charset 
charset) {
         this.jsonSerializer = new 
JsonSerializationSchema(createJsonRowType(rowType), charset);
-        this.reuse = new SeaTunnelRow(2);
+        this.reuse = new SeaTunnelRow(5);
     }
 
     @Override
@@ -62,6 +67,15 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema {
             String opType = rowKind2String(row.getRowKind());
             reuse.setField(0, new SeaTunnelRow[] {row});
             reuse.setField(1, opType);
+            if (!StringUtils.isEmpty(row.getTableId())) {
+                reuse.setField(2, 
TablePath.of(row.getTableId()).getDatabaseName());
+                reuse.setField(3, 
TablePath.of(row.getTableId()).getTableName());
+            }
+
+            if (row.getOptions() != null && 
row.getOptions().containsKey(EVENT_TIME.getName())) {
+                reuse.setField(4, row.getOptions().get(EVENT_TIME.getName()));
+            }
+
             return jsonSerializer.serialize(reuse);
         } catch (Throwable t) {
             throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
@@ -88,9 +102,13 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema {
         // but we don't need them
         // and we don't need "old" , because can not support 
UPDATE_BEFORE,UPDATE_AFTER
         return new SeaTunnelRowType(
-                new String[] {"data", "type"},
+                new String[] {"data", "type", "database", "table", "ts"},
                 new SeaTunnelDataType[] {
-                    new ArrayType<>(SeaTunnelRowType[].class, databaseSchema), 
STRING_TYPE
+                    new ArrayType<>(SeaTunnelRowType[].class, databaseSchema),
+                    STRING_TYPE,
+                    STRING_TYPE,
+                    STRING_TYPE,
+                    LONG_TYPE
                 });
     }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
index 47afa1a015..122af68d23 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -46,6 +47,7 @@ public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<
     public static final String DATA_PAYLOAD = "payload";
     private static final String DATA_BEFORE = "before";
     private static final String DATA_AFTER = "after";
+    private static final String DATA_TS = "ts_ms";
 
     private static final String REPLICA_IDENTITY_EXCEPTION =
             "The \"before\" field of %s operation is null, "
@@ -117,6 +119,7 @@ public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<
     private void parsePayload(Collector<SeaTunnelRow> out, TablePath 
tablePath, JsonNode payload)
             throws IOException {
         String op = payload.get(OP_KEY).asText();
+        JsonNode tsNode = payload.get(DATA_TS);
 
         switch (op) {
             case OP_CREATE:
@@ -126,6 +129,9 @@ public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<
                 if (tablePath != null) {
                     insert.setTableId(tablePath.toString());
                 }
+                if (tsNode != null) {
+                    MetadataUtil.setEventTime(insert, tsNode.asLong());
+                }
                 out.collect(insert);
                 break;
             case OP_UPDATE:
@@ -138,6 +144,9 @@ public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<
                 if (tablePath != null) {
                     before.setTableId(tablePath.toString());
                 }
+                if (tsNode != null) {
+                    MetadataUtil.setEventTime(before, tsNode.asLong());
+                }
                 out.collect(before);
 
                 SeaTunnelRow after = 
debeziumRowConverter.parse(payload.get(DATA_AFTER));
@@ -146,6 +155,9 @@ public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<
                 if (tablePath != null) {
                     after.setTableId(tablePath.toString());
                 }
+                if (tsNode != null) {
+                    MetadataUtil.setEventTime(after, tsNode.asLong());
+                }
                 out.collect(after);
                 break;
             case OP_DELETE:
@@ -158,6 +170,9 @@ public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<
                 if (tablePath != null) {
                     delete.setTableId(tablePath.toString());
                 }
+                if (tsNode != null) {
+                    MetadataUtil.setEventTime(delete, tsNode.asLong());
+                }
                 out.collect(delete);
                 break;
             default:
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
index eb75bfd2b0..43cab50c9d 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
@@ -25,7 +25,7 @@ import java.util.Map;
 
 public class DebeziumJsonFormatOptions {
 
-    public static final int GENERATE_ROW_SIZE = 3;
+    public static final int GENERATE_ROW_SIZE = 5;
 
     public static final Option<Boolean> IGNORE_PARSE_ERRORS = 
JsonFormatOptions.IGNORE_PARSE_ERRORS;
 
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
index c58ddbce7c..a768d29e3e 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
@@ -18,15 +18,24 @@
 package org.apache.seatunnel.format.json.debezium;
 
 import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
 
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
 import static 
org.apache.seatunnel.format.json.debezium.DebeziumJsonFormatOptions.GENERATE_ROW_SIZE;
 
 public class DebeziumJsonSerializationSchema implements SerializationSchema {
@@ -53,18 +62,37 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema {
     @Override
     public byte[] serialize(SeaTunnelRow row) {
         try {
+            Map<String, String> source = new HashMap<>();
+            if (!StringUtils.isEmpty(row.getTableId())) {
+                source.put("schema", 
TablePath.of(row.getTableId()).getSchemaName());
+                source.put("database", 
TablePath.of(row.getTableId()).getDatabaseName());
+                source.put("table", 
TablePath.of(row.getTableId()).getTableName());
+            }
             switch (row.getRowKind()) {
                 case INSERT:
                 case UPDATE_AFTER:
                     genericRow.setField(0, null);
                     genericRow.setField(1, row);
                     genericRow.setField(2, OP_INSERT);
+                    genericRow.setField(3, source);
+
+                    if (row.getOptions() != null
+                            && 
row.getOptions().containsKey(EVENT_TIME.getName())) {
+                        genericRow.setField(4, 
row.getOptions().get(EVENT_TIME.getName()));
+                    } else {
+                        genericRow.setField(4, null);
+                    }
                     return jsonSerializer.serialize(genericRow);
                 case UPDATE_BEFORE:
                 case DELETE:
                     genericRow.setField(0, row);
                     genericRow.setField(1, null);
                     genericRow.setField(2, OP_DELETE);
+                    genericRow.setField(3, source);
+                    if (row.getOptions() != null
+                            && 
row.getOptions().containsKey(EVENT_TIME.getName())) {
+                        genericRow.setField(4, 
row.getOptions().get(EVENT_TIME.getName()));
+                    }
                     return jsonSerializer.serialize(genericRow);
                 default:
                     throw new UnsupportedOperationException(
@@ -78,7 +106,13 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema {
 
     private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType 
databaseSchema) {
         return new SeaTunnelRowType(
-                new String[] {"before", "after", "op"},
-                new SeaTunnelDataType[] {databaseSchema, databaseSchema, 
STRING_TYPE});
+                new String[] {"before", "after", "op", "source", "ts_ms"},
+                new SeaTunnelDataType[] {
+                    databaseSchema,
+                    databaseSchema,
+                    STRING_TYPE,
+                    new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE),
+                    LONG_TYPE
+                });
     }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
index a87784353e..335a443ed0 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
@@ -24,6 +24,8 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -33,6 +35,7 @@ import 
org.apache.seatunnel.format.json.JsonDeserializationSchema;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.regex.Pattern;
 
 import static java.lang.String.format;
@@ -57,6 +60,8 @@ public class MaxWellJsonDeserializationSchema implements 
DeserializationSchema<S
 
     private static final String FIELD_TABLE = "table";
 
+    private static final String FIELD_TS = "ts";
+
     private final String database;
 
     private final String table;
@@ -110,6 +115,9 @@ public class MaxWellJsonDeserializationSchema implements 
DeserializationSchema<S
         if (message == null) {
             return;
         }
+        TablePath tablePath =
+                
Optional.ofNullable(catalogTable).map(CatalogTable::getTablePath).orElse(null);
+
         ObjectNode jsonNode = (ObjectNode) convertBytes(message);
         if (database != null
                 && 
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
@@ -120,9 +128,16 @@ public class MaxWellJsonDeserializationSchema implements 
DeserializationSchema<S
         }
         JsonNode dataNode = jsonNode.get(FIELD_DATA);
         String type = jsonNode.get(FIELD_TYPE).asText();
+        JsonNode tsNode = jsonNode.get(FIELD_TS);
         if (OP_INSERT.equals(type)) {
             SeaTunnelRow rowInsert = convertJsonNode(dataNode);
             rowInsert.setRowKind(RowKind.INSERT);
+            if (tablePath != null && !tablePath.toString().isEmpty()) {
+                rowInsert.setTableId(tablePath.toString());
+            }
+            if (tsNode != null) {
+                MetadataUtil.setEventTime(rowInsert, tsNode.asLong() * 1000);
+            }
             out.collect(rowInsert);
         } else if (OP_UPDATE.equals(type)) {
             SeaTunnelRow rowAfter = convertJsonNode(dataNode);
@@ -142,11 +157,25 @@ public class MaxWellJsonDeserializationSchema implements 
DeserializationSchema<S
             rowBefore.setRowKind(RowKind.UPDATE_BEFORE);
             assert rowAfter != null;
             rowAfter.setRowKind(RowKind.UPDATE_AFTER);
+            if (tablePath != null && !tablePath.toString().isEmpty()) {
+                rowBefore.setTableId(tablePath.toString());
+                rowAfter.setTableId(tablePath.toString());
+            }
+            if (tsNode != null) {
+                MetadataUtil.setEventTime(rowBefore, tsNode.asLong() * 1000);
+                MetadataUtil.setEventTime(rowAfter, tsNode.asLong() * 1000);
+            }
             out.collect(rowBefore);
             out.collect(rowAfter);
         } else if (OP_DELETE.equals(type)) {
             SeaTunnelRow rowDelete = convertJsonNode(dataNode);
             rowDelete.setRowKind(RowKind.DELETE);
+            if (tablePath != null && !tablePath.toString().isEmpty()) {
+                rowDelete.setTableId(tablePath.toString());
+            }
+            if (tsNode != null) {
+                MetadataUtil.setEventTime(rowDelete, tsNode.asLong() * 1000);
+            }
             out.collect(rowDelete);
         } else {
             if (!ignoreParseErrors) {
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
index 1134a0527c..191512c49c 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
@@ -19,6 +19,7 @@
 package org.apache.seatunnel.format.json.maxwell;
 
 import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -28,9 +29,13 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.nio.charset.Charset;
 
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
 
 public class MaxWellJsonSerializationSchema implements SerializationSchema {
 
@@ -47,12 +52,12 @@ public class MaxWellJsonSerializationSchema implements 
SerializationSchema {
 
     public MaxWellJsonSerializationSchema(SeaTunnelRowType rowType) {
         this.jsonSerializer = new 
JsonSerializationSchema(createJsonRowType(rowType));
-        this.reuse = new SeaTunnelRow(2);
+        this.reuse = new SeaTunnelRow(5);
     }
 
     public MaxWellJsonSerializationSchema(SeaTunnelRowType rowType, Charset 
charset) {
         this.jsonSerializer = new 
JsonSerializationSchema(createJsonRowType(rowType), charset);
-        this.reuse = new SeaTunnelRow(2);
+        this.reuse = new SeaTunnelRow(5);
     }
 
     @Override
@@ -61,6 +66,14 @@ public class MaxWellJsonSerializationSchema implements 
SerializationSchema {
             String opType = rowKind2String(row.getRowKind());
             reuse.setField(0, row);
             reuse.setField(1, opType);
+            reuse.setField(2, row.getTableId());
+            if (!StringUtils.isEmpty(row.getTableId())) {
+                reuse.setField(2, 
TablePath.of(row.getTableId()).getDatabaseName());
+                reuse.setField(3, 
TablePath.of(row.getTableId()).getTableName());
+            }
+            if (row.getOptions() != null && 
row.getOptions().containsKey(EVENT_TIME.getName())) {
+                reuse.setField(4, row.getOptions().get(EVENT_TIME.getName()));
+            }
             return jsonSerializer.serialize(reuse);
         } catch (Throwable t) {
             throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
@@ -87,7 +100,9 @@ public class MaxWellJsonSerializationSchema implements 
SerializationSchema {
         // but we don't need them
         // and we don't need "old" , because can not support 
UPDATE_BEFORE,UPDATE_AFTER
         return new SeaTunnelRowType(
-                new String[] {"data", "type"},
-                new SeaTunnelDataType[] {databaseSchema, STRING_TYPE});
+                new String[] {"data", "type", "database", "table", "ts"},
+                new SeaTunnelDataType[] {
+                    databaseSchema, STRING_TYPE, STRING_TYPE, STRING_TYPE, 
LONG_TYPE
+                });
     }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
index 14f3b5944e..f1731e3872 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
@@ -25,17 +25,20 @@ import 
org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
 import lombok.NonNull;
 
 import java.io.IOException;
+import java.time.ZoneOffset;
 import java.util.Optional;
 import java.util.regex.Pattern;
 
@@ -49,6 +52,8 @@ public class OggJsonDeserializationSchema implements 
DeserializationSchema<SeaTu
 
     private static final String FIELD_DATABASE_TABLE = "table";
 
+    private static final String FIELD_TS = "op_ts";
+
     private static final String DATA_BEFORE = "before"; // BEFORE
 
     private static final String DATA_AFTER = "after"; // AFTER
@@ -151,7 +156,13 @@ public class OggJsonDeserializationSchema implements 
DeserializationSchema<SeaTu
             }
 
             String op = jsonNode.get(FIELD_TYPE).asText().trim();
-
+            JsonNode tsNode = jsonNode.get(FIELD_TS);
+            // ogg json ts is date, eg "2020-05-13 15:40:07.000000"
+            long ts = 0;
+            if (tsNode != null) {
+                String tsDateTime = tsNode.asText();
+                ts = 
DateTimeUtils.parse(tsDateTime).toEpochSecond(ZoneOffset.UTC) * 1000;
+            }
             switch (op) {
                 case OP_INSERT:
                     // Gets the data for the INSERT operation
@@ -160,6 +171,9 @@ public class OggJsonDeserializationSchema implements 
DeserializationSchema<SeaTu
                     if (tablePath != null) {
                         row.setTableId(tablePath.toString());
                     }
+                    if (tsNode != null) {
+                        MetadataUtil.setEventTime(row, ts);
+                    }
                     out.collect(row);
                     break;
                 case OP_UPDATE:
@@ -178,12 +192,18 @@ public class OggJsonDeserializationSchema implements 
DeserializationSchema<SeaTu
                     if (tablePath != null) {
                         before.setTableId(tablePath.toString());
                     }
+                    if (tsNode != null) {
+                        MetadataUtil.setEventTime(before, ts);
+                    }
                     out.collect(before);
 
                     after.setRowKind(RowKind.UPDATE_AFTER);
                     if (tablePath != null) {
                         after.setTableId(tablePath.toString());
                     }
+                    if (tsNode != null) {
+                        MetadataUtil.setEventTime(after, ts);
+                    }
                     out.collect(after);
                     break;
                 case OP_DELETE:
@@ -202,6 +222,9 @@ public class OggJsonDeserializationSchema implements 
DeserializationSchema<SeaTu
                     if (tablePath != null) {
                         beforeDelete.setTableId(tablePath.toString());
                     }
+                    if (tsNode != null) {
+                        MetadataUtil.setEventTime(beforeDelete, ts);
+                    }
                     out.collect(beforeDelete);
                     break;
                 default:
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
index 2499736fa9..05218b6d43 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
@@ -28,7 +28,11 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
 
 public class OggJsonSerializationSchema implements SerializationSchema {
 
@@ -44,7 +48,7 @@ public class OggJsonSerializationSchema implements 
SerializationSchema {
 
     public OggJsonSerializationSchema(SeaTunnelRowType rowType) {
         this.jsonSerializer = new 
JsonSerializationSchema(createJsonRowType(rowType));
-        this.reuse = new SeaTunnelRow(2);
+        this.reuse = new SeaTunnelRow(4);
     }
 
     @Override
@@ -53,6 +57,13 @@ public class OggJsonSerializationSchema implements 
SerializationSchema {
             String opType = rowKind2String(row.getRowKind());
             reuse.setField(0, row);
             reuse.setField(1, opType);
+            if (!StringUtils.isEmpty(row.getTableId())) {
+                reuse.setField(2, row.getTableId());
+            }
+
+            if (row.getOptions() != null && 
row.getOptions().containsKey(EVENT_TIME.getName())) {
+                reuse.setField(3, row.getOptions().get(EVENT_TIME.getName()));
+            }
             return jsonSerializer.serialize(reuse);
         } catch (Throwable t) {
             throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
@@ -79,7 +90,7 @@ public class OggJsonSerializationSchema implements 
SerializationSchema {
         // but we don't need them
         // and we don't need "old" , because can not support 
UPDATE_BEFORE,UPDATE_AFTER
         return new SeaTunnelRowType(
-                new String[] {"data", "type"},
-                new SeaTunnelDataType[] {databaseSchema, STRING_TYPE});
+                new String[] {"data", "type", "table", "op_ts"},
+                new SeaTunnelDataType[] {databaseSchema, STRING_TYPE, 
STRING_TYPE, LONG_TYPE});
     }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
index 7bad34de3a..ad0c1ec960 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
@@ -207,32 +207,32 @@ public class CanalJsonSerDeSchemaTest {
 
         List<String> expectedResult =
                 Arrays.asList(
-                        
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14}],\"type\":\"INSERT\"}",
-                        "{\"data\":[{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1}],\"type\":\"INSERT\"}",
-                        "{\"data\":[{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":0.75}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":0.875}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":1.0}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind breaker\",\"weight\":0.1}],\"type\":\"INSERT\"}",
-                        "{\"data\":[{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":22.2}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":1.0}],\"type\":\"DELETE\"}",
-                        
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter 
hammer\",\"weight\":1.0}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3}],\"type\":\"DELETE\"}",
-                        
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.1}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":0.2}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.18}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":0.2}],\"type\":\"DELETE\"}",
-                        
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"new water 
resistent white wind breaker\",\"weight\":0.5}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.18}],\"type\":\"DELETE\"}",
-                        
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.17}],\"type\":\"INSERT\"}",
-                        
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.17}],\"type\":\"DELETE\"}",
-                        
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14}],\"type\":\"DELETE\"}",
-                        
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":5.17}],\"type\":\"INSERT\"}",
-                        "{\"data\":[{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1}],\"type\":\"DELETE\"}",
-                        "{\"data\":[{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":5.17}],\"type\":\"INSERT\"}",
-                        "{\"data\":[{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":5.17}],\"type\":\"DELETE\"}",
-                        "{\"data\":[{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8}],\"type\":\"DELETE\"}");
+                        
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+                        "{\"data\":[{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+                        "{\"data\":[{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+                        
"{\"data\":[{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":0.75}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+                        
"{\"data\":[{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":0.875}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+                        
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":1.0}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+                        
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+                        
"{\"data\":[{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind 
breaker\",\"weight\":0.1}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+                        "{\"data\":[{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":22.2}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+                        
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":1.0}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944202218}",
+                        
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter 
hammer\",\"weight\":1.0}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944202218}",
+                        
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944279665}",
+                        
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.1}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944279665}",
+                        
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind 
breaker\",\"weight\":0.2}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944288394}",
+                        
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.18}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944288394}",
+                        
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind 
breaker\",\"weight\":0.2}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944288717}",
+                        
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"new water 
resistent white wind 
breaker\",\"weight\":0.5}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944288717}",
+                        
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.18}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337341}",
+                        
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.17}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337341}",
+                        
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.17}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337341}",
+                        
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337663}",
+                        
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":5.17}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337663}",
+                        "{\"data\":[{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337663}",
+                        "{\"data\":[{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":5.17}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337663}",
+                        "{\"data\":[{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":5.17}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944418418}",
+                        "{\"data\":[{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944418418}");
         assertEquals(expectedResult, result);
     }
 
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
index 51996b815b..8607aedbf0 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -616,26 +616,26 @@ public class DebeziumJsonSerDeSchemaTest {
 
         expected =
                 Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small
 2-wheel scooter\",\"weight\":3.14},\"op\":\"c\"}",
-                        "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz
 carpenter's hammer\",\"weight\":0.75},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz
 carpenter's hammer\",\"weight\":0.875},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz
 carpenter's hammer\",\"weight\":1.0},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
 of assorted rocks\",\"weight\":5.3},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water
 resistent black wind breaker\",\"weight\":0.1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"c\"}",
-                        
"{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"after\":null,\"op\":\"d\"}",
-                        
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz
 carpenter hammer\",\"weight\":1.0},\"op\":\"c\"}",
-                        
"{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"after\":null,\"op\":\"d\"}",
-                        
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
 of assorted rocks\",\"weight\":5.1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water
 resistent white wind breaker\",\"weight\":0.2},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
 2-wheel scooter \",\"weight\":5.18},\"op\":\"c\"}",
-                        
"{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":0.2},\"after\":null,\"op\":\"d\"}",
-                        
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new
 water resistent white wind breaker\",\"weight\":0.5},\"op\":\"c\"}",
-                        
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.18},\"after\":null,\"op\":\"d\"}",
-                        
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
 2-wheel scooter \",\"weight\":5.17},\"op\":\"c\"}",
-                        
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.17},\"after\":null,\"op\":\"d\"}");
+                        
"{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small
 2-wheel 
scooter\",\"weight\":3.14},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606100}",
+                        "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+                        
"{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+                        
"{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz
 carpenter's 
hammer\",\"weight\":0.75},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+                        
"{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz
 carpenter's 
hammer\",\"weight\":0.875},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+                        
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz
 carpenter's 
hammer\",\"weight\":1.0},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+                        
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
 of assorted 
rocks\",\"weight\":5.3},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+                        
"{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water
 resistent black wind 
breaker\",\"weight\":0.1},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+                        
"{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":22.2},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+                        
"{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589361987936}",
+                        
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz
 carpenter 
hammer\",\"weight\":1.0},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589361987936}",
+                        
"{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362099505}",
+                        
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
 of assorted 
rocks\",\"weight\":5.1},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362099505}",
+                        
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water
 resistent white wind 
breaker\",\"weight\":0.2},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362210230}",
+                        
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
 2-wheel scooter 
\",\"weight\":5.18},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362243428}",
+                        
"{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind 
breaker\",\"weight\":0.2},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362293539}",
+                        
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new
 water resistent white wind 
breaker\",\"weight\":0.5},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362293539}",
+                        
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.18},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362330904}",
+                        
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
 2-wheel scooter 
\",\"weight\":5.17},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362330904}",
+                        
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.17},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362344455}");
         assertEquals(expected, actual);
     }
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
index f82b272cf7..9641647533 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
@@ -81,32 +81,32 @@ public class MaxWellJsonSerDeSchemaTest {
         }
         List<String> expected =
                 Arrays.asList(
-                        "SeaTunnelRow{tableId=, kind=+I, fields=[101, scooter, 
Small 2-wheel scooter, 3.14]}",
-                        "SeaTunnelRow{tableId=, kind=+I, fields=[102, car 
battery, 12V car battery, 8.1]}",
-                        "SeaTunnelRow{tableId=, kind=+I, fields=[103, 12-pack 
drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}",
-                        "SeaTunnelRow{tableId=, kind=+I, fields=[104, hammer, 
12oz carpenter's hammer, 0.75]}",
-                        "SeaTunnelRow{tableId=, kind=+I, fields=[105, hammer, 
14oz carpenter's hammer, 0.875]}",
-                        "SeaTunnelRow{tableId=, kind=+I, fields=[106, hammer, 
16oz carpenter's hammer, 1.0]}",
-                        "SeaTunnelRow{tableId=, kind=+I, fields=[107, rocks, 
box of assorted rocks, 5.3]}",
-                        "SeaTunnelRow{tableId=, kind=+I, fields=[108, jacket, 
water resistent black wind breaker, 0.1]}",
-                        "SeaTunnelRow{tableId=, kind=+I, fields=[109, spare 
tire, 24 inch spare tire, 22.2]}",
-                        "SeaTunnelRow{tableId=, kind=-U, fields=[106, hammer, 
16oz carpenter's hammer, 1.0]}",
-                        "SeaTunnelRow{tableId=, kind=+U, fields=[106, hammer, 
18oz carpenter hammer, 1.0]}",
-                        "SeaTunnelRow{tableId=, kind=-U, fields=[107, rocks, 
box of assorted rocks, 5.3]}",
-                        "SeaTunnelRow{tableId=, kind=+U, fields=[107, rocks, 
box of assorted rocks, 5.1]}",
-                        "SeaTunnelRow{tableId=, kind=+I, fields=[110, jacket, 
water resistent white wind breaker, 0.2]}",
-                        "SeaTunnelRow{tableId=, kind=+I, fields=[111, scooter, 
Big 2-wheel scooter , 5.18]}",
-                        "SeaTunnelRow{tableId=, kind=-U, fields=[110, jacket, 
water resistent white wind breaker, 0.2]}",
-                        "SeaTunnelRow{tableId=, kind=+U, fields=[110, jacket, 
new water resistent white wind breaker, 0.5]}",
-                        "SeaTunnelRow{tableId=, kind=-U, fields=[111, scooter, 
Big 2-wheel scooter , 5.18]}",
-                        "SeaTunnelRow{tableId=, kind=+U, fields=[111, scooter, 
Big 2-wheel scooter , 5.17]}",
-                        "SeaTunnelRow{tableId=, kind=-D, fields=[111, scooter, 
Big 2-wheel scooter , 5.17]}",
-                        "SeaTunnelRow{tableId=, kind=-U, fields=[101, scooter, 
Small 2-wheel scooter, 3.14]}",
-                        "SeaTunnelRow{tableId=, kind=+U, fields=[101, scooter, 
Small 2-wheel scooter, 5.17]}",
-                        "SeaTunnelRow{tableId=, kind=-U, fields=[102, car 
battery, 12V car battery, 8.1]}",
-                        "SeaTunnelRow{tableId=, kind=+U, fields=[102, car 
battery, 12V car battery, 5.17]}",
-                        "SeaTunnelRow{tableId=, kind=-D, fields=[102, car 
battery, 12V car battery, 5.17]}",
-                        "SeaTunnelRow{tableId=, kind=-D, fields=[103, 12-pack 
drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}");
+                        "SeaTunnelRow{tableId=..test, kind=+I, fields=[101, 
scooter, Small 2-wheel scooter, 3.14]}",
+                        "SeaTunnelRow{tableId=..test, kind=+I, fields=[102, 
car battery, 12V car battery, 8.1]}",
+                        "SeaTunnelRow{tableId=..test, kind=+I, fields=[103, 
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 
0.8]}",
+                        "SeaTunnelRow{tableId=..test, kind=+I, fields=[104, 
hammer, 12oz carpenter's hammer, 0.75]}",
+                        "SeaTunnelRow{tableId=..test, kind=+I, fields=[105, 
hammer, 14oz carpenter's hammer, 0.875]}",
+                        "SeaTunnelRow{tableId=..test, kind=+I, fields=[106, 
hammer, 16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=..test, kind=+I, fields=[107, 
rocks, box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=..test, kind=+I, fields=[108, 
jacket, water resistent black wind breaker, 0.1]}",
+                        "SeaTunnelRow{tableId=..test, kind=+I, fields=[109, 
spare tire, 24 inch spare tire, 22.2]}",
+                        "SeaTunnelRow{tableId=..test, kind=-U, fields=[106, 
hammer, 16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=..test, kind=+U, fields=[106, 
hammer, 18oz carpenter hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=..test, kind=-U, fields=[107, 
rocks, box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=..test, kind=+U, fields=[107, 
rocks, box of assorted rocks, 5.1]}",
+                        "SeaTunnelRow{tableId=..test, kind=+I, fields=[110, 
jacket, water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=..test, kind=+I, fields=[111, 
scooter, Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=..test, kind=-U, fields=[110, 
jacket, water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=..test, kind=+U, fields=[110, 
jacket, new water resistent white wind breaker, 0.5]}",
+                        "SeaTunnelRow{tableId=..test, kind=-U, fields=[111, 
scooter, Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=..test, kind=+U, fields=[111, 
scooter, Big 2-wheel scooter , 5.17]}",
+                        "SeaTunnelRow{tableId=..test, kind=-D, fields=[111, 
scooter, Big 2-wheel scooter , 5.17]}",
+                        "SeaTunnelRow{tableId=..test, kind=-U, fields=[101, 
scooter, Small 2-wheel scooter, 3.14]}",
+                        "SeaTunnelRow{tableId=..test, kind=+U, fields=[101, 
scooter, Small 2-wheel scooter, 5.17]}",
+                        "SeaTunnelRow{tableId=..test, kind=-U, fields=[102, 
car battery, 12V car battery, 8.1]}",
+                        "SeaTunnelRow{tableId=..test, kind=+U, fields=[102, 
car battery, 12V car battery, 5.17]}",
+                        "SeaTunnelRow{tableId=..test, kind=-D, fields=[102, 
car battery, 12V car battery, 5.17]}",
+                        "SeaTunnelRow{tableId=..test, kind=-D, fields=[103, 
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 
0.8]}");
         List<String> actual =
                 
collector.list.stream().map(Object::toString).collect(Collectors.toList());
         assertEquals(expected, actual);
@@ -121,32 +121,32 @@ public class MaxWellJsonSerDeSchemaTest {
 
         List<String> expectedResult =
                 Arrays.asList(
-                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":0.75},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":0.875},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind breaker\",\"weight\":0.1},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":22.2},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter 
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.1},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":0.2},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.18},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":0.2},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water 
resistent white wind breaker\",\"weight\":0.5},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.18},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.17},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.17},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":5.17},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1},\"type\":\"DELETE\"}",
-                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":5.17},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":5.17},\"type\":\"DELETE\"}",
-                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8},\"type\":\"DELETE\"}");
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":0.75},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":0.875},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind 
breaker\",\"weight\":0.1},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":22.2},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684893000}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter 
hammer\",\"weight\":1.0},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684893000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684897000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.1},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684897000}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind 
breaker\",\"weight\":0.2},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684900000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.18},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684904000}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind 
breaker\",\"weight\":0.2},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684906000}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water 
resistent white wind 
breaker\",\"weight\":0.5},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684906000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.18},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684912000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.17},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684912000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.17},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684914000}",
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684928000}",
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":5.17},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684928000}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684928000}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":5.17},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684928000}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":5.17},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684938000}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684938000}");
         assertEquals(expectedResult, result);
     }
 
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
index 20df0d945a..b4da3d4ae2 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
@@ -206,27 +206,26 @@ public class OggJsonSerDeSchemaTest {
 
         List<String> expectedResult =
                 Arrays.asList(
-                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":0.75},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":0.875},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind breaker\",\"weight\":0.1},\"type\":\"INSERT\"}",
-                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":22.2},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter 
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.1},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":0.2},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.18},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind breaker\",\"weight\":0.2},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water 
resistent white wind breaker\",\"weight\":0.5},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.18},\"type\":\"DELETE\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.17},\"type\":\"INSERT\"}",
-                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter \",\"weight\":5.17},\"type\":\"DELETE\"}");
-
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":3.14},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384406000}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":8.1},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":0.8},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":0.75},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":0.875},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind 
breaker\",\"weight\":0.1},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":22.2},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's 
hammer\",\"weight\":1.0},\"type\":\"DELETE\",\"table\":\"..test\",\"op_ts\":1589390787000}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter 
hammer\",\"weight\":1.0},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589390787000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.3},\"type\":\"DELETE\",\"table\":\"..test\",\"op_ts\":1589390899000}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":5.1},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589390899000}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind 
breaker\",\"weight\":0.2},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589391010000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.18},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589391043000}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent 
white wind 
breaker\",\"weight\":0.2},\"type\":\"DELETE\",\"table\":\"..test\",\"op_ts\":1589391140000}",
+                        
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water 
resistent white wind 
breaker\",\"weight\":0.5},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589391140000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.18},\"type\":\"DELETE\",\"table\":\"..test\",\"op_ts\":1589391130000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.17},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589391130000}",
+                        
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel 
scooter 
\",\"weight\":5.17},\"type\":\"DELETE\",\"table\":\"..test\",\"op_ts\":1589391144000}");
         assertEquals(expectedResult, result);
     }
 

Reply via email to