This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 4da02b8ae20e70966b7beb6348afbc681f701be0 Author: monster <[email protected]> AuthorDate: Tue Jan 2 10:21:58 2024 +0800 [flink] Fix mongo primary key field not objectId type (#2606) --- .../cdc/mongodb/strategy/MongoVersionStrategy.java | 4 ++- .../cdc/mongodb/MongoDBSyncTableActionITCase.java | 37 +++++++++++++++++++++- .../mongodb/table/defaultid/defaultId-1.js | 8 ++--- .../defaultid/{defaultId-1.js => defaultId-2.js} | 2 +- 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java index d4388d7bd..8465cedce 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java @@ -98,7 +98,9 @@ public interface MongoVersionStrategy { } JsonNode document = mongodbConfig.getBoolean(DEFAULT_ID_GENERATION) - ? objectNode.set(ID_FIELD, idNode.get(OID_FIELD)) + ? objectNode.set( + ID_FIELD, + idNode.get(OID_FIELD) == null ? idNode : idNode.get(OID_FIELD)) : objectNode; Map<String, String> row; switch (mode) { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java index 485e9876d..81401a2ed 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java @@ -301,7 +301,7 @@ public class MongoDBSyncTableActionITCase extends MongoDBActionITCaseBase { Map<String, String> mongodbConfig = getBasicMongoDBConfig(); mongodbConfig.put("database", database); - mongodbConfig.put("collection", "defaultId"); + mongodbConfig.put("collection", "defaultId1"); mongodbConfig.put("default.id.generation", "false"); MongoDBSyncTableAction action = @@ -310,6 +310,41 @@ public class MongoDBSyncTableActionITCase extends MongoDBActionITCaseBase { .build(); runActionWithDefaultEnv(action); + FileStoreTable table = getFileStoreTable(tableName); + List<String> primaryKeys = Collections.singletonList("_id"); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"_id", "name", "description", "weight"}); + + List<String> expectedInsert = + Arrays.asList( + "+I[{\"$oid\":\"100000000000000000000101\"}, scooter, Small 2-wheel scooter, 3.14]", + "+I[{\"$oid\":\"100000000000000000000102\"}, car battery, 12V car battery, 8.1]", + "+I[{\"$oid\":\"100000000000000000000103\"}, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]"); + waitForResult(expectedInsert, table, rowType, primaryKeys); + } + + @Test + @Timeout(60) + public void testPrimaryKeyNotObjectIdType() throws Exception { + writeRecordsToMongoDB("defaultId-2", database, "table/defaultid"); + + Map<String, String> mongodbConfig = getBasicMongoDBConfig(); + mongodbConfig.put("database", database); + mongodbConfig.put("collection", "defaultId2"); + + MongoDBSyncTableAction action = + syncTableActionBuilder(mongodbConfig) + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + FileStoreTable table = getFileStoreTable(tableName); List<String> primaryKeys = Collections.singletonList("_id"); RowType rowType = diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js index 12e305ec5..2d9b87b17 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js @@ -13,21 +13,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -db.getCollection('defaultId').insertMany([ +db.getCollection('defaultId1').insertMany([ { - "_id": "100000000000000000000101", + "_id": ObjectId("100000000000000000000101"), "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14 }, { - "_id": "100000000000000000000102", + "_id": ObjectId("100000000000000000000102"), "name": "car battery", "description": "12V car battery", "weight": 8.1 }, { - "_id": "100000000000000000000103", + "_id": ObjectId("100000000000000000000103"), "name": "12-pack drill bits", "description": "12-pack of drill bits with sizes ranging from #40 to #3", "weight": 0.8 diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-2.js similarity index 96% copy from paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js copy to paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-2.js index 12e305ec5..c1acf8fe2 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-2.js @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -db.getCollection('defaultId').insertMany([ +db.getCollection('defaultId2').insertMany([ { "_id": "100000000000000000000101", "name": "scooter",
