This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ead1c5fd8c [BUG][Connector-V2][Mongo-cdc] Incremental data kind error 
in snapshot phase (#5184)
ead1c5fd8c is described below

commit ead1c5fd8c3fe3454012cfcdd2eacf997567b1f8
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue Aug 15 10:04:06 2023 +0800

    [BUG][Connector-V2][Mongo-cdc] Incremental data kind error in snapshot 
phase (#5184)
    
    * [BUG][Connector-V2][Mongo-cdc] Incremental data kind error in snapshot 
phase
---
 .../MongoDBConnectorDeserializationSchema.java     | 12 +----
 .../source/fetch/MongodbFetchTaskContext.java      | 56 +++++++++++++++++++++-
 .../cdc/mongodb/utils/MongodbRecordUtils.java      | 38 ++++++++++++++-
 3 files changed, 93 insertions(+), 13 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
index 75f3564c6c..6f36f4be83 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
@@ -65,6 +65,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.Mongo
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ENCODE_VALUE_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
 
 public class MongoDBConnectorDeserializationSchema
@@ -154,17 +155,6 @@ public class MongoDBConnectorDeserializationSchema
         return (SeaTunnelRow) physicalConverter.convert(document);
     }
 
-    private BsonDocument extractBsonDocument(
-            Struct value, @Nonnull Schema valueSchema, String fieldName) {
-        if (valueSchema.field(fieldName) != null) {
-            String docString = value.getString(fieldName);
-            if (docString != null) {
-                return BsonDocument.parse(docString);
-            }
-        }
-        return null;
-    }
-
     // 
-------------------------------------------------------------------------------------
     // Runtime Converters
     // 
-------------------------------------------------------------------------------------
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
index d9aee5ef97..fa0931a807 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
@@ -27,10 +27,13 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.Chang
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
 
+import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import org.bson.BsonDocument;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
 import org.bson.BsonType;
 import org.bson.BsonValue;
 
@@ -50,12 +53,21 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE_INSERT;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_TRUE;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.TS_MS_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.BsonUtils.compareBsonValue;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.buildSourceRecord;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getDocumentKey;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getResumeToken;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient;
@@ -172,9 +184,27 @@ public class MongodbFetchTaskContext implements 
FetchTask.Context {
 
             switch (OperationType.fromString(operationType)) {
                 case INSERT:
+                    outputBuffer.put(key, changeRecord);
+                    break;
                 case UPDATE:
                 case REPLACE:
-                    outputBuffer.put(key, changeRecord);
+                    Schema valueSchema = changeRecord.valueSchema();
+                    BsonDocument fullDocument =
+                            extractBsonDocument(value, valueSchema, 
FULL_DOCUMENT);
+                    if (fullDocument == null) {
+                        break;
+                    }
+                    BsonDocument valueDocument = 
normalizeSnapshotDocument(fullDocument, value);
+                    SourceRecord record =
+                            buildSourceRecord(
+                                    changeRecord.sourcePartition(),
+                                    changeRecord.sourceOffset(),
+                                    changeRecord.topic(),
+                                    changeRecord.kafkaPartition(),
+                                    changeRecord.keySchema(),
+                                    changeRecord.key(),
+                                    valueDocument);
+                    outputBuffer.put(key, record);
                     break;
                 case DELETE:
                     outputBuffer.remove(key);
@@ -202,6 +232,30 @@ public class MongodbFetchTaskContext implements 
FetchTask.Context {
                 .collect(Collectors.toList());
     }
 
+    private BsonDocument normalizeSnapshotDocument(
+            @Nonnull final BsonDocument fullDocument, Struct value) {
+        return new BsonDocument()
+                .append(ID_FIELD, new 
BsonString(value.getString(DOCUMENT_KEY)))
+                .append(OPERATION_TYPE, new BsonString(OPERATION_TYPE_INSERT))
+                .append(
+                        NS_FIELD,
+                        new BsonDocument(
+                                        DB_FIELD,
+                                        new BsonString(
+                                                
value.getStruct(NS_FIELD).getString(DB_FIELD)))
+                                .append(
+                                        COLL_FIELD,
+                                        new BsonString(
+                                                
value.getStruct(NS_FIELD).getString(COLL_FIELD))))
+                .append(DOCUMENT_KEY, new 
BsonString(value.getString(DOCUMENT_KEY)))
+                .append(FULL_DOCUMENT, fullDocument)
+                .append(TS_MS_FIELD, new 
BsonInt64(value.getInt64(TS_MS_FIELD)))
+                .append(
+                        SOURCE_FIELD,
+                        new BsonDocument(SNAPSHOT_FIELD, new 
BsonString(SNAPSHOT_TRUE))
+                                .append(TS_MS_FIELD, new BsonInt64(0L)));
+    }
+
     @Override
     public void close() {
         Runtime.getRuntime()
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
index c4d51c59e4..1e9ab57722 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -66,7 +67,18 @@ public class MongodbRecordUtils {
 
     public static BsonDocument getDocumentKey(@Nonnull SourceRecord 
sourceRecord) {
         Struct value = (Struct) sourceRecord.value();
-        return BsonDocument.parse(value.getString(DOCUMENT_KEY));
+        return extractBsonDocument(value, sourceRecord.valueSchema(), 
DOCUMENT_KEY);
+    }
+
+    public static BsonDocument extractBsonDocument(
+            Struct value, @Nonnull Schema valueSchema, String fieldName) {
+        if (valueSchema.field(fieldName) != null) {
+            String docString = value.getString(fieldName);
+            if (docString != null) {
+                return BsonDocument.parse(docString);
+            }
+        }
+        return null;
     }
 
     public static String getOffsetValue(@Nonnull SourceRecord sourceRecord, 
String key) {
@@ -139,6 +151,30 @@ public class MongodbRecordUtils {
                 valueSchemaAndValue.value());
     }
 
+    public static @Nonnull SourceRecord buildSourceRecord(
+            Map<String, ?> sourcePartition,
+            Map<String, ?> sourceOffset,
+            String topicName,
+            Integer partition,
+            Schema keySchema,
+            Object key,
+            BsonDocument valueDocument) {
+        BsonValueToSchemaAndValue schemaAndValue =
+                new BsonValueToSchemaAndValue(new 
DefaultJson().getJsonWriterSettings());
+        SchemaAndValue valueSchemaAndValue =
+                schemaAndValue.toSchemaAndValue(fromJson(OUTPUT_SCHEMA), 
valueDocument);
+
+        return new SourceRecord(
+                sourcePartition,
+                sourceOffset,
+                topicName,
+                partition,
+                keySchema,
+                key,
+                valueSchemaAndValue.schema(),
+                valueSchemaAndValue.value());
+    }
+
     public static @Nonnull Map<String, String> createSourceOffsetMap(
             @Nonnull BsonDocument idDocument, boolean isSnapshotRecord) {
         Map<String, String> sourceOffset = new HashMap<>();

Reply via email to