This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b31bdbc3f [flink] Fix mongodb CDC Ingestion unknown delete (#2029)
b31bdbc3f is described below

commit b31bdbc3f4a6249ceab248a4a4058064e365f507
Author: monster <[email protected]>
AuthorDate: Wed Sep 20 21:07:21 2023 +0800

    [flink] Fix mongodb CDC Ingestion unknown delete (#2029)
---
 .../mongodb/strategy/Mongo4VersionStrategy.java    | 70 ++++++++------------
 .../mongodb/MongoDBSyncDatabaseActionITCase.java   | 12 ++--
 .../cdc/mongodb/MongoDBSyncTableActionITCase.java  | 77 ++++++++++++++++++++--
 .../resources/mongodb/table/event/event-delete.js  | 18 +++++
 .../resources/mongodb/table/event/event-insert.js  | 35 ++++++++++
 .../resources/mongodb/table/event/event-replace.js | 19 ++++++
 .../resources/mongodb/table/event/event-update.js  | 19 ++++++
 7 files changed, 197 insertions(+), 53 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index 028af46f1..53cd8a309 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -42,10 +42,11 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
 
     private static final String FIELD_TYPE = "operationType";
     private static final String FIELD_DATA = "fullDocument";
+    private static final String FIELD_KEY = "documentKey";
     private static final String OP_UPDATE = "update";
     private static final String OP_INSERT = "insert";
     private static final String OP_REPLACE = "replace";
-
+    private static final String OP_DELETE = "delete";
     private final String databaseName;
     private final String collection;
     private final boolean caseSensitive;
@@ -77,7 +78,8 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
             throws JsonProcessingException {
         String op = root.get(FIELD_TYPE).asText();
         JsonNode fullDocument = root.get(FIELD_DATA);
-        return handleOperation(op, fullDocument);
+        JsonNode documentKey = root.get(FIELD_KEY);
+        return handleOperation(op, fullDocument, documentKey);
     }
 
     /**
@@ -88,18 +90,25 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
      * @return A list of RichCdcMultiplexRecord based on the operation type.
      * @throws JsonProcessingException If there's an error during JSON 
processing.
      */
-    private List<RichCdcMultiplexRecord> handleOperation(String op, JsonNode 
fullDocument)
-            throws JsonProcessingException {
+    private List<RichCdcMultiplexRecord> handleOperation(
+            String op, JsonNode fullDocument, JsonNode documentKey) throws 
JsonProcessingException {
         List<RichCdcMultiplexRecord> records = new ArrayList<>();
         LinkedHashMap<String, DataType> paimonFieldTypes = new 
LinkedHashMap<>();
 
         switch (op) {
             case OP_INSERT:
-                records.add(handleInsert(fullDocument, paimonFieldTypes));
+                records.add(processRecord(fullDocument, paimonFieldTypes, 
RowKind.INSERT));
                 break;
             case OP_REPLACE:
             case OP_UPDATE:
-                records.add(handleUpdateOrReplace(fullDocument, 
paimonFieldTypes));
+                // Before version 6.0 of MongoDB, it was not possible to 
obtain 'Update Before'
+                // information. Therefore, data is first deleted using the 
primary key '_id', and
+                // then inserted.
+                records.add(processRecord(documentKey, paimonFieldTypes, 
RowKind.DELETE));
+                records.add(processRecord(fullDocument, paimonFieldTypes, 
RowKind.INSERT));
+                break;
+            case OP_DELETE:
+                records.add(processRecord(documentKey, paimonFieldTypes, 
RowKind.DELETE));
                 break;
             default:
                 throw new UnsupportedOperationException("Unknown record type: 
" + op);
@@ -108,43 +117,22 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
     }
 
     /**
-     * Processes the insert operation and constructs a RichCdcMultiplexRecord.
-     *
-     * @param fullDocument The JsonNode representing the full MongoDB document 
for insertion.
-     * @param paimonFieldTypes A map to store the field types.
-     * @return A RichCdcMultiplexRecord representing the insert operation.
-     * @throws JsonProcessingException If there's an error during JSON 
processing.
-     */
-    private RichCdcMultiplexRecord handleInsert(
-            JsonNode fullDocument, LinkedHashMap<String, DataType> 
paimonFieldTypes)
-            throws JsonProcessingException {
-        Map<String, String> insert =
-                getExtractRow(
-                        fullDocument,
-                        paimonFieldTypes,
-                        caseSensitive,
-                        computedColumns,
-                        mongodbConfig);
-        return new RichCdcMultiplexRecord(
-                databaseName,
-                collection,
-                paimonFieldTypes,
-                extractPrimaryKeys(),
-                new CdcRecord(RowKind.INSERT, insert));
-    }
-
-    /**
-     * Processes the update or replace operation and constructs a 
RichCdcMultiplexRecord.
+     * Processes a JSON record based on the specified parameters and returns a
+     * RichCdcMultiplexRecord object.
      *
-     * @param fullDocument The JsonNode representing the full MongoDB document 
for update/replace.
-     * @param paimonFieldTypes A map to store the field types.
-     * @return A RichCdcMultiplexRecord representing the update or replace 
operation.
-     * @throws JsonProcessingException If there's an error during JSON 
processing.
+     * @param fullDocument the JSON node containing the full document to be 
processed.
+     * @param paimonFieldTypes a LinkedHashMap containing the field types to 
be used in the
+     *     processing.
+     * @param rowKind the kind of row to be processed (e.g., insert, update, 
delete).
+     * @throws JsonProcessingException if there is an error in processing the 
JSON document.
+     * @return a RichCdcMultiplexRecord object that contains the processed 
record information.
      */
-    private RichCdcMultiplexRecord handleUpdateOrReplace(
-            JsonNode fullDocument, LinkedHashMap<String, DataType> 
paimonFieldTypes)
+    private RichCdcMultiplexRecord processRecord(
+            JsonNode fullDocument,
+            LinkedHashMap<String, DataType> paimonFieldTypes,
+            RowKind rowKind)
             throws JsonProcessingException {
-        Map<String, String> after =
+        Map<String, String> record =
                 getExtractRow(
                         fullDocument,
                         paimonFieldTypes,
@@ -156,6 +144,6 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
                 collection,
                 paimonFieldTypes,
                 extractPrimaryKeys(),
-                new CdcRecord(RowKind.UPDATE_AFTER, after));
+                new CdcRecord(rowKind, record));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index deb11739a..0b55ce8ab 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -99,18 +99,18 @@ public class MongoDBSyncDatabaseActionITCase extends 
MongoDBActionITCaseBase {
 
         expected =
                 Arrays.asList(
-                        "+U[100000000000000000000101, scooter, Small 2-wheel 
scooter, 350]",
-                        "+U[100000000000000000000102, car battery, 
High-performance car battery, 8.1]",
-                        "+U[100000000000000000000103, 12-pack drill bits, Set 
of 12 professional-grade drill bits, 0.8]");
+                        "+I[100000000000000000000101, scooter, Small 2-wheel 
scooter, 350]",
+                        "+I[100000000000000000000102, car battery, 
High-performance car battery, 8.1]",
+                        "+I[100000000000000000000103, 12-pack drill bits, Set 
of 12 professional-grade drill bits, 0.8]");
         waitForResult(expected, table1, rowType1, primaryKeys1);
 
         writeRecordsToMongoDB("test-data-4", database, "database");
 
         expected =
                 Arrays.asList(
-                        "+U[100000000000000000000101, user_1, Guangzhou, 
123563291234]",
-                        "+U[100000000000000000000102, user_2, Beijing, 
1234546591234]",
-                        "+U[100000000000000000000103, user_3, Nanjing, 
1235567891234]");
+                        "+I[100000000000000000000101, user_1, Guangzhou, 
123563291234]",
+                        "+I[100000000000000000000102, user_2, Beijing, 
1234546591234]",
+                        "+I[100000000000000000000103, user_3, Nanjing, 
1235567891234]");
         waitForResult(expected, table2, rowType2, primaryKeys2);
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index bd4d78f04..1ed3427c0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -89,9 +89,9 @@ public class MongoDBSyncTableActionITCase extends 
MongoDBActionITCaseBase {
                         new String[] {"_id", "name", "description", "weight"});
         expected =
                 Arrays.asList(
-                        "+U[100000000000000000000101, scooter, Small 2-wheel 
scooter, 350]",
-                        "+U[100000000000000000000102, car battery, 
High-performance car battery, 8.1]",
-                        "+U[100000000000000000000103, 12-pack drill bits, Set 
of 12 professional-grade drill bits, 0.8]");
+                        "+I[100000000000000000000101, scooter, Small 2-wheel 
scooter, 350]",
+                        "+I[100000000000000000000102, car battery, 
High-performance car battery, 8.1]",
+                        "+I[100000000000000000000103, 12-pack drill bits, Set 
of 12 professional-grade drill bits, 0.8]");
         waitForResult(expected, table, rowType, primaryKeys);
 
         writeRecordsToMongoDB("inventory-3", dbName, "table");
