This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b31bdbc3f [flink] Fix mongodb CDC Ingestion unknown delete (#2029)
b31bdbc3f is described below
commit b31bdbc3f4a6249ceab248a4a4058064e365f507
Author: monster <[email protected]>
AuthorDate: Wed Sep 20 21:07:21 2023 +0800
[flink] Fix mongodb CDC Ingestion unknown delete (#2029)
---
.../mongodb/strategy/Mongo4VersionStrategy.java | 70 ++++++++------------
.../mongodb/MongoDBSyncDatabaseActionITCase.java | 12 ++--
.../cdc/mongodb/MongoDBSyncTableActionITCase.java | 77 ++++++++++++++++++++--
.../resources/mongodb/table/event/event-delete.js | 18 +++++
.../resources/mongodb/table/event/event-insert.js | 35 ++++++++++
.../resources/mongodb/table/event/event-replace.js | 19 ++++++
.../resources/mongodb/table/event/event-update.js | 19 ++++++
7 files changed, 197 insertions(+), 53 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index 028af46f1..53cd8a309 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -42,10 +42,11 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
private static final String FIELD_TYPE = "operationType";
private static final String FIELD_DATA = "fullDocument";
+ private static final String FIELD_KEY = "documentKey";
private static final String OP_UPDATE = "update";
private static final String OP_INSERT = "insert";
private static final String OP_REPLACE = "replace";
-
+ private static final String OP_DELETE = "delete";
private final String databaseName;
private final String collection;
private final boolean caseSensitive;
@@ -77,7 +78,8 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
throws JsonProcessingException {
String op = root.get(FIELD_TYPE).asText();
JsonNode fullDocument = root.get(FIELD_DATA);
- return handleOperation(op, fullDocument);
+ JsonNode documentKey = root.get(FIELD_KEY);
+ return handleOperation(op, fullDocument, documentKey);
}
/**
@@ -88,18 +90,25 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
* @return A list of RichCdcMultiplexRecord based on the operation type.
* @throws JsonProcessingException If there's an error during JSON
processing.
*/
- private List<RichCdcMultiplexRecord> handleOperation(String op, JsonNode
fullDocument)
- throws JsonProcessingException {
+ private List<RichCdcMultiplexRecord> handleOperation(
+ String op, JsonNode fullDocument, JsonNode documentKey) throws
JsonProcessingException {
List<RichCdcMultiplexRecord> records = new ArrayList<>();
LinkedHashMap<String, DataType> paimonFieldTypes = new
LinkedHashMap<>();
switch (op) {
case OP_INSERT:
- records.add(handleInsert(fullDocument, paimonFieldTypes));
+ records.add(processRecord(fullDocument, paimonFieldTypes,
RowKind.INSERT));
break;
case OP_REPLACE:
case OP_UPDATE:
- records.add(handleUpdateOrReplace(fullDocument,
paimonFieldTypes));
+ // Before version 6.0 of MongoDB, it was not possible to
obtain 'Update Before'
+ // information. Therefore, data is first deleted using the
primary key '_id', and
+ // then inserted.
+ records.add(processRecord(documentKey, paimonFieldTypes,
RowKind.DELETE));
+ records.add(processRecord(fullDocument, paimonFieldTypes,
RowKind.INSERT));
+ break;
+ case OP_DELETE:
+ records.add(processRecord(documentKey, paimonFieldTypes,
RowKind.DELETE));
break;
default:
throw new UnsupportedOperationException("Unknown record type:
" + op);
@@ -108,43 +117,22 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
}
/**
- * Processes the insert operation and constructs a RichCdcMultiplexRecord.
- *
- * @param fullDocument The JsonNode representing the full MongoDB document
for insertion.
- * @param paimonFieldTypes A map to store the field types.
- * @return A RichCdcMultiplexRecord representing the insert operation.
- * @throws JsonProcessingException If there's an error during JSON
processing.
- */
- private RichCdcMultiplexRecord handleInsert(
- JsonNode fullDocument, LinkedHashMap<String, DataType>
paimonFieldTypes)
- throws JsonProcessingException {
- Map<String, String> insert =
- getExtractRow(
- fullDocument,
- paimonFieldTypes,
- caseSensitive,
- computedColumns,
- mongodbConfig);
- return new RichCdcMultiplexRecord(
- databaseName,
- collection,
- paimonFieldTypes,
- extractPrimaryKeys(),
- new CdcRecord(RowKind.INSERT, insert));
- }
-
- /**
- * Processes the update or replace operation and constructs a
RichCdcMultiplexRecord.
+ * Processes a JSON record based on the specified parameters and returns a
+ * RichCdcMultiplexRecord object.
*
- * @param fullDocument The JsonNode representing the full MongoDB document
for update/replace.
- * @param paimonFieldTypes A map to store the field types.
- * @return A RichCdcMultiplexRecord representing the update or replace
operation.
- * @throws JsonProcessingException If there's an error during JSON
processing.
+ * @param fullDocument the JSON node containing the full document to be
processed.
+ * @param paimonFieldTypes a LinkedHashMap containing the field types to
be used in the
+ * processing.
+ * @param rowKind the kind of row to be processed (e.g., insert, update,
delete).
+ * @throws JsonProcessingException if there is an error in processing the
JSON document.
+ * @return a RichCdcMultiplexRecord object that contains the processed
record information.
*/
- private RichCdcMultiplexRecord handleUpdateOrReplace(
- JsonNode fullDocument, LinkedHashMap<String, DataType>
paimonFieldTypes)
+ private RichCdcMultiplexRecord processRecord(
+ JsonNode fullDocument,
+ LinkedHashMap<String, DataType> paimonFieldTypes,
+ RowKind rowKind)
throws JsonProcessingException {
- Map<String, String> after =
+ Map<String, String> record =
getExtractRow(
fullDocument,
paimonFieldTypes,
@@ -156,6 +144,6 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
collection,
paimonFieldTypes,
extractPrimaryKeys(),
- new CdcRecord(RowKind.UPDATE_AFTER, after));
+ new CdcRecord(rowKind, record));
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index deb11739a..0b55ce8ab 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -99,18 +99,18 @@ public class MongoDBSyncDatabaseActionITCase extends
MongoDBActionITCaseBase {
expected =
Arrays.asList(
- "+U[100000000000000000000101, scooter, Small 2-wheel
scooter, 350]",
- "+U[100000000000000000000102, car battery,
High-performance car battery, 8.1]",
- "+U[100000000000000000000103, 12-pack drill bits, Set
of 12 professional-grade drill bits, 0.8]");
+ "+I[100000000000000000000101, scooter, Small 2-wheel
scooter, 350]",
+ "+I[100000000000000000000102, car battery,
High-performance car battery, 8.1]",
+ "+I[100000000000000000000103, 12-pack drill bits, Set
of 12 professional-grade drill bits, 0.8]");
waitForResult(expected, table1, rowType1, primaryKeys1);
writeRecordsToMongoDB("test-data-4", database, "database");
expected =
Arrays.asList(
- "+U[100000000000000000000101, user_1, Guangzhou,
123563291234]",
- "+U[100000000000000000000102, user_2, Beijing,
1234546591234]",
- "+U[100000000000000000000103, user_3, Nanjing,
1235567891234]");
+ "+I[100000000000000000000101, user_1, Guangzhou,
123563291234]",
+ "+I[100000000000000000000102, user_2, Beijing,
1234546591234]",
+ "+I[100000000000000000000103, user_3, Nanjing,
1235567891234]");
waitForResult(expected, table2, rowType2, primaryKeys2);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index bd4d78f04..1ed3427c0 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -89,9 +89,9 @@ public class MongoDBSyncTableActionITCase extends
MongoDBActionITCaseBase {
new String[] {"_id", "name", "description", "weight"});
expected =
Arrays.asList(
- "+U[100000000000000000000101, scooter, Small 2-wheel
scooter, 350]",
- "+U[100000000000000000000102, car battery,
High-performance car battery, 8.1]",
- "+U[100000000000000000000103, 12-pack drill bits, Set
of 12 professional-grade drill bits, 0.8]");
+ "+I[100000000000000000000101, scooter, Small 2-wheel
scooter, 350]",
+ "+I[100000000000000000000102, car battery,
High-performance car battery, 8.1]",
+ "+I[100000000000000000000103, 12-pack drill bits, Set
of 12 professional-grade drill bits, 0.8]");
waitForResult(expected, table, rowType, primaryKeys);
writeRecordsToMongoDB("inventory-3", dbName, "table");
@@ -111,9 +111,9 @@ public class MongoDBSyncTableActionITCase extends
MongoDBActionITCaseBase {
});
expected =
Arrays.asList(
- "+U[100000000000000000000102, car battery,
High-performance car battery, 8.1, NULL, 18, NULL]",
- "+U[100000000000000000000103, 12-pack drill bits, Set
of 12 professional-grade drill bits, 0.8, NULL, NULL, I live in Sanlitun]",
- "+U[100000000000000000000101, scooter, Small 2-wheel
scooter, 350, playing computer games, NULL, NULL]");
+ "+I[100000000000000000000102, car battery,
High-performance car battery, 8.1, NULL, 18, NULL]",
+ "+I[100000000000000000000103, 12-pack drill bits, Set
of 12 professional-grade drill bits, 0.8, NULL, NULL, I live in Sanlitun]",
+ "+I[100000000000000000000101, scooter, Small 2-wheel
scooter, 350, playing computer games, NULL, NULL]");
waitForResult(expected, table, rowType, primaryKeys);
}
@@ -189,4 +189,69 @@ public class MongoDBSyncTableActionITCase extends
MongoDBActionITCaseBase {
rowType,
Collections.singletonList("_id"));
}
+
+ @Test
+ @Timeout(60)
+ public void testMongoDBCDCOperations() throws Exception {
+ writeRecordsToMongoDB("event-insert", database, "table/event");
+
+ Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+ mongodbConfig.put("database", database);
+ mongodbConfig.put("collection", "event");
+
+ MongoDBSyncTableAction action =
+ syncTableActionBuilder(mongodbConfig)
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ FileStoreTable table = getFileStoreTable(tableName);
+ List<String> primaryKeys = Collections.singletonList("_id");
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"_id", "name", "description", "weight"});
+
+ // For the INSERT operation
+ List<String> expectedInsert =
+ Arrays.asList(
+ "+I[100000000000000000000101, scooter, Small 2-wheel
scooter, 3.14]",
+ "+I[100000000000000000000102, car battery, 12V car
battery, 8.1]",
+ "+I[100000000000000000000103, 12-pack drill bits,
12-pack of drill bits with sizes ranging from #40 to #3, 0.8]");
+ waitForResult(expectedInsert, table, rowType, primaryKeys);
+
+ writeRecordsToMongoDB("event-update", database, "table/event");
+
+ // For the UPDATE operation
+ List<String> expectedUpdate =
+ Arrays.asList(
+ "+I[100000000000000000000101, scooter, Updated scooter
description, 4]",
+ "+I[100000000000000000000102, car battery, 12V car
battery, 8.1]",
+ "+I[100000000000000000000103, 12-pack drill bits,
12-pack of drill bits with sizes ranging from #40 to #3, 0.8]");
+ waitForResult(expectedUpdate, table, rowType, primaryKeys);
+
+ writeRecordsToMongoDB("event-replace", database, "table/event");
+
+ // For the REPLACE operation
+ List<String> expectedReplace =
+ Arrays.asList(
+ "+I[100000000000000000000101, scooter, Updated scooter
description, 4]",
+ "+I[100000000000000000000102, new car battery, New 12V
car battery, 9]",
+ "+I[100000000000000000000103, 12-pack drill bits,
12-pack of drill bits with sizes ranging from #40 to #3, 0.8]");
+ waitForResult(expectedReplace, table, rowType, primaryKeys);
+
+ writeRecordsToMongoDB("event-delete", database, "table/event");
+
+ // For the DELETE operation
+ List<String> expectedDelete =
+ Arrays.asList(
+ "+I[100000000000000000000101, scooter, Updated scooter
description, 4]",
+ "+I[100000000000000000000102, new car battery, New 12V
car battery, 9]");
+ waitForResult(expectedDelete, table, rowType, primaryKeys);
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-delete.js
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-delete.js
new file mode 100644
index 000000000..1f14d5d47
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-delete.js
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('event').deleteOne(
+ { "_id": ObjectId("100000000000000000000103") }
+);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-insert.js
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-insert.js
new file mode 100644
index 000000000..676bd862f
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-insert.js
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('event').insertMany([
+ {
+ "_id": ObjectId("100000000000000000000101"),
+ "name": "scooter",
+ "description": "Small 2-wheel scooter",
+ "weight": 3.14
+ },
+ {
+ "_id": ObjectId("100000000000000000000102"),
+ "name": "car battery",
+ "description": "12V car battery",
+ "weight": 8.1
+ },
+ {
+ "_id": ObjectId("100000000000000000000103"),
+ "name": "12-pack drill bits",
+ "description": "12-pack of drill bits with sizes ranging from #40 to
#3",
+ "weight": 0.8
+ }
+]);
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-replace.js
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-replace.js
new file mode 100644
index 000000000..0e8219cfd
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-replace.js
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('event').replaceOne(
+ { "_id": ObjectId("100000000000000000000102") },
+ { "_id": ObjectId("100000000000000000000102"), "name": "new car battery",
"description": "New 12V car battery", "weight": 9.0 }
+);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-update.js
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-update.js
new file mode 100644
index 000000000..dd92a7fbc
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/event/event-update.js
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('event').updateOne(
+ { "_id": ObjectId("100000000000000000000101") },
+ { $set: { "description": "Updated scooter description", "weight": 4.0 } }
+);
\ No newline at end of file