This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0bad12d4d0 [cdc] Add debezium-bson format document and bugfix bson
value convert to java value (#4935)
0bad12d4d0 is described below
commit 0bad12d4d04a6fb52bae2203964d7e70365a76fb
Author: chuangchuang <[email protected]>
AuthorDate: Wed Mar 12 15:08:05 2025 +0800
[cdc] Add debezium-bson format document and bugfix bson value convert to
java value (#4935)
---
docs/content/cdc-ingestion/debezium-bson.md | 329 +++++++++++++++++++++
docs/content/cdc-ingestion/kafka-cdc.md | 4 +
.../format/debezium/DebeziumBsonRecordParser.java | 84 +++++-
.../format/debezium/DebeziumJsonRecordParser.java | 4 +-
.../KafkaDebeziumJsonDeserializationSchema.java | 9 +-
.../action/cdc/mongodb/BsonValueConvertor.java | 46 ++-
.../debezium/DebeziumBsonRecordParserTest.java | 267 +++++++++++++++++
.../kafka/debezium-bson/table/event/event-bson.txt | 19 ++
.../debezium-bson/table/event/event-delete.txt | 23 ++
.../debezium-bson/table/event/event-insert.txt | 19 ++
.../kafka/debezium-bson/table/event/event-json.txt | 19 ++
.../debezium-bson/table/event/event-update.txt | 21 ++
12 files changed, 807 insertions(+), 37 deletions(-)
diff --git a/docs/content/cdc-ingestion/debezium-bson.md
b/docs/content/cdc-ingestion/debezium-bson.md
new file mode 100644
index 0000000000..52eb0422d5
--- /dev/null
+++ b/docs/content/cdc-ingestion/debezium-bson.md
@@ -0,0 +1,329 @@
+---
+title: "Debezium BSON"
+weight: 6
+type: docs
+aliases:
+- /cdc-ingestion/debezium-bson.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Debezium BSON Format
+
+
+The debezium-bson format is one of the formats supported by <a href="{{< ref
"/cdc-ingestion/kafka-cdc" >}}">Kafka CDC</a>.
+It is the format obtained by collecting mongodb through debezium, which is
similar to
+<a
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/formats/debezium/">debezium-json</a>
format.
+However, MongoDB does not have a fixed schema, and the field types of each
document may be different, so the before/after fields
+in JSON are all string types, while the debezium-json format requires a JSON
object type.
+
+
+## Prepare MongoDB BSON Jar
+
+Can be downloaded from the [Maven
repository](https://mvnrepository.com/artifact/org.mongodb/bson)
+
+```
+bson-*.jar
+```
+
+## Introduction
+
+{{< hint info >}}
+The debezium bson format requires insert/update/delete event messages include
the full document, and include a field that represents the state of the
document before the change.
+This requires setting debezium's capture.mode to
change_streams_update_full_with_pre_image and
[capture.mode.full.update.type](https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-property-capture-mode-full-update-type)
to post_image.
+Before version 6.0 of MongoDB, it was not possible to obtain 'Update Before'
information. Therefore, using the id field in the Kafka Key as 'Update before'
information
+{{< /hint >}}
+
+Here is a simple example for an update operation captured from a Mongodb
customers collection in JSON format:
+
+```json
+{
+ "schema": {
+ "type": "struct",
+ "fields": [
+ {
+ "type": "string",
+ "optional": true,
+ "name": "io.debezium.data.Json",
+ "version": 1,
+ "field": "before"
+ },
+ {
+ "type": "string",
+ "optional": true,
+ "name": "io.debezium.data.Json",
+ "version": 1,
+ "field": "after"
+ },
+ ...
+ ]
+ },
+ "payload": {
+ "before": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" :
\"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"},
\"tags\":[\"success\"]}",
+ "after": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" :
\"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"},
\"tags\":[\"passion\",\"success\"]}",
+ "source": {
+ "db": "inventory",
+ "rs": "rs0",
+ "collection": "customers",
+ ...
+ },
+ "op": "u",
+ "ts_ms": 1558965515240,
+ "ts_us": 1558965515240142,
+ "ts_ns": 1558965515240142879
+ }
+}
+```
+
+This document from the MongoDB collection customers has 4 columns, the _id is
a BSON ObjectID, name is a string,
+create_time is a long, tags is an array of string. The following is the
processing result in debezium-bson format:
+
+Document Schema:
+
+| Field Name | Field Type | Key |
+|------------|------------|-------------|
+| _id | STRING | Primary Key |
+| name | STRING | |
+| create_time| STRING | |
+| tags | STRING | |
+
+Records:
+
+| RowKind | _id | name | create_time |
tags |
+|---------|--------------------------|-------|----------------------------|-----------------------|
+| -U | 596e275826f08b2730779e1f | Anne | 1558965506000 |
["success"] |
+| +U | 596e275826f08b2730779e1f | Anne | 1558965506000 |
["passion","success"] |
+
+
+### How it works
+Because the schema field of the event message does not have the field
information of the document, the debezium-bson format does not require event
messages to have schema information. The specific operations are as follows:
+
+- Parse the before/after fields of the event message into BSONDocument.
+- Recursive traversal all fields of BSONDocument and convert BsonValue to Java
Object.
+- All top-level fields of before/after are converted to string type, and _id
is fixed to primary key
+- If the top-level fields of before/after is a basic type(such as
Integer/Long, etc.), it is directly converted to a string, if not, it is
converted to a JSON string
+
+Below is a list of top-level field BsonValue conversion examples:
+
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">BsonValue Type</th>
+ <th class="text-left" style="width: 40%">Json Value</th>
+ <th class="text-left" style="width: 40%">Conversion Result String</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>BsonString</h5></td>
+ <td>"hello"</td>
+ <td>"hello"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonInt32</h5></td>
+ <td>123</td>
+ <td>"123"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonInt64</h5></td>
+ <td>
+ <ul>
+ <li>1735934393769</li>
+ <li>{"$numberLong": "1735934393769"}</li>
+ </ul>
+ </td>
+ <td>"1735934393769"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonDouble</h5></td>
+ <td>
+ <ul>
+ <li>{"$numberDouble": "3.14"}</li>
+ <li>{"$numberDouble": "NaN"}</li>
+ <li>{"$numberDouble": "Infinity"}</li>
+ <li>{"$numberDouble": "-Infinity"}</li>
+ </ul>
+ </td>
+ <td>
+ <ul>
+ <li>"3.14"</li>
+ <li>"NaN"</li>
+ <li>"Infinity"</li>
+ <li>"-Infinity"</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td><h5>BsonBoolean</h5></td>
+ <td>
+ <ul>
+ <li>true</li>
+ <li>false</li>
+ </ul>
+ </td>
+ <td>
+ <ul>
+ <li>"true"</li>
+ <li>"false"</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td><h5>BsonArray</h5></td>
+ <td>[1,2,{"$numberLong": "1735934393769"}]</td>
+ <td>"[1,2,1735934393769]"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonObjectId</h5></td>
+ <td>{"$oid": "596e275826f08b2730779e1f"}</td>
+ <td>"596e275826f08b2730779e1f"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonDateTime</h5></td>
+ <td>{"$date": 1735934393769 }</td>
+ <td>"1735934393769"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonNull</h5></td>
+ <td>null</td>
+ <td>null</td>
+ </tr>
+ <tr>
+ <td><h5>BsonUndefined</h5></td>
+ <td>{"$undefined": true}</td>
+ <td>null</td>
+ </tr>
+ <tr>
+ <td><h5>BsonBinary</h5></td>
+ <td>{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "0"}</td>
+ <td>"uE2/4v5MSVOiJZkOo3APKQ=="</td>
+ </tr>
+ <tr>
+ <td><h5>BsonBinary(type=UUID)</h5></td>
+ <td>{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "4"}</td>
+ <td>"b84dbfe2-fe4c-4953-a225-990ea3700f29"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonDecimal128</h5></td>
+ <td>
+ <ul>
+ <li>{"$numberDecimal": "3.14"}</li>
+ <li>{"$numberDecimal": "NaN"}</li>
+ </ul>
+ </td>
+ <td>
+ <ul>
+ <li>"3.14"</li>
+ <li>"NaN"</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td><h5>BsonRegularExpression</h5></td>
+ <td>{"$regularExpression": {"pattern": "^pass$", "options": "i"}}</td>
+ <td>"/^pass$/i"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonSymbol</h5></td>
+ <td>{"$symbol": "symbol"}</td>
+ <td>"symbol"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonTimestamp</h5></td>
+ <td>{"$timestamp": {"t": 1736997330, "i": 2}}</td>
+ <td>"1736997330"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonMinKey</h5></td>
+ <td>{"$minKey": 1}</td>
+ <td>"BsonMinKey"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonMaxKey</h5></td>
+ <td>{"$maxKey": 1}</td>
+ <td>"BsonMaxKey"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonJavaScript</h5></td>
+ <td>{"$code": "function(){}"}</td>
+ <td>"function(){}"</td>
+ </tr>
+ <tr>
+ <td><h5>BsonJavaScriptWithScope</h5></td>
+ <td>{"$code": "function(){}", "$scope": {"name": "Anne"}}</td>
+ <td>'{"$code": "function(){}", "$scope": {"name": "Anne"}}'</td>
+ </tr>
+ <tr>
+ <td><h5>BsonDocument</h5></td>
+ <td>
+<pre>
+{
+ "decimalPi": {"$numberDecimal": "3.14"},
+ "doublePi": {"$numberDouble": "3.14"},
+ "doubleNaN": {"$numberDouble": "NaN"},
+ "decimalNaN": {"$numberDecimal": "NaN"},
+ "long": {"$numberLong": "100"},
+ "bool": true,
+ "array": [
+ {"$numberInt": "1"},
+ {"$numberLong": "2"}
+ ]
+}
+</pre>
+ </td>
+ <td>
+<pre>
+'{
+ "decimalPi":3.14,
+ "doublePi":3.14,
+ "doubleNaN":"NaN",
+ "decimalNaN":"NaN",
+ "long":100,
+ "bool":true,
+ "array":[1,2]
+}'
+</pre>
+ </td>
+ </tr>
+ </tbody>
+</table>
+
+
+### How to use
+Use debezium-bson by adding the kafka_conf parameter
**value.format=debezium-bson**. Let’s take table synchronization as an example:
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ kafka_sync_table \
+ --warehouse hdfs:///path/to/warehouse \
+ --database test_db \
+ --table ods_mongodb_customers \
+ --primary_keys _id \
+ --kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \
+ --kafka_conf topic=customers \
+ --kafka_conf properties.group.id=123456 \
+ --kafka_conf value.format=debezium-bson \
+ --catalog_conf metastore=filesystem \
+ --table_conf bucket=4 \
+ --table_conf changelog-producer=input \
+ --table_conf sink.parallelism=4
+```
+
+
diff --git a/docs/content/cdc-ingestion/kafka-cdc.md
b/docs/content/cdc-ingestion/kafka-cdc.md
index 20ea3e6199..dfad34fad3 100644
--- a/docs/content/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/cdc-ingestion/kafka-cdc.md
@@ -67,6 +67,10 @@ If a message in a Kafka topic is a change event captured
from another database u
<td><a
href="https://docs.aws.amazon.com/dms/latest/userguide/Welcome.html">aws-dms-json</a></td>
<td>True</td>
</tr>
+ <tr>
+ <td><a href="{{< ref "/cdc-ingestion/debezium-bson"
>}}">debezium-bson</a></td>
+ <td>True</td>
+ </tr>
</tbody>
</table>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
index 10dcd56b80..397575c8e5 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
@@ -22,13 +22,17 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.TypeUtils;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
import org.bson.BsonDocument;
import org.bson.BsonValue;
@@ -37,14 +41,23 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_BEFORE;
import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_PAYLOAD;
import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_SCHEMA;
+import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_TYPE;
+import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_DELETE;
+import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_INSERT;
+import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_READE;
+import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_UPDATE;
+import static org.apache.paimon.utils.JsonSerdeUtil.fromJson;
+import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString;
/**
@@ -63,19 +76,47 @@ public class DebeziumBsonRecordParser extends
DebeziumJsonRecordParser {
private static final String FIELD_COLLECTION = "collection";
private static final String FIELD_OBJECT_ID = "_id";
+ private static final String FIELD_KEY_ID = "id";
private static final List<String> PRIMARY_KEYS =
Collections.singletonList(FIELD_OBJECT_ID);
+ private ObjectNode keyRoot;
+
public DebeziumBsonRecordParser(TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
super(typeMapping, computedColumns);
}
+ @Override
+ public List<RichCdcMultiplexRecord> extractRecords() {
+ String operation = getAndCheck(FIELD_TYPE).asText();
+ List<RichCdcMultiplexRecord> records = new ArrayList<>();
+ switch (operation) {
+ case OP_INSERT:
+ case OP_READE:
+ processRecord(getData(), RowKind.INSERT, records);
+ break;
+ case OP_UPDATE:
+ processDeleteRecord(operation, records);
+ processRecord(getData(), RowKind.INSERT, records);
+ break;
+ case OP_DELETE:
+ processDeleteRecord(operation, records);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown record
operation: " + operation);
+ }
+ return records;
+ }
+
@Override
protected void setRoot(CdcSourceRecord record) {
- JsonNode node = (JsonNode) record.getValue();
- if (node.has(FIELD_SCHEMA)) {
- root = node.get(FIELD_PAYLOAD);
- } else {
- root = node;
+ root = (JsonNode) record.getValue();
+ if (root.has(FIELD_SCHEMA)) {
+ root = root.get(FIELD_PAYLOAD);
+ }
+
+ keyRoot = (ObjectNode) record.getKey();
+ if (!isNull(keyRoot) && keyRoot.has(FIELD_SCHEMA)) {
+ keyRoot = (ObjectNode) keyRoot.get(FIELD_PAYLOAD);
}
}
@@ -128,4 +169,37 @@ public class DebeziumBsonRecordParser extends
DebeziumJsonRecordParser {
protected String format() {
return "debezium-bson";
}
+
+ public boolean checkBeforeExists() {
+ return !isNull(root.get(FIELD_BEFORE));
+ }
+
+ private void processDeleteRecord(String operation,
List<RichCdcMultiplexRecord> records) {
+ if (checkBeforeExists()) {
+ processRecord(getBefore(operation), RowKind.DELETE, records);
+ } else {
+ // Before version 6.0 of MongoDB, it was not possible to obtain
'Update Before'
+ // information. Therefore, data is first deleted using the key 'id'
+ JsonNode idNode = null;
+ Preconditions.checkArgument(
+ !isNull(keyRoot) && !isNull(idNode =
keyRoot.get(FIELD_KEY_ID)),
+ "Invalid %s format: missing '%s' field in key when '%s' is
'%s' for: %s.",
+ format(),
+ FIELD_KEY_ID,
+ FIELD_TYPE,
+ operation,
+ keyRoot);
+
+ // Deserialize id from json string to JsonNode
+ Map<String, JsonNode> record =
+ Collections.singletonMap(
+ FIELD_OBJECT_ID, fromJson(idNode.asText(),
JsonNode.class));
+
+ try {
+ processRecord(new TextNode(writeValueAsString(record)),
RowKind.DELETE, records);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to deserialize key
record.", e);
+ }
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
index 66ddc2e1ca..6e06e3adca 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
@@ -109,11 +109,11 @@ public class DebeziumJsonRecordParser extends
AbstractJsonRecordParser {
return records;
}
- private JsonNode getData() {
+ protected JsonNode getData() {
return getAndCheck(dataField());
}
- private JsonNode getBefore(String op) {
+ protected JsonNode getBefore(String op) {
return getAndCheck(FIELD_BEFORE, FIELD_TYPE, op);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
index 76211cf56d..507c9eb63c 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
@@ -64,7 +64,14 @@ public class KafkaDebeziumJsonDeserializationSchema
}
try {
- return new CdcSourceRecord(objectMapper.readValue(message.value(),
JsonNode.class));
+ byte[] key = message.key();
+ JsonNode keyNode = null;
+ if (key != null) {
+ keyNode = objectMapper.readValue(key, JsonNode.class);
+ }
+
+ JsonNode valueNode = objectMapper.readValue(message.value(),
JsonNode.class);
+ return new CdcSourceRecord(null, keyNode, valueNode);
} catch (Exception e) {
LOG.error("Invalid Json:\n{}", new String(message.value()));
throw e;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
index 16d43821bd..666ef92242 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
@@ -47,8 +47,8 @@ import org.bson.BsonValue;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
-import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -61,15 +61,22 @@ public class BsonValueConvertor {
return bsonTimestamp.getTime();
}
- private static BigDecimal convert(BsonDecimal128 bsonValue) {
+ private static Number convert(BsonDecimal128 bsonValue) {
return convert(bsonValue.decimal128Value());
}
- private static BigDecimal convert(Decimal128 bsonValue) {
- if (bsonValue.isNaN() || bsonValue.isInfinite()) {
- return null;
+ private static Number convert(Decimal128 bsonValue) {
+ if (bsonValue.isNaN()) {
+ return Double.NaN;
+ } else if (bsonValue.isInfinite()) {
+ if (bsonValue.isNegative()) {
+ return Double.NEGATIVE_INFINITY;
+ } else {
+ return Double.POSITIVE_INFINITY;
+ }
+ } else {
+ return bsonValue.bigDecimalValue();
}
- return bsonValue.bigDecimalValue();
}
private static String convert(BsonObjectId objectId) {
@@ -84,7 +91,7 @@ public class BsonValueConvertor {
if (BsonBinarySubType.isUuid(bsonBinary.getType())) {
return bsonBinary.asUuid().toString();
} else {
- return toHex(bsonBinary.getData());
+ return Base64.getEncoder().encodeToString(bsonBinary.getData());
}
}
@@ -99,11 +106,7 @@ public class BsonValueConvertor {
}
private static Double convert(BsonDouble bsonDouble) {
- double value = bsonDouble.getValue();
- if (Double.isNaN(value) || Double.isInfinite(value)) {
- return null;
- }
- return value;
+ return bsonDouble.getValue();
}
private static String convert(BsonString string) {
@@ -136,8 +139,8 @@ public class BsonValueConvertor {
private static Map<String, Object> convert(BsonJavaScriptWithScope
javascriptWithScope) {
return CollectionUtil.map(
- Pair.of("code", javascriptWithScope.getCode()),
- Pair.of("scope", convert(javascriptWithScope.getScope())));
+ Pair.of("$code", javascriptWithScope.getCode()),
+ Pair.of("$scope", convert(javascriptWithScope.getScope())));
}
private static String convert(BsonNull bsonNull) {
@@ -224,19 +227,4 @@ public class BsonValueConvertor {
"Unsupported BSON type: " + bsonValue.getBsonType());
}
}
-
- public static String toHex(byte[] bytes) {
- StringBuilder sb = new StringBuilder();
-
- for (byte b : bytes) {
- String s = Integer.toHexString(255 & b);
- if (s.length() < 2) {
- sb.append("0");
- }
-
- sb.append(s);
- }
-
- return sb.toString();
- }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
new file mode 100644
index 0000000000..78d3bcc3de
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import
org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.StringUtils;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
+
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Test for DebeziumBsonRecordParser. */
+public class DebeziumBsonRecordParserTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(DebeziumBsonRecordParserTest.class);
+ private static List<CdcSourceRecord> insertList = new ArrayList<>();
+ private static List<CdcSourceRecord> updateList = new ArrayList<>();
+ private static List<CdcSourceRecord> deleteList = new ArrayList<>();
+
+ private static ArrayList<CdcSourceRecord> bsonRecords = new ArrayList<>();
+ private static ArrayList<CdcSourceRecord> jsonRecords = new ArrayList<>();
+
+ private static Map<String, String> keyEvent = new HashMap<>();
+
+ private static KafkaDeserializationSchema<CdcSourceRecord>
kafkaDeserializationSchema = null;
+
+ private static Map<String, String> beforeEvent = new HashMap<>();
+
+ private static Map<String, String> afterEvent = new HashMap<>();
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ DataFormat dataFormat = new DebeziumBsonDataFormatFactory().create();
+ kafkaDeserializationSchema = dataFormat.createKafkaDeserializer(null);
+
+ keyEvent.put("_id", "67ab25755c0d5ac87eb8c632");
+
+ beforeEvent.put("_id", "67ab25755c0d5ac87eb8c632");
+ beforeEvent.put("created_at", "1736207571013");
+ beforeEvent.put("created_by", "peter");
+ beforeEvent.put("tags", "[\"pending\"]");
+ beforeEvent.put("updated_at", "1739455297970");
+
+ afterEvent.put("_id", "67ab25755c0d5ac87eb8c632");
+ afterEvent.put("created_at", "1736207571013");
+ afterEvent.put("created_by", "peter");
+ afterEvent.put("tags", "[\"succeed\"]");
+ afterEvent.put("updated_at", "1739455397970");
+
+ String insertRes = "kafka/debezium-bson/table/event/event-insert.txt";
+ String updateRes = "kafka/debezium-bson/table/event/event-update.txt";
+ String deleteRes = "kafka/debezium-bson/table/event/event-delete.txt";
+ String bsonPth = "kafka/debezium-bson/table/event/event-bson.txt";
+ String jsonPath = "kafka/debezium-bson/table/event/event-json.txt";
+
+ parseCdcSourceRecords(insertRes, insertList);
+
+ parseCdcSourceRecords(updateRes, updateList);
+
+ parseCdcSourceRecords(deleteRes, deleteList);
+
+ parseCdcSourceRecords(bsonPth, bsonRecords);
+
+ parseCdcSourceRecords(jsonPath, jsonRecords);
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ insertList.clear();
+ updateList.clear();
+ deleteList.clear();
+ bsonRecords.clear();
+ jsonRecords.clear();
+ }
+
+ private static void parseCdcSourceRecords(String resourcePath,
List<CdcSourceRecord> records)
+ throws Exception {
+ URL url =
DebeziumBsonRecordParserTest.class.getClassLoader().getResource(resourcePath);
+ List<String> line = Files.readAllLines(Paths.get(url.toURI()));
+ String key = null;
+ for (String json : line) {
+ if (StringUtils.isNullOrWhitespaceOnly(json) ||
!json.startsWith("{")) {
+ continue;
+ }
+ if (key == null) {
+ key = json;
+ } else {
+ // test kafka deserialization
+ records.add(deserializeKafkaSchema(key, json));
+ key = null;
+ }
+ }
+ }
+
+ @Test
+ public void extractInsertRecord() throws Exception {
+ DebeziumBsonRecordParser parser =
+ new DebeziumBsonRecordParser(TypeMapping.defaultMapping(),
Collections.emptyList());
+ Assertions.assertFalse(insertList.isEmpty());
+ for (CdcSourceRecord cdcRecord : insertList) {
+ Schema schema = parser.buildSchema(cdcRecord);
+ Assertions.assertEquals(schema.primaryKeys(),
Arrays.asList("_id"));
+
+ List<RichCdcMultiplexRecord> records = parser.extractRecords();
+ Assertions.assertEquals(records.size(), 1);
+
+ CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+ Assertions.assertEquals(result.kind(), RowKind.INSERT);
+ Assertions.assertEquals(beforeEvent, result.data());
+
+ String dbName = parser.getDatabaseName();
+ Assertions.assertEquals(dbName, "bigdata_test");
+
+ String tableName = parser.getTableName();
+ Assertions.assertEquals(tableName, "sync_test_table");
+
+ MessageQueueCdcTimestampExtractor extractor = new
MessageQueueCdcTimestampExtractor();
+ Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+ }
+ }
+
+ @Test
+ public void extractUpdateRecord() throws Exception {
+ DebeziumBsonRecordParser parser =
+ new DebeziumBsonRecordParser(TypeMapping.defaultMapping(),
Collections.emptyList());
+ Assertions.assertFalse(updateList.isEmpty());
+ for (CdcSourceRecord cdcRecord : updateList) {
+ Schema schema = parser.buildSchema(cdcRecord);
+ Assertions.assertEquals(schema.primaryKeys(),
Arrays.asList("_id"));
+
+ List<RichCdcMultiplexRecord> records = parser.extractRecords();
+ Assertions.assertEquals(records.size(), 2);
+
+ CdcRecord updateBefore =
records.get(0).toRichCdcRecord().toCdcRecord();
+ Assertions.assertEquals(updateBefore.kind(), RowKind.DELETE);
+ if (parser.checkBeforeExists()) {
+ Assertions.assertEquals(beforeEvent, updateBefore.data());
+ } else {
+ Assertions.assertEquals(keyEvent, updateBefore.data());
+ }
+
+ CdcRecord updateAfter =
records.get(1).toRichCdcRecord().toCdcRecord();
+ Assertions.assertEquals(updateAfter.kind(), RowKind.INSERT);
+ Assertions.assertEquals(afterEvent, updateAfter.data());
+
+ String dbName = parser.getDatabaseName();
+ Assertions.assertEquals(dbName, "bigdata_test");
+
+ String tableName = parser.getTableName();
+ Assertions.assertEquals(tableName, "sync_test_table");
+
+ MessageQueueCdcTimestampExtractor extractor = new
MessageQueueCdcTimestampExtractor();
+ Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+ }
+ }
+
+ @Test
+ public void extractDeleteRecord() throws Exception {
+ DebeziumBsonRecordParser parser =
+ new DebeziumBsonRecordParser(TypeMapping.defaultMapping(),
Collections.emptyList());
+ Assertions.assertFalse(deleteList.isEmpty());
+ for (CdcSourceRecord cdcRecord : deleteList) {
+ Schema schema = parser.buildSchema(cdcRecord);
+ Assertions.assertEquals(schema.primaryKeys(),
Arrays.asList("_id"));
+
+ List<RichCdcMultiplexRecord> records = parser.extractRecords();
+ Assertions.assertEquals(records.size(), 1);
+
+ CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+ Assertions.assertEquals(result.kind(), RowKind.DELETE);
+ if (parser.checkBeforeExists()) {
+ Assertions.assertEquals(beforeEvent, result.data());
+ } else {
+ Assertions.assertEquals(keyEvent, result.data());
+ }
+
+ String dbName = parser.getDatabaseName();
+ Assertions.assertEquals(dbName, "bigdata_test");
+
+ String tableName = parser.getTableName();
+ Assertions.assertEquals(tableName, "sync_test_table");
+
+ MessageQueueCdcTimestampExtractor extractor = new
MessageQueueCdcTimestampExtractor();
+ Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+ }
+ }
+
+ @Test
+ public void bsonConvertJsonTest() throws Exception {
+ DebeziumBsonRecordParser parser =
+ new DebeziumBsonRecordParser(TypeMapping.defaultMapping(),
Collections.emptyList());
+
+ Assertions.assertFalse(jsonRecords.isEmpty());
+ for (int i = 0; i < jsonRecords.size(); i++) {
+ CdcSourceRecord bsonRecord = bsonRecords.get(i);
+ CdcSourceRecord jsonRecord = jsonRecords.get(i);
+
+ JsonNode bsonTextNode =
+ new
TextNode(JsonSerdeUtil.writeValueAsString(bsonRecord.getValue()));
+ Map<String, String> resultMap =
parser.extractRowData(bsonTextNode, RowType.builder());
+
+ ObjectNode expectNode = (ObjectNode) jsonRecord.getValue();
+
+ expectNode
+ .fields()
+ .forEachRemaining(
+ entry -> {
+ String key = entry.getKey();
+ String expectValue = null;
+ if (!JsonSerdeUtil.isNull(entry.getValue())) {
+ expectValue = entry.getValue().asText();
+ }
+ Assertions.assertEquals(expectValue,
resultMap.get(key));
+ });
+ }
+ }
+
+ private static CdcSourceRecord deserializeKafkaSchema(String key, String
value)
+ throws Exception {
+ return kafkaDeserializationSchema.deserialize(
+ new ConsumerRecord<>("topic", 0, 0, key.getBytes(),
value.getBytes()));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt
new file mode 100644
index 0000000000..be744a1147
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "dummy"}
+{"id": {"$oid": "596e275826f08b2730779e1f"},"name": "Sally","age":
25,"created_time": {"$numberLong": "1735934393769"},"updated_time":
1735934393769,"deleted_time": {"$date": 1735934393769 },"register_time":
{"$timestamp": {"t": 1736997330, "i": 2}},"double_nan": {"$numberDouble":
"NaN"},"double_inf": {"$numberDouble": "Infinity"},"double_ninf":
{"$numberDouble": "-Infinity"},"double_zero": {"$numberDouble":
"0"},"boolean_true": true,"boolean_false": false,"array": ["a", "b",
"c"],"array [...]
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt
new file mode 100644
index 0000000000..a1de2bc514
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": null,"after": null,"updateDescription": null,"source": {"version":
"2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms":
1739321992000,"snapshot": "false","db": "bigdata_test","sequence":
null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection":
"sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime":
null},"op": "d","ts_ms": 1739321992890,"transaction": null}
+{"id": "\"67ab25755c0d5ac87eb8c632\""}
+{"before": null,"after": null,"updateDescription": null,"source": {"version":
"2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms":
1739321992000,"snapshot": "false","db": "bigdata_test","sequence":
null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection":
"sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime":
null},"op": "d","ts_ms": 1739321992890,"transaction": null}
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\":
{\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\":
[\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","after":
null,"updateDescription": null,"source": {"version": "2.7.0.Final","connector":
"mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot":
"false","db": "bigdata_test","sequence": null,"ts_us":
1739321992000000,"ts_ns": 1739321992000000000,"collection": " [...]
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt
new file mode 100644
index 0000000000..ff97846015
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": null,"after": "{\"_id\": {\"$oid\":
\"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\":
\"1736207571013\"},\"created_by\": \"peter\",\"tags\":
[\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","updateDescription":
null,"source": {"version": "2.7.0.Final","connector": "mongodb","name":
"mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db":
"bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns":
1739321992000000000,"collection": " [...]
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt
new file mode 100644
index 0000000000..115ee5a91b
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "dummy"}
+{"id":"596e275826f08b2730779e1f","name":"Sally","age":"25","created_time":"1735934393769","updated_time":"1735934393769","deleted_time":"1735934393769","register_time":"1736997330","double_nan":"NaN","double_inf":"Infinity","double_ninf":"-Infinity","double_zero":"0.0","boolean_true":"true","boolean_false":"false","array":"[\"a\",\"b\",\"c\"]","array_mix":"[\"1\",2,1735934393769]","decimal":"3.14","decimal_nan":"NaN","regex":"/^pass$/i","symbol":"symbol","minKey":"BsonMinKey","maxKey":"B
[...]
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt
new file mode 100644
index 0000000000..035ce3ff87
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": null,"after": "{\"_id\": {\"$oid\":
\"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\":
\"1736207571013\"},\"created_by\": \"peter\",\"tags\":
[\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","updateDescription":
null,"source": {"version": "2.7.0.Final","connector": "mongodb","name":
"mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db":
"bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns":
1739321992000000000,"collection": " [...]
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\":
{\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\":
[\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","after": "{\"_id\":
{\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\":
\"1736207571013\"},\"created_by\": \"peter\",\"tags\":
[\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","updateDescription":
null,"source": {"version": "2.7.0.Final","connector [...]