This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bf96a6d94 [INLONG-8407][Sort] Optimize mongodb cdc to obtain 
documentKey (#8426)
2bf96a6d94 is described below

commit 2bf96a6d944719aafc47475eaeb0ffb5cb361bdf
Author: emhui <[email protected]>
AuthorDate: Wed Jul 5 19:08:16 2023 +0800

    [INLONG-8407][Sort] Optimize mongodb cdc to obtain documentKey (#8426)
---
 .../table/MongoDBConnectorDeserializationSchema.java     | 16 +++++-----------
 .../sort/cdc/mongodb/debezium/utils/RecordUtils.java     | 11 -----------
 2 files changed, 5 insertions(+), 22 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
index 6333a61ac9..e61b490f31 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
@@ -80,7 +80,6 @@ import java.util.Map;
 import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static 
org.apache.inlong.sort.cdc.mongodb.debezium.utils.RecordUtils.isDMLOperation;
 
 public class MongoDBConnectorDeserializationSchema
         implements
@@ -145,17 +144,12 @@ public class MongoDBConnectorDeserializationSchema
         Schema valueSchema = record.valueSchema();
 
         OperationType op = operationTypeFor(record);
-        BsonDocument documentKey = new BsonDocument();
-        BsonDocument fullDocument = new BsonDocument();
 
-        if (isDMLOperation(op)) {
-            documentKey =
-                    checkNotNull(
-                            extractBsonDocument(
-                                    value, valueSchema, 
MongoDBEnvelope.DOCUMENT_KEY_FIELD));
-            fullDocument =
-                    extractBsonDocument(value, valueSchema, 
MongoDBEnvelope.FULL_DOCUMENT_FIELD);
-        }
+        BsonDocument documentKey =
+                extractBsonDocument(
+                        value, valueSchema, 
MongoDBEnvelope.DOCUMENT_KEY_FIELD);
+        BsonDocument fullDocument =
+                extractBsonDocument(value, valueSchema, 
MongoDBEnvelope.FULL_DOCUMENT_FIELD);
 
         switch (op) {
             case INSERT:
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
index 1f0442e6f1..2f7368d2f6 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.sort.cdc.mongodb.debezium.utils;
 
 import com.google.common.collect.ImmutableMap;
-import com.mongodb.client.model.changestream.OperationType;
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.relational.TableId;
 import org.apache.flink.table.types.logical.ArrayType;
@@ -127,14 +126,4 @@ public class RecordUtils {
         return null;
     }
 
-    /**
-     * Whether the MongoDB event's operation is a dml operation.
-     */
-    public static boolean isDMLOperation(OperationType op) {
-        if (OperationType.INSERT.equals(op) || OperationType.DELETE.equals(op)
-                || OperationType.UPDATE.equals(op) || 
OperationType.REPLACE.equals(op)) {
-            return true;
-        }
-        return false;
-    }
 }

Reply via email to