This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 613a261c5 [cdc] Refactor Kafka and MongoDB cdc schema build (#2069)
613a261c5 is described below

commit 613a261c5fdc07accae0690e38dc4dfdfad58fe6
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 26 12:43:32 2023 +0800

    [cdc] Refactor Kafka and MongoDB cdc schema build (#2069)
---
 .../flink/action/cdc/CdcActionCommonUtils.java     |  75 +++----------
 .../flink/action/cdc/ComputedColumnUtils.java      |   6 +-
 .../flink/action/cdc/kafka/KafkaActionUtils.java   |  32 ------
 .../{KafkaSchema.java => KafkaSchemaUtils.java}    |  64 ++---------
 .../action/cdc/kafka/KafkaSyncTableAction.java     |  24 ++---
 .../action/cdc/kafka/formats/RecordParser.java     |  30 +++++-
 .../action/cdc/mongodb/MongoDBActionUtils.java     |  35 ------
 .../action/cdc/mongodb/MongoDBSyncTableAction.java |  23 ++--
 ...{MongodbSchema.java => MongodbSchemaUtils.java} | 119 ++++++---------------
 .../action/cdc/mysql/schema/MySqlSchemaUtils.java  |  31 +++---
 .../flink/action/cdc/kafka/KafkaSchemaITCase.java  |  28 ++---
 .../action/cdc/mongodb/MongodbSchemaITCase.java    |  39 ++++---
 12 files changed, 153 insertions(+), 353 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index a73d98ba6..19e3283fc 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -29,12 +29,11 @@ import org.apache.paimon.types.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -133,6 +132,18 @@ public class CdcActionCommonUtils {
                 : 
origin.stream().map(String::toLowerCase).collect(Collectors.toList());
     }
 
+    public static String columnCaseConvertAndDuplicateCheck(
+            String column,
+            Set<String> existedFields,
+            boolean caseSensitive,
+            Function<String, String> columnDuplicateErrMsg) {
+        if (caseSensitive) {
+            return column;
+        }
+        checkArgument(existedFields.add(column.toLowerCase()), 
columnDuplicateErrMsg.apply(column));
+        return column.toLowerCase();
+    }
+
     public static Schema buildPaimonSchema(
             List<String> specifiedPartitionKeys,
             List<String> specifiedPrimaryKeys,
@@ -194,66 +205,6 @@ public class CdcActionCommonUtils {
         return builder.build();
     }
 
-    public static Schema buildPaimonSchema(
-            List<String> specifiedPartitionKeys,
-            List<String> specifiedPrimaryKeys,
-            List<ComputedColumn> computedColumns,
-            Map<String, String> tableConfig,
-            LinkedHashMap<String, DataType> sourceColumns,
-            @Nullable List<String> sourceColumnComments,
-            List<String> sourcePrimaryKeys) {
-        Schema.Builder builder = Schema.newBuilder();
-
-        // options
-        builder.options(tableConfig);
-
-        // columns
-        if (sourceColumnComments != null) {
-            checkArgument(
-                    sourceColumns.size() == sourceColumnComments.size(),
-                    "Source table columns count and column comments count 
should be equal.");
-
-            int i = 0;
-            for (Map.Entry<String, DataType> entry : sourceColumns.entrySet()) 
{
-                builder.column(entry.getKey(), entry.getValue(), 
sourceColumnComments.get(i++));
-            }
-        } else {
-            sourceColumns.forEach(builder::column);
-        }
-
-        for (ComputedColumn computedColumn : computedColumns) {
-            builder.column(computedColumn.columnName(), 
computedColumn.columnType());
-        }
-
-        // primary keys
-        if (!specifiedPrimaryKeys.isEmpty()) {
-            for (String key : specifiedPrimaryKeys) {
-                if (!sourceColumns.containsKey(key)
-                        && computedColumns.stream().noneMatch(c -> 
c.columnName().equals(key))) {
-                    throw new IllegalArgumentException(
-                            "Specified primary key '"
-                                    + key
-                                    + "' does not exist in source tables or 
computed columns.");
-                }
-            }
-            builder.primaryKey(specifiedPrimaryKeys);
-        } else if (!sourcePrimaryKeys.isEmpty()) {
-            builder.primaryKey(sourcePrimaryKeys);
-        } else {
-            throw new IllegalArgumentException(
-                    "Primary keys are not specified. "
-                            + "Also, can't infer primary keys from source 
table schemas because "
-                            + "source tables have no primary keys or have 
different primary keys.");
-        }
-
-        // partition keys
-        if (!specifiedPartitionKeys.isEmpty()) {
-            builder.partitionKeys(specifiedPartitionKeys);
-        }
-
-        return builder.build();
-    }
-
     public static String tableList(
             MultiTablesSinkMode mode,
             String databasePattern,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
index 8a3e3a866..ccc0764a2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
@@ -36,15 +36,11 @@ public class ComputedColumnUtils {
 
     public static List<ComputedColumn> buildComputedColumns(
             List<String> computedColumnArgs, Schema schema) {
-        Map<String, DataType> dataFields =
+        Map<String, DataType> typeMapping =
                 schema.fields().stream()
                         .collect(
                                 Collectors.toMap(DataField::name, 
DataField::type, (v1, v2) -> v2));
-        return buildComputedColumns(computedColumnArgs, dataFields);
-    }
 
-    public static List<ComputedColumn> buildComputedColumns(
-            List<String> computedColumnArgs, Map<String, DataType> 
typeMapping) {
         List<ComputedColumn> computedColumns = new ArrayList<>();
         for (String columnArg : computedColumnArgs) {
             String[] kv = columnArg.split("=");
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index c249db8ee..7a5f46bf7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -18,10 +18,6 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
-import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
-import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
@@ -41,7 +37,6 @@ import org.apache.kafka.common.TopicPartition;
 
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -50,9 +45,6 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 class KafkaActionUtils {
@@ -62,30 +54,6 @@ class KafkaActionUtils {
     private static final String PARTITION = "partition";
     private static final String OFFSET = "offset";
 
-    static Schema buildPaimonSchema(
-            KafkaSchema kafkaSchema,
-            List<String> specifiedPartitionKeys,
-            List<String> specifiedPrimaryKeys,
-            List<ComputedColumn> computedColumns,
-            Map<String, String> tableConfig,
-            boolean caseSensitive) {
-        LinkedHashMap<String, DataType> sourceColumns =
-                mapKeyCaseConvert(
-                        kafkaSchema.fields(),
-                        caseSensitive,
-                        columnDuplicateErrMsg(kafkaSchema.tableName()));
-        List<String> primaryKeys = listCaseConvert(kafkaSchema.primaryKeys(), 
caseSensitive);
-
-        return CdcActionCommonUtils.buildPaimonSchema(
-                specifiedPartitionKeys,
-                specifiedPrimaryKeys,
-                computedColumns,
-                tableConfig,
-                sourceColumns,
-                null,
-                primaryKeys);
-    }
-
     static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
         validateKafkaConfig(kafkaConfig);
         KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
similarity index 78%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
index a01ce3de6..330df8e79 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.action.cdc.kafka;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
 import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.schema.Schema;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
@@ -36,7 +36,6 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -47,42 +46,11 @@ import java.util.stream.StreamSupport;
 import static 
org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPropertiesGroupId;
 import static 
org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat.getDataFormat;
 
-/** Utility class to load canal kafka schema. */
-public class KafkaSchema {
+/** Utility class to load kafka schema. */
+public class KafkaSchemaUtils {
 
     private static final int MAX_RETRY = 5;
     private static final int POLL_TIMEOUT_MILLIS = 1000;
-    private final String databaseName;
-    private final String tableName;
-    private final LinkedHashMap<String, DataType> fields;
-    private final List<String> primaryKeys;
-
-    public KafkaSchema(
-            String databaseName,
-            String tableName,
-            LinkedHashMap<String, DataType> fields,
-            List<String> primaryKeys) {
-        this.databaseName = databaseName;
-        this.tableName = tableName;
-        this.fields = fields;
-        this.primaryKeys = primaryKeys;
-    }
-
-    public String tableName() {
-        return tableName;
-    }
-
-    public String databaseName() {
-        return databaseName;
-    }
-
-    public LinkedHashMap<String, DataType> fields() {
-        return fields;
-    }
-
-    public List<String> primaryKeys() {
-        return primaryKeys;
-    }
 
     private static KafkaConsumer<String, String> getKafkaEarliestConsumer(
             Configuration kafkaConfig, String topic) {
@@ -123,8 +91,8 @@ public class KafkaSchema {
      * @return The Kafka schema for the topic.
      * @throws KafkaSchemaRetrievalException If unable to retrieve the schema 
after max retries.
      */
-    public static KafkaSchema getKafkaSchema(
-            Configuration kafkaConfig, String topic, TypeMapping typeMapping)
+    public static Schema getKafkaSchema(
+            Configuration kafkaConfig, String topic, TypeMapping typeMapping, 
boolean caseSensitive)
             throws KafkaSchemaRetrievalException {
         KafkaConsumer<String, String> consumer = 
getKafkaEarliestConsumer(kafkaConfig, topic);
         int retry = 0;
@@ -140,7 +108,7 @@ public class KafkaSchema {
             Stream<ConsumerRecord<String, String>> recordStream =
                     StreamSupport.stream(records.spliterator(), false);
 
-            Optional<KafkaSchema> kafkaSchema =
+            Optional<Schema> kafkaSchema =
                     recordStream
                             .map(record -> 
recordParser.getKafkaSchema(record.value()))
                             .filter(Objects::nonNull)
@@ -175,24 +143,4 @@ public class KafkaSchema {
             super(message);
         }
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof KafkaSchema)) {
-            return false;
-        }
-        KafkaSchema that = (KafkaSchema) o;
-        return databaseName.equals(that.databaseName)
-                && tableName.equals(that.tableName)
-                && fields.equals(that.fields)
-                && primaryKeys.equals(that.primaryKeys);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(databaseName, tableName, fields, primaryKeys);
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 2804b04e0..87d3f0ddb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -24,7 +24,6 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
-import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
@@ -48,6 +47,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.assertSchemaCompatible;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.buildPaimonSchema;
 import static 
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
 
 /**
@@ -144,28 +145,25 @@ public class KafkaSyncTableAction extends ActionBase {
     public void build(StreamExecutionEnvironment env) throws Exception {
         KafkaSource<String> source = 
KafkaActionUtils.buildKafkaSource(kafkaConfig);
         String topic = kafkaConfig.get(KafkaConnectorOptions.TOPIC).get(0);
-        KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(kafkaConfig, 
topic, typeMapping);
 
         catalog.createDatabase(database, true);
         boolean caseSensitive = catalog.caseSensitive();
+        Schema kafkaSchema =
+                KafkaSchemaUtils.getKafkaSchema(kafkaConfig, topic, 
typeMapping, caseSensitive);
 
         Identifier identifier = new Identifier(database, table);
         FileStoreTable table;
         List<ComputedColumn> computedColumns =
-                buildComputedColumns(computedColumnArgs, kafkaSchema.fields());
-        Schema fromCanal =
-                KafkaActionUtils.buildPaimonSchema(
-                        kafkaSchema,
-                        partitionKeys,
-                        primaryKeys,
-                        computedColumns,
-                        tableConfig,
-                        caseSensitive);
+                buildComputedColumns(computedColumnArgs, kafkaSchema);
+        Schema fromKafka =
+                buildPaimonSchema(
+                        partitionKeys, primaryKeys, computedColumns, 
tableConfig, kafkaSchema);
+
         try {
             table = (FileStoreTable) catalog.getTable(identifier);
-            CdcActionCommonUtils.assertSchemaCompatible(table.schema(), 
fromCanal.fields());
+            assertSchemaCompatible(table.schema(), fromKafka.fields());
         } catch (Catalog.TableNotExistException e) {
-            catalog.createTable(identifier, fromCanal, false);
+            catalog.createTable(identifier, fromKafka, false);
             table = (FileStoreTable) catalog.getTable(identifier);
         }
         DataFormat format = DataFormat.getDataFormat(kafkaConfig);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
index 5e3d0f886..bf6be1b7d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
@@ -20,9 +20,9 @@ package org.apache.paimon.flink.action.cdc.kafka.formats;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
@@ -39,14 +39,22 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.Obje
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.util.Collector;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
 
@@ -81,12 +89,13 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
         this.computedColumns = computedColumns;
     }
 
-    public KafkaSchema getKafkaSchema(String record) {
+    @Nullable
+    public Schema getKafkaSchema(String record) {
         this.parseRootJson(record);
         if (this.isDDL()) {
             return null;
         }
-        databaseName = extractStringFromRootJson(FIELD_DATABASE);
+
         tableName = extractStringFromRootJson(FIELD_TABLE);
         this.setPrimaryField();
         this.setDataField();
@@ -94,7 +103,20 @@ public abstract class RecordParser implements 
FlatMapFunction<String, RichCdcMul
         this.extractPrimaryKeys();
         this.extractFieldTypesFromDatabaseSchema();
         LinkedHashMap<String, DataType> paimonFieldTypes = 
this.setPaimonFieldType();
-        return new KafkaSchema(databaseName, tableName, paimonFieldTypes, 
primaryKeys);
+
+        Schema.Builder builder = Schema.newBuilder();
+        Set<String> existedFields = new HashSet<>();
+        Function<String, String> columnDuplicateErrMsg = 
columnDuplicateErrMsg(tableName);
+        for (Map.Entry<String, DataType> entry : paimonFieldTypes.entrySet()) {
+            builder.column(
+                    columnCaseConvertAndDuplicateCheck(
+                            entry.getKey(), existedFields, caseSensitive, 
columnDuplicateErrMsg),
+                    entry.getValue());
+        }
+
+        builder.primaryKey(listCaseConvert(primaryKeys, caseSensitive));
+
+        return builder.build();
     }
 
     protected abstract List<RichCdcMultiplexRecord> extractRecords();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
index fefc4a687..3c13a48d3 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -18,13 +18,6 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
-import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
-import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataType;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-
 import com.ververica.cdc.connectors.base.options.SourceOptions;
 import com.ververica.cdc.connectors.base.options.StartupOptions;
 import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
@@ -36,15 +29,10 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -70,7 +58,6 @@ public class MongoDBActionUtils {
     private static final String INITIAL_MODE = "initial";
     private static final String LATEST_OFFSET_MODE = "latest-offset";
     private static final String TIMESTAMP_MODE = "timestamp";
-    private static final String PRIMARY_KEY = "_id";
 
     public static final ConfigOption<String> FIELD_NAME =
             ConfigOptions.key("field.name")
@@ -158,26 +145,4 @@ public class MongoDBActionUtils {
                         "mongodb-conf [%s] must be specified.",
                         MongoDBSourceOptions.DATABASE.key()));
     }
-
-    static Schema buildPaimonSchema(
-            MongodbSchema mongodbSchema,
-            List<String> specifiedPartitionKeys,
-            List<ComputedColumn> computedColumns,
-            Map<String, String> tableConfig,
-            boolean caseSensitive) {
-        LinkedHashMap<String, DataType> sourceColumns =
-                mapKeyCaseConvert(
-                        mongodbSchema.fields(),
-                        caseSensitive,
-                        columnDuplicateErrMsg(mongodbSchema.tableName()));
-
-        return CdcActionCommonUtils.buildPaimonSchema(
-                specifiedPartitionKeys,
-                Lists.newArrayList(PRIMARY_KEY),
-                computedColumns,
-                tableConfig,
-                sourceColumns,
-                null,
-                Collections.emptyList());
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 3b25c2b69..ddadcc305 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -38,10 +38,13 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.assertSchemaCompatible;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.buildPaimonSchema;
 import static 
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -125,25 +128,25 @@ public class MongoDBSyncTableAction extends ActionBase {
             validateCaseInsensitive();
         }
 
-        MongodbSchema mongodbSchema = 
MongodbSchema.getMongodbSchema(mongodbConfig);
+        Schema mongodbSchema = 
MongodbSchemaUtils.getMongodbSchema(mongodbConfig, caseSensitive);
         catalog.createDatabase(database, true);
         List<ComputedColumn> computedColumns =
-                buildComputedColumns(computedColumnArgs, 
mongodbSchema.fields());
+                buildComputedColumns(computedColumnArgs, mongodbSchema);
 
         Identifier identifier = new Identifier(database, table);
         FileStoreTable table;
-
+        Schema fromMongodb =
+                buildPaimonSchema(
+                        partitionKeys,
+                        Collections.emptyList(),
+                        computedColumns,
+                        tableConfig,
+                        mongodbSchema);
         // Check if table exists before trying to get or create it
         if (catalog.tableExists(identifier)) {
             table = (FileStoreTable) catalog.getTable(identifier);
+            assertSchemaCompatible(table.schema(), fromMongodb.fields());
         } else {
-            Schema fromMongodb =
-                    MongoDBActionUtils.buildPaimonSchema(
-                            mongodbSchema,
-                            partitionKeys,
-                            computedColumns,
-                            tableConfig,
-                            caseSensitive);
             catalog.createTable(identifier, fromMongodb, false);
             table = (FileStoreTable) catalog.getTable(identifier);
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java
similarity index 65%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java
index fbf648eeb..f81bdfaf7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataTypes;
 
 import com.mongodb.ConnectionString;
@@ -37,65 +37,32 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.LinkedHashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
 
 import static 
com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
 
 /**
- * Represents the schema of a MongoDB collection.
+ * Build schema from a MongoDB collection.
  *
- * <p>This class provides methods to retrieve and manage the schema details of 
a MongoDB collection,
- * including the database name, table (collection) name, fields, and primary 
keys. The schema can be
- * acquired in two modes: SPECIFIED and DYNAMIC. In the SPECIFIED mode, the 
schema details are
- * provided explicitly, while in the DYNAMIC mode, the schema is inferred from 
the first document in
- * the collection.
- *
- * <p>The class also provides utility methods to generate schema fields and 
create a new MongoDB
- * schema instance.
+ * <p>The schema can be acquired in two modes: SPECIFIED and DYNAMIC. In the 
SPECIFIED mode, the
+ * schema details are provided explicitly, while in the DYNAMIC mode, the 
schema is inferred from
+ * the first document in the collection.
  */
-public class MongodbSchema {
+public class MongodbSchemaUtils {
 
     private static final String ID_FIELD = "_id";
-    private final String databaseName;
-    private final String tableName;
-    private final LinkedHashMap<String, DataType> fields;
-    private final List<String> primaryKeys;
-
-    public MongodbSchema(
-            String databaseName,
-            String tableName,
-            LinkedHashMap<String, DataType> fields,
-            List<String> primaryKeys) {
-        this.databaseName = databaseName;
-        this.tableName = tableName;
-        this.fields = fields;
-        this.primaryKeys = primaryKeys;
-    }
-
-    public String tableName() {
-        return tableName;
-    }
-
-    public String databaseName() {
-        return databaseName;
-    }
-
-    public LinkedHashMap<String, DataType> fields() {
-        return fields;
-    }
-
-    public List<String> primaryKeys() {
-        return primaryKeys;
-    }
 
     /**
-     * Utility class for creating a MongoDB schema based on the provided 
configuration. The schema
-     * can be created in one of the two modes:
+     * The schema can be created in one of the two modes:
      *
      * <ul>
      *   <li><b>SPECIFIED</b>: In this mode, the schema is created based on 
the explicit column
@@ -110,7 +77,7 @@ public class MongodbSchema {
      * name, and optionally, the username and password for authentication. For 
the SPECIFIED mode,
      * the field names should also be specified in the configuration.
      */
-    public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) {
+    public static Schema getMongodbSchema(Configuration mongodbConfig, boolean 
caseSensitive) {
         SchemaAcquisitionMode mode = getModeFromConfig(mongodbConfig);
         String databaseName =
                 Objects.requireNonNull(
@@ -127,13 +94,8 @@ public class MongodbSchema {
                         Objects.requireNonNull(
                                         mongodbConfig.get(FIELD_NAME), "Field 
names cannot be null")
                                 .split(",");
-                LinkedHashMap<String, DataType> schemaFields =
-                        generateSchemaFields(Arrays.asList(columnNames));
-                return new MongodbSchema(
-                        databaseName,
-                        collectionName,
-                        schemaFields,
-                        Collections.singletonList(ID_FIELD));
+
+                return createMongodbSchema(collectionName, columnNames, 
caseSensitive);
             case DYNAMIC:
                 String hosts =
                         Objects.requireNonNull(
@@ -165,7 +127,7 @@ public class MongodbSchema {
                     }
 
                     return createMongodbSchema(
-                            databaseName, collectionName, 
getColumnNames(firstDocument));
+                            collectionName, getColumnNames(firstDocument), 
caseSensitive);
                 } catch (Exception e) {
                     throw new RuntimeException(
                             "Failed to create schema from MongoDB collection", 
e);
@@ -175,7 +137,7 @@ public class MongodbSchema {
         }
     }
 
-    public static String buildConnectionString(
+    private static String buildConnectionString(
             @Nullable String username,
             @Nullable String password,
             String scheme,
@@ -204,40 +166,25 @@ public class MongodbSchema {
         return document != null ? new ArrayList<>(document.keySet()) : 
Collections.emptyList();
     }
 
-    private static LinkedHashMap<String, DataType> 
generateSchemaFields(List<String> columnNames) {
-        LinkedHashMap<String, DataType> schemaFields = new LinkedHashMap<>();
-        for (String columnName : columnNames) {
-            schemaFields.put(columnName, DataTypes.STRING());
-        }
-        return schemaFields;
-    }
-
-    private static MongodbSchema createMongodbSchema(
-            String databaseName, String collectionName, List<String> 
columnNames) {
-        return new MongodbSchema(
-                databaseName,
-                collectionName,
-                generateSchemaFields(columnNames),
-                Collections.singletonList(ID_FIELD));
+    private static Schema createMongodbSchema(
+            String collectionName, String[] columnNames, boolean 
caseSensitive) {
+        return createMongodbSchema(collectionName, Arrays.asList(columnNames), 
caseSensitive);
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof MongodbSchema)) {
-            return false;
+    private static Schema createMongodbSchema(
+            String collectionName, List<String> columnNames, boolean 
caseSensitive) {
+        Schema.Builder builder = Schema.newBuilder();
+        Set<String> existedFields = new HashSet<>();
+        Function<String, String> columnDuplicateErrMsg = 
columnDuplicateErrMsg(collectionName);
+        for (String column : columnNames) {
+            builder.column(
+                    columnCaseConvertAndDuplicateCheck(
+                            column, existedFields, caseSensitive, 
columnDuplicateErrMsg),
+                    DataTypes.STRING());
         }
-        MongodbSchema that = (MongodbSchema) o;
-        return databaseName.equals(that.databaseName)
-                && tableName.equals(that.tableName)
-                && fields.equals(that.fields)
-                && primaryKeys.equals(that.primaryKeys);
-    }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(databaseName, tableName, fields, primaryKeys);
+        builder.primaryKey(ID_FIELD);
+
+        return builder.build();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java
index 27a5388c0..45762d550 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java
@@ -32,15 +32,16 @@ import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion;
 
 /** Utility class to load MySQL table schema with JDBC. */
@@ -56,9 +57,10 @@ public class MySqlSchemaUtils {
             TypeMapping typeMapping,
             boolean caseSensitive)
             throws SQLException {
-        Map<String, Integer> duplicateFields = new HashMap<>();
         Schema.Builder builder = Schema.newBuilder();
         try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, 
null)) {
+            Set<String> existedFields = new HashSet<>();
+            Function<String, String> columnDuplicateErrMsg = 
columnDuplicateErrMsg(tableName);
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
                 String fieldType = rs.getString("TYPE_NAME");
@@ -73,23 +75,18 @@ public class MySqlSchemaUtils {
                 if (rs.wasNull()) {
                     scale = null;
                 }
-                DataType paimonType =
-                        MySqlTypeUtils.toDataType(fieldType, precision, scale, 
typeMapping);
-
-                if (!caseSensitive) {
-                    checkArgument(
-                            
!duplicateFields.containsKey(fieldName.toLowerCase()),
-                            columnDuplicateErrMsg(tableName).apply(fieldName));
-                    fieldName = fieldName.toLowerCase();
-                }
-
                 boolean isNullable =
                         typeMapping.containsMode(TO_NULLABLE)
                                 || 
isNullableColumn(rs.getString("IS_NULLABLE"));
-                DataType updateType = paimonType.copy(isNullable);
+                DataType paimonType =
+                        MySqlTypeUtils.toDataType(fieldType, precision, scale, 
typeMapping)
+                                .copy(isNullable);
+
+                fieldName =
+                        columnCaseConvertAndDuplicateCheck(
+                                fieldName, existedFields, caseSensitive, 
columnDuplicateErrMsg);
 
-                builder.column(fieldName, updateType, fieldComment);
-                duplicateFields.put(fieldName, 1);
+                builder.column(fieldName, paimonType, fieldComment);
             }
         }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
index 6f389c220..58a8779ef 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
@@ -19,20 +19,21 @@
 package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 
 import org.apache.flink.configuration.Configuration;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
-import java.util.LinkedHashMap;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link KafkaSchema}. */
+/** Tests for {@link KafkaSchemaUtils}. */
 public class KafkaSchemaITCase extends KafkaActionITCaseBase {
     @Test
     @Timeout(60)
@@ -50,17 +51,16 @@ public class KafkaSchemaITCase extends 
KafkaActionITCaseBase {
         kafkaConfig.put("value.format", "canal-json");
         kafkaConfig.put("topic", topic);
 
-        KafkaSchema kafkaSchema =
-                KafkaSchema.getKafkaSchema(
-                        Configuration.fromMap(kafkaConfig), topic, 
TypeMapping.defaultMapping());
-        Map<String, DataType> fields = new LinkedHashMap<>();
-        fields.put("pt", DataTypes.INT());
-        fields.put("_id", DataTypes.INT());
-        fields.put("v1", DataTypes.VARCHAR(10));
-        String tableName = "schema_evolution_1";
-        String databasesName = "paimon_sync_table";
+        Schema kafkaSchema =
+                KafkaSchemaUtils.getKafkaSchema(
+                        Configuration.fromMap(kafkaConfig),
+                        topic,
+                        TypeMapping.defaultMapping(),
+                        true);
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "pt", DataTypes.INT()));
+        fields.add(new DataField(1, "_id", DataTypes.INT().notNull()));
+        fields.add(new DataField(2, "v1", DataTypes.VARCHAR(10)));
         assertThat(kafkaSchema.fields()).isEqualTo(fields);
-        assertThat(kafkaSchema.tableName()).isEqualTo(tableName);
-        assertThat(kafkaSchema.databaseName()).isEqualTo(databasesName);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
index fa08d2fc8..1881747d1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
@@ -18,7 +18,8 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 
 import com.mongodb.MongoClientSettings;
@@ -34,14 +35,15 @@ import org.bson.Document;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedHashMap;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-/** Tests for {@link MongodbSchema}. */
+/** Tests for {@link MongodbSchemaUtils}. */
 public class MongodbSchemaITCase extends MongoDBActionITCaseBase {
 
     @BeforeAll
@@ -81,10 +83,8 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
         mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
         mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
         mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
-        MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig);
+        Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig, 
true);
         assertNotNull(schema);
-        assertEquals("testDatabase", schema.databaseName());
-        assertEquals("testCollection", schema.tableName());
     }
 
     @Test
@@ -98,7 +98,9 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
         mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
         mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
 
-        assertThrows(RuntimeException.class, () -> 
MongodbSchema.getMongodbSchema(mongodbConfig));
+        assertThrows(
+                RuntimeException.class,
+                () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig, 
true));
     }
 
     @Test
@@ -108,7 +110,8 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
         mongodbConfig.setString(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
         // Expect an exception to be thrown due to missing necessary settings
         assertThrows(
-                NullPointerException.class, () -> 
MongodbSchema.getMongodbSchema(mongodbConfig));
+                NullPointerException.class,
+                () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig, 
true));
     }
 
     @Test
@@ -124,17 +127,15 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
         mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
 
         // Call the method and check the results
-        MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig);
+        Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig, 
true);
 
         // Verify the schema
         assertNotNull(schema);
-        assertEquals("testDatabase", schema.databaseName());
-        assertEquals("testCollection", schema.tableName());
 
-        LinkedHashMap<String, DataType> expectedFields = new LinkedHashMap<>();
-        expectedFields.put("name", DataTypes.STRING());
-        expectedFields.put("age", DataTypes.STRING());
-        expectedFields.put("_id", DataTypes.STRING());
+        List<DataField> expectedFields = new ArrayList<>();
+        expectedFields.add(new DataField(0, "_id", 
DataTypes.STRING().notNull()));
+        expectedFields.add(new DataField(1, "name", DataTypes.STRING()));
+        expectedFields.add(new DataField(2, "age", DataTypes.STRING()));
 
         assertEquals(expectedFields, schema.fields());
     }
@@ -150,7 +151,9 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
         mongodbConfig.setString(MongoDBSourceOptions.DATABASE, 
"invalidDatabase");
         mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
 
-        assertThrows(RuntimeException.class, () -> 
MongodbSchema.getMongodbSchema(mongodbConfig));
+        assertThrows(
+                RuntimeException.class,
+                () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig, 
true));
     }
 
     @Test
@@ -164,6 +167,8 @@ public class MongodbSchemaITCase extends 
MongoDBActionITCaseBase {
         mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
         mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"invalidCollection");
 
-        assertThrows(RuntimeException.class, () -> 
MongodbSchema.getMongodbSchema(mongodbConfig));
+        assertThrows(
+                RuntimeException.class,
+                () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig, 
true));
     }
 }


Reply via email to