@@ -111,9 +111,9 @@ public class MongoDBSyncTableActionITCase extends 
MongoDBActionITCaseBase {
                         });
         expected =
                 Arrays.asList(
-                        "+U[100000000000000000000102, car battery, 
High-performance car battery, 8.1, NULL, 18, NULL]",
-                        "+U[100000000000000000000103, 12-pack drill bits, Set 
of 12 professional-grade drill bits, 0.8, NULL, NULL, I live in Sanlitun]",
-                        "+U[100000000000000000000101, scooter, Small 2-wheel 
scooter, 350, playing computer games, NULL, NULL]");
+                        "+I[100000000000000000000102, car battery, 
High-performance car battery, 8.1, NULL, 18, NULL]",
+                        "+I[100000000000000000000103, 12-pack drill bits, Set 
of 12 professional-grade drill bits, 0.8, NULL, NULL, I live in Sanlitun]",
+                        "+I[100000000000000000000101, scooter, Small 2-wheel 
scooter, 350, playing computer games, NULL, NULL]");
         waitForResult(expected, table, rowType, primaryKeys);
     }
 
@@ -189,4 +189,69 @@ public class MongoDBSyncTableActionITCase extends 
MongoDBActionITCaseBase {
                 rowType,
                 Collections.singletonList("_id"));
     }
