This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2bf96a6d94 [INLONG-8407][Sort] Optimize mongodb cdc to obtain
documentKey (#8426)
2bf96a6d94 is described below
commit 2bf96a6d944719aafc47475eaeb0ffb5cb361bdf
Author: emhui <[email protected]>
AuthorDate: Wed Jul 5 19:08:16 2023 +0800
[INLONG-8407][Sort] Optimize mongodb cdc to obtain documentKey (#8426)
---
.../table/MongoDBConnectorDeserializationSchema.java | 16 +++++-----------
.../sort/cdc/mongodb/debezium/utils/RecordUtils.java | 11 -----------
2 files changed, 5 insertions(+), 22 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
index 6333a61ac9..e61b490f31 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
@@ -80,7 +80,6 @@ import java.util.Map;
import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static
org.apache.inlong.sort.cdc.mongodb.debezium.utils.RecordUtils.isDMLOperation;
public class MongoDBConnectorDeserializationSchema
implements
@@ -145,17 +144,12 @@ public class MongoDBConnectorDeserializationSchema
Schema valueSchema = record.valueSchema();
OperationType op = operationTypeFor(record);
- BsonDocument documentKey = new BsonDocument();
- BsonDocument fullDocument = new BsonDocument();
- if (isDMLOperation(op)) {
- documentKey =
- checkNotNull(
- extractBsonDocument(
- value, valueSchema,
MongoDBEnvelope.DOCUMENT_KEY_FIELD));
- fullDocument =
- extractBsonDocument(value, valueSchema,
MongoDBEnvelope.FULL_DOCUMENT_FIELD);
- }
+ BsonDocument documentKey =
+ extractBsonDocument(
+ value, valueSchema,
MongoDBEnvelope.DOCUMENT_KEY_FIELD);
+ BsonDocument fullDocument =
+ extractBsonDocument(value, valueSchema,
MongoDBEnvelope.FULL_DOCUMENT_FIELD);
switch (op) {
case INSERT:
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
index 1f0442e6f1..2f7368d2f6 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
@@ -18,7 +18,6 @@
package org.apache.inlong.sort.cdc.mongodb.debezium.utils;
import com.google.common.collect.ImmutableMap;
-import com.mongodb.client.model.changestream.OperationType;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.relational.TableId;
import org.apache.flink.table.types.logical.ArrayType;
@@ -127,14 +126,4 @@ public class RecordUtils {
return null;
}
- /**
- * Whether the MongoDB event's operation is a dml operation.
- */
- public static boolean isDMLOperation(OperationType op) {
- if (OperationType.INSERT.equals(op) || OperationType.DELETE.equals(op)
- || OperationType.UPDATE.equals(op) ||
OperationType.REPLACE.equals(op)) {
- return true;
- }
- return false;
- }
}