This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 4da02b8ae20e70966b7beb6348afbc681f701be0
Author: monster <[email protected]>
AuthorDate: Tue Jan 2 10:21:58 2024 +0800

    [flink] Fix mongo primary key field not objectId type (#2606)
---
 .../cdc/mongodb/strategy/MongoVersionStrategy.java |  4 ++-
 .../cdc/mongodb/MongoDBSyncTableActionITCase.java  | 37 +++++++++++++++++++++-
 .../mongodb/table/defaultid/defaultId-1.js         |  8 ++---
 .../defaultid/{defaultId-1.js => defaultId-2.js}   |  2 +-
 4 files changed, 44 insertions(+), 7 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index d4388d7bd..8465cedce 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -98,7 +98,9 @@ public interface MongoVersionStrategy {
         }
         JsonNode document =
                 mongodbConfig.getBoolean(DEFAULT_ID_GENERATION)
-                        ? objectNode.set(ID_FIELD, idNode.get(OID_FIELD))
+                        ? objectNode.set(
+                                ID_FIELD,
+                                idNode.get(OID_FIELD) == null ? idNode : 
idNode.get(OID_FIELD))
                         : objectNode;
         Map<String, String> row;
         switch (mode) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index 485e9876d..81401a2ed 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -301,7 +301,7 @@ public class MongoDBSyncTableActionITCase extends 
MongoDBActionITCaseBase {
 
         Map<String, String> mongodbConfig = getBasicMongoDBConfig();
         mongodbConfig.put("database", database);
-        mongodbConfig.put("collection", "defaultId");
+        mongodbConfig.put("collection", "defaultId1");
         mongodbConfig.put("default.id.generation", "false");
 
         MongoDBSyncTableAction action =
@@ -310,6 +310,41 @@ public class MongoDBSyncTableActionITCase extends 
MongoDBActionITCaseBase {
                         .build();
         runActionWithDefaultEnv(action);
 
+        FileStoreTable table = getFileStoreTable(tableName);
+        List<String> primaryKeys = Collections.singletonList("_id");
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"_id", "name", "description", "weight"});
+
+        List<String> expectedInsert =
+                Arrays.asList(
+                        "+I[{\"$oid\":\"100000000000000000000101\"}, scooter, 
Small 2-wheel scooter, 3.14]",
+                        "+I[{\"$oid\":\"100000000000000000000102\"}, car 
battery, 12V car battery, 8.1]",
+                        "+I[{\"$oid\":\"100000000000000000000103\"}, 12-pack 
drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]");
+        waitForResult(expectedInsert, table, rowType, primaryKeys);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testPrimaryKeyNotObjectIdType() throws Exception {
+        writeRecordsToMongoDB("defaultId-2", database, "table/defaultid");
+
+        Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+        mongodbConfig.put("database", database);
+        mongodbConfig.put("collection", "defaultId2");
+
+        MongoDBSyncTableAction action =
+                syncTableActionBuilder(mongodbConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
         FileStoreTable table = getFileStoreTable(tableName);
         List<String> primaryKeys = Collections.singletonList("_id");
         RowType rowType =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js
index 12e305ec5..2d9b87b17 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js
@@ -13,21 +13,21 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-db.getCollection('defaultId').insertMany([
+db.getCollection('defaultId1').insertMany([
     {
-        "_id": "100000000000000000000101",
+        "_id": ObjectId("100000000000000000000101"),
         "name": "scooter",
         "description": "Small 2-wheel scooter",
         "weight": 3.14
     },
     {
-        "_id": "100000000000000000000102",
+        "_id": ObjectId("100000000000000000000102"),
         "name": "car battery",
         "description": "12V car battery",
         "weight": 8.1
     },
     {
-        "_id": "100000000000000000000103",
+        "_id": ObjectId("100000000000000000000103"),
         "name": "12-pack drill bits",
         "description": "12-pack of drill bits with sizes ranging from #40 to 
#3",
         "weight": 0.8
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-2.js
similarity index 96%
copy from 
paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js
copy to 
paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-2.js
index 12e305ec5..c1acf8fe2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-2.js
@@ -13,7 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-db.getCollection('defaultId').insertMany([
+db.getCollection('defaultId2').insertMany([
     {
         "_id": "100000000000000000000101",
         "name": "scooter",

Reply via email to