+
+    @Test
+    @Timeout(60)
+    public void testMongoDBCDCOperations() throws Exception {
+        writeRecordsToMongoDB("event-insert", database, "table/event");
+
+        Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+        mongodbConfig.put("database", database);
+        mongodbConfig.put("collection", "event");
+
+        MongoDBSyncTableAction action =
+                syncTableActionBuilder(mongodbConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        FileStoreTable table = getFileStoreTable(tableName);
+        List<String> primaryKeys = Collections.singletonList("_id");
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"_id", "name", "description", "weight"});
+
+        // For the INSERT operation
+        List<String> expectedInsert =
+                Arrays.asList(
+                        "+I[100000000000000000000101, scooter, Small 2-wheel 
scooter, 3.14]",
+                        "+I[100000000000000000000102, car battery, 12V car 
battery, 8.1]",
+                        "+I[100000000000000000000103, 12-pack drill bits, 
12-pack of drill bits with sizes ranging from #40 to #3, 0.8]");
+        waitForResult(expectedInsert, table, rowType, primaryKeys);
+
+        writeRecordsToMongoDB("event-update", database, "table/event");
+
+        // For the UPDATE operation
+        List<String> expectedUpdate =
+                Arrays.asList(
+                        "+I[100000000000000000000101, scooter, Updated scooter 
description, 4]",
+                        "+I[100000000000000000000102, car battery, 12V car 
battery, 8.1]",
+                        "+I[100000000000000000000103, 12-pack drill bits, 
12-pack of drill bits with sizes ranging from #40 to #3, 0.8]");
+        waitForResult(expectedUpdate, table, rowType, primaryKeys);
+
+        writeRecordsToMongoDB("event-replace", database, "table/event");
+
+        // For the REPLACE operation
+        List<String> expectedReplace =
+                Arrays.asList(
+                        "+I[100000000000000000000101, scooter, Updated scooter 
description, 4]",
+                        "+I[100000000000000000000102, new car battery, New 12V 
car battery, 9]",
+                        "+I[100000000000000000000103, 12-pack drill bits, 
12-pack of drill bits with sizes ranging from #40 to #3, 0.8]");
+        waitForResult(expectedReplace, table, rowType, primaryKeys);
+
+        writeRecordsToMongoDB("event-delete", database, "table/event");
+
+        // For the DELETE operation
+        List<String> expectedDelete =
+                Arrays.asList(
+                        "+I[100000000000000000000101, scooter, Updated scooter 
description, 4]",
+                        "+I[100000000000000000000102, new car battery, New 12V 
car battery, 9]");
+        waitForResult(expectedDelete, table, rowType, primaryKeys);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-delete.js
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-delete.js
new file mode 100644
index 000000000..1f14d5d47
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-delete.js
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('event').deleteOne(
+    { "_id": ObjectId("100000000000000000000103") }
+);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-insert.js
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-insert.js
new file mode 100644
index 000000000..676bd862f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-insert.js
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('event').insertMany([
+    {
+        "_id": ObjectId("100000000000000000000101"),
+        "name": "scooter",
+        "description": "Small 2-wheel scooter",
+        "weight": 3.14
+    },
+    {
+        "_id": ObjectId("100000000000000000000102"),
+        "name": "car battery",
+        "description": "12V car battery",
+        "weight": 8.1
+    },
+    {
+        "_id": ObjectId("100000000000000000000103"),
+        "name": "12-pack drill bits",
+        "description": "12-pack of drill bits with sizes ranging from #40 to 
#3",
+        "weight": 0.8
+    }
+]);
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-replace.js
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-replace.js
new file mode 100644
index 000000000..0e8219cfd
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-replace.js
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('event').replaceOne(
+    { "_id": ObjectId("100000000000000000000102") },
+    { "_id": ObjectId("100000000000000000000102"), "name": "new car battery", 
"description": "New 12V car battery", "weight": 9.0 }
+);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-update.js
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-update.js
new file mode 100644
index 000000000..dd92a7fbc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-update.js
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('event').updateOne(
+    { "_id": ObjectId("100000000000000000000101") },
+    { $set: { "description": "Updated scooter description", "weight": 4.0 } }
+);
\ No newline at end of file

Reply via email to