This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ced05f05f [flink] Fix cannot deserialize value for Mongo cdc (#1913)
ced05f05f is described below
commit ced05f05fc7650089c285b7fd3ae2d59fd427e10
Author: monster <[email protected]>
AuthorDate: Wed Aug 30 21:48:33 2023 +0800
[flink] Fix cannot deserialize value for Mongo cdc (#1913)
---
.../org/apache/paimon/utils/JsonSerdeUtil.java | 20 +++++--
.../cdc/mongodb/MongoDBSyncDatabaseAction.java | 8 +--
.../mongodb/MongoDBSyncDatabaseActionITCase.java | 53 ++++++++++++++++++
.../test/resources/mongodb/database/test-data-5.js | 38 +++++++++++++
.../test/resources/mongodb/database/test-data-6.js | 62 ++++++++++++++++++++++
5 files changed, 175 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index 4ea553249..5d46759f2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -67,9 +67,23 @@ public class JsonSerdeUtil {
.collect(
Collectors.toMap(
Map.Entry::getKey,
- entry ->
-
OBJECT_MAPPER_INSTANCE.convertValue(
- entry.getValue(),
valueType),
+ entry -> {
+ Object value = entry.getValue();
+ try {
+ if
(!(valueType.isInstance(value))) {
+ String jsonStr =
+
OBJECT_MAPPER_INSTANCE.writeValueAsString(
+ value);
+ return
OBJECT_MAPPER_INSTANCE.convertValue(
+ jsonStr, valueType);
+ }
+ return
OBJECT_MAPPER_INSTANCE.convertValue(
+ value, valueType);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(
+ "Error converting value to
JSON string", e);
+ }
+ },
(oldValue, newValue) -> oldValue,
LinkedHashMap::new));
} catch (JsonProcessingException e) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 957aed34e..5f2f8db4d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -77,7 +77,7 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
@Nullable private final String includingTables;
public MongoDBSyncDatabaseAction(
- Map<String, String> kafkaConfig,
+ Map<String, String> mongodbConfig,
String warehouse,
String database,
@Nullable String tablePrefix,
@@ -87,7 +87,7 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
Map<String, String> catalogConfig,
Map<String, String> tableConfig) {
super(warehouse, catalogConfig);
- this.mongodbConfig = Configuration.fromMap(kafkaConfig);
+ this.mongodbConfig = Configuration.fromMap(mongodbConfig);
this.database = database;
this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
@@ -131,7 +131,9 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
"MongoDB Source")
.flatMap(
new MongoDBRecordParser(
- false,
tableNameConverter, mongodbConfig)))
+ caseSensitive,
+ tableNameConverter,
+ mongodbConfig)))
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
.withDatabase(database)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index 4ca4e2914..e1a2b9c9b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -125,4 +125,57 @@ public class MongoDBSyncDatabaseActionITCase extends
MongoDBActionITCaseBase {
assertThat(action.tableConfig())
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}
+
+ @Test
+ @Timeout(60)
+ public void testMongoDBNestedDataSynchronizationAndVerification() throws
Exception {
+ writeRecordsToMongoDB("test-data-5", database, "database");
+ writeRecordsToMongoDB("test-data-6", database, "database");
+ Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+ mongodbConfig.put("database", database);
+ MongoDBSyncDatabaseAction action =
+ syncDatabaseActionBuilder(mongodbConfig)
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ waitingTables("t3", "t4");
+ FileStoreTable table1 = getFileStoreTable("t3");
+ FileStoreTable table2 = getFileStoreTable("t4");
+
+ RowType rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"_id", "country", "languages",
"religions"});
+ List<String> primaryKeys1 = Collections.singletonList("_id");
+ List<String> expected1 =
+ Arrays.asList(
+ "+I[610000000000000000000101, Switzerland, Italian,
{\"f\":\"v\",\"n\":null}]",
+ "+I[610000000000000000000102, Switzerland, Italian, ]",
+ "+I[610000000000000000000103, Switzerland,
[\"Italian\"], ]");
+ waitForResult(expected1, table1, rowType1, primaryKeys1);
+
+ RowType rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"_id", "kind", "etag", "pageInfo",
"items"});
+ List<String> primaryKeys2 = Collections.singletonList("_id");
+ List<String> expected2 =
+ Arrays.asList(
+ "+I[610000000000000000000101,
youtube#videoListResponse,
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\",
{\"totalResults\":1,\"resultsPerPage\":1},
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\
[...]
+ "+I[610000000000000000000102,
youtube#videoListResponse,
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", page,
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\"/m/02mjmr\"],\"relevantTopicIds\":[\
[...]
+ "+I[610000000000000000000103,
youtube#videoListResponse,
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\",
{\"pagehit\":{\"kind\":\"youtube#video\"},\"totalResults\":1,\"resultsPerPage\":1},
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":
[...]
+ waitForResult(expected2, table2, rowType2, primaryKeys2);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-5.js
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-5.js
new file mode 100644
index 000000000..bc32ac132
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-5.js
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('t3').insertMany([
+ {
+ '_id': ObjectId('610000000000000000000101'),
+ 'country': 'Switzerland',
+ 'languages': 'Italian',
+ 'religions': {
+ 'f': 'v',
+ 'n': null
+ }
+ },
+ {
+ '_id': ObjectId('610000000000000000000102'),
+ 'country': 'Switzerland',
+ 'languages': 'Italian',
+ 'religions': ''
+ },
+ {
+ '_id': ObjectId('610000000000000000000103'),
+ 'country': 'Switzerland',
+ 'languages': ['Italian'],
+ 'religions': ''
+ }
+]);
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-6.js
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-6.js
new file mode 100644
index 000000000..3d189d58c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mongodb/database/test-data-6.js
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('t4').insertMany([
+ {
+ "_id": ObjectId("610000000000000000000101"),
+ "kind": "youtube#videoListResponse",
+ "etag":
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"",
+ "pageInfo": { "totalResults": 1, "resultsPerPage": 1 },
+ "items": [
+ {
+ "kind": "youtube#video",
+ "etag":
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"",
+ "id": "wHkPb68dxEw",
+ "statistics": { "viewCount": "9211", "likeCount": "79",
"dislikeCount": "11", "favoriteCount": "0", "commentCount": "29" },
+ "topicDetails": { "topicIds": [ "/m/02mjmr" ],
"relevantTopicIds": [ "/m/0cnfvd", "/m/01jdpf" ] }
+ }
+ ]
+ },
+ {
+ "_id": ObjectId("610000000000000000000102"),
+ "kind": "youtube#videoListResponse",
+ "etag":
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"",
+ "pageInfo": "page",
+ "items": [
+ {
+ "kind": "youtube#video",
+ "etag":
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"",
+ "id": "wHkPb68dxEw",
+ "statistics": { "viewCount": "9211", "likeCount": "79",
"dislikeCount": "11", "favoriteCount": "0", "commentCount": "29" },
+ "topicDetails": { "topicIds": [ "/m/02mjmr" ],
"relevantTopicIds": [ "/m/0cnfvd", "/m/01jdpf" ] }
+ }
+ ]
+ },
+ {
+ "_id": ObjectId("610000000000000000000103"),
+ "kind": "youtube#videoListResponse",
+ "etag":
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\"",
+ "pageInfo": { "pagehit":{ "kind": "youtube#video" }, "totalResults":
1, "resultsPerPage": 1 },
+ "items": [
+ {
+ "kind": "youtube#video",
+ "etag":
"\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\"",
+ "id": "wHkPb68dxEw",
+ "statistics": { "viewCount": "9211", "likeCount": "79",
"dislikeCount": "11", "favoriteCount": "0", "commentCount": "29" },
+ "topicDetails": { "topicIds": [ "/m/02mjmr" ],
"relevantTopicIds": [ "/m/0cnfvd", "/m/01jdpf" ] }
+ }
+ ]
+ }
+]);