This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ead1c5fd8c [BUG][Connector-V2][Mongo-cdc] Incremental data kind error
in snapshot phase (#5184)
ead1c5fd8c is described below
commit ead1c5fd8c3fe3454012cfcdd2eacf997567b1f8
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue Aug 15 10:04:06 2023 +0800
[BUG][Connector-V2][Mongo-cdc] Incremental data kind error in snapshot
phase (#5184)
* [BUG][Connector-V2][Mongo-cdc] Incremental data kind error in snapshot
phase
---
.../MongoDBConnectorDeserializationSchema.java | 12 +----
.../source/fetch/MongodbFetchTaskContext.java | 56 +++++++++++++++++++++-
.../cdc/mongodb/utils/MongodbRecordUtils.java | 38 ++++++++++++++-
3 files changed, 93 insertions(+), 13 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
index 75f3564c6c..6f36f4be83 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
@@ -65,6 +65,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.Mongo
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ENCODE_VALUE_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
public class MongoDBConnectorDeserializationSchema
@@ -154,17 +155,6 @@ public class MongoDBConnectorDeserializationSchema
return (SeaTunnelRow) physicalConverter.convert(document);
}
- private BsonDocument extractBsonDocument(
- Struct value, @Nonnull Schema valueSchema, String fieldName) {
- if (valueSchema.field(fieldName) != null) {
- String docString = value.getString(fieldName);
- if (docString != null) {
- return BsonDocument.parse(docString);
- }
- }
- return null;
- }
-
//
-------------------------------------------------------------------------------------
// Runtime Converters
//
-------------------------------------------------------------------------------------
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
index d9aee5ef97..fa0931a807 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
@@ -27,10 +27,13 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.Chang
import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
+import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
import org.bson.BsonType;
import org.bson.BsonValue;
@@ -50,12 +53,21 @@ import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE_INSERT;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_TRUE;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.TS_MS_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.BsonUtils.compareBsonValue;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.buildSourceRecord;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getDocumentKey;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getResumeToken;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient;
@@ -172,9 +184,27 @@ public class MongodbFetchTaskContext implements
FetchTask.Context {
switch (OperationType.fromString(operationType)) {
case INSERT:
+ outputBuffer.put(key, changeRecord);
+ break;
case UPDATE:
case REPLACE:
- outputBuffer.put(key, changeRecord);
+ Schema valueSchema = changeRecord.valueSchema();
+ BsonDocument fullDocument =
+ extractBsonDocument(value, valueSchema,
FULL_DOCUMENT);
+ if (fullDocument == null) {
+ break;
+ }
+ BsonDocument valueDocument =
normalizeSnapshotDocument(fullDocument, value);
+ SourceRecord record =
+ buildSourceRecord(
+ changeRecord.sourcePartition(),
+ changeRecord.sourceOffset(),
+ changeRecord.topic(),
+ changeRecord.kafkaPartition(),
+ changeRecord.keySchema(),
+ changeRecord.key(),
+ valueDocument);
+ outputBuffer.put(key, record);
break;
case DELETE:
outputBuffer.remove(key);
@@ -202,6 +232,30 @@ public class MongodbFetchTaskContext implements
FetchTask.Context {
.collect(Collectors.toList());
}
+ private BsonDocument normalizeSnapshotDocument(
+ @Nonnull final BsonDocument fullDocument, Struct value) {
+ return new BsonDocument()
+ .append(ID_FIELD, new
BsonString(value.getString(DOCUMENT_KEY)))
+ .append(OPERATION_TYPE, new BsonString(OPERATION_TYPE_INSERT))
+ .append(
+ NS_FIELD,
+ new BsonDocument(
+ DB_FIELD,
+ new BsonString(
+
value.getStruct(NS_FIELD).getString(DB_FIELD)))
+ .append(
+ COLL_FIELD,
+ new BsonString(
+
value.getStruct(NS_FIELD).getString(COLL_FIELD))))
+ .append(DOCUMENT_KEY, new
BsonString(value.getString(DOCUMENT_KEY)))
+ .append(FULL_DOCUMENT, fullDocument)
+ .append(TS_MS_FIELD, new
BsonInt64(value.getInt64(TS_MS_FIELD)))
+ .append(
+ SOURCE_FIELD,
+ new BsonDocument(SNAPSHOT_FIELD, new
BsonString(SNAPSHOT_TRUE))
+ .append(TS_MS_FIELD, new BsonInt64(0L)));
+ }
+
@Override
public void close() {
Runtime.getRuntime()
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
index c4d51c59e4..1e9ab57722 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@@ -66,7 +67,18 @@ public class MongodbRecordUtils {
public static BsonDocument getDocumentKey(@Nonnull SourceRecord
sourceRecord) {
Struct value = (Struct) sourceRecord.value();
- return BsonDocument.parse(value.getString(DOCUMENT_KEY));
+ return extractBsonDocument(value, sourceRecord.valueSchema(),
DOCUMENT_KEY);
+ }
+
+ public static BsonDocument extractBsonDocument(
+ Struct value, @Nonnull Schema valueSchema, String fieldName) {
+ if (valueSchema.field(fieldName) != null) {
+ String docString = value.getString(fieldName);
+ if (docString != null) {
+ return BsonDocument.parse(docString);
+ }
+ }
+ return null;
}
public static String getOffsetValue(@Nonnull SourceRecord sourceRecord,
String key) {
@@ -139,6 +151,30 @@ public class MongodbRecordUtils {
valueSchemaAndValue.value());
}
+ public static @Nonnull SourceRecord buildSourceRecord(
+ Map<String, ?> sourcePartition,
+ Map<String, ?> sourceOffset,
+ String topicName,
+ Integer partition,
+ Schema keySchema,
+ Object key,
+ BsonDocument valueDocument) {
+ BsonValueToSchemaAndValue schemaAndValue =
+ new BsonValueToSchemaAndValue(new
DefaultJson().getJsonWriterSettings());
+ SchemaAndValue valueSchemaAndValue =
+ schemaAndValue.toSchemaAndValue(fromJson(OUTPUT_SCHEMA),
valueDocument);
+
+ return new SourceRecord(
+ sourcePartition,
+ sourceOffset,
+ topicName,
+ partition,
+ keySchema,
+ key,
+ valueSchemaAndValue.schema(),
+ valueSchemaAndValue.value());
+ }
+
public static @Nonnull Map<String, String> createSourceOffsetMap(
@Nonnull BsonDocument idDocument, boolean isSnapshotRecord) {
Map<String, String> sourceOffset = new HashMap<>();