This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fb8444b946 [Feature][Format] Improve
maxwell_json,canal_json,debezium_json format add ts_ms and table (#9701)
fb8444b946 is described below
commit fb8444b946512a5a8f518e25e76d346b2118d6ea
Author: dyp12 <[email protected]>
AuthorDate: Wed Aug 20 15:21:55 2025 +0800
[Feature][Format] Improve maxwell_json,canal_json,debezium_json format add
ts_ms and table (#9701)
---
docs/en/transform-v2/metadata.md | 16 ++--
docs/zh/transform-v2/metadata.md | 16 ++--
.../seatunnel/api/table/type/CommonOptions.java | 6 +-
.../seatunnel/api/table/type/SeaTunnelRow.java | 2 +
.../seatunnel/file/local/LocalFileTest.java | 69 ++++++++++----
.../e2e/connector/kafka/KafkaFormatIT.java | 96 +++++++++----------
.../json/canal/CanalJsonDeserializationSchema.java | 14 +++
.../json/canal/CanalJsonSerializationSchema.java | 26 +++++-
.../DebeziumJsonDeserializationSchema.java | 15 +++
.../json/debezium/DebeziumJsonFormatOptions.java | 2 +-
.../debezium/DebeziumJsonSerializationSchema.java | 38 +++++++-
.../maxwell/MaxWellJsonDeserializationSchema.java | 29 ++++++
.../maxwell/MaxWellJsonSerializationSchema.java | 23 ++++-
.../json/ogg/OggJsonDeserializationSchema.java | 25 ++++-
.../json/ogg/OggJsonSerializationSchema.java | 17 +++-
.../json/canal/CanalJsonSerDeSchemaTest.java | 52 +++++------
.../json/debezium/DebeziumJsonSerDeSchemaTest.java | 40 ++++----
.../json/maxwell/MaxWellJsonSerDeSchemaTest.java | 104 ++++++++++-----------
.../format/json/ogg/OggJsonSerDeSchemaTest.java | 41 ++++----
19 files changed, 414 insertions(+), 217 deletions(-)
diff --git a/docs/en/transform-v2/metadata.md b/docs/en/transform-v2/metadata.md
index 536038b4d8..89c0bea651 100644
--- a/docs/en/transform-v2/metadata.md
+++ b/docs/en/transform-v2/metadata.md
@@ -7,14 +7,14 @@ Metadata transform plugin for adding metadata fields to data
## Available Metadata
-| Key | DataType | Description
|
-|:---------:|:--------:|:---------------------------------------------------------------------------------------------------|
-| Database | string | Name of the table that contain the row.
|
-| Table | string | Name of the table that contain the row.
|
-| RowKind | string | The type of operation
|
-| EventTime | Long | The time at which the connector processed the event.
|
-| Delay | Long | The difference between data extraction time and
database change time |
-| Partition | string | Contains the partition field of the corresponding
number table of the row, multiple using `,` join |
+| Key | DataType | Description
|
+|:---------:|:--------:|:---------------------------------------------------------------------------------------------------------|
+| Database | string | Name of the table that contain the row.
|
+| Table | string | Name of the table that contain the row.
|
+| RowKind | string | The type of operation
|
+| EventTime | Long | The time at which the connector processed the
event.And the data should be milliseconds |
+| Delay | Long | The difference between data extraction time and
database change time.And the data should be milliseconds |
+| Partition | string | Contains the partition field of the corresponding
number table of the row, multiple using `,` join |
### note
`Delay` `EventTime` only worked on cdc series connectors for now , except
TiDB-CDC
diff --git a/docs/zh/transform-v2/metadata.md b/docs/zh/transform-v2/metadata.md
index 2e92c1282f..72ebd320f8 100644
--- a/docs/zh/transform-v2/metadata.md
+++ b/docs/zh/transform-v2/metadata.md
@@ -7,14 +7,14 @@
## 支持的元数据
-| Key | DataType | Description |
-|:---------:|:--------:|:-----------------------:|
-| Database | string | 包含该行的数据库名 |
-| Table | string | 包含该行的数表名 |
-| RowKind | string | 行类型 |
-| EventTime | Long | |
-| Delay | Long | 数据抽取时间与数据库变更时间的差 |
-| Partition | string | 包含该行对应数表的分区字段,多个使用`,`连接 |
+| Key | DataType | Description |
+|:---------:|:--------:|:-----------------------------:|
+| Database | string | 包含该行的数据库名 |
+| Table | string | 包含该行的数表名 |
+| RowKind | string | 行类型 |
+| EventTime | Long | 该行的对应的数据时间,统一格式是到毫秒的时间戳 |
+| Delay | Long | 数据抽取时间与数据库变更时间的差,统一格式是到毫秒的时间戳 |
+| Partition | string | 包含该行对应数表的分区字段,多个使用`,`连接 |
### 注意事项
`Delay` `EventTime`目前只适用于cdc系列连接器,TiDB-CDC除外
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
index 8b5b36682a..d12810cad7 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
@@ -49,9 +49,13 @@ public enum CommonOptions {
ROW_KIND("RowKind", true),
/**
* The key of {@link SeaTunnelRow#getOptions()} to store the EVENT_TIME
value of the row value.
+ * And the data should be milliseconds.
*/
EVENT_TIME("EventTime", true),
- /** The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value
of the row value. */
+ /**
+ * The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value
of the row value. And
+ * the data should be milliseconds.
+ */
DELAY("Delay", true);
private final String name;
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 091284b7e3..ed44437b82 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -95,6 +95,7 @@ public final class SeaTunnelRow implements Serializable {
SeaTunnelRow newRow = new SeaTunnelRow(newFields);
newRow.setRowKind(this.getRowKind());
newRow.setTableId(this.getTableId());
+ newRow.setOptions(this.getOptions());
return newRow;
}
@@ -106,6 +107,7 @@ public final class SeaTunnelRow implements Serializable {
SeaTunnelRow newRow = new SeaTunnelRow(newFields);
newRow.setRowKind(this.getRowKind());
newRow.setTableId(this.getTableId());
+ newRow.setOptions(this.getOptions());
return newRow;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
index 66ea90e701..3e07a6f11e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
@@ -48,6 +48,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
+
@DisabledOnOs(
value = OS.WINDOWS,
disabledReason =
@@ -346,24 +348,33 @@ public class LocalFileTest {
Collections.emptyList(),
"comment");
+ Map<String, Object> rowOptions = new HashMap<>();
+ rowOptions.put(EVENT_TIME.getName(), 1L);
+
SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {1L, "A", 100});
row1.setRowKind(RowKind.INSERT);
row1.setTableId(TablePath.DEFAULT.getFullName());
+ row1.setOptions(rowOptions);
SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {2L, "B", 100});
row2.setRowKind(RowKind.INSERT);
row2.setTableId(TablePath.DEFAULT.getFullName());
+ row2.setOptions(rowOptions);
SeaTunnelRow row3 = new SeaTunnelRow(new Object[] {3L, "C", 100});
row3.setRowKind(RowKind.INSERT);
row3.setTableId(TablePath.DEFAULT.getFullName());
+ row3.setOptions(rowOptions);
SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[] {1L,
"A", 100});
row1UpdateBefore.setTableId(TablePath.DEFAULT.getFullName());
row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE);
+ row1UpdateBefore.setOptions(rowOptions);
SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[] {1L,
"A_1", 100});
row1UpdateAfter.setTableId(TablePath.DEFAULT.getFullName());
row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER);
+ row1UpdateAfter.setOptions(rowOptions);
SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[] {2L, "B",
100});
row2Delete.setTableId(TablePath.DEFAULT.getFullName());
row2Delete.setRowKind(RowKind.DELETE);
+ row2Delete.setOptions(rowOptions);
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
catalogTable,
@@ -379,22 +390,22 @@ public class LocalFileTest {
String dataStr = FileUtils.readFileToStr(path);
Assertions.assertTrue(
dataStr.contains(
-
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"INSERT\"}"));
+
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
Assertions.assertTrue(
dataStr.contains(
-
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"INSERT\"}"));
+
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
Assertions.assertTrue(
dataStr.contains(
-
"{\"data\":[{\"a\":3,\"b\":\"C\",\"c\":100}],\"type\":\"INSERT\"}"));
+
"{\"data\":[{\"a\":3,\"b\":\"C\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
Assertions.assertTrue(
dataStr.contains(
-
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"DELETE\"}"));
+
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
Assertions.assertTrue(
dataStr.contains(
-
"{\"data\":[{\"a\":1,\"b\":\"A_1\",\"c\":100}],\"type\":\"INSERT\"}"));
+
"{\"data\":[{\"a\":1,\"b\":\"A_1\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
Assertions.assertTrue(
dataStr.contains(
-
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"DELETE\"}"));
+
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
}
@Test
@@ -431,24 +442,33 @@ public class LocalFileTest {
Collections.emptyList(),
"comment");
+ Map<String, Object> rowOptions = new HashMap<>();
+ rowOptions.put(EVENT_TIME.getName(), 1L);
+
SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {1L, "A", 100});
row1.setRowKind(RowKind.INSERT);
row1.setTableId(TablePath.DEFAULT.getFullName());
+ row1.setOptions(rowOptions);
SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {2L, "B", 100});
row2.setRowKind(RowKind.INSERT);
row2.setTableId(TablePath.DEFAULT.getFullName());
+ row2.setOptions(rowOptions);
SeaTunnelRow row3 = new SeaTunnelRow(new Object[] {3L, "C", 100});
row3.setRowKind(RowKind.INSERT);
row3.setTableId(TablePath.DEFAULT.getFullName());
+ row3.setOptions(rowOptions);
SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[] {1L,
"A", 100});
row1UpdateBefore.setTableId(TablePath.DEFAULT.getFullName());
row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE);
+ row1UpdateBefore.setOptions(rowOptions);
SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[] {1L,
"A_1", 100});
row1UpdateAfter.setTableId(TablePath.DEFAULT.getFullName());
row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER);
+ row1UpdateAfter.setOptions(rowOptions);
SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[] {2L, "B",
100});
row2Delete.setTableId(TablePath.DEFAULT.getFullName());
row2Delete.setRowKind(RowKind.DELETE);
+ row2Delete.setOptions(rowOptions);
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
catalogTable,
@@ -464,22 +484,22 @@ public class LocalFileTest {
String dataStr = FileUtils.readFileToStr(path);
Assertions.assertTrue(
dataStr.contains(
-
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A\",\"c\":100},\"op\":\"c\"}"));
+
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
Assertions.assertTrue(
dataStr.contains(
-
"{\"before\":null,\"after\":{\"a\":2,\"b\":\"B\",\"c\":100},\"op\":\"c\"}"));
+
"{\"before\":null,\"after\":{\"a\":2,\"b\":\"B\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
Assertions.assertTrue(
dataStr.contains(
-
"{\"before\":null,\"after\":{\"a\":3,\"b\":\"C\",\"c\":100},\"op\":\"c\"}"));
+
"{\"before\":null,\"after\":{\"a\":3,\"b\":\"C\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
Assertions.assertTrue(
dataStr.contains(
-
"{\"before\":{\"a\":1,\"b\":\"A\",\"c\":100},\"after\":null,\"op\":\"d\"}"));
+
"{\"before\":{\"a\":1,\"b\":\"A\",\"c\":100},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
Assertions.assertTrue(
dataStr.contains(
-
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"op\":\"c\"}"));
+
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
Assertions.assertTrue(
dataStr.contains(
-
"{\"before\":{\"a\":2,\"b\":\"B\",\"c\":100},\"after\":null,\"op\":\"d\"}"));
+
"{\"before\":{\"a\":2,\"b\":\"B\",\"c\":100},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
}
@Test
@@ -515,25 +535,33 @@ public class LocalFileTest {
Collections.emptyMap(),
Collections.emptyList(),
"comment");
+ Map<String, Object> rowOptions = new HashMap<>();
+ rowOptions.put(EVENT_TIME.getName(), 1L);
SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {1L, "A", 100});
row1.setRowKind(RowKind.INSERT);
row1.setTableId(TablePath.DEFAULT.getFullName());
+ row1.setOptions(rowOptions);
SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {2L, "B", 100});
row2.setRowKind(RowKind.INSERT);
row2.setTableId(TablePath.DEFAULT.getFullName());
+ row2.setOptions(rowOptions);
SeaTunnelRow row3 = new SeaTunnelRow(new Object[] {3L, "C", 100});
row3.setRowKind(RowKind.INSERT);
row3.setTableId(TablePath.DEFAULT.getFullName());
+ row3.setOptions(rowOptions);
SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[] {1L,
"A", 100});
row1UpdateBefore.setTableId(TablePath.DEFAULT.getFullName());
row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE);
+ row1UpdateBefore.setOptions(rowOptions);
SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[] {1L,
"A_1", 100});
row1UpdateAfter.setTableId(TablePath.DEFAULT.getFullName());
row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER);
+ row1UpdateAfter.setOptions(rowOptions);
SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[] {2L, "B",
100});
row2Delete.setTableId(TablePath.DEFAULT.getFullName());
row2Delete.setRowKind(RowKind.DELETE);
+ row2Delete.setOptions(rowOptions);
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
catalogTable,
@@ -548,17 +576,22 @@ public class LocalFileTest {
Path path =
Paths.get("/tmp/seatunnel/LocalFileTest/maxwell_json_file.maxwell_json");
String dataStr = FileUtils.readFileToStr(path);
Assertions.assertTrue(
-
dataStr.contains("{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"INSERT\"}"));
+ dataStr.contains(
+
"{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
Assertions.assertTrue(
-
dataStr.contains("{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"INSERT\"}"));
+ dataStr.contains(
+
"{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
Assertions.assertTrue(
-
dataStr.contains("{\"data\":{\"a\":3,\"b\":\"C\",\"c\":100},\"type\":\"INSERT\"}"));
+ dataStr.contains(
+
"{\"data\":{\"a\":3,\"b\":\"C\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
Assertions.assertTrue(
-
dataStr.contains("{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"DELETE\"}"));
+ dataStr.contains(
+
"{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
Assertions.assertTrue(
dataStr.contains(
-
"{\"data\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"type\":\"INSERT\"}"));
+
"{\"data\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
Assertions.assertTrue(
-
dataStr.contains("{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"DELETE\"}"));
+ dataStr.contains(
+
"{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
index 36e24ec9b0..5025d676c7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
@@ -503,20 +503,20 @@ public class KafkaFormatIT extends TestSuiteBase
implements TestResource {
private void checkCanalFormat() {
List<String> expectedResult =
Arrays.asList(
-
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.14\"}],\"type\":\"INSERT\"}",
- "{\"data\":[{\"id\":1102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":\"8.1\"}],\"type\":\"INSERT\"}",
- "{\"data\":[{\"id\":1103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":\"0.8\"}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":1104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":\"0.75\"}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":1105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":\"0.875\"}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":1106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":\"1.0\"}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.3\"}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":1108,\"name\":\"jacket\",\"description\":\"water resistent
black wind breaker\",\"weight\":\"0.1\"}],\"type\":\"INSERT\"}",
- "{\"data\":[{\"id\":1109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.2\"}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.14\"}],\"type\":\"DELETE\"}",
-
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"4.56\"}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.3\"}],\"type\":\"DELETE\"}",
-
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"7.88\"}],\"type\":\"INSERT\"}",
- "{\"data\":[{\"id\":1109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.2\"}],\"type\":\"DELETE\"}");
+
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.14\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+ "{\"data\":[{\"id\":1102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":\"8.1\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+ "{\"data\":[{\"id\":1103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":\"0.8\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+
"{\"data\":[{\"id\":1104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":\"0.75\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+
"{\"data\":[{\"id\":1105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":\"0.875\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+
"{\"data\":[{\"id\":1106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":\"1.0\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.3\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+
"{\"data\":[{\"id\":1108,\"name\":\"jacket\",\"description\":\"water resistent
black wind
breaker\",\"weight\":\"0.1\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+ "{\"data\":[{\"id\":1109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.2\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900618}",
+
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.14\"}],\"type\":\"DELETE\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900619}",
+
"{\"data\":[{\"id\":1101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"4.56\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900619}",
+
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.3\"}],\"type\":\"DELETE\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900619}",
+
"{\"data\":[{\"id\":1107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"7.88\"}],\"type\":\"INSERT\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900619}",
+ "{\"data\":[{\"id\":1109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.2\"}],\"type\":\"DELETE\",\"database\":null,\"table\":\"test-cdc_mds\",\"ts\":1697788900619}");
ArrayList<String> result = new ArrayList<>();
ArrayList<String> topics = new ArrayList<>();
@@ -562,20 +562,20 @@ public class KafkaFormatIT extends TestSuiteBase
implements TestResource {
private void checkMaxWellFormat() {
List<String> expectedResult =
Arrays.asList(
-
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":\"1.0\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.14\"},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"4.56\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.3\"},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"7.88\"},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.2\"},\"type\":\"DELETE\"}");
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+ "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":\"0.8\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":\"1.0\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind
breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+ "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.14\"},\"type\":\"DELETE\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"4.56\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.3\"},\"type\":\"DELETE\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"7.88\"},\"type\":\"INSERT\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}",
+ "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.2\"},\"type\":\"DELETE\",\"database\":null,\"table\":\"maxwell-test-cdc_mds\",\"ts\":1699253290000}");
ArrayList<String> result = new ArrayList<>();
ArrayList<String> topics = new ArrayList<>();
@@ -619,26 +619,26 @@ public class KafkaFormatIT extends TestSuiteBase
implements TestResource {
private void checkOggFormat() {
List<String> kafkaExpectedResult =
Arrays.asList(
-
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.140000104904175\"},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":\"8.100000381469727\"},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":\"0.800000011920929\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":\"1\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.300000190734863\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind breaker\",\"weight\":\"0.10000000149011612\"},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.200000762939453\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":\"1\"},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter
hammer\",\"weight\":\"1\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.300000190734863\"},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.099999904632568\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":\"0.20000000298023224\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":\"5.179999828338623\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":\"0.20000000298023224\"},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water
resistent white wind breaker\",\"weight\":\"0.5\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":\"5.179999828338623\"},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":\"5.170000076293945\"},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":\"5.170000076293945\"},\"type\":\"DELETE\"}");
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.140000104904175\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384406000}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":\"8.100000381469727\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+ "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":\"0.800000011920929\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":\"1\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.300000190734863\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind
breaker\",\"weight\":\"0.10000000149011612\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+ "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.200000762939453\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":\"1\"},\"type\":\"DELETE\",\"table\":\"test-ogg-source\",\"op_ts\":1589390787000}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter
hammer\",\"weight\":\"1\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589390787000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.300000190734863\"},\"type\":\"DELETE\",\"table\":\"test-ogg-source\",\"op_ts\":1589390899000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"5.099999904632568\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589390899000}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind
breaker\",\"weight\":\"0.20000000298023224\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589391010000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":\"5.179999828338623\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589391043000}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind
breaker\",\"weight\":\"0.20000000298023224\"},\"type\":\"DELETE\",\"table\":\"test-ogg-source\",\"op_ts\":1589391140000}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water
resistent white wind
breaker\",\"weight\":\"0.5\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589391140000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":\"5.179999828338623\"},\"type\":\"DELETE\",\"table\":\"test-ogg-source\",\"op_ts\":1589391130000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":\"5.170000076293945\"},\"type\":\"INSERT\",\"table\":\"test-ogg-source\",\"op_ts\":1589391130000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":\"5.170000076293945\"},\"type\":\"DELETE\",\"table\":\"test-ogg-source\",\"op_ts\":1589391144000}");
ArrayList<String> checkKafkaConsumerResult = new ArrayList<>();
ArrayList<String> topics = new ArrayList<>();
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
index 77055a2f8e..df9d30af5a 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -57,6 +58,8 @@ public class CanalJsonDeserializationSchema implements
DeserializationSchema<Sea
private static final String FIELD_TABLE = "table";
+ private static final String FIELD_TS = "ts";
+
private static final String OP_INSERT = "INSERT";
private static final String OP_UPDATE = "UPDATE";
@@ -137,6 +140,7 @@ public class CanalJsonDeserializationSchema implements
DeserializationSchema<Sea
JsonNode dataNode = jsonNode.get(FIELD_DATA);
String op = jsonNode.get(FIELD_TYPE).asText();
+ JsonNode tsNode = jsonNode.get(FIELD_TS);
// When a null value is encountered, an exception needs to be
thrown for easy sensing
if (dataNode == null || dataNode.isNull()) {
// We'll skip the query or create or alter event data
@@ -154,6 +158,9 @@ public class CanalJsonDeserializationSchema implements
DeserializationSchema<Sea
if (tablePath != null &&
!tablePath.toString().isEmpty()) {
row.setTableId(tablePath.toString());
}
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(row, tsNode.asLong());
+ }
out.collect(row);
}
break;
@@ -178,6 +185,10 @@ public class CanalJsonDeserializationSchema implements
DeserializationSchema<Sea
if (tablePath != null &&
!tablePath.toString().isEmpty()) {
after.setTableId(tablePath.toString());
}
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(before, tsNode.asLong());
+ MetadataUtil.setEventTime(after, tsNode.asLong());
+ }
out.collect(before);
out.collect(after);
}
@@ -189,6 +200,9 @@ public class CanalJsonDeserializationSchema implements
DeserializationSchema<Sea
if (tablePath != null &&
!tablePath.toString().isEmpty()) {
row.setTableId(tablePath.toString());
}
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(row, tsNode.asLong());
+ }
out.collect(row);
}
break;
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
index c3663ab86c..03575d052d 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
@@ -19,6 +19,7 @@
package org.apache.seatunnel.format.json.canal;
import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -29,9 +30,13 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.commons.lang3.StringUtils;
+
import java.nio.charset.Charset;
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
public class CanalJsonSerializationSchema implements SerializationSchema {
@@ -48,12 +53,12 @@ public class CanalJsonSerializationSchema implements
SerializationSchema {
public CanalJsonSerializationSchema(SeaTunnelRowType rowType) {
this.jsonSerializer = new
JsonSerializationSchema(createJsonRowType(rowType));
- this.reuse = new SeaTunnelRow(2);
+ this.reuse = new SeaTunnelRow(5);
}
public CanalJsonSerializationSchema(SeaTunnelRowType rowType, Charset
charset) {
this.jsonSerializer = new
JsonSerializationSchema(createJsonRowType(rowType), charset);
- this.reuse = new SeaTunnelRow(2);
+ this.reuse = new SeaTunnelRow(5);
}
@Override
@@ -62,6 +67,15 @@ public class CanalJsonSerializationSchema implements
SerializationSchema {
String opType = rowKind2String(row.getRowKind());
reuse.setField(0, new SeaTunnelRow[] {row});
reuse.setField(1, opType);
+ if (!StringUtils.isEmpty(row.getTableId())) {
+ reuse.setField(2,
TablePath.of(row.getTableId()).getDatabaseName());
+ reuse.setField(3,
TablePath.of(row.getTableId()).getTableName());
+ }
+
+ if (row.getOptions() != null &&
row.getOptions().containsKey(EVENT_TIME.getName())) {
+ reuse.setField(4, row.getOptions().get(EVENT_TIME.getName()));
+ }
+
return jsonSerializer.serialize(reuse);
} catch (Throwable t) {
throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
@@ -88,9 +102,13 @@ public class CanalJsonSerializationSchema implements
SerializationSchema {
// but we don't need them
// and we don't need "old" , because can not support
UPDATE_BEFORE,UPDATE_AFTER
return new SeaTunnelRowType(
- new String[] {"data", "type"},
+ new String[] {"data", "type", "database", "table", "ts"},
new SeaTunnelDataType[] {
- new ArrayType<>(SeaTunnelRowType[].class, databaseSchema),
STRING_TYPE
+ new ArrayType<>(SeaTunnelRowType[].class, databaseSchema),
+ STRING_TYPE,
+ STRING_TYPE,
+ STRING_TYPE,
+ LONG_TYPE
});
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
index 47afa1a015..122af68d23 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -46,6 +47,7 @@ public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<
public static final String DATA_PAYLOAD = "payload";
private static final String DATA_BEFORE = "before";
private static final String DATA_AFTER = "after";
+ private static final String DATA_TS = "ts_ms";
private static final String REPLICA_IDENTITY_EXCEPTION =
"The \"before\" field of %s operation is null, "
@@ -117,6 +119,7 @@ public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<
private void parsePayload(Collector<SeaTunnelRow> out, TablePath
tablePath, JsonNode payload)
throws IOException {
String op = payload.get(OP_KEY).asText();
+ JsonNode tsNode = payload.get(DATA_TS);
switch (op) {
case OP_CREATE:
@@ -126,6 +129,9 @@ public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<
if (tablePath != null) {
insert.setTableId(tablePath.toString());
}
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(insert, tsNode.asLong());
+ }
out.collect(insert);
break;
case OP_UPDATE:
@@ -138,6 +144,9 @@ public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<
if (tablePath != null) {
before.setTableId(tablePath.toString());
}
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(before, tsNode.asLong());
+ }
out.collect(before);
SeaTunnelRow after =
debeziumRowConverter.parse(payload.get(DATA_AFTER));
@@ -146,6 +155,9 @@ public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<
if (tablePath != null) {
after.setTableId(tablePath.toString());
}
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(after, tsNode.asLong());
+ }
out.collect(after);
break;
case OP_DELETE:
@@ -158,6 +170,9 @@ public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<
if (tablePath != null) {
delete.setTableId(tablePath.toString());
}
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(delete, tsNode.asLong());
+ }
out.collect(delete);
break;
default:
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
index eb75bfd2b0..43cab50c9d 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonFormatOptions.java
@@ -25,7 +25,7 @@ import java.util.Map;
public class DebeziumJsonFormatOptions {
- public static final int GENERATE_ROW_SIZE = 3;
+ public static final int GENERATE_ROW_SIZE = 5;
public static final Option<Boolean> IGNORE_PARSE_ERRORS =
JsonFormatOptions.IGNORE_PARSE_ERRORS;
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
index c58ddbce7c..a768d29e3e 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
@@ -18,15 +18,24 @@
package org.apache.seatunnel.format.json.debezium;
import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.commons.lang3.StringUtils;
+
import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
import static
org.apache.seatunnel.format.json.debezium.DebeziumJsonFormatOptions.GENERATE_ROW_SIZE;
public class DebeziumJsonSerializationSchema implements SerializationSchema {
@@ -53,18 +62,37 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema {
@Override
public byte[] serialize(SeaTunnelRow row) {
try {
+ Map<String, String> source = new HashMap<>();
+ if (!StringUtils.isEmpty(row.getTableId())) {
+ source.put("schema",
TablePath.of(row.getTableId()).getSchemaName());
+ source.put("database",
TablePath.of(row.getTableId()).getDatabaseName());
+ source.put("table",
TablePath.of(row.getTableId()).getTableName());
+ }
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
genericRow.setField(0, null);
genericRow.setField(1, row);
genericRow.setField(2, OP_INSERT);
+ genericRow.setField(3, source);
+
+ if (row.getOptions() != null
+ &&
row.getOptions().containsKey(EVENT_TIME.getName())) {
+ genericRow.setField(4,
row.getOptions().get(EVENT_TIME.getName()));
+ } else {
+ genericRow.setField(4, null);
+ }
return jsonSerializer.serialize(genericRow);
case UPDATE_BEFORE:
case DELETE:
genericRow.setField(0, row);
genericRow.setField(1, null);
genericRow.setField(2, OP_DELETE);
+ genericRow.setField(3, source);
+ if (row.getOptions() != null
+ &&
row.getOptions().containsKey(EVENT_TIME.getName())) {
+ genericRow.setField(4,
row.getOptions().get(EVENT_TIME.getName()));
+ }
return jsonSerializer.serialize(genericRow);
default:
throw new UnsupportedOperationException(
@@ -78,7 +106,13 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema {
private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType
databaseSchema) {
return new SeaTunnelRowType(
- new String[] {"before", "after", "op"},
- new SeaTunnelDataType[] {databaseSchema, databaseSchema,
STRING_TYPE});
+ new String[] {"before", "after", "op", "source", "ts_ms"},
+ new SeaTunnelDataType[] {
+ databaseSchema,
+ databaseSchema,
+ STRING_TYPE,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE),
+ LONG_TYPE
+ });
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
index a87784353e..335a443ed0 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonDeserializationSchema.java
@@ -24,6 +24,8 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -33,6 +35,7 @@ import
org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import java.io.IOException;
+import java.util.Optional;
import java.util.regex.Pattern;
import static java.lang.String.format;
@@ -57,6 +60,8 @@ public class MaxWellJsonDeserializationSchema implements
DeserializationSchema<S
private static final String FIELD_TABLE = "table";
+ private static final String FIELD_TS = "ts";
+
private final String database;
private final String table;
@@ -110,6 +115,9 @@ public class MaxWellJsonDeserializationSchema implements
DeserializationSchema<S
if (message == null) {
return;
}
+ TablePath tablePath =
+
Optional.ofNullable(catalogTable).map(CatalogTable::getTablePath).orElse(null);
+
ObjectNode jsonNode = (ObjectNode) convertBytes(message);
if (database != null
&&
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
@@ -120,9 +128,16 @@ public class MaxWellJsonDeserializationSchema implements
DeserializationSchema<S
}
JsonNode dataNode = jsonNode.get(FIELD_DATA);
String type = jsonNode.get(FIELD_TYPE).asText();
+ JsonNode tsNode = jsonNode.get(FIELD_TS);
if (OP_INSERT.equals(type)) {
SeaTunnelRow rowInsert = convertJsonNode(dataNode);
rowInsert.setRowKind(RowKind.INSERT);
+ if (tablePath != null && !tablePath.toString().isEmpty()) {
+ rowInsert.setTableId(tablePath.toString());
+ }
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(rowInsert, tsNode.asLong() * 1000);
+ }
out.collect(rowInsert);
} else if (OP_UPDATE.equals(type)) {
SeaTunnelRow rowAfter = convertJsonNode(dataNode);
@@ -142,11 +157,25 @@ public class MaxWellJsonDeserializationSchema implements
DeserializationSchema<S
rowBefore.setRowKind(RowKind.UPDATE_BEFORE);
assert rowAfter != null;
rowAfter.setRowKind(RowKind.UPDATE_AFTER);
+ if (tablePath != null && !tablePath.toString().isEmpty()) {
+ rowBefore.setTableId(tablePath.toString());
+ rowAfter.setTableId(tablePath.toString());
+ }
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(rowBefore, tsNode.asLong() * 1000);
+ MetadataUtil.setEventTime(rowAfter, tsNode.asLong() * 1000);
+ }
out.collect(rowBefore);
out.collect(rowAfter);
} else if (OP_DELETE.equals(type)) {
SeaTunnelRow rowDelete = convertJsonNode(dataNode);
rowDelete.setRowKind(RowKind.DELETE);
+ if (tablePath != null && !tablePath.toString().isEmpty()) {
+ rowDelete.setTableId(tablePath.toString());
+ }
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(rowDelete, tsNode.asLong() * 1000);
+ }
out.collect(rowDelete);
} else {
if (!ignoreParseErrors) {
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
index 1134a0527c..191512c49c 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerializationSchema.java
@@ -19,6 +19,7 @@
package org.apache.seatunnel.format.json.maxwell;
import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -28,9 +29,13 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.commons.lang3.StringUtils;
+
import java.nio.charset.Charset;
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
public class MaxWellJsonSerializationSchema implements SerializationSchema {
@@ -47,12 +52,12 @@ public class MaxWellJsonSerializationSchema implements
SerializationSchema {
public MaxWellJsonSerializationSchema(SeaTunnelRowType rowType) {
this.jsonSerializer = new
JsonSerializationSchema(createJsonRowType(rowType));
- this.reuse = new SeaTunnelRow(2);
+ this.reuse = new SeaTunnelRow(5);
}
public MaxWellJsonSerializationSchema(SeaTunnelRowType rowType, Charset
charset) {
this.jsonSerializer = new
JsonSerializationSchema(createJsonRowType(rowType), charset);
- this.reuse = new SeaTunnelRow(2);
+ this.reuse = new SeaTunnelRow(5);
}
@Override
@@ -61,6 +66,14 @@ public class MaxWellJsonSerializationSchema implements
SerializationSchema {
String opType = rowKind2String(row.getRowKind());
reuse.setField(0, row);
reuse.setField(1, opType);
+ reuse.setField(2, row.getTableId());
+ if (!StringUtils.isEmpty(row.getTableId())) {
+ reuse.setField(2,
TablePath.of(row.getTableId()).getDatabaseName());
+ reuse.setField(3,
TablePath.of(row.getTableId()).getTableName());
+ }
+ if (row.getOptions() != null &&
row.getOptions().containsKey(EVENT_TIME.getName())) {
+ reuse.setField(4, row.getOptions().get(EVENT_TIME.getName()));
+ }
return jsonSerializer.serialize(reuse);
} catch (Throwable t) {
throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
@@ -87,7 +100,9 @@ public class MaxWellJsonSerializationSchema implements
SerializationSchema {
// but we don't need them
// and we don't need "old" , because can not support
UPDATE_BEFORE,UPDATE_AFTER
return new SeaTunnelRowType(
- new String[] {"data", "type"},
- new SeaTunnelDataType[] {databaseSchema, STRING_TYPE});
+ new String[] {"data", "type", "database", "table", "ts"},
+ new SeaTunnelDataType[] {
+ databaseSchema, STRING_TYPE, STRING_TYPE, STRING_TYPE,
LONG_TYPE
+ });
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
index 14f3b5944e..f1731e3872 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
@@ -25,17 +25,20 @@ import
org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import lombok.NonNull;
import java.io.IOException;
+import java.time.ZoneOffset;
import java.util.Optional;
import java.util.regex.Pattern;
@@ -49,6 +52,8 @@ public class OggJsonDeserializationSchema implements
DeserializationSchema<SeaTu
private static final String FIELD_DATABASE_TABLE = "table";
+ private static final String FIELD_TS = "op_ts";
+
private static final String DATA_BEFORE = "before"; // BEFORE
private static final String DATA_AFTER = "after"; // AFTER
@@ -151,7 +156,13 @@ public class OggJsonDeserializationSchema implements
DeserializationSchema<SeaTu
}
String op = jsonNode.get(FIELD_TYPE).asText().trim();
-
+ JsonNode tsNode = jsonNode.get(FIELD_TS);
+ // ogg json ts is date, eg "2020-05-13 15:40:07.000000"
+ long ts = 0;
+ if (tsNode != null) {
+ String tsDateTime = tsNode.asText();
+ ts =
DateTimeUtils.parse(tsDateTime).toEpochSecond(ZoneOffset.UTC) * 1000;
+ }
switch (op) {
case OP_INSERT:
// Gets the data for the INSERT operation
@@ -160,6 +171,9 @@ public class OggJsonDeserializationSchema implements
DeserializationSchema<SeaTu
if (tablePath != null) {
row.setTableId(tablePath.toString());
}
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(row, ts);
+ }
out.collect(row);
break;
case OP_UPDATE:
@@ -178,12 +192,18 @@ public class OggJsonDeserializationSchema implements
DeserializationSchema<SeaTu
if (tablePath != null) {
before.setTableId(tablePath.toString());
}
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(before, ts);
+ }
out.collect(before);
after.setRowKind(RowKind.UPDATE_AFTER);
if (tablePath != null) {
after.setTableId(tablePath.toString());
}
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(after, ts);
+ }
out.collect(after);
break;
case OP_DELETE:
@@ -202,6 +222,9 @@ public class OggJsonDeserializationSchema implements
DeserializationSchema<SeaTu
if (tablePath != null) {
beforeDelete.setTableId(tablePath.toString());
}
+ if (tsNode != null) {
+ MetadataUtil.setEventTime(beforeDelete, ts);
+ }
out.collect(beforeDelete);
break;
default:
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
index 2499736fa9..05218b6d43 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
@@ -28,7 +28,11 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
public class OggJsonSerializationSchema implements SerializationSchema {
@@ -44,7 +48,7 @@ public class OggJsonSerializationSchema implements
SerializationSchema {
public OggJsonSerializationSchema(SeaTunnelRowType rowType) {
this.jsonSerializer = new
JsonSerializationSchema(createJsonRowType(rowType));
- this.reuse = new SeaTunnelRow(2);
+ this.reuse = new SeaTunnelRow(4);
}
@Override
@@ -53,6 +57,13 @@ public class OggJsonSerializationSchema implements
SerializationSchema {
String opType = rowKind2String(row.getRowKind());
reuse.setField(0, row);
reuse.setField(1, opType);
+ if (!StringUtils.isEmpty(row.getTableId())) {
+ reuse.setField(2, row.getTableId());
+ }
+
+ if (row.getOptions() != null &&
row.getOptions().containsKey(EVENT_TIME.getName())) {
+ reuse.setField(3, row.getOptions().get(EVENT_TIME.getName()));
+ }
return jsonSerializer.serialize(reuse);
} catch (Throwable t) {
throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
@@ -79,7 +90,7 @@ public class OggJsonSerializationSchema implements
SerializationSchema {
// but we don't need them
// and we don't need "old" , because can not support
UPDATE_BEFORE,UPDATE_AFTER
return new SeaTunnelRowType(
- new String[] {"data", "type"},
- new SeaTunnelDataType[] {databaseSchema, STRING_TYPE});
+ new String[] {"data", "type", "table", "op_ts"},
+ new SeaTunnelDataType[] {databaseSchema, STRING_TYPE,
STRING_TYPE, LONG_TYPE});
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
index 7bad34de3a..ad0c1ec960 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
@@ -207,32 +207,32 @@ public class CanalJsonSerDeSchemaTest {
List<String> expectedResult =
Arrays.asList(
-
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14}],\"type\":\"INSERT\"}",
- "{\"data\":[{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1}],\"type\":\"INSERT\"}",
- "{\"data\":[{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":0.75}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":0.875}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":1.0}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind breaker\",\"weight\":0.1}],\"type\":\"INSERT\"}",
- "{\"data\":[{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":22.2}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":1.0}],\"type\":\"DELETE\"}",
-
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter
hammer\",\"weight\":1.0}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3}],\"type\":\"DELETE\"}",
-
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.1}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":0.2}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.18}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":0.2}],\"type\":\"DELETE\"}",
-
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"new water
resistent white wind breaker\",\"weight\":0.5}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.18}],\"type\":\"DELETE\"}",
-
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.17}],\"type\":\"INSERT\"}",
-
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.17}],\"type\":\"DELETE\"}",
-
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14}],\"type\":\"DELETE\"}",
-
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":5.17}],\"type\":\"INSERT\"}",
- "{\"data\":[{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1}],\"type\":\"DELETE\"}",
- "{\"data\":[{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":5.17}],\"type\":\"INSERT\"}",
- "{\"data\":[{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":5.17}],\"type\":\"DELETE\"}",
- "{\"data\":[{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8}],\"type\":\"DELETE\"}");
+
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+ "{\"data\":[{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+ "{\"data\":[{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+
"{\"data\":[{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":0.75}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+
"{\"data\":[{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":0.875}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":1.0}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+
"{\"data\":[{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind
breaker\",\"weight\":0.1}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+ "{\"data\":[{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":22.2}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944146308}",
+
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":1.0}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944202218}",
+
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter
hammer\",\"weight\":1.0}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944202218}",
+
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944279665}",
+
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.1}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944279665}",
+
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind
breaker\",\"weight\":0.2}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944288394}",
+
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.18}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944288394}",
+
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind
breaker\",\"weight\":0.2}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944288717}",
+
"{\"data\":[{\"id\":110,\"name\":\"jacket\",\"description\":\"new water
resistent white wind
breaker\",\"weight\":0.5}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944288717}",
+
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.18}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337341}",
+
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.17}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337341}",
+
"{\"data\":[{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.17}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337341}",
+
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337663}",
+
"{\"data\":[{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":5.17}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337663}",
+ "{\"data\":[{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337663}",
+ "{\"data\":[{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":5.17}],\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944337663}",
+ "{\"data\":[{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":5.17}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944418418}",
+ "{\"data\":[{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8}],\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1598944418418}");
assertEquals(expectedResult, result);
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
index 51996b815b..8607aedbf0 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -616,26 +616,26 @@ public class DebeziumJsonSerDeSchemaTest {
expected =
Arrays.asList(
-
"{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small
2-wheel scooter\",\"weight\":3.14},\"op\":\"c\"}",
- "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz
carpenter's hammer\",\"weight\":0.75},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz
carpenter's hammer\",\"weight\":0.875},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz
carpenter's hammer\",\"weight\":1.0},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
of assorted rocks\",\"weight\":5.3},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water
resistent black wind breaker\",\"weight\":0.1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"c\"}",
-
"{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"after\":null,\"op\":\"d\"}",
-
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz
carpenter hammer\",\"weight\":1.0},\"op\":\"c\"}",
-
"{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"after\":null,\"op\":\"d\"}",
-
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
of assorted rocks\",\"weight\":5.1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water
resistent white wind breaker\",\"weight\":0.2},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
2-wheel scooter \",\"weight\":5.18},\"op\":\"c\"}",
-
"{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":0.2},\"after\":null,\"op\":\"d\"}",
-
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new
water resistent white wind breaker\",\"weight\":0.5},\"op\":\"c\"}",
-
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.18},\"after\":null,\"op\":\"d\"}",
-
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
2-wheel scooter \",\"weight\":5.17},\"op\":\"c\"}",
-
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.17},\"after\":null,\"op\":\"d\"}");
+
"{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small
2-wheel
scooter\",\"weight\":3.14},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606100}",
+ "{\"before\":null,\"after\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+
"{\"before\":null,\"after\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+
"{\"before\":null,\"after\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz
carpenter's
hammer\",\"weight\":0.75},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+
"{\"before\":null,\"after\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz
carpenter's
hammer\",\"weight\":0.875},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz
carpenter's
hammer\",\"weight\":1.0},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
of assorted
rocks\",\"weight\":5.3},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+
"{\"before\":null,\"after\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water
resistent black wind
breaker\",\"weight\":0.1},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+
"{\"before\":null,\"after\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":22.2},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589355606101}",
+
"{\"before\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589361987936}",
+
"{\"before\":null,\"after\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz
carpenter
hammer\",\"weight\":1.0},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589361987936}",
+
"{\"before\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362099505}",
+
"{\"before\":null,\"after\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box
of assorted
rocks\",\"weight\":5.1},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362099505}",
+
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water
resistent white wind
breaker\",\"weight\":0.2},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362210230}",
+
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
2-wheel scooter
\",\"weight\":5.18},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362243428}",
+
"{\"before\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind
breaker\",\"weight\":0.2},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362293539}",
+
"{\"before\":null,\"after\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new
water resistent white wind
breaker\",\"weight\":0.5},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362293539}",
+
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.18},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362330904}",
+
"{\"before\":null,\"after\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big
2-wheel scooter
\",\"weight\":5.17},\"op\":\"c\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362330904}",
+
"{\"before\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.17},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"\",\"database\":\"\",\"table\":\"test\"},\"ts_ms\":1589362344455}");
assertEquals(expected, actual);
}
//
--------------------------------------------------------------------------------------------
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
index f82b272cf7..9641647533 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/maxwell/MaxWellJsonSerDeSchemaTest.java
@@ -81,32 +81,32 @@ public class MaxWellJsonSerDeSchemaTest {
}
List<String> expected =
Arrays.asList(
- "SeaTunnelRow{tableId=, kind=+I, fields=[101, scooter,
Small 2-wheel scooter, 3.14]}",
- "SeaTunnelRow{tableId=, kind=+I, fields=[102, car
battery, 12V car battery, 8.1]}",
- "SeaTunnelRow{tableId=, kind=+I, fields=[103, 12-pack
drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}",
- "SeaTunnelRow{tableId=, kind=+I, fields=[104, hammer,
12oz carpenter's hammer, 0.75]}",
- "SeaTunnelRow{tableId=, kind=+I, fields=[105, hammer,
14oz carpenter's hammer, 0.875]}",
- "SeaTunnelRow{tableId=, kind=+I, fields=[106, hammer,
16oz carpenter's hammer, 1.0]}",
- "SeaTunnelRow{tableId=, kind=+I, fields=[107, rocks,
box of assorted rocks, 5.3]}",
- "SeaTunnelRow{tableId=, kind=+I, fields=[108, jacket,
water resistent black wind breaker, 0.1]}",
- "SeaTunnelRow{tableId=, kind=+I, fields=[109, spare
tire, 24 inch spare tire, 22.2]}",
- "SeaTunnelRow{tableId=, kind=-U, fields=[106, hammer,
16oz carpenter's hammer, 1.0]}",
- "SeaTunnelRow{tableId=, kind=+U, fields=[106, hammer,
18oz carpenter hammer, 1.0]}",
- "SeaTunnelRow{tableId=, kind=-U, fields=[107, rocks,
box of assorted rocks, 5.3]}",
- "SeaTunnelRow{tableId=, kind=+U, fields=[107, rocks,
box of assorted rocks, 5.1]}",
- "SeaTunnelRow{tableId=, kind=+I, fields=[110, jacket,
water resistent white wind breaker, 0.2]}",
- "SeaTunnelRow{tableId=, kind=+I, fields=[111, scooter,
Big 2-wheel scooter , 5.18]}",
- "SeaTunnelRow{tableId=, kind=-U, fields=[110, jacket,
water resistent white wind breaker, 0.2]}",
- "SeaTunnelRow{tableId=, kind=+U, fields=[110, jacket,
new water resistent white wind breaker, 0.5]}",
- "SeaTunnelRow{tableId=, kind=-U, fields=[111, scooter,
Big 2-wheel scooter , 5.18]}",
- "SeaTunnelRow{tableId=, kind=+U, fields=[111, scooter,
Big 2-wheel scooter , 5.17]}",
- "SeaTunnelRow{tableId=, kind=-D, fields=[111, scooter,
Big 2-wheel scooter , 5.17]}",
- "SeaTunnelRow{tableId=, kind=-U, fields=[101, scooter,
Small 2-wheel scooter, 3.14]}",
- "SeaTunnelRow{tableId=, kind=+U, fields=[101, scooter,
Small 2-wheel scooter, 5.17]}",
- "SeaTunnelRow{tableId=, kind=-U, fields=[102, car
battery, 12V car battery, 8.1]}",
- "SeaTunnelRow{tableId=, kind=+U, fields=[102, car
battery, 12V car battery, 5.17]}",
- "SeaTunnelRow{tableId=, kind=-D, fields=[102, car
battery, 12V car battery, 5.17]}",
- "SeaTunnelRow{tableId=, kind=-D, fields=[103, 12-pack
drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}");
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[101,
scooter, Small 2-wheel scooter, 3.14]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[102,
car battery, 12V car battery, 8.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[104,
hammer, 12oz carpenter's hammer, 0.75]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[105,
hammer, 14oz carpenter's hammer, 0.875]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[106,
hammer, 16oz carpenter's hammer, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[107,
rocks, box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[108,
jacket, water resistent black wind breaker, 0.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[109,
spare tire, 24 inch spare tire, 22.2]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[106,
hammer, 16oz carpenter's hammer, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[106,
hammer, 18oz carpenter hammer, 1.0]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[107,
rocks, box of assorted rocks, 5.3]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[107,
rocks, box of assorted rocks, 5.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=..test, kind=+I, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[110,
jacket, water resistent white wind breaker, 0.2]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[110,
jacket, new water resistent white wind breaker, 0.5]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[111,
scooter, Big 2-wheel scooter , 5.18]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-D, fields=[111,
scooter, Big 2-wheel scooter , 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[101,
scooter, Small 2-wheel scooter, 3.14]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[101,
scooter, Small 2-wheel scooter, 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-U, fields=[102,
car battery, 12V car battery, 8.1]}",
+ "SeaTunnelRow{tableId=..test, kind=+U, fields=[102,
car battery, 12V car battery, 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-D, fields=[102,
car battery, 12V car battery, 5.17]}",
+ "SeaTunnelRow{tableId=..test, kind=-D, fields=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8]}");
List<String> actual =
collector.list.stream().map(Object::toString).collect(Collectors.toList());
assertEquals(expected, actual);
@@ -121,32 +121,32 @@ public class MaxWellJsonSerDeSchemaTest {
List<String> expectedResult =
Arrays.asList(
-
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":0.75},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":0.875},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind breaker\",\"weight\":0.1},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":22.2},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.1},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":0.2},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.18},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":0.2},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water
resistent white wind breaker\",\"weight\":0.5},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.18},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.17},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.17},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":5.17},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1},\"type\":\"DELETE\"}",
- "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":5.17},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":5.17},\"type\":\"DELETE\"}",
- "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8},\"type\":\"DELETE\"}");
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+ "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":0.75},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":0.875},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind
breaker\",\"weight\":0.1},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+ "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":22.2},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684883000}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684893000}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter
hammer\",\"weight\":1.0},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684893000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684897000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.1},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684897000}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind
breaker\",\"weight\":0.2},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684900000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.18},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684904000}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind
breaker\",\"weight\":0.2},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684906000}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water
resistent white wind
breaker\",\"weight\":0.5},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684906000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.18},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684912000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.17},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684912000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.17},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684914000}",
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684928000}",
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":5.17},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684928000}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684928000}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":5.17},\"type\":\"INSERT\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684928000}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":5.17},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684938000}",
+ "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8},\"type\":\"DELETE\",\"database\":\"\",\"table\":\"test\",\"ts\":1596684938000}");
assertEquals(expectedResult, result);
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
index 20df0d945a..b4da3d4ae2 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
@@ -206,27 +206,26 @@ public class OggJsonSerDeSchemaTest {
List<String> expectedResult =
Arrays.asList(
-
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":0.75},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":0.875},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind breaker\",\"weight\":0.1},\"type\":\"INSERT\"}",
- "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":22.2},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter
hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.1},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":0.2},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.18},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind breaker\",\"weight\":0.2},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water
resistent white wind breaker\",\"weight\":0.5},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.18},\"type\":\"DELETE\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.17},\"type\":\"INSERT\"}",
-
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter \",\"weight\":5.17},\"type\":\"DELETE\"}");
-
+
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":3.14},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384406000}",
+ "{\"data\":{\"id\":102,\"name\":\"car
battery\",\"description\":\"12V car
battery\",\"weight\":8.1},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+ "{\"data\":{\"id\":103,\"name\":\"12-pack drill
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to
#3\",\"weight\":0.8},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's
hammer\",\"weight\":0.75},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's
hammer\",\"weight\":0.875},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent
black wind
breaker\",\"weight\":0.1},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+ "{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":22.2},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384407000}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's
hammer\",\"weight\":1.0},\"type\":\"DELETE\",\"table\":\"..test\",\"op_ts\":1589390787000}",
+
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter
hammer\",\"weight\":1.0},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589390787000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.3},\"type\":\"DELETE\",\"table\":\"..test\",\"op_ts\":1589390899000}",
+
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":5.1},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589390899000}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind
breaker\",\"weight\":0.2},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589391010000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.18},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589391043000}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent
white wind
breaker\",\"weight\":0.2},\"type\":\"DELETE\",\"table\":\"..test\",\"op_ts\":1589391140000}",
+
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water
resistent white wind
breaker\",\"weight\":0.5},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589391140000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.18},\"type\":\"DELETE\",\"table\":\"..test\",\"op_ts\":1589391130000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.17},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589391130000}",
+
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel
scooter
\",\"weight\":5.17},\"type\":\"DELETE\",\"table\":\"..test\",\"op_ts\":1589391144000}");
assertEquals(expectedResult, result);
}