This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ced05f05f [flink] Fix cannot deserialize value for Mongo cdc (#1913)
ced05f05f is described below

commit ced05f05fc7650089c285b7fd3ae2d59fd427e10
Author: monster <[email protected]>
AuthorDate: Wed Aug 30 21:48:33 2023 +0800

    [flink] Fix cannot deserialize value for Mongo cdc (#1913)
---
 .../org/apache/paimon/utils/JsonSerdeUtil.java     | 20 +++++--
 .../cdc/mongodb/MongoDBSyncDatabaseAction.java     |  8 +--
 .../mongodb/MongoDBSyncDatabaseActionITCase.java   | 53 ++++++++++++++++++
 .../test/resources/mongodb/database/test-data-5.js | 38 +++++++++++++
 .../test/resources/mongodb/database/test-data-6.js | 62 ++++++++++++++++++++++
 5 files changed, 175 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index 4ea553249..5d46759f2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -67,9 +67,23 @@ public class JsonSerdeUtil {
                     .collect(
                             Collectors.toMap(
                                     Map.Entry::getKey,
-                                    entry ->
-                                            
OBJECT_MAPPER_INSTANCE.convertValue(
-                                                    entry.getValue(), 
valueType),
+                                    entry -> {
+                                        Object value = entry.getValue();
+                                        try {
+                                            if 
(!(valueType.isInstance(value))) {
+                                                String jsonStr =
+                                                        
OBJECT_MAPPER_INSTANCE.writeValueAsString(
+                                                                value);
+                                                return 
OBJECT_MAPPER_INSTANCE.convertValue(
+                                                        jsonStr, valueType);
+                                            }
+                                            return 
OBJECT_MAPPER_INSTANCE.convertValue(
+                                                    value, valueType);
+                                        } catch (JsonProcessingException e) {
+                                            throw new RuntimeException(
+                                                    "Error converting value to 
JSON string", e);
+                                        }
+                                    },
                                     (oldValue, newValue) -> oldValue,
                                     LinkedHashMap::new));
         } catch (JsonProcessingException e) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 957aed34e..5f2f8db4d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -77,7 +77,7 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
     @Nullable private final String includingTables;
 
     public MongoDBSyncDatabaseAction(
-            Map<String, String> kafkaConfig,
+            Map<String, String> mongodbConfig,
             String warehouse,
             String database,
             @Nullable String tablePrefix,
@@ -87,7 +87,7 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
             Map<String, String> catalogConfig,
             Map<String, String> tableConfig) {
         super(warehouse, catalogConfig);
-        this.mongodbConfig = Configuration.fromMap(kafkaConfig);
+        this.mongodbConfig = Configuration.fromMap(mongodbConfig);
         this.database = database;
         this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
         this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
@@ -131,7 +131,9 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
                                                 "MongoDB Source")
                                         .flatMap(
                                                 new MongoDBRecordParser(
-                                                        false, 
tableNameConverter, mongodbConfig)))
+                                                        caseSensitive,
+                                                        tableNameConverter,
+                                                        mongodbConfig)))
                         .withParserFactory(parserFactory)
                         .withCatalogLoader(catalogLoader())
                         .withDatabase(database)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index 4ca4e2914..e1a2b9c9b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -125,4 +125,57 @@ public class MongoDBSyncDatabaseActionITCase extends 
MongoDBActionITCaseBase {
         assertThat(action.tableConfig())
                 
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
     }
+
+    @Test
+    @Timeout(60)
+    public void testMongoDBNestedDataSynchronizationAndVerification() throws 
Exception {
+        writeRecordsToMongoDB("test-data-5", database, "database");
+        writeRecordsToMongoDB("test-data-6", database, "database");
+        Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+        mongodbConfig.put("database", database);
+        MongoDBSyncDatabaseAction action =
+                syncDatabaseActionBuilder(mongodbConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        waitingTables("t3", "t4");
+        FileStoreTable table1 = getFileStoreTable("t3");
+        FileStoreTable table2 = getFileStoreTable("t4");
+
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"_id", "country", "languages", 
"religions"});
+        List<String> primaryKeys1 = Collections.singletonList("_id");
+        List<String> expected1 =
+                Arrays.asList(
+                        "+I[610000000000000000000101, Switzerland, Italian, 
{\"f\":\"v\",\"n\":null}]",
+                        "+I[610000000000000000000102, Switzerland, Italian, ]",
+                        "+I[610000000000000000000103, Switzerland, 
[\"Italian\"], ]");
+        waitForResult(expected1, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"_id", "kind", "etag", "pageInfo", 
"items"});
+        List<String> primaryKeys2 = Collections.singletonList("_id");
+        List<String> expected2 =
+                Arrays.asList(
+                        "+I[610000000000000000000101, 
youtube#videoListResponse, 
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", 
{\"totalResults\":1,\"resultsPerPage\":1}, 
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\
 [...]
+                        "+I[610000000000000000000102, 
youtube#videoListResponse, 
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", page, 
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\"/m/02mjmr\"],\"relevantTopicIds\":[\
 [...]
+                        "+I[610000000000000000000103, 
youtube#videoListResponse, 
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", 
{\"pagehit\":{\"kind\":\"youtube#video\"},\"totalResults\":1,\"resultsPerPage\":1},
 
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":
 [...]
+        waitForResult(expected2, table2, rowType2, primaryKeys2);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-5.js
 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-5.js
new file mode 100644
index 000000000..bc32ac132
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-5.js
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('t3').insertMany([
+    {
+        '_id': ObjectId('610000000000000000000101'),
+        'country': 'Switzerland',
+        'languages': 'Italian',
+        'religions': {
+            'f': 'v',
+            'n': null
+        }
+    },
+    {
+        '_id': ObjectId('610000000000000000000102'),
+        'country': 'Switzerland',
+        'languages': 'Italian',
+        'religions': ''
+    },
+    {
+        '_id': ObjectId('610000000000000000000103'),
+        'country': 'Switzerland',
+        'languages': ['Italian'],
+        'religions': ''
+    }
+]);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-6.js
 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-6.js
new file mode 100644
index 000000000..3d189d58c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-6.js
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('t4').insertMany([
+    {
+        "_id": ObjectId("610000000000000000000101"),
+        "kind": "youtube#videoListResponse",
+        "etag": 
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"",
+        "pageInfo": { "totalResults": 1, "resultsPerPage": 1 },
+        "items": [
+            {
+                "kind": "youtube#video",
+                "etag": 
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"",
+                "id": "wHkPb68dxEw",
+                "statistics": { "viewCount": "9211", "likeCount": "79", 
"dislikeCount": "11", "favoriteCount": "0", "commentCount": "29" },
+                "topicDetails": { "topicIds": [ "/m/02mjmr" ], 
"relevantTopicIds": [ "/m/0cnfvd", "/m/01jdpf" ] }
+            }
+        ]
+    },
+    {
+        "_id": ObjectId("610000000000000000000102"),
+        "kind": "youtube#videoListResponse",
+        "etag": 
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"",
+        "pageInfo": "page",
+        "items": [
+            {
+                "kind": "youtube#video",
+                "etag": 
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"",
+                "id": "wHkPb68dxEw",
+                "statistics": { "viewCount": "9211", "likeCount": "79", 
"dislikeCount": "11", "favoriteCount": "0", "commentCount": "29" },
+                "topicDetails": { "topicIds": [ "/m/02mjmr" ], 
"relevantTopicIds": [ "/m/0cnfvd", "/m/01jdpf" ] }
+            }
+        ]
+    },
+    {
+        "_id": ObjectId("610000000000000000000103"),
+        "kind": "youtube#videoListResponse",
+        "etag": 
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"",
+        "pageInfo": { "pagehit":{ "kind": "youtube#video" }, "totalResults": 
1, "resultsPerPage": 1 },
+        "items": [
+            {
+                "kind": "youtube#video",
+                "etag": 
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"",
+                "id": "wHkPb68dxEw",
+                "statistics": { "viewCount": "9211", "likeCount": "79", 
"dislikeCount": "11", "favoriteCount": "0", "commentCount": "29" },
+                "topicDetails": { "topicIds": [ "/m/02mjmr" ], 
"relevantTopicIds": [ "/m/0cnfvd", "/m/01jdpf" ] }
+            }
+        ]
+    }
+]);

Reply via email to