This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 114c260db [flink] Fix exception related to field size after adding a
new table (#1919)
114c260db is described below
commit 114c260dbc60a5a1c8ce5832e280af7578bca72e
Author: monster <[email protected]>
AuthorDate: Wed Sep 6 15:50:37 2023 +0800
[flink] Fix exception related to field size after adding a new table (#1919)
---
.../action/cdc/kafka/KafkaSyncDatabaseAction.java | 2 +-
.../cdc/kafka/formats/canal/CanalRecordParser.java | 8 ++-
.../cdc/mongodb/MongoDBSyncDatabaseAction.java | 2 +-
.../cdc/mongodb/strategy/MongoVersionStrategy.java | 8 ++-
.../cdc/RichCdcMultiplexRecordSchemaBuilder.java | 10 +++-
.../mongodb/MongoDBSyncDatabaseActionITCase.java | 68 ++++++++++++++++++++++
.../mysql/TestCaseInsensitiveCatalogFactory.java | 26 +++++++++
7 files changed, 114 insertions(+), 10 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 9065137b1..f44ce1b01 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -159,7 +159,7 @@ public class KafkaSyncDatabaseAction extends ActionBase {
format.createParser(
caseSensitive, tableNameConverter, typeMapping,
Collections.emptyList());
RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
- new RichCdcMultiplexRecordSchemaBuilder(tableConfig);
+ new RichCdcMultiplexRecordSchemaBuilder(tableConfig,
caseSensitive);
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
excludingTables == null ? null :
Pattern.compile(excludingTables);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
index 2cc3dd43e..e7128f9e1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
@@ -91,7 +91,7 @@ public class CanalRecordParser extends RecordParser {
List<String> primaryKeys = extractPrimaryKeys();
LinkedHashMap<String, String> mySqlFieldTypes =
extractFieldTypesFromMySqlType();
LinkedHashMap<String, DataType> paimonFieldTypes =
- convertToPaimonFieldTypes(mySqlFieldTypes);
+ convertToPaimonFieldTypes(mySqlFieldTypes, caseSensitive);
String type = extractString(FIELD_TYPE);
ArrayNode data = JsonSerdeUtil.getNodeAs(root, FIELD_DATA,
ArrayNode.class);
@@ -316,11 +316,13 @@ public class CanalRecordParser extends RecordParser {
}
private LinkedHashMap<String, DataType> convertToPaimonFieldTypes(
- Map<String, String> mySqlFieldTypes) {
+ Map<String, String> mySqlFieldTypes, boolean caseSensitive) {
LinkedHashMap<String, DataType> paimonFieldTypes = new
LinkedHashMap<>();
mySqlFieldTypes.forEach(
(name, type) ->
- paimonFieldTypes.put(name,
MySqlTypeUtils.toDataType(type, typeMapping)));
+ paimonFieldTypes.put(
+ caseSensitive ? name : name.toLowerCase(),
+ MySqlTypeUtils.toDataType(type, typeMapping)));
return paimonFieldTypes;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 8ec535f19..cf35089ef 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -120,7 +120,7 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
- new RichCdcMultiplexRecordSchemaBuilder(tableConfig);
+ new RichCdcMultiplexRecordSchemaBuilder(tableConfig,
caseSensitive);
Pattern includingPattern = this.includingPattern;
Pattern excludingPattern = this.excludingPattern;
parserFactory =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index 609e698c8..8582ec348 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -103,7 +103,7 @@ public interface MongoVersionStrategy {
paimonFieldTypes);
break;
case DYNAMIC:
- row = parseAndTypeJsonRow(document.toString(),
paimonFieldTypes);
+ row = parseAndTypeJsonRow(document.toString(),
paimonFieldTypes, caseSensitive);
break;
default:
throw new RuntimeException("Unsupported extraction mode: " +
mode);
@@ -119,10 +119,12 @@ public interface MongoVersionStrategy {
* @return A map containing the parsed key-value pairs from the JSON
string.
*/
default Map<String, String> parseAndTypeJsonRow(
- String evaluate, LinkedHashMap<String, DataType> paimonFieldTypes)
{
+ String evaluate,
+ LinkedHashMap<String, DataType> paimonFieldTypes,
+ boolean caseSensitive) {
Map<String, String> parsedMap = JsonSerdeUtil.parseJsonMap(evaluate,
String.class);
for (String column : parsedMap.keySet()) {
- paimonFieldTypes.put(column, DataTypes.STRING());
+ paimonFieldTypes.put(caseSensitive ? column :
column.toLowerCase(), DataTypes.STRING());
}
return extractRow(evaluate);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
index 9b83b6d7e..55a0bc003 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
@@ -29,9 +29,12 @@ public class RichCdcMultiplexRecordSchemaBuilder
implements NewTableSchemaBuilder<RichCdcMultiplexRecord> {
private final Map<String, String> tableConfig;
+ private final boolean caseSensitive;
- public RichCdcMultiplexRecordSchemaBuilder(Map<String, String>
tableConfig) {
+ public RichCdcMultiplexRecordSchemaBuilder(
+ Map<String, String> tableConfig, boolean caseSensitive) {
this.tableConfig = tableConfig;
+ this.caseSensitive = caseSensitive;
}
@Override
@@ -40,7 +43,10 @@ public class RichCdcMultiplexRecordSchemaBuilder
builder.options(tableConfig);
for (Map.Entry<String, DataType> entry :
record.fieldTypes().entrySet()) {
- builder.column(entry.getKey(), entry.getValue(), null);
+ builder.column(
+ caseSensitive ? entry.getKey() :
entry.getKey().toLowerCase(),
+ entry.getValue(),
+ null);
}
Schema schema = builder.primaryKey(record.primaryKeys()).build();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index e1a2b9c9b..0d93200bd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -18,6 +18,10 @@
package org.apache.paimon.flink.action.cdc.mongodb;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.flink.action.ActionBase;
+import
org.apache.paimon.flink.action.cdc.mysql.TestCaseInsensitiveCatalogFactory;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -26,10 +30,12 @@ import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -178,4 +184,66 @@ public class MongoDBSyncDatabaseActionITCase extends
MongoDBActionITCaseBase {
"+I[610000000000000000000103,
youtube#videoListResponse,
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\",
{\"pagehit\":{\"kind\":\"youtube#video\"},\"totalResults\":1,\"resultsPerPage\":1},
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":
[...]
waitForResult(expected2, table2, rowType2, primaryKeys2);
}
+
+ @Test
+ @Timeout(60)
+ public void testDynamicTableCreationInMongoDB() throws Exception {
+ catalog =
+ new TestCaseInsensitiveCatalogFactory()
+ .createCatalog(CatalogContext.create(new
Path(warehouse)));
+ String dbName = database + UUID.randomUUID();
+ writeRecordsToMongoDB("test-data-5", dbName, "database");
+ Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+ mongodbConfig.put("database", dbName);
+ MongoDBSyncDatabaseAction action =
+ syncDatabaseActionBuilder(mongodbConfig)
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ Field catalogField = ActionBase.class.getDeclaredField("catalog");
+ catalogField.setAccessible(true);
+ Object newCatalog = catalog;
+ catalogField.set(action, newCatalog);
+ runActionWithDefaultEnv(action);
+
+ waitingTables("t3");
+ FileStoreTable table1 = getFileStoreTable("t3");
+ writeRecordsToMongoDB("test-data-6", dbName, "database");
+ waitingTables("t4");
+ FileStoreTable table2 = getFileStoreTable("t4");
+
+ RowType rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"_id", "country", "languages",
"religions"});
+ List<String> primaryKeys1 = Collections.singletonList("_id");
+ List<String> expected1 =
+ Arrays.asList(
+ "+I[610000000000000000000101, Switzerland, Italian,
{\"f\":\"v\",\"n\":null}]",
+ "+I[610000000000000000000102, Switzerland, Italian, ]",
+ "+I[610000000000000000000103, Switzerland,
[\"Italian\"], ]");
+ waitForResult(expected1, table1, rowType1, primaryKeys1);
+
+ RowType rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"_id", "kind", "etag", "pageinfo",
"items"});
+ List<String> primaryKeys2 = Collections.singletonList("_id");
+ List<String> expected2 =
+ Arrays.asList(
+ "+I[610000000000000000000101,
youtube#videoListResponse,
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\",
{\"totalResults\":1,\"resultsPerPage\":1},
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\
[...]
+ "+I[610000000000000000000102,
youtube#videoListResponse,
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", page,
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\"/m/02mjmr\"],\"relevantTopicIds\":[\
[...]
+ "+I[610000000000000000000103,
youtube#videoListResponse,
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\",
{\"pagehit\":{\"kind\":\"youtube#video\"},\"totalResults\":1,\"resultsPerPage\":1},
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":
[...]
+ waitForResult(expected2, table2, rowType2, primaryKeys2);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
index 82caabd7b..d7935e3a3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
@@ -29,8 +29,11 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataField;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.List;
+import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Factory to create a mock case-insensitive catalog for test. */
@@ -41,6 +44,29 @@ public class TestCaseInsensitiveCatalogFactory implements
CatalogFactory {
return "test-case-insensitive";
}
+ public Catalog createCatalog(CatalogContext context) {
+ String warehouse =
CatalogFactory.warehouse(context).toUri().toString();
+
+ Path warehousePath = new Path(warehouse);
+ FileIO fileIO;
+ try {
+ fileIO = FileIO.get(warehousePath, context);
+ if (fileIO.exists(warehousePath)) {
+ checkArgument(
+ fileIO.isDir(warehousePath),
+ "The %s path '%s' should be a directory.",
+ WAREHOUSE.key(),
+ warehouse);
+ } else {
+ fileIO.mkdirs(warehousePath);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return create(fileIO, warehousePath, context);
+ }
+
@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext
context) {
return new FileSystemCatalog(fileIO, warehouse,
context.options().toMap()) {