This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4b4b5f9640 [Hotfix][MongodbCDC]Refine data format to adapt to 
universal logic (#5162)
4b4b5f9640 is described below

commit 4b4b5f96404ca4ca2a8cab385c67003348bf8bd9
Author: monster <[email protected]>
AuthorDate: Thu Jul 27 10:06:10 2023 +0800

    [Hotfix][MongodbCDC]Refine data format to adapt to universal logic (#5162)
    
    Co-authored-by: chenzy15 <[email protected]>
---
 docs/en/connector-v2/source/MongoDB-CDC.md         | 41 ++++++++++++++++++----
 .../external/IncrementalSourceStreamFetcher.java   | 14 ++++----
 .../cdc/mongodb/config/MongodbSourceOptions.java   |  2 ++
 .../cdc/mongodb/utils/MongodbRecordUtils.java      |  9 +++++
 .../seatunnel/cdc/mongodb/utils/ResumeToken.java   | 11 ++++--
 5 files changed, 61 insertions(+), 16 deletions(-)

diff --git a/docs/en/connector-v2/source/MongoDB-CDC.md 
b/docs/en/connector-v2/source/MongoDB-CDC.md
index cb7c2f32ac..d78f70110f 100644
--- a/docs/en/connector-v2/source/MongoDB-CDC.md
+++ b/docs/en/connector-v2/source/MongoDB-CDC.md
@@ -84,8 +84,8 @@ The following table lists the field data type mapping from 
MongoDB BSON type to
 | Int64             | BIGINT              |
 | Double            | DOUBLE              |
 | Decimal128        | DECIMAL             |
-| Date              | Date                |
-| Timestamp         | Timestamp           |
+| Date              | DATE                |
+| Timestamp         | TIMESTAMP           |
 | Object            | ROW                 |
 | Array             | ARRAY               |
 
@@ -274,9 +274,38 @@ sink {
 }
 ```
 
-## Changelog
+## Format of real-time streaming data
 
-- [Feature]Add MongoDB CDC Source 
Connector([4923](https://github.com/apache/seatunnel/pull/4923))
-
-### next version
+```shell
+{
+   _id : { <BSON Object> },        // Identifier of the open change stream, 
can be assigned to the 'resumeAfter' parameter for subsequent resumption of 
this change stream
+   "operationType" : "<operation>",        // The type of change operation 
that occurred, such as: insert, delete, update, etc.
+   "fullDocument" : { <document> },      // The full document data involved in 
the change operation. This field does not exist in delete operations
+   "ns" : {   
+      "db" : "<database>",         // The database where the change operation 
occurred
+      "coll" : "<collection>"     // The collection where the change operation 
occurred
+   },
+   "to" : {   // These fields are displayed only when the operation type is 
'rename'
+      "db" : "<database>",         // The new database name after the change
+      "coll" : "<collection>"     // The new collection name after the change
+   },
+   "source":{
+        "ts_ms":"<timestamp>",     // The timestamp when the change operation 
occurred
+        "table":"<collection>"     // The collection where the change 
operation occurred
+        "db":"<database>",         // The database where the change operation 
occurred
+        "snapshot":"false"         // Identify the current stage of data 
synchronization
+    },
+   "documentKey" : { "_id" : <value> },  // The _id field value of the 
document involved in the change operation
+   "updateDescription" : {    // Description of the update operation
+      "updatedFields" : { <document> },  // The fields and values that the 
update operation modified
+      "removedFields" : [ "<field>", ... ]     // The fields and values that 
the update operation removed
+   }
+   "clusterTime" : <Timestamp>,     // The timestamp of the Oplog log entry 
corresponding to the change operation
+   "txnNumber" : <NumberLong>,    // If the change operation is executed in a 
multi-document transaction, this field and value are displayed, representing 
the transaction number
+   "lsid" : {          // Represents information related to the Session in 
which the transaction is located
+      "id" : <UUID>,  
+      "uid" : <BinData> 
+   }
+}
+```
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index c4d5fdfd68..5257064dc1 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -28,6 +28,7 @@ import org.apache.kafka.connect.source.SourceRecord;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.debezium.connector.base.ChangeEventQueue;
 import io.debezium.pipeline.DataChangeEvent;
+import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
@@ -38,6 +39,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getTableId;
+
 /**
  * Fetcher to fetch data from table split, the split is the incremental split 
{@link
  * IncrementalSplit}.
@@ -147,14 +150,11 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
     private boolean shouldEmit(SourceRecord sourceRecord) {
         if (taskContext.isDataChangeRecord(sourceRecord)) {
             Offset position = taskContext.getStreamOffset(sourceRecord);
-            // TODO: The sourceRecord from MongoDB CDC and MySQL CDC are 
inconsistent. For
-            // compatibility, the getTableId method is commented out for now.
-            // TableId tableId = getTableId(sourceRecord);
+            TableId tableId = getTableId(sourceRecord);
             if (!taskContext.isExactlyOnce()) {
-                //                log.trace(
-                //                        "The table {} is not support 
exactly-once, so ignore the
-                // watermark check",
-                //                        tableId);
+                log.trace(
+                        "The table {} is not support exactly-once, so ignore 
the watermark check",
+                        tableId);
                 return position.isAfter(splitStartWatermark);
             }
             // TODO only the table who captured snapshot splits need to 
filter( Used to support
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
index dac939777f..170bef34e9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
@@ -108,6 +108,8 @@ public class MongodbSourceOptions extends SourceOptions {
                     + "    { \"name\": \"source\","
                     + "      \"type\": [{\"name\": \"source\", \"type\": 
\"record\", \"fields\": ["
                     + "                {\"name\": \"ts_ms\", \"type\": 
\"long\"},"
+                    + "                {\"name\": \"table\", \"type\": 
[\"string\", \"null\"]},"
+                    + "                {\"name\": \"db\", \"type\": 
[\"string\", \"null\"]},"
                     + "                {\"name\": \"snapshot\", \"type\": 
[\"string\", \"null\"] } ]"
                     + "               }, \"null\" ] },"
                     + "    { \"name\": \"ts_ms\", \"type\": [\"long\", 
\"null\"]},"
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
index 84af2f7fda..c4d51c59e4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.source.SourceRecord;
 
 import org.bson.BsonDocument;
 import org.bson.BsonTimestamp;
+import org.bson.BsonValue;
 import org.bson.json.JsonWriterSettings;
 
 import com.mongodb.kafka.connect.source.json.formatter.DefaultJson;
@@ -39,6 +40,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static com.mongodb.kafka.connect.source.schema.AvroSchema.fromJson;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
 import static 
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isWatermarkEvent;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
@@ -46,6 +48,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.Mongo
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OUTPUT_SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD;
 
 public class MongodbRecordUtils {
 
@@ -117,6 +120,12 @@ public class MongodbRecordUtils {
         SchemaAndValue keySchemaAndValue =
                 schemaAndValue.toSchemaAndValue(
                         fromJson(AvroSchemaDefaults.DEFAULT_AVRO_KEY_SCHEMA), 
keyDocument);
+        BsonDocument source = valueDocument.get(SOURCE_FIELD).asDocument();
+        BsonValue table = 
valueDocument.get(NS_FIELD).asDocument().get(COLL_FIELD);
+        BsonValue db = valueDocument.get(NS_FIELD).asDocument().get(DB_FIELD);
+        source.append(TABLE_NAME_KEY, table);
+        source.append(DB_FIELD, db);
+        valueDocument.replace(SOURCE_FIELD, source);
         SchemaAndValue valueSchemaAndValue =
                 schemaAndValue.toSchemaAndValue(fromJson(OUTPUT_SCHEMA), 
valueDocument);
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
index 5ee8962bc5..1ef6870c85 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils;
 
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
+
 import org.bson.BsonDocument;
 import org.bson.BsonTimestamp;
 import org.bson.BsonValue;
@@ -27,6 +29,8 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Objects;
 
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
+
 public class ResumeToken {
 
     private static final int K_TIMESTAMP = 130;
@@ -41,14 +45,15 @@ public class ResumeToken {
         } else if (bsonValue.isString()) { // Hex-encoded string (v0 or v1)
             keyStringBytes = hexToUint8Array(bsonValue.asString().getValue());
         } else {
-            throw new IllegalArgumentException(
-                    "Unknown resume token format: " + resumeToken.toJson());
+            throw new MongodbConnectorException(
+                    ILLEGAL_ARGUMENT, "Unknown resume token format: " + 
bsonValue);
         }
 
         ByteBuffer buffer = 
ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN);
         int kType = buffer.get() & 0xff;
         if (kType != K_TIMESTAMP) {
-            throw new IllegalArgumentException("Unknown keyType of timestamp: 
" + kType);
+            throw new MongodbConnectorException(
+                    ILLEGAL_ARGUMENT, "Unknown keyType of timestamp: " + 
kType);
         }
 
         int t = buffer.getInt();

Reply via email to