This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 613a261c5 [cdc] Refactor Kafka and MongoDB cdc schema build (#2069)
613a261c5 is described below
commit 613a261c5fdc07accae0690e38dc4dfdfad58fe6
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 26 12:43:32 2023 +0800
[cdc] Refactor Kafka and MongoDB cdc schema build (#2069)
---
.../flink/action/cdc/CdcActionCommonUtils.java | 75 +++----------
.../flink/action/cdc/ComputedColumnUtils.java | 6 +-
.../flink/action/cdc/kafka/KafkaActionUtils.java | 32 ------
.../{KafkaSchema.java => KafkaSchemaUtils.java} | 64 ++---------
.../action/cdc/kafka/KafkaSyncTableAction.java | 24 ++---
.../action/cdc/kafka/formats/RecordParser.java | 30 +++++-
.../action/cdc/mongodb/MongoDBActionUtils.java | 35 ------
.../action/cdc/mongodb/MongoDBSyncTableAction.java | 23 ++--
...{MongodbSchema.java => MongodbSchemaUtils.java} | 119 ++++++---------------
.../action/cdc/mysql/schema/MySqlSchemaUtils.java | 31 +++---
.../flink/action/cdc/kafka/KafkaSchemaITCase.java | 28 ++---
.../action/cdc/mongodb/MongodbSchemaITCase.java | 39 ++++---
12 files changed, 153 insertions(+), 353 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index a73d98ba6..19e3283fc 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -29,12 +29,11 @@ import org.apache.paimon.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -133,6 +132,18 @@ public class CdcActionCommonUtils {
:
origin.stream().map(String::toLowerCase).collect(Collectors.toList());
}
+ public static String columnCaseConvertAndDuplicateCheck(
+ String column,
+ Set<String> existedFields,
+ boolean caseSensitive,
+ Function<String, String> columnDuplicateErrMsg) {
+ if (caseSensitive) {
+ return column;
+ }
+ checkArgument(existedFields.add(column.toLowerCase()),
columnDuplicateErrMsg.apply(column));
+ return column.toLowerCase();
+ }
+
public static Schema buildPaimonSchema(
List<String> specifiedPartitionKeys,
List<String> specifiedPrimaryKeys,
@@ -194,66 +205,6 @@ public class CdcActionCommonUtils {
return builder.build();
}
- public static Schema buildPaimonSchema(
- List<String> specifiedPartitionKeys,
- List<String> specifiedPrimaryKeys,
- List<ComputedColumn> computedColumns,
- Map<String, String> tableConfig,
- LinkedHashMap<String, DataType> sourceColumns,
- @Nullable List<String> sourceColumnComments,
- List<String> sourcePrimaryKeys) {
- Schema.Builder builder = Schema.newBuilder();
-
- // options
- builder.options(tableConfig);
-
- // columns
- if (sourceColumnComments != null) {
- checkArgument(
- sourceColumns.size() == sourceColumnComments.size(),
- "Source table columns count and column comments count
should be equal.");
-
- int i = 0;
- for (Map.Entry<String, DataType> entry : sourceColumns.entrySet())
{
- builder.column(entry.getKey(), entry.getValue(),
sourceColumnComments.get(i++));
- }
- } else {
- sourceColumns.forEach(builder::column);
- }
-
- for (ComputedColumn computedColumn : computedColumns) {
- builder.column(computedColumn.columnName(),
computedColumn.columnType());
- }
-
- // primary keys
- if (!specifiedPrimaryKeys.isEmpty()) {
- for (String key : specifiedPrimaryKeys) {
- if (!sourceColumns.containsKey(key)
- && computedColumns.stream().noneMatch(c ->
c.columnName().equals(key))) {
- throw new IllegalArgumentException(
- "Specified primary key '"
- + key
- + "' does not exist in source tables or
computed columns.");
- }
- }
- builder.primaryKey(specifiedPrimaryKeys);
- } else if (!sourcePrimaryKeys.isEmpty()) {
- builder.primaryKey(sourcePrimaryKeys);
- } else {
- throw new IllegalArgumentException(
- "Primary keys are not specified. "
- + "Also, can't infer primary keys from source
table schemas because "
- + "source tables have no primary keys or have
different primary keys.");
- }
-
- // partition keys
- if (!specifiedPartitionKeys.isEmpty()) {
- builder.partitionKeys(specifiedPartitionKeys);
- }
-
- return builder.build();
- }
-
public static String tableList(
MultiTablesSinkMode mode,
String databasePattern,
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
index 8a3e3a866..ccc0764a2 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
@@ -36,15 +36,11 @@ public class ComputedColumnUtils {
public static List<ComputedColumn> buildComputedColumns(
List<String> computedColumnArgs, Schema schema) {
- Map<String, DataType> dataFields =
+ Map<String, DataType> typeMapping =
schema.fields().stream()
.collect(
Collectors.toMap(DataField::name,
DataField::type, (v1, v2) -> v2));
- return buildComputedColumns(computedColumnArgs, dataFields);
- }
- public static List<ComputedColumn> buildComputedColumns(
- List<String> computedColumnArgs, Map<String, DataType>
typeMapping) {
List<ComputedColumn> computedColumns = new ArrayList<>();
for (String columnArg : computedColumnArgs) {
String[] kv = columnArg.split("=");
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index c249db8ee..7a5f46bf7 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -18,10 +18,6 @@
package org.apache.paimon.flink.action.cdc.kafka;
-import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
-import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
@@ -41,7 +37,6 @@ import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -50,9 +45,6 @@ import java.util.UUID;
import java.util.stream.Collectors;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static org.apache.paimon.utils.Preconditions.checkArgument;
class KafkaActionUtils {
@@ -62,30 +54,6 @@ class KafkaActionUtils {
private static final String PARTITION = "partition";
private static final String OFFSET = "offset";
- static Schema buildPaimonSchema(
- KafkaSchema kafkaSchema,
- List<String> specifiedPartitionKeys,
- List<String> specifiedPrimaryKeys,
- List<ComputedColumn> computedColumns,
- Map<String, String> tableConfig,
- boolean caseSensitive) {
- LinkedHashMap<String, DataType> sourceColumns =
- mapKeyCaseConvert(
- kafkaSchema.fields(),
- caseSensitive,
- columnDuplicateErrMsg(kafkaSchema.tableName()));
- List<String> primaryKeys = listCaseConvert(kafkaSchema.primaryKeys(),
caseSensitive);
-
- return CdcActionCommonUtils.buildPaimonSchema(
- specifiedPartitionKeys,
- specifiedPrimaryKeys,
- computedColumns,
- tableConfig,
- sourceColumns,
- null,
- primaryKeys);
- }
-
static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
validateKafkaConfig(kafkaConfig);
KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
similarity index 78%
rename from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
index a01ce3de6..330df8e79 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.schema.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
@@ -36,7 +36,6 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -47,42 +46,11 @@ import java.util.stream.StreamSupport;
import static
org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPropertiesGroupId;
import static
org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat.getDataFormat;
-/** Utility class to load canal kafka schema. */
-public class KafkaSchema {
+/** Utility class to load kafka schema. */
+public class KafkaSchemaUtils {
private static final int MAX_RETRY = 5;
private static final int POLL_TIMEOUT_MILLIS = 1000;
- private final String databaseName;
- private final String tableName;
- private final LinkedHashMap<String, DataType> fields;
- private final List<String> primaryKeys;
-
- public KafkaSchema(
- String databaseName,
- String tableName,
- LinkedHashMap<String, DataType> fields,
- List<String> primaryKeys) {
- this.databaseName = databaseName;
- this.tableName = tableName;
- this.fields = fields;
- this.primaryKeys = primaryKeys;
- }
-
- public String tableName() {
- return tableName;
- }
-
- public String databaseName() {
- return databaseName;
- }
-
- public LinkedHashMap<String, DataType> fields() {
- return fields;
- }
-
- public List<String> primaryKeys() {
- return primaryKeys;
- }
private static KafkaConsumer<String, String> getKafkaEarliestConsumer(
Configuration kafkaConfig, String topic) {
@@ -123,8 +91,8 @@ public class KafkaSchema {
* @return The Kafka schema for the topic.
* @throws KafkaSchemaRetrievalException If unable to retrieve the schema
after max retries.
*/
- public static KafkaSchema getKafkaSchema(
- Configuration kafkaConfig, String topic, TypeMapping typeMapping)
+ public static Schema getKafkaSchema(
+ Configuration kafkaConfig, String topic, TypeMapping typeMapping,
boolean caseSensitive)
throws KafkaSchemaRetrievalException {
KafkaConsumer<String, String> consumer =
getKafkaEarliestConsumer(kafkaConfig, topic);
int retry = 0;
@@ -140,7 +108,7 @@ public class KafkaSchema {
Stream<ConsumerRecord<String, String>> recordStream =
StreamSupport.stream(records.spliterator(), false);
- Optional<KafkaSchema> kafkaSchema =
+ Optional<Schema> kafkaSchema =
recordStream
.map(record ->
recordParser.getKafkaSchema(record.value()))
.filter(Objects::nonNull)
@@ -175,24 +143,4 @@ public class KafkaSchema {
super(message);
}
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof KafkaSchema)) {
- return false;
- }
- KafkaSchema that = (KafkaSchema) o;
- return databaseName.equals(that.databaseName)
- && tableName.equals(that.tableName)
- && fields.equals(that.fields)
- && primaryKeys.equals(that.primaryKeys);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(databaseName, tableName, fields, primaryKeys);
- }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 2804b04e0..87d3f0ddb 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -24,7 +24,6 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
-import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
@@ -48,6 +47,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.assertSchemaCompatible;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.buildPaimonSchema;
import static
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
/**
@@ -144,28 +145,25 @@ public class KafkaSyncTableAction extends ActionBase {
public void build(StreamExecutionEnvironment env) throws Exception {
KafkaSource<String> source =
KafkaActionUtils.buildKafkaSource(kafkaConfig);
String topic = kafkaConfig.get(KafkaConnectorOptions.TOPIC).get(0);
- KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(kafkaConfig,
topic, typeMapping);
catalog.createDatabase(database, true);
boolean caseSensitive = catalog.caseSensitive();
+ Schema kafkaSchema =
+ KafkaSchemaUtils.getKafkaSchema(kafkaConfig, topic,
typeMapping, caseSensitive);
Identifier identifier = new Identifier(database, table);
FileStoreTable table;
List<ComputedColumn> computedColumns =
- buildComputedColumns(computedColumnArgs, kafkaSchema.fields());
- Schema fromCanal =
- KafkaActionUtils.buildPaimonSchema(
- kafkaSchema,
- partitionKeys,
- primaryKeys,
- computedColumns,
- tableConfig,
- caseSensitive);
+ buildComputedColumns(computedColumnArgs, kafkaSchema);
+ Schema fromKafka =
+ buildPaimonSchema(
+ partitionKeys, primaryKeys, computedColumns,
tableConfig, kafkaSchema);
+
try {
table = (FileStoreTable) catalog.getTable(identifier);
- CdcActionCommonUtils.assertSchemaCompatible(table.schema(),
fromCanal.fields());
+ assertSchemaCompatible(table.schema(), fromKafka.fields());
} catch (Catalog.TableNotExistException e) {
- catalog.createTable(identifier, fromCanal, false);
+ catalog.createTable(identifier, fromKafka, false);
table = (FileStoreTable) catalog.getTable(identifier);
}
DataFormat format = DataFormat.getDataFormat(kafkaConfig);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
index 5e3d0f886..bf6be1b7d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
@@ -20,9 +20,9 @@ package org.apache.paimon.flink.action.cdc.kafka.formats;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
@@ -39,14 +39,22 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.Obje
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
@@ -81,12 +89,13 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
this.computedColumns = computedColumns;
}
- public KafkaSchema getKafkaSchema(String record) {
+ @Nullable
+ public Schema getKafkaSchema(String record) {
this.parseRootJson(record);
if (this.isDDL()) {
return null;
}
- databaseName = extractStringFromRootJson(FIELD_DATABASE);
+
tableName = extractStringFromRootJson(FIELD_TABLE);
this.setPrimaryField();
this.setDataField();
@@ -94,7 +103,20 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
this.extractPrimaryKeys();
this.extractFieldTypesFromDatabaseSchema();
LinkedHashMap<String, DataType> paimonFieldTypes =
this.setPaimonFieldType();
- return new KafkaSchema(databaseName, tableName, paimonFieldTypes,
primaryKeys);
+
+ Schema.Builder builder = Schema.newBuilder();
+ Set<String> existedFields = new HashSet<>();
+ Function<String, String> columnDuplicateErrMsg =
columnDuplicateErrMsg(tableName);
+ for (Map.Entry<String, DataType> entry : paimonFieldTypes.entrySet()) {
+ builder.column(
+ columnCaseConvertAndDuplicateCheck(
+ entry.getKey(), existedFields, caseSensitive,
columnDuplicateErrMsg),
+ entry.getValue());
+ }
+
+ builder.primaryKey(listCaseConvert(primaryKeys, caseSensitive));
+
+ return builder.build();
}
protected abstract List<RichCdcMultiplexRecord> extractRecords();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
index fefc4a687..3c13a48d3 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -18,13 +18,6 @@
package org.apache.paimon.flink.action.cdc.mongodb;
-import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
-import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataType;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-
import com.ververica.cdc.connectors.base.options.SourceOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
@@ -36,15 +29,10 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.connect.json.JsonConverterConfig;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -70,7 +58,6 @@ public class MongoDBActionUtils {
private static final String INITIAL_MODE = "initial";
private static final String LATEST_OFFSET_MODE = "latest-offset";
private static final String TIMESTAMP_MODE = "timestamp";
- private static final String PRIMARY_KEY = "_id";
public static final ConfigOption<String> FIELD_NAME =
ConfigOptions.key("field.name")
@@ -158,26 +145,4 @@ public class MongoDBActionUtils {
"mongodb-conf [%s] must be specified.",
MongoDBSourceOptions.DATABASE.key()));
}
-
- static Schema buildPaimonSchema(
- MongodbSchema mongodbSchema,
- List<String> specifiedPartitionKeys,
- List<ComputedColumn> computedColumns,
- Map<String, String> tableConfig,
- boolean caseSensitive) {
- LinkedHashMap<String, DataType> sourceColumns =
- mapKeyCaseConvert(
- mongodbSchema.fields(),
- caseSensitive,
- columnDuplicateErrMsg(mongodbSchema.tableName()));
-
- return CdcActionCommonUtils.buildPaimonSchema(
- specifiedPartitionKeys,
- Lists.newArrayList(PRIMARY_KEY),
- computedColumns,
- tableConfig,
- sourceColumns,
- null,
- Collections.emptyList());
- }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 3b25c2b69..ddadcc305 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -38,10 +38,13 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.assertSchemaCompatible;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.buildPaimonSchema;
import static
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -125,25 +128,25 @@ public class MongoDBSyncTableAction extends ActionBase {
validateCaseInsensitive();
}
- MongodbSchema mongodbSchema =
MongodbSchema.getMongodbSchema(mongodbConfig);
+ Schema mongodbSchema =
MongodbSchemaUtils.getMongodbSchema(mongodbConfig, caseSensitive);
catalog.createDatabase(database, true);
List<ComputedColumn> computedColumns =
- buildComputedColumns(computedColumnArgs,
mongodbSchema.fields());
+ buildComputedColumns(computedColumnArgs, mongodbSchema);
Identifier identifier = new Identifier(database, table);
FileStoreTable table;
-
+ Schema fromMongodb =
+ buildPaimonSchema(
+ partitionKeys,
+ Collections.emptyList(),
+ computedColumns,
+ tableConfig,
+ mongodbSchema);
// Check if table exists before trying to get or create it
if (catalog.tableExists(identifier)) {
table = (FileStoreTable) catalog.getTable(identifier);
+ assertSchemaCompatible(table.schema(), fromMongodb.fields());
} else {
- Schema fromMongodb =
- MongoDBActionUtils.buildPaimonSchema(
- mongodbSchema,
- partitionKeys,
- computedColumns,
- tableConfig,
- caseSensitive);
catalog.createTable(identifier, fromMongodb, false);
table = (FileStoreTable) catalog.getTable(identifier);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java
similarity index 65%
rename from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java
index fbf648eeb..f81bdfaf7 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import com.mongodb.ConnectionString;
@@ -37,65 +37,32 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.LinkedHashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
import static
com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
/**
- * Represents the schema of a MongoDB collection.
+ * Build schema from a MongoDB collection.
*
- * <p>This class provides methods to retrieve and manage the schema details of
a MongoDB collection,
- * including the database name, table (collection) name, fields, and primary
keys. The schema can be
- * acquired in two modes: SPECIFIED and DYNAMIC. In the SPECIFIED mode, the
schema details are
- * provided explicitly, while in the DYNAMIC mode, the schema is inferred from
the first document in
- * the collection.
- *
- * <p>The class also provides utility methods to generate schema fields and
create a new MongoDB
- * schema instance.
+ * <p>The schema can be acquired in two modes: SPECIFIED and DYNAMIC. In the
SPECIFIED mode, the
+ * schema details are provided explicitly, while in the DYNAMIC mode, the
schema is inferred from
+ * the first document in the collection.
*/
-public class MongodbSchema {
+public class MongodbSchemaUtils {
private static final String ID_FIELD = "_id";
- private final String databaseName;
- private final String tableName;
- private final LinkedHashMap<String, DataType> fields;
- private final List<String> primaryKeys;
-
- public MongodbSchema(
- String databaseName,
- String tableName,
- LinkedHashMap<String, DataType> fields,
- List<String> primaryKeys) {
- this.databaseName = databaseName;
- this.tableName = tableName;
- this.fields = fields;
- this.primaryKeys = primaryKeys;
- }
-
- public String tableName() {
- return tableName;
- }
-
- public String databaseName() {
- return databaseName;
- }
-
- public LinkedHashMap<String, DataType> fields() {
- return fields;
- }
-
- public List<String> primaryKeys() {
- return primaryKeys;
- }
/**
- * Utility class for creating a MongoDB schema based on the provided
configuration. The schema
- * can be created in one of the two modes:
+ * The schema can be created in one of the two modes:
*
* <ul>
* <li><b>SPECIFIED</b>: In this mode, the schema is created based on
the explicit column
@@ -110,7 +77,7 @@ public class MongodbSchema {
* name, and optionally, the username and password for authentication. For
the SPECIFIED mode,
* the field names should also be specified in the configuration.
*/
- public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) {
+ public static Schema getMongodbSchema(Configuration mongodbConfig, boolean
caseSensitive) {
SchemaAcquisitionMode mode = getModeFromConfig(mongodbConfig);
String databaseName =
Objects.requireNonNull(
@@ -127,13 +94,8 @@ public class MongodbSchema {
Objects.requireNonNull(
mongodbConfig.get(FIELD_NAME), "Field
names cannot be null")
.split(",");
- LinkedHashMap<String, DataType> schemaFields =
- generateSchemaFields(Arrays.asList(columnNames));
- return new MongodbSchema(
- databaseName,
- collectionName,
- schemaFields,
- Collections.singletonList(ID_FIELD));
+
+ return createMongodbSchema(collectionName, columnNames,
caseSensitive);
case DYNAMIC:
String hosts =
Objects.requireNonNull(
@@ -165,7 +127,7 @@ public class MongodbSchema {
}
return createMongodbSchema(
- databaseName, collectionName,
getColumnNames(firstDocument));
+ collectionName, getColumnNames(firstDocument),
caseSensitive);
} catch (Exception e) {
throw new RuntimeException(
"Failed to create schema from MongoDB collection",
e);
@@ -175,7 +137,7 @@ public class MongodbSchema {
}
}
- public static String buildConnectionString(
+ private static String buildConnectionString(
@Nullable String username,
@Nullable String password,
String scheme,
@@ -204,40 +166,25 @@ public class MongodbSchema {
return document != null ? new ArrayList<>(document.keySet()) :
Collections.emptyList();
}
- private static LinkedHashMap<String, DataType>
generateSchemaFields(List<String> columnNames) {
- LinkedHashMap<String, DataType> schemaFields = new LinkedHashMap<>();
- for (String columnName : columnNames) {
- schemaFields.put(columnName, DataTypes.STRING());
- }
- return schemaFields;
- }
-
- private static MongodbSchema createMongodbSchema(
- String databaseName, String collectionName, List<String>
columnNames) {
- return new MongodbSchema(
- databaseName,
- collectionName,
- generateSchemaFields(columnNames),
- Collections.singletonList(ID_FIELD));
+ private static Schema createMongodbSchema(
+ String collectionName, String[] columnNames, boolean
caseSensitive) {
+ return createMongodbSchema(collectionName, Arrays.asList(columnNames),
caseSensitive);
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof MongodbSchema)) {
- return false;
+ private static Schema createMongodbSchema(
+ String collectionName, List<String> columnNames, boolean
caseSensitive) {
+ Schema.Builder builder = Schema.newBuilder();
+ Set<String> existedFields = new HashSet<>();
+ Function<String, String> columnDuplicateErrMsg =
columnDuplicateErrMsg(collectionName);
+ for (String column : columnNames) {
+ builder.column(
+ columnCaseConvertAndDuplicateCheck(
+ column, existedFields, caseSensitive,
columnDuplicateErrMsg),
+ DataTypes.STRING());
}
- MongodbSchema that = (MongodbSchema) o;
- return databaseName.equals(that.databaseName)
- && tableName.equals(that.tableName)
- && fields.equals(that.fields)
- && primaryKeys.equals(that.primaryKeys);
- }
- @Override
- public int hashCode() {
- return Objects.hash(databaseName, tableName, fields, primaryKeys);
+ builder.primaryKey(ID_FIELD);
+
+ return builder.build();
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java
index 27a5388c0..45762d550 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java
@@ -32,15 +32,16 @@ import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion;
/** Utility class to load MySQL table schema with JDBC. */
@@ -56,9 +57,10 @@ public class MySqlSchemaUtils {
TypeMapping typeMapping,
boolean caseSensitive)
throws SQLException {
- Map<String, Integer> duplicateFields = new HashMap<>();
Schema.Builder builder = Schema.newBuilder();
try (ResultSet rs = metaData.getColumns(databaseName, null, tableName,
null)) {
+ Set<String> existedFields = new HashSet<>();
+ Function<String, String> columnDuplicateErrMsg =
columnDuplicateErrMsg(tableName);
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
String fieldType = rs.getString("TYPE_NAME");
@@ -73,23 +75,18 @@ public class MySqlSchemaUtils {
if (rs.wasNull()) {
scale = null;
}
- DataType paimonType =
- MySqlTypeUtils.toDataType(fieldType, precision, scale,
typeMapping);
-
- if (!caseSensitive) {
- checkArgument(
-
!duplicateFields.containsKey(fieldName.toLowerCase()),
- columnDuplicateErrMsg(tableName).apply(fieldName));
- fieldName = fieldName.toLowerCase();
- }
-
boolean isNullable =
typeMapping.containsMode(TO_NULLABLE)
||
isNullableColumn(rs.getString("IS_NULLABLE"));
- DataType updateType = paimonType.copy(isNullable);
+ DataType paimonType =
+ MySqlTypeUtils.toDataType(fieldType, precision, scale,
typeMapping)
+ .copy(isNullable);
+
+ fieldName =
+ columnCaseConvertAndDuplicateCheck(
+ fieldName, existedFields, caseSensitive,
columnDuplicateErrMsg);
- builder.column(fieldName, updateType, fieldComment);
- duplicateFields.put(fieldName, 1);
+ builder.column(fieldName, paimonType, fieldComment);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
index 6f389c220..58a8779ef 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
@@ -19,20 +19,21 @@
package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import java.util.LinkedHashMap;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link KafkaSchema}. */
+/** Tests for {@link KafkaSchemaUtils}. */
public class KafkaSchemaITCase extends KafkaActionITCaseBase {
@Test
@Timeout(60)
@@ -50,17 +51,16 @@ public class KafkaSchemaITCase extends
KafkaActionITCaseBase {
kafkaConfig.put("value.format", "canal-json");
kafkaConfig.put("topic", topic);
- KafkaSchema kafkaSchema =
- KafkaSchema.getKafkaSchema(
- Configuration.fromMap(kafkaConfig), topic,
TypeMapping.defaultMapping());
- Map<String, DataType> fields = new LinkedHashMap<>();
- fields.put("pt", DataTypes.INT());
- fields.put("_id", DataTypes.INT());
- fields.put("v1", DataTypes.VARCHAR(10));
- String tableName = "schema_evolution_1";
- String databasesName = "paimon_sync_table";
+ Schema kafkaSchema =
+ KafkaSchemaUtils.getKafkaSchema(
+ Configuration.fromMap(kafkaConfig),
+ topic,
+ TypeMapping.defaultMapping(),
+ true);
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "pt", DataTypes.INT()));
+ fields.add(new DataField(1, "_id", DataTypes.INT().notNull()));
+ fields.add(new DataField(2, "v1", DataTypes.VARCHAR(10)));
assertThat(kafkaSchema.fields()).isEqualTo(fields);
- assertThat(kafkaSchema.tableName()).isEqualTo(tableName);
- assertThat(kafkaSchema.databaseName()).isEqualTo(databasesName);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
index fa08d2fc8..1881747d1 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
@@ -18,7 +18,8 @@
package org.apache.paimon.flink.action.cdc.mongodb;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import com.mongodb.MongoClientSettings;
@@ -34,14 +35,15 @@ import org.bson.Document;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedHashMap;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
-/** Tests for {@link MongodbSchema}. */
+/** Tests for {@link MongodbSchemaUtils}. */
public class MongodbSchemaITCase extends MongoDBActionITCaseBase {
@BeforeAll
@@ -81,10 +83,8 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
- MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig);
+ Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig,
true);
assertNotNull(schema);
- assertEquals("testDatabase", schema.databaseName());
- assertEquals("testCollection", schema.tableName());
}
@Test
@@ -98,7 +98,9 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
- assertThrows(RuntimeException.class, () ->
MongodbSchema.getMongodbSchema(mongodbConfig));
+ assertThrows(
+ RuntimeException.class,
+ () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig,
true));
}
@Test
@@ -108,7 +110,8 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
mongodbConfig.setString(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
// Expect an exception to be thrown due to missing necessary settings
assertThrows(
- NullPointerException.class, () ->
MongodbSchema.getMongodbSchema(mongodbConfig));
+ NullPointerException.class,
+ () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig,
true));
}
@Test
@@ -124,17 +127,15 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
// Call the method and check the results
- MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig);
+ Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig,
true);
// Verify the schema
assertNotNull(schema);
- assertEquals("testDatabase", schema.databaseName());
- assertEquals("testCollection", schema.tableName());
- LinkedHashMap<String, DataType> expectedFields = new LinkedHashMap<>();
- expectedFields.put("name", DataTypes.STRING());
- expectedFields.put("age", DataTypes.STRING());
- expectedFields.put("_id", DataTypes.STRING());
+ List<DataField> expectedFields = new ArrayList<>();
+ expectedFields.add(new DataField(0, "_id",
DataTypes.STRING().notNull()));
+ expectedFields.add(new DataField(1, "name", DataTypes.STRING()));
+ expectedFields.add(new DataField(2, "age", DataTypes.STRING()));
assertEquals(expectedFields, schema.fields());
}
@@ -150,7 +151,9 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
mongodbConfig.setString(MongoDBSourceOptions.DATABASE,
"invalidDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
- assertThrows(RuntimeException.class, () ->
MongodbSchema.getMongodbSchema(mongodbConfig));
+ assertThrows(
+ RuntimeException.class,
+ () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig,
true));
}
@Test
@@ -164,6 +167,8 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"invalidCollection");
- assertThrows(RuntimeException.class, () ->
MongodbSchema.getMongodbSchema(mongodbConfig));
+ assertThrows(
+ RuntimeException.class,
+ () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig,
true));
}
}