This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 114c260db [flink] Fix exception related to field size after adding a 
new table (#1919)
114c260db is described below

commit 114c260dbc60a5a1c8ce5832e280af7578bca72e
Author: monster <[email protected]>
AuthorDate: Wed Sep 6 15:50:37 2023 +0800

    [flink] Fix exception related to field size after adding a new table (#1919)
---
 .../action/cdc/kafka/KafkaSyncDatabaseAction.java  |  2 +-
 .../cdc/kafka/formats/canal/CanalRecordParser.java |  8 ++-
 .../cdc/mongodb/MongoDBSyncDatabaseAction.java     |  2 +-
 .../cdc/mongodb/strategy/MongoVersionStrategy.java |  8 ++-
 .../cdc/RichCdcMultiplexRecordSchemaBuilder.java   | 10 +++-
 .../mongodb/MongoDBSyncDatabaseActionITCase.java   | 68 ++++++++++++++++++++++
 .../mysql/TestCaseInsensitiveCatalogFactory.java   | 26 +++++++++
 7 files changed, 114 insertions(+), 10 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 9065137b1..f44ce1b01 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -159,7 +159,7 @@ public class KafkaSyncDatabaseAction extends ActionBase {
                 format.createParser(
                         caseSensitive, tableNameConverter, typeMapping, 
Collections.emptyList());
         RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
-                new RichCdcMultiplexRecordSchemaBuilder(tableConfig);
+                new RichCdcMultiplexRecordSchemaBuilder(tableConfig, 
caseSensitive);
         Pattern includingPattern = Pattern.compile(includingTables);
         Pattern excludingPattern =
                 excludingTables == null ? null : 
Pattern.compile(excludingTables);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
index 2cc3dd43e..e7128f9e1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
@@ -91,7 +91,7 @@ public class CanalRecordParser extends RecordParser {
         List<String> primaryKeys = extractPrimaryKeys();
         LinkedHashMap<String, String> mySqlFieldTypes = 
extractFieldTypesFromMySqlType();
         LinkedHashMap<String, DataType> paimonFieldTypes =
-                convertToPaimonFieldTypes(mySqlFieldTypes);
+                convertToPaimonFieldTypes(mySqlFieldTypes, caseSensitive);
 
         String type = extractString(FIELD_TYPE);
         ArrayNode data = JsonSerdeUtil.getNodeAs(root, FIELD_DATA, 
ArrayNode.class);
@@ -316,11 +316,13 @@ public class CanalRecordParser extends RecordParser {
     }
 
     private LinkedHashMap<String, DataType> convertToPaimonFieldTypes(
-            Map<String, String> mySqlFieldTypes) {
+            Map<String, String> mySqlFieldTypes, boolean caseSensitive) {
         LinkedHashMap<String, DataType> paimonFieldTypes = new 
LinkedHashMap<>();
         mySqlFieldTypes.forEach(
                 (name, type) ->
-                        paimonFieldTypes.put(name, 
MySqlTypeUtils.toDataType(type, typeMapping)));
+                        paimonFieldTypes.put(
+                                caseSensitive ? name : name.toLowerCase(),
+                                MySqlTypeUtils.toDataType(type, typeMapping)));
         return paimonFieldTypes;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 8ec535f19..cf35089ef 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -120,7 +120,7 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
 
         EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
         RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
-                new RichCdcMultiplexRecordSchemaBuilder(tableConfig);
+                new RichCdcMultiplexRecordSchemaBuilder(tableConfig, 
caseSensitive);
         Pattern includingPattern = this.includingPattern;
         Pattern excludingPattern = this.excludingPattern;
         parserFactory =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index 609e698c8..8582ec348 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -103,7 +103,7 @@ public interface MongoVersionStrategy {
                                 paimonFieldTypes);
                 break;
             case DYNAMIC:
-                row = parseAndTypeJsonRow(document.toString(), 
paimonFieldTypes);
+                row = parseAndTypeJsonRow(document.toString(), 
paimonFieldTypes, caseSensitive);
                 break;
             default:
                 throw new RuntimeException("Unsupported extraction mode: " + 
mode);
@@ -119,10 +119,12 @@ public interface MongoVersionStrategy {
      * @return A map containing the parsed key-value pairs from the JSON 
string.
      */
     default Map<String, String> parseAndTypeJsonRow(
-            String evaluate, LinkedHashMap<String, DataType> paimonFieldTypes) 
{
+            String evaluate,
+            LinkedHashMap<String, DataType> paimonFieldTypes,
+            boolean caseSensitive) {
         Map<String, String> parsedMap = JsonSerdeUtil.parseJsonMap(evaluate, 
String.class);
         for (String column : parsedMap.keySet()) {
-            paimonFieldTypes.put(column, DataTypes.STRING());
+            paimonFieldTypes.put(caseSensitive ? column : 
column.toLowerCase(), DataTypes.STRING());
         }
         return extractRow(evaluate);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
index 9b83b6d7e..55a0bc003 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
@@ -29,9 +29,12 @@ public class RichCdcMultiplexRecordSchemaBuilder
         implements NewTableSchemaBuilder<RichCdcMultiplexRecord> {
 
     private final Map<String, String> tableConfig;
+    private final boolean caseSensitive;
 
-    public RichCdcMultiplexRecordSchemaBuilder(Map<String, String> 
tableConfig) {
+    public RichCdcMultiplexRecordSchemaBuilder(
+            Map<String, String> tableConfig, boolean caseSensitive) {
         this.tableConfig = tableConfig;
+        this.caseSensitive = caseSensitive;
     }
 
     @Override
@@ -40,7 +43,10 @@ public class RichCdcMultiplexRecordSchemaBuilder
         builder.options(tableConfig);
 
         for (Map.Entry<String, DataType> entry : 
record.fieldTypes().entrySet()) {
-            builder.column(entry.getKey(), entry.getValue(), null);
+            builder.column(
+                    caseSensitive ? entry.getKey() : 
entry.getKey().toLowerCase(),
+                    entry.getValue(),
+                    null);
         }
 
         Schema schema = builder.primaryKey(record.primaryKeys()).build();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index e1a2b9c9b..0d93200bd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -18,6 +18,10 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.flink.action.ActionBase;
+import 
org.apache.paimon.flink.action.cdc.mysql.TestCaseInsensitiveCatalogFactory;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -26,10 +30,12 @@ import org.apache.paimon.types.RowType;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -178,4 +184,66 @@ public class MongoDBSyncDatabaseActionITCase extends 
MongoDBActionITCaseBase {
                         "+I[610000000000000000000103, 
youtube#videoListResponse, 
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", 
{\"pagehit\":{\"kind\":\"youtube#video\"},\"totalResults\":1,\"resultsPerPage\":1},
 
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":
 [...]
         waitForResult(expected2, table2, rowType2, primaryKeys2);
     }
+
+    @Test
+    @Timeout(60)
+    public void testDynamicTableCreationInMongoDB() throws Exception {
+        catalog =
+                new TestCaseInsensitiveCatalogFactory()
+                        .createCatalog(CatalogContext.create(new 
Path(warehouse)));
+        String dbName = database + UUID.randomUUID();
+        writeRecordsToMongoDB("test-data-5", dbName, "database");
+        Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+        mongodbConfig.put("database", dbName);
+        MongoDBSyncDatabaseAction action =
+                syncDatabaseActionBuilder(mongodbConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        Field catalogField = ActionBase.class.getDeclaredField("catalog");
+        catalogField.setAccessible(true);
+        Object newCatalog = catalog;
+        catalogField.set(action, newCatalog);
+        runActionWithDefaultEnv(action);
+
+        waitingTables("t3");
+        FileStoreTable table1 = getFileStoreTable("t3");
+        writeRecordsToMongoDB("test-data-6", dbName, "database");
+        waitingTables("t4");
+        FileStoreTable table2 = getFileStoreTable("t4");
+
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"_id", "country", "languages", 
"religions"});
+        List<String> primaryKeys1 = Collections.singletonList("_id");
+        List<String> expected1 =
+                Arrays.asList(
+                        "+I[610000000000000000000101, Switzerland, Italian, 
{\"f\":\"v\",\"n\":null}]",
+                        "+I[610000000000000000000102, Switzerland, Italian, ]",
+                        "+I[610000000000000000000103, Switzerland, 
[\"Italian\"], ]");
+        waitForResult(expected1, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"_id", "kind", "etag", "pageinfo", 
"items"});
+        List<String> primaryKeys2 = Collections.singletonList("_id");
+        List<String> expected2 =
+                Arrays.asList(
+                        "+I[610000000000000000000101, 
youtube#videoListResponse, 
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", 
{\"totalResults\":1,\"resultsPerPage\":1}, 
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\
 [...]
+                        "+I[610000000000000000000102, 
youtube#videoListResponse, 
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", page, 
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\"/m/02mjmr\"],\"relevantTopicIds\":[\
 [...]
+                        "+I[610000000000000000000103, 
youtube#videoListResponse, 
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", 
{\"pagehit\":{\"kind\":\"youtube#video\"},\"totalResults\":1,\"resultsPerPage\":1},
 
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":
 [...]
+        waitForResult(expected2, table2, rowType2, primaryKeys2);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
index 82caabd7b..d7935e3a3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
@@ -29,8 +29,11 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.types.DataField;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.List;
 
+import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Factory to create a mock case-insensitive catalog for test. */
@@ -41,6 +44,29 @@ public class TestCaseInsensitiveCatalogFactory implements 
CatalogFactory {
         return "test-case-insensitive";
     }
 
+    public Catalog createCatalog(CatalogContext context) {
+        String warehouse = 
CatalogFactory.warehouse(context).toUri().toString();
+
+        Path warehousePath = new Path(warehouse);
+        FileIO fileIO;
+        try {
+            fileIO = FileIO.get(warehousePath, context);
+            if (fileIO.exists(warehousePath)) {
+                checkArgument(
+                        fileIO.isDir(warehousePath),
+                        "The %s path '%s' should be a directory.",
+                        WAREHOUSE.key(),
+                        warehouse);
+            } else {
+                fileIO.mkdirs(warehousePath);
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
+        return create(fileIO, warehousePath, context);
+    }
+
     @Override
     public Catalog create(FileIO fileIO, Path warehouse, CatalogContext 
context) {
         return new FileSystemCatalog(fileIO, warehouse, 
context.options().toMap()) {

Reply via email to