This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4b4b5f9640 [Hotfix][MongodbCDC]Refine data format to adapt to
universal logic (#5162)
4b4b5f9640 is described below
commit 4b4b5f96404ca4ca2a8cab385c67003348bf8bd9
Author: monster <[email protected]>
AuthorDate: Thu Jul 27 10:06:10 2023 +0800
[Hotfix][MongodbCDC]Refine data format to adapt to universal logic (#5162)
Co-authored-by: chenzy15 <[email protected]>
---
docs/en/connector-v2/source/MongoDB-CDC.md | 41 ++++++++++++++++++----
.../external/IncrementalSourceStreamFetcher.java | 14 ++++----
.../cdc/mongodb/config/MongodbSourceOptions.java | 2 ++
.../cdc/mongodb/utils/MongodbRecordUtils.java | 9 +++++
.../seatunnel/cdc/mongodb/utils/ResumeToken.java | 11 ++++--
5 files changed, 61 insertions(+), 16 deletions(-)
diff --git a/docs/en/connector-v2/source/MongoDB-CDC.md
b/docs/en/connector-v2/source/MongoDB-CDC.md
index cb7c2f32ac..d78f70110f 100644
--- a/docs/en/connector-v2/source/MongoDB-CDC.md
+++ b/docs/en/connector-v2/source/MongoDB-CDC.md
@@ -84,8 +84,8 @@ The following table lists the field data type mapping from
MongoDB BSON type to
| Int64 | BIGINT |
| Double | DOUBLE |
| Decimal128 | DECIMAL |
-| Date | Date |
-| Timestamp | Timestamp |
+| Date | DATE |
+| Timestamp | TIMESTAMP |
| Object | ROW |
| Array | ARRAY |
@@ -274,9 +274,38 @@ sink {
}
```
-## Changelog
+## Format of real-time streaming data
-- [Feature]Add MongoDB CDC Source
Connector([4923](https://github.com/apache/seatunnel/pull/4923))
-
-### next version
+```shell
+{
+ _id : { <BSON Object> }, // Identifier of the open change stream,
can be assigned to the 'resumeAfter' parameter for subsequent resumption of
this change stream
+ "operationType" : "<operation>", // The type of change operation
that occurred, such as: insert, delete, update, etc.
+ "fullDocument" : { <document> }, // The full document data involved in
the change operation. This field does not exist in delete operations
+ "ns" : {
+ "db" : "<database>", // The database where the change operation
occurred
+ "coll" : "<collection>" // The collection where the change operation
occurred
+ },
+ "to" : { // These fields are displayed only when the operation type is
'rename'
+ "db" : "<database>", // The new database name after the change
+ "coll" : "<collection>" // The new collection name after the change
+ },
+ "source":{
+ "ts_ms":"<timestamp>", // The timestamp when the change operation
occurred
+ "table":"<collection>" // The collection where the change
operation occurred
+ "db":"<database>", // The database where the change operation
occurred
+ "snapshot":"false" // Identify the current stage of data
synchronization
+ },
+ "documentKey" : { "_id" : <value> }, // The _id field value of the
document involved in the change operation
+ "updateDescription" : { // Description of the update operation
+ "updatedFields" : { <document> }, // The fields and values that the
update operation modified
+ "removedFields" : [ "<field>", ... ] // The fields and values that
the update operation removed
+ }
+ "clusterTime" : <Timestamp>, // The timestamp of the Oplog log entry
corresponding to the change operation
+ "txnNumber" : <NumberLong>, // If the change operation is executed in a
multi-document transaction, this field and value are displayed, representing
the transaction number
+ "lsid" : { // Represents information related to the Session in
which the transaction is located
+ "id" : <UUID>,
+ "uid" : <BinData>
+ }
+}
+```
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index c4d5fdfd68..5257064dc1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -28,6 +28,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
+import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
@@ -38,6 +39,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getTableId;
+
/**
* Fetcher to fetch data from table split, the split is the incremental split
{@link
* IncrementalSplit}.
@@ -147,14 +150,11 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
private boolean shouldEmit(SourceRecord sourceRecord) {
if (taskContext.isDataChangeRecord(sourceRecord)) {
Offset position = taskContext.getStreamOffset(sourceRecord);
- // TODO: The sourceRecord from MongoDB CDC and MySQL CDC are
inconsistent. For
- // compatibility, the getTableId method is commented out for now.
- // TableId tableId = getTableId(sourceRecord);
+ TableId tableId = getTableId(sourceRecord);
if (!taskContext.isExactlyOnce()) {
- // log.trace(
- // "The table {} is not support
exactly-once, so ignore the
- // watermark check",
- // tableId);
+ log.trace(
+ "The table {} is not support exactly-once, so ignore
the watermark check",
+ tableId);
return position.isAfter(splitStartWatermark);
}
// TODO only the table who captured snapshot splits need to
filter( Used to support
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
index dac939777f..170bef34e9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
@@ -108,6 +108,8 @@ public class MongodbSourceOptions extends SourceOptions {
+ " { \"name\": \"source\","
+ " \"type\": [{\"name\": \"source\", \"type\":
\"record\", \"fields\": ["
+ " {\"name\": \"ts_ms\", \"type\":
\"long\"},"
+ + " {\"name\": \"table\", \"type\":
[\"string\", \"null\"]},"
+ + " {\"name\": \"db\", \"type\":
[\"string\", \"null\"]},"
+ " {\"name\": \"snapshot\", \"type\":
[\"string\", \"null\"] } ]"
+ " }, \"null\" ] },"
+ " { \"name\": \"ts_ms\", \"type\": [\"long\",
\"null\"]},"
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
index 84af2f7fda..c4d51c59e4 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
+import org.bson.BsonValue;
import org.bson.json.JsonWriterSettings;
import com.mongodb.kafka.connect.source.json.formatter.DefaultJson;
@@ -39,6 +40,7 @@ import java.util.HashMap;
import java.util.Map;
import static com.mongodb.kafka.connect.source.schema.AvroSchema.fromJson;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isWatermarkEvent;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
@@ -46,6 +48,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.Mongo
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OUTPUT_SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD;
public class MongodbRecordUtils {
@@ -117,6 +120,12 @@ public class MongodbRecordUtils {
SchemaAndValue keySchemaAndValue =
schemaAndValue.toSchemaAndValue(
fromJson(AvroSchemaDefaults.DEFAULT_AVRO_KEY_SCHEMA),
keyDocument);
+ BsonDocument source = valueDocument.get(SOURCE_FIELD).asDocument();
+ BsonValue table =
valueDocument.get(NS_FIELD).asDocument().get(COLL_FIELD);
+ BsonValue db = valueDocument.get(NS_FIELD).asDocument().get(DB_FIELD);
+ source.append(TABLE_NAME_KEY, table);
+ source.append(DB_FIELD, db);
+ valueDocument.replace(SOURCE_FIELD, source);
SchemaAndValue valueSchemaAndValue =
schemaAndValue.toSchemaAndValue(fromJson(OUTPUT_SCHEMA),
valueDocument);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
index 5ee8962bc5..1ef6870c85 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
+
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
@@ -27,6 +29,8 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;
+import static
org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
+
public class ResumeToken {
private static final int K_TIMESTAMP = 130;
@@ -41,14 +45,15 @@ public class ResumeToken {
} else if (bsonValue.isString()) { // Hex-encoded string (v0 or v1)
keyStringBytes = hexToUint8Array(bsonValue.asString().getValue());
} else {
- throw new IllegalArgumentException(
- "Unknown resume token format: " + resumeToken.toJson());
+ throw new MongodbConnectorException(
+ ILLEGAL_ARGUMENT, "Unknown resume token format: " +
bsonValue);
}
ByteBuffer buffer =
ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN);
int kType = buffer.get() & 0xff;
if (kType != K_TIMESTAMP) {
- throw new IllegalArgumentException("Unknown keyType of timestamp:
" + kType);
+ throw new MongodbConnectorException(
+ ILLEGAL_ARGUMENT, "Unknown keyType of timestamp: " +
kType);
}
int t = buffer.getInt();