This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cdd5bb72f7 [core] Clean constants, caseSensitive, loader in Catalog 
(#4721)
cdd5bb72f7 is described below

commit cdd5bb72f706901f6978a71832e4ee1c78934e08
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 16 22:33:22 2024 +0800

    [core] Clean constants, caseSensitive, loader in Catalog (#4721)
---
 docs/content/maintenance/configurations.md         |  6 --
 docs/content/program-api/flink-api.md              |  2 +-
 .../generated/catalog_configuration.html           | 12 ++--
 .../shortcodes/generated/core_configuration.html   | 12 ++--
 .../file_system_catalog_configuration.html         | 36 -----------
 .../apache/paimon/arrow/ArrowBundleRecords.java    |  8 +--
 .../java/org/apache/paimon/arrow/ArrowUtils.java   | 10 +--
 .../paimon/arrow/reader/ArrowBatchReader.java      | 11 ++--
 .../paimon/arrow/vector/ArrowFormatCWriter.java    |  4 +-
 .../paimon/arrow/vector/ArrowFormatWriter.java     |  4 +-
 .../org/apache/paimon/options/CatalogOptions.java  |  9 ++-
 .../java/org/apache/paimon/utils/StringUtils.java  |  4 +-
 .../org/apache/paimon/catalog/AbstractCatalog.java | 15 ++---
 .../java/org/apache/paimon/catalog/Catalog.java    | 74 ++++++++--------------
 ...ystemCatalogOptions.java => CatalogLoader.java} | 18 ++----
 .../org/apache/paimon/catalog/CatalogUtils.java    | 34 ++++++++++
 .../org/apache/paimon/catalog/DelegateCatalog.java |  4 +-
 .../apache/paimon/catalog/FileSystemCatalog.java   |  6 +-
 .../java/org/apache/paimon/jdbc/JdbcCatalog.java   |  2 +-
 .../java/org/apache/paimon/rest/RESTCatalog.java   | 11 ++--
 .../paimon/catalog/FileSystemCatalogTest.java      |  4 +-
 .../flink/action/cdc/CdcActionCommonUtils.java     |  8 +--
 .../apache/paimon/flink/action/cdc/Expression.java |  2 +-
 .../cdc/MessageQueueSyncTableActionBase.java       |  2 +-
 .../flink/action/cdc/SyncDatabaseActionBase.java   | 11 ++--
 .../flink/action/cdc/SyncTableActionBase.java      | 11 ++--
 .../action/cdc/SynchronizationActionBase.java      |  4 +-
 .../flink/action/cdc/TableNameConverter.java       |  6 +-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  4 +-
 .../paimon/flink/sink/cdc/CaseSensitiveUtils.java  | 13 ++--
 .../cdc/CdcDynamicTableParsingProcessFunction.java |  5 +-
 .../cdc/CdcMultiplexRecordChannelComputer.java     |  5 +-
 .../sink/cdc/CdcRecordStoreMultiWriteOperator.java |  9 +--
 .../paimon/flink/sink/cdc/CdcSinkBuilder.java      |  6 +-
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     |  6 +-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  |  6 +-
 ...MultiTableUpdatedDataFieldsProcessFunction.java |  3 +-
 .../paimon/flink/sink/cdc/RichCdcSinkBuilder.java  |  8 +--
 .../sink/cdc/UpdatedDataFieldsProcessFunction.java |  4 +-
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  | 12 ++--
 .../flink/action/cdc/SchemaEvolutionTest.java      |  4 +-
 .../kafka/KafkaCanalSyncDatabaseActionITCase.java  |  4 +-
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java |  4 +-
 .../cdc/kafka/KafkaSyncDatabaseActionITCase.java   |  4 +-
 .../mongodb/MongoDBSyncDatabaseActionITCase.java   |  4 +-
 .../cdc/mongodb/MongoDBSyncTableActionITCase.java  |  4 +-
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   |  5 +-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  5 +-
 .../cdc/CdcMultiplexRecordChannelComputerTest.java |  3 +-
 .../cdc/CdcRecordStoreMultiWriteOperatorTest.java  |  7 +-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java   |  5 +-
 .../sink/cdc/FlinkCdcSyncTableSinkITCase.java      |  5 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  3 +-
 .../org/apache/paimon/flink/action/ActionBase.java |  3 +-
 .../flink/compact/MultiAwareBucketTableScan.java   |  4 +-
 .../paimon/flink/compact/MultiTableScanBase.java   |  3 +-
 .../flink/compact/MultiUnawareBucketTableScan.java |  4 +-
 ...pendOnlyMultiTableCompactionWorkerOperator.java |  9 +--
 .../flink/sink/CombinedTableCompactorSink.java     |  6 +-
 .../sink/MultiTablesStoreCompactOperator.java      |  9 +--
 .../paimon/flink/sink/StoreMultiCommitter.java     |  5 +-
 .../flink/sink/partition/PartitionListeners.java   |  5 +-
 ...orter.java => PartitionStatisticsReporter.java} | 17 +++--
 ...sListener.java => ReportPartStatsListener.java} | 22 +++----
 .../CombinedTableCompactorSourceBuilder.java       |  7 +-
 .../source/operator/CombinedAwareBatchSource.java  |  6 +-
 .../operator/CombinedAwareStreamingSource.java     |  6 +-
 .../source/operator/CombinedCompactorSource.java   |  9 +--
 .../operator/CombinedUnawareBatchSource.java       |  5 +-
 .../operator/CombinedUnawareStreamingSource.java   |  6 +-
 .../source/operator/MultiTablesReadOperator.java   |  7 +-
 .../operator/MultiUnawareTablesReadOperator.java   |  6 +-
 .../paimon/flink/sink/CompactorSinkITCase.java     |  3 +-
 .../paimon/flink/sink/StoreMultiCommitterTest.java |  5 +-
 ...t.java => PartitionStatisticsReporterTest.java} |  8 ++-
 .../MultiTablesCompactorSourceBuilderITCase.java   |  3 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   |  7 +-
 .../java/org/apache/paimon/spark/SparkCatalog.java |  6 +-
 .../apache/paimon/spark/SparkGenericCatalog.java   |  6 +-
 79 files changed, 320 insertions(+), 325 deletions(-)

diff --git a/docs/content/maintenance/configurations.md 
b/docs/content/maintenance/configurations.md
index 3849d70a58..99f797e68f 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -38,12 +38,6 @@ Options for paimon catalog.
 
 {{< generated/catalog_configuration >}}
 
-### FilesystemCatalogOptions
-
-Options for Filesystem catalog.
-
-{{< generated/file_system_catalog_configuration >}}
-
 ### HiveCatalogOptions
 
 Options for Hive catalog.
diff --git a/docs/content/program-api/flink-api.md 
b/docs/content/program-api/flink-api.md
index 6ecac3909c..3451b40d58 100644
--- a/docs/content/program-api/flink-api.md
+++ b/docs/content/program-api/flink-api.md
@@ -221,7 +221,7 @@ public class WriteCdcToTable {
         Identifier identifier = Identifier.create("my_db", "T");
         Options catalogOptions = new Options();
         catalogOptions.set("warehouse", "/path/to/warehouse");
-        Catalog.Loader catalogLoader = 
+        CatalogLoader catalogLoader = 
                 () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
         Table table = catalogLoader.load().getTable(identifier);
 
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 63f7adda1e..6355c95586 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -26,12 +26,6 @@ under the License.
         </tr>
     </thead>
     <tbody>
-        <tr>
-            <td><h5>allow-upper-case</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>Boolean</td>
-            <td>Indicates whether this catalog allow upper case, its default 
value depends on the implementation of the specific catalog.</td>
-        </tr>
         <tr>
             <td><h5>cache-enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
@@ -74,6 +68,12 @@ under the License.
             <td>Integer</td>
             <td>Controls the max number for snapshots per table in the catalog 
are cached.</td>
         </tr>
+        <tr>
+            <td><h5>case-sensitive</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Boolean</td>
+            <td>Indicates whether this catalog is case-sensitive.</td>
+        </tr>
         <tr>
             <td><h5>client-pool-size</h5></td>
             <td style="word-wrap: break-word;">2</td>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 15b1aac935..1133de289f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -230,6 +230,12 @@ under the License.
             <td>String</td>
             <td>Specify the file name prefix of data files.</td>
         </tr>
+        <tr>
+            <td><h5>data-file.thin-mode</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Enable data file thin mode to avoid duplicate columns 
storage.</td>
+        </tr>
         <tr>
             <td><h5>delete-file.thread-num</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -864,12 +870,6 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td>Integer</td>
             <td>Default spill compression zstd level. For higher compression 
rates, it can be configured to 9, but the read and write speed will 
significantly decrease.</td>
         </tr>
-        <tr>
-            <td><h5>data-file.thin-mode</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Enable data file thin mode to avoid duplicate columns 
storage.</td>
-        </tr>
         <tr>
             <td><h5>streaming-read-mode</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html 
b/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html
deleted file mode 100644
index c416ed6da5..0000000000
--- a/docs/layouts/shortcodes/generated/file_system_catalog_configuration.html
+++ /dev/null
@@ -1,36 +0,0 @@
-{{/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/}}
-<table class="configuration table table-bordered">
-    <thead>
-        <tr>
-            <th class="text-left" style="width: 20%">Key</th>
-            <th class="text-left" style="width: 15%">Default</th>
-            <th class="text-left" style="width: 10%">Type</th>
-            <th class="text-left" style="width: 55%">Description</th>
-        </tr>
-    </thead>
-    <tbody>
-        <tr>
-            <td><h5>case-sensitive</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean</td>
-            <td>Is case sensitive. If case insensitive, you need to set this 
option to false, and the table name and fields be converted to lowercase.</td>
-        </tr>
-    </tbody>
-</table>
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
index 9627bbd85d..25f6603ec2 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowBundleRecords.java
@@ -32,13 +32,13 @@ public class ArrowBundleRecords implements BundleRecords {
 
     private final VectorSchemaRoot vectorSchemaRoot;
     private final RowType rowType;
-    private final boolean allowUpperCase;
+    private final boolean caseSensitive;
 
     public ArrowBundleRecords(
-            VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean 
allowUpperCase) {
+            VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean 
caseSensitive) {
         this.vectorSchemaRoot = vectorSchemaRoot;
         this.rowType = rowType;
-        this.allowUpperCase = allowUpperCase;
+        this.caseSensitive = caseSensitive;
     }
 
     public VectorSchemaRoot getVectorSchemaRoot() {
@@ -52,7 +52,7 @@ public class ArrowBundleRecords implements BundleRecords {
 
     @Override
     public Iterator<InternalRow> iterator() {
-        ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, 
allowUpperCase);
+        ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, 
caseSensitive);
         return arrowBatchReader.readBatch(vectorSchemaRoot).iterator();
     }
 }
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
index b3925a0a76..0f6a98b7a2 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
@@ -55,6 +55,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
+
 /** Utilities for creating Arrow objects. */
 public class ArrowUtils {
 
@@ -66,13 +68,13 @@ public class ArrowUtils {
     }
 
     public static VectorSchemaRoot createVectorSchemaRoot(
-            RowType rowType, BufferAllocator allocator, boolean 
allowUpperCase) {
+            RowType rowType, BufferAllocator allocator, boolean caseSensitive) 
{
         List<Field> fields =
                 rowType.getFields().stream()
                         .map(
                                 f ->
                                         toArrowField(
-                                                allowUpperCase ? f.name() : 
f.name().toLowerCase(),
+                                                toLowerCaseIfNeed(f.name(), 
caseSensitive),
                                                 f.id(),
                                                 f.type(),
                                                 0))
@@ -81,9 +83,9 @@ public class ArrowUtils {
     }
 
     public static FieldVector createVector(
-            DataField dataField, BufferAllocator allocator, boolean 
allowUpperCase) {
+            DataField dataField, BufferAllocator allocator, boolean 
caseSensitive) {
         return toArrowField(
-                        allowUpperCase ? dataField.name() : 
dataField.name().toLowerCase(),
+                        toLowerCaseIfNeed(dataField.name(), caseSensitive),
                         dataField.id(),
                         dataField.type(),
                         0)
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java
index 9d20062b43..b626758ded 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/reader/ArrowBatchReader.java
@@ -34,6 +34,8 @@ import org.apache.arrow.vector.types.pojo.Schema;
 import java.util.Iterator;
 import java.util.List;
 
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
+
 /** Reader from a {@link VectorSchemaRoot} to paimon rows. */
 public class ArrowBatchReader {
 
@@ -41,9 +43,9 @@ public class ArrowBatchReader {
     private final VectorizedColumnBatch batch;
     private final Arrow2PaimonVectorConverter[] convertors;
     private final RowType projectedRowType;
-    private final boolean allowUpperCase;
+    private final boolean caseSensitive;
 
-    public ArrowBatchReader(RowType rowType, boolean allowUpperCase) {
+    public ArrowBatchReader(RowType rowType, boolean caseSensitive) {
         this.internalRowSerializer = new InternalRowSerializer(rowType);
         ColumnVector[] columnVectors = new 
ColumnVector[rowType.getFieldCount()];
         this.convertors = new 
Arrow2PaimonVectorConverter[rowType.getFieldCount()];
@@ -53,7 +55,7 @@ public class ArrowBatchReader {
         for (int i = 0; i < columnVectors.length; i++) {
             this.convertors[i] = 
Arrow2PaimonVectorConverter.construct(rowType.getTypeAt(i));
         }
-        this.allowUpperCase = allowUpperCase;
+        this.caseSensitive = caseSensitive;
     }
 
     public Iterable<InternalRow> readBatch(VectorSchemaRoot vsr) {
@@ -63,8 +65,7 @@ public class ArrowBatchReader {
         for (int i = 0; i < dataFields.size(); ++i) {
             try {
                 String fieldName = dataFields.get(i).name();
-                Field field =
-                        arrowSchema.findField(allowUpperCase ? fieldName : 
fieldName.toLowerCase());
+                Field field = 
arrowSchema.findField(toLowerCaseIfNeed(fieldName, caseSensitive));
                 int idx = arrowSchema.getFields().indexOf(field);
                 mapping[i] = idx;
             } catch (IllegalArgumentException e) {
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
index 10afcaf691..442457813a 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
@@ -37,8 +37,8 @@ public class ArrowFormatCWriter implements AutoCloseable {
     private final ArrowSchema schema;
     private final ArrowFormatWriter realWriter;
 
-    public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean 
allowUpperCase) {
-        this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize, 
allowUpperCase);
+    public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean 
caseSensitive) {
+        this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize, 
caseSensitive);
         RootAllocator allocator = realWriter.getAllocator();
         array = ArrowArray.allocateNew(allocator);
         schema = ArrowSchema.allocateNew(allocator);
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
index acdb5d0dcb..9f55792197 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -43,10 +43,10 @@ public class ArrowFormatWriter implements AutoCloseable {
     private final RootAllocator allocator;
     private int rowId;
 
-    public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean 
allowUpperCase) {
+    public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean 
caseSensitive) {
         allocator = new RootAllocator();
 
-        vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, 
allocator, allowUpperCase);
+        vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, 
allocator, caseSensitive);
 
         fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index bb8cfae682..b22274e011 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -128,13 +128,12 @@ public class CatalogOptions {
                     .withDescription(
                             "Controls the max number for snapshots per table 
in the catalog are cached.");
 
-    public static final ConfigOption<Boolean> ALLOW_UPPER_CASE =
-            ConfigOptions.key("allow-upper-case")
+    public static final ConfigOption<Boolean> CASE_SENSITIVE =
+            ConfigOptions.key("case-sensitive")
                     .booleanType()
                     .noDefaultValue()
-                    .withDescription(
-                            "Indicates whether this catalog allow upper case, "
-                                    + "its default value depends on the 
implementation of the specific catalog.");
+                    .withFallbackKeys("allow-upper-case")
+                    .withDescription("Indicates whether this catalog is 
case-sensitive.");
 
     public static final ConfigOption<Boolean> SYNC_ALL_PROPERTIES =
             ConfigOptions.key("sync-all-properties")
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index e184624c0b..c4e07e0a69 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -542,8 +542,8 @@ public class StringUtils {
         return "`" + str + "`";
     }
 
-    public static String caseSensitiveConversion(String str, boolean 
allowUpperCase) {
-        return allowUpperCase ? str : str.toLowerCase();
+    public static String toLowerCaseIfNeed(String str, boolean caseSensitive) {
+        return caseSensitive ? str : str.toLowerCase();
     }
 
     public static boolean isNumeric(final CharSequence cs) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index a1b41e3b8a..db69092955 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -60,7 +60,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.CoreOptions.createCommitUser;
-import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
@@ -82,7 +81,7 @@ public abstract class AbstractCatalog implements Catalog {
 
     protected AbstractCatalog(FileIO fileIO, Options options) {
         this.fileIO = fileIO;
-        this.tableDefaultOptions = 
Catalog.tableDefaultOptions(options.toMap());
+        this.tableDefaultOptions = 
CatalogUtils.tableDefaultOptions(options.toMap());
         this.catalogOptions = options;
     }
 
@@ -123,11 +122,6 @@ public abstract class AbstractCatalog implements Catalog {
         return 
catalogOptions.getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
     }
 
-    @Override
-    public boolean allowUpperCase() {
-        return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true);
-    }
-
     protected boolean allowCustomTablePath() {
         return false;
     }
@@ -559,8 +553,9 @@ public abstract class AbstractCatalog implements Catalog {
     }
 
     protected void validateIdentifierNameCaseInsensitive(Identifier 
identifier) {
-        Catalog.validateCaseInsensitive(allowUpperCase(), "Database", 
identifier.getDatabaseName());
-        Catalog.validateCaseInsensitive(allowUpperCase(), "Table", 
identifier.getObjectName());
+        CatalogUtils.validateCaseInsensitive(
+                caseSensitive(), "Database", identifier.getDatabaseName());
+        CatalogUtils.validateCaseInsensitive(caseSensitive(), "Table", 
identifier.getObjectName());
     }
 
     private void 
validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> changes) {
@@ -578,7 +573,7 @@ public abstract class AbstractCatalog implements Catalog {
     }
 
     protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
-        Catalog.validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
+        CatalogUtils.validateCaseInsensitive(caseSensitive(), "Field", 
fieldNames);
     }
 
     private void validateAutoCreateClose(Map<String, String> options) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index c3808caa13..7b1fe0ea07 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -26,15 +26,9 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.view.View;
 
-import java.io.Serializable;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
-
-import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
  * This interface is responsible for reading and writing metadata such as 
database/table from a
@@ -46,30 +40,38 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 @Public
 public interface Catalog extends AutoCloseable {
 
-    String DEFAULT_DATABASE = "default";
-
+    // constants for system table and database
     String SYSTEM_TABLE_SPLITTER = "$";
     String SYSTEM_DATABASE_NAME = "sys";
     String SYSTEM_BRANCH_PREFIX = "branch_";
-    String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
-    String DB_SUFFIX = ".db";
 
+    // constants for table and database
     String COMMENT_PROP = "comment";
     String OWNER_PROP = "owner";
+
+    // constants for database
+    String DEFAULT_DATABASE = "default";
+    String DB_SUFFIX = ".db";
     String DB_LOCATION_PROP = "location";
+
+    // constants for table
+    String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
     String NUM_ROWS_PROP = "numRows";
     String NUM_FILES_PROP = "numFiles";
     String TOTAL_SIZE_PROP = "totalSize";
     String LAST_UPDATE_TIME_PROP = "lastUpdateTime";
-    String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime";
 
-    /** Warehouse root path containing all database directories in this 
catalog. */
+    /** Warehouse root path for creating new databases. */
     String warehouse();
 
-    /** Catalog options. */
+    /** {@link FileIO} of this catalog. It can access {@link #warehouse()} 
path. */
+    FileIO fileIO();
+
+    /** Catalog options for re-creating this catalog. */
     Map<String, String> options();
 
-    FileIO fileIO();
+    /** Return a boolean that indicates whether this catalog is 
case-sensitive. */
+    boolean caseSensitive();
 
     /**
      * Get the names of all databases in this catalog.
@@ -325,44 +327,30 @@ public interface Catalog extends AutoCloseable {
         throw new UnsupportedOperationException();
     }
 
-    /** Return a boolean that indicates whether this catalog allow upper case. 
*/
-    boolean allowUpperCase();
-
+    /**
+     * Repair the entire Catalog, repair the metadata in the metastore 
consistent with the metadata
+     * in the filesystem, register missing tables in the metastore.
+     */
     default void repairCatalog() {
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * Repair the entire database, repair the metadata in the metastore 
consistent with the metadata
+     * in the filesystem, register missing tables in the metastore.
+     */
     default void repairDatabase(String databaseName) {
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * Repair the table, repair the metadata in the metastore consistent with 
the metadata in the
+     * filesystem.
+     */
     default void repairTable(Identifier identifier) throws 
TableNotExistException {
         throw new UnsupportedOperationException();
     }
 
-    static Map<String, String> tableDefaultOptions(Map<String, String> 
options) {
-        return convertToPropertiesPrefixKey(options, 
TABLE_DEFAULT_OPTION_PREFIX);
-    }
-
-    /** Validate database, table and field names must be lowercase when not 
case-sensitive. */
-    static void validateCaseInsensitive(boolean caseSensitive, String type, 
String... names) {
-        validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
-    }
-
-    /** Validate database, table and field names must be lowercase when not 
case-sensitive. */
-    static void validateCaseInsensitive(boolean caseSensitive, String type, 
List<String> names) {
-        if (caseSensitive) {
-            return;
-        }
-        List<String> illegalNames =
-                names.stream().filter(f -> 
!f.equals(f.toLowerCase())).collect(Collectors.toList());
-        checkArgument(
-                illegalNames.isEmpty(),
-                String.format(
-                        "%s name %s cannot contain upper case in the catalog.",
-                        type, illegalNames));
-    }
-
     /** Exception for trying to drop on a database that is not empty. */
     class DatabaseNotEmptyException extends Exception {
         private static final String MSG = "Database %s is not empty.";
@@ -599,10 +587,4 @@ public interface Catalog extends AutoCloseable {
             return identifier;
         }
     }
-
-    /** Loader of {@link Catalog}. */
-    @FunctionalInterface
-    interface Loader extends Serializable {
-        Catalog load();
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java
 b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java
similarity index 55%
rename from 
paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java
rename to paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java
index e656742b42..c8de08139c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java
@@ -18,19 +18,11 @@
 
 package org.apache.paimon.catalog;
 
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
+import java.io.Serializable;
 
-/** Options for filesystem catalog. */
-public final class FileSystemCatalogOptions {
+/** Loader for creating a {@link Catalog}. */
+@FunctionalInterface
+public interface CatalogLoader extends Serializable {
 
-    public static final ConfigOption<Boolean> CASE_SENSITIVE =
-            ConfigOptions.key("case-sensitive")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withFallbackKeys("allow-upper-case")
-                    .withDescription(
-                            "Is case sensitive. If case insensitive, you need 
to set this option to false, and the table name and fields be converted to 
lowercase.");
-
-    private FileSystemCatalogOptions() {}
+    Catalog load();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 39f81833a9..bae23c6276 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -21,6 +21,15 @@ package org.apache.paimon.catalog;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.SchemaManager;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
+import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Utils for {@link Catalog}. */
 public class CatalogUtils {
 
@@ -51,4 +60,29 @@ public class CatalogUtils {
     public static String table(String path) {
         return SchemaManager.identifierFromPath(path, false).getObjectName();
     }
+
+    public static Map<String, String> tableDefaultOptions(Map<String, String> 
options) {
+        return convertToPropertiesPrefixKey(options, 
TABLE_DEFAULT_OPTION_PREFIX);
+    }
+
+    /** Validate database, table and field names must be lowercase when not 
case-sensitive. */
+    public static void validateCaseInsensitive(
+            boolean caseSensitive, String type, String... names) {
+        validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
+    }
+
+    /** Validate database, table and field names must be lowercase when not 
case-sensitive. */
+    public static void validateCaseInsensitive(
+            boolean caseSensitive, String type, List<String> names) {
+        if (caseSensitive) {
+            return;
+        }
+        List<String> illegalNames =
+                names.stream().filter(f -> 
!f.equals(f.toLowerCase())).collect(Collectors.toList());
+        checkArgument(
+                illegalNames.isEmpty(),
+                String.format(
+                        "%s name %s cannot contain upper case in the catalog.",
+                        type, illegalNames));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 2298626b0e..93e8ce2581 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -42,8 +42,8 @@ public class DelegateCatalog implements Catalog {
     }
 
     @Override
-    public boolean allowUpperCase() {
-        return wrapped.allowUpperCase();
+    public boolean caseSensitive() {
+        return wrapped.caseSensitive();
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 9264a54647..279ddb26ee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -34,7 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
-import static 
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 
 /** A catalog implementation for {@link FileIO}. */
 public class FileSystemCatalog extends AbstractCatalog {
@@ -158,7 +158,7 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    public boolean allowUpperCase() {
-        return catalogOptions.get(CASE_SENSITIVE);
+    public boolean caseSensitive() {
+        return catalogOptions.getOptional(CASE_SENSITIVE).orElse(true);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 778bc591fe..551b2d8fc9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -320,7 +320,7 @@ public class JdbcCatalog extends AbstractCatalog {
     }
 
     @Override
-    public boolean allowUpperCase() {
+    public boolean caseSensitive() {
         return false;
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 86b87e25e8..c30e1109e2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -52,6 +52,7 @@ import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 import static 
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
 
 /** A catalog implementation for REST. */
@@ -61,7 +62,7 @@ public class RESTCatalog implements Catalog {
 
     private final RESTClient client;
     private final ResourcePaths resourcePaths;
-    private final Map<String, String> options;
+    private final Options options;
     private final Map<String, String> baseHeader;
     private final AuthSession catalogAuth;
 
@@ -99,7 +100,7 @@ public class RESTCatalog implements Catalog {
         }
         Map<String, String> initHeaders =
                 RESTUtil.merge(configHeaders(options.toMap()), 
this.catalogAuth.getHeaders());
-        this.options = fetchOptionsFromServer(initHeaders, options.toMap());
+        this.options = new Options(fetchOptionsFromServer(initHeaders, 
options.toMap()));
         this.resourcePaths =
                 ResourcePaths.forCatalogProperties(
                         this.options.get(RESTCatalogInternalOptions.PREFIX));
@@ -112,7 +113,7 @@ public class RESTCatalog implements Catalog {
 
     @Override
     public Map<String, String> options() {
-        return this.options;
+        return this.options.toMap();
     }
 
     @Override
@@ -223,8 +224,8 @@ public class RESTCatalog implements Catalog {
     }
 
     @Override
-    public boolean allowUpperCase() {
-        return false;
+    public boolean caseSensitive() {
+        return options.getOptional(CASE_SENSITIVE).orElse(true);
     }
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
index 303a9d8733..65ea6721c2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
@@ -36,12 +36,12 @@ public class FileSystemCatalogTest extends CatalogTestBase {
     public void setUp() throws Exception {
         super.setUp();
         Options catalogOptions = new Options();
-        catalogOptions.set(CatalogOptions.ALLOW_UPPER_CASE, false);
+        catalogOptions.set(CatalogOptions.CASE_SENSITIVE, false);
         catalog = new FileSystemCatalog(fileIO, new Path(warehouse), 
catalogOptions);
     }
 
     @Test
-    public void testCreateTableAllowUpperCase() throws Exception {
+    public void testCreateTableCaseSensitive() throws Exception {
         catalog.createDatabase("test_db", false);
         Identifier identifier = Identifier.create("test_db", "new_table");
         Schema schema =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index c8af6f91c4..6482a625f4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -42,7 +42,7 @@ import static 
org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
 import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkState;
-import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion;
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
 
 /** Common utils for CDC Action. */
 public class CdcActionCommonUtils {
@@ -129,21 +129,21 @@ public class CdcActionCommonUtils {
         List<String> allFieldNames = new ArrayList<>();
 
         for (DataField field : sourceSchema.fields()) {
-            String fieldName = caseSensitiveConversion(field.name(), 
caseSensitive);
+            String fieldName = toLowerCaseIfNeed(field.name(), caseSensitive);
             allFieldNames.add(fieldName);
             builder.column(fieldName, field.type(), field.description());
         }
 
         for (ComputedColumn computedColumn : computedColumns) {
             String computedColumnName =
-                    caseSensitiveConversion(computedColumn.columnName(), 
caseSensitive);
+                    toLowerCaseIfNeed(computedColumn.columnName(), 
caseSensitive);
             allFieldNames.add(computedColumnName);
             builder.column(computedColumnName, computedColumn.columnType());
         }
 
         for (CdcMetadataConverter metadataConverter : metadataConverters) {
             String metadataColumnName =
-                    caseSensitiveConversion(metadataConverter.columnName(), 
caseSensitive);
+                    toLowerCaseIfNeed(metadataConverter.columnName(), 
caseSensitive);
             allFieldNames.add(metadataColumnName);
             builder.column(metadataColumnName, metadataConverter.dataType());
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 2e0a131929..3290ec1829 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -189,7 +189,7 @@ public interface Expression extends Serializable {
             String[] literals =
                     
Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
             String referencedFieldCheckForm =
-                    StringUtils.caseSensitiveConversion(referencedField, 
caseSensitive);
+                    StringUtils.toLowerCaseIfNeed(referencedField, 
caseSensitive);
 
             DataType fieldType =
                     checkNotNull(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
index 9c629b5a51..3af0957ce6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
@@ -86,7 +86,7 @@ public abstract class MessageQueueSyncTableActionBase extends 
SyncTableActionBas
                 tableConfig,
                 retrievedSchema,
                 metadataConverters,
-                allowUpperCase,
+                caseSensitive,
                 true,
                 false);
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 4fb1339c51..56334c1e7b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
 import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -155,9 +156,9 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
 
     @Override
     protected void validateCaseSensitivity() {
-        Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
-        Catalog.validateCaseInsensitive(allowUpperCase, "Table prefix", 
tablePrefix);
-        Catalog.validateCaseInsensitive(allowUpperCase, "Table suffix", 
tableSuffix);
+        CatalogUtils.validateCaseInsensitive(caseSensitive, "Database", 
database);
+        CatalogUtils.validateCaseInsensitive(caseSensitive, "Table prefix", 
tablePrefix);
+        CatalogUtils.validateCaseInsensitive(caseSensitive, "Table suffix", 
tableSuffix);
     }
 
     @Override
@@ -179,7 +180,7 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
         NewTableSchemaBuilder schemaBuilder =
                 new NewTableSchemaBuilder(
                         tableConfig,
-                        allowUpperCase,
+                        caseSensitive,
                         partitionKeys,
                         primaryKeys,
                         requirePrimaryKeys(),
@@ -190,7 +191,7 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
                 excludingTables == null ? null : 
Pattern.compile(excludingTables);
         TableNameConverter tableNameConverter =
                 new TableNameConverter(
-                        allowUpperCase,
+                        caseSensitive,
                         mergeShards,
                         dbPrefix,
                         dbSuffix,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index 87efeb2a19..6fcdbd44bc 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
@@ -107,15 +108,15 @@ public abstract class SyncTableActionBase extends 
SynchronizationActionBase {
                 tableConfig,
                 retrievedSchema,
                 metadataConverters,
-                allowUpperCase,
+                caseSensitive,
                 true,
                 true);
     }
 
     @Override
     protected void validateCaseSensitivity() {
-        Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
-        Catalog.validateCaseInsensitive(allowUpperCase, "Table", table);
+        CatalogUtils.validateCaseInsensitive(caseSensitive, "Database", 
database);
+        CatalogUtils.validateCaseInsensitive(caseSensitive, "Table", table);
     }
 
     @Override
@@ -142,7 +143,7 @@ public abstract class SyncTableActionBase extends 
SynchronizationActionBase {
                         buildComputedColumns(
                                 computedColumnArgs,
                                 fileStoreTable.schema().fields(),
-                                allowUpperCase);
+                                caseSensitive);
                 // check partition keys and primary keys in case that user 
specified them
                 checkConstraints();
             }
@@ -162,7 +163,7 @@ public abstract class SyncTableActionBase extends 
SynchronizationActionBase {
 
     @Override
     protected EventParser.Factory<RichCdcMultiplexRecord> 
buildEventParserFactory() {
-        boolean caseSensitive = this.allowUpperCase;
+        boolean caseSensitive = this.caseSensitive;
         return () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index a7c7703474..d755b200a9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -64,7 +64,7 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
     protected final String database;
     protected final Configuration cdcSourceConfig;
     protected final SyncJobHandler syncJobHandler;
-    protected final boolean allowUpperCase;
+    protected final boolean caseSensitive;
 
     protected Map<String, String> tableConfig = new HashMap<>();
     protected TypeMapping typeMapping = TypeMapping.defaultMapping();
@@ -80,7 +80,7 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
         this.database = database;
         this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
         this.syncJobHandler = syncJobHandler;
-        this.allowUpperCase = catalog.allowUpperCase();
+        this.caseSensitive = catalog.caseSensitive();
 
         this.syncJobHandler.registerJdbcDriver();
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
index 15fc3507ce..7dd63ed227 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
@@ -24,6 +24,8 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
+
 /** Used to convert a MySQL source table name to corresponding Paimon table 
name. */
 public class TableNameConverter implements Serializable {
 
@@ -78,7 +80,7 @@ public class TableNameConverter implements Serializable {
         // top priority: table mapping
         if (tableMapping.containsKey(originTblName.toLowerCase())) {
             String mappedName = tableMapping.get(originTblName.toLowerCase());
-            return caseSensitive ? mappedName : mappedName.toLowerCase();
+            return toLowerCaseIfNeed(mappedName, caseSensitive);
         }
 
         String tblPrefix = prefix;
@@ -93,7 +95,7 @@ public class TableNameConverter implements Serializable {
         }
 
         // third priority: normal prefix and suffix
-        String tableName = caseSensitive ? originTblName : 
originTblName.toLowerCase();
+        String tableName = toLowerCaseIfNeed(originTblName, caseSensitive);
         return tblPrefix + tableName + tblSuffix;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index ce2e9124a6..0f452e2834 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -139,7 +139,7 @@ public class MySqlSyncDatabaseAction extends 
SyncDatabaseActionBase {
 
         TableNameConverter tableNameConverter =
                 new TableNameConverter(
-                        allowUpperCase, mergeShards, tablePrefix, tableSuffix, 
tableMapping);
+                        caseSensitive, mergeShards, tablePrefix, tableSuffix, 
tableMapping);
         for (JdbcTableInfo tableInfo : jdbcTableInfos) {
             Identifier identifier =
                     Identifier.create(
@@ -155,7 +155,7 @@ public class MySqlSyncDatabaseAction extends 
SyncDatabaseActionBase {
                             tableConfig,
                             tableInfo.schema(),
                             metadataConverters,
-                            allowUpperCase,
+                            caseSensitive,
                             false,
                             true);
             try {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
index 4892aee030..e80692ed22 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -28,8 +29,8 @@ import org.apache.flink.util.Collector;
 public class CaseSensitiveUtils {
 
     public static DataStream<CdcRecord> cdcRecordConvert(
-            Catalog.Loader catalogLoader, DataStream<CdcRecord> input) {
-        if (allowUpperCase(catalogLoader)) {
+            CatalogLoader catalogLoader, DataStream<CdcRecord> input) {
+        if (caseSensitive(catalogLoader)) {
             return input;
         }
 
@@ -46,8 +47,8 @@ public class CaseSensitiveUtils {
     }
 
     public static DataStream<CdcMultiplexRecord> cdcMultiplexRecordConvert(
-            Catalog.Loader catalogLoader, DataStream<CdcMultiplexRecord> 
input) {
-        if (allowUpperCase(catalogLoader)) {
+            CatalogLoader catalogLoader, DataStream<CdcMultiplexRecord> input) 
{
+        if (caseSensitive(catalogLoader)) {
             return input;
         }
 
@@ -65,9 +66,9 @@ public class CaseSensitiveUtils {
                 .name("Case-insensitive Convert");
     }
 
-    private static boolean allowUpperCase(Catalog.Loader catalogLoader) {
+    private static boolean caseSensitive(CatalogLoader catalogLoader) {
         try (Catalog catalog = catalogLoader.load()) {
-            return catalog.allowUpperCase();
+            return catalog.caseSensitive();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 886e33e204..4efcf1207e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.types.DataField;
 
@@ -62,13 +63,13 @@ public class CdcDynamicTableParsingProcessFunction<T> 
extends ProcessFunction<T,
 
     private final EventParser.Factory<T> parserFactory;
     private final String database;
-    private final Catalog.Loader catalogLoader;
+    private final CatalogLoader catalogLoader;
 
     private transient EventParser<T> parser;
     private transient Catalog catalog;
 
     public CdcDynamicTableParsingProcessFunction(
-            String database, Catalog.Loader catalogLoader, 
EventParser.Factory<T> parserFactory) {
+            String database, CatalogLoader catalogLoader, 
EventParser.Factory<T> parserFactory) {
         // for now, only support single database
         this.database = database;
         this.catalogLoader = catalogLoader;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
index fdad3a921d..2858b2d4eb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
@@ -38,13 +39,13 @@ public class CdcMultiplexRecordChannelComputer implements 
ChannelComputer<CdcMul
             LoggerFactory.getLogger(CdcMultiplexRecordChannelComputer.class);
 
     private static final long serialVersionUID = 1L;
-    private final Catalog.Loader catalogLoader;
+    private final CatalogLoader catalogLoader;
 
     private transient int numChannels;
 
     private Map<Identifier, CdcRecordChannelComputer> channelComputers;
 
-    public CdcMultiplexRecordChannelComputer(Catalog.Loader catalogLoader) {
+    public CdcMultiplexRecordChannelComputer(CatalogLoader catalogLoader) {
         this.catalogLoader = catalogLoader;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 5db111a300..9387a82938 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.flink.sink.MultiTableCommittable;
@@ -67,7 +68,7 @@ public class CdcRecordStoreMultiWriteOperator
 
     private final StoreSinkWrite.WithWriteBufferProvider 
storeSinkWriteProvider;
     private final String initialCommitUser;
-    private final Catalog.Loader catalogLoader;
+    private final CatalogLoader catalogLoader;
 
     private MemoryPoolFactory memoryPoolFactory;
     private Catalog catalog;
@@ -79,7 +80,7 @@ public class CdcRecordStoreMultiWriteOperator
 
     private CdcRecordStoreMultiWriteOperator(
             StreamOperatorParameters<MultiTableCommittable> parameters,
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
             String initialCommitUser,
             Options options) {
@@ -264,10 +265,10 @@ public class CdcRecordStoreMultiWriteOperator
             extends PrepareCommitOperator.Factory<CdcMultiplexRecord, 
MultiTableCommittable> {
         private final StoreSinkWrite.WithWriteBufferProvider 
storeSinkWriteProvider;
         private final String initialCommitUser;
-        private final Catalog.Loader catalogLoader;
+        private final CatalogLoader catalogLoader;
 
         public Factory(
-                Catalog.Loader catalogLoader,
+                CatalogLoader catalogLoader,
                 StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
                 String initialCommitUser,
                 Options options) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 28b68fedc3..5c27db6ddf 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.annotation.Experimental;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
 import org.apache.paimon.schema.SchemaManager;
@@ -48,7 +48,7 @@ public class CdcSinkBuilder<T> {
     private EventParser.Factory<T> parserFactory = null;
     private Table table = null;
     private Identifier identifier = null;
-    private Catalog.Loader catalogLoader = null;
+    private CatalogLoader catalogLoader = null;
 
     @Nullable private Integer parallelism;
 
@@ -77,7 +77,7 @@ public class CdcSinkBuilder<T> {
         return this;
     }
 
-    public CdcSinkBuilder<T> withCatalogLoader(Catalog.Loader catalogLoader) {
+    public CdcSinkBuilder<T> withCatalogLoader(CatalogLoader catalogLoader) {
         this.catalogLoader = catalogLoader;
         return this;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 1688d4deb0..4cd9235cb5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.flink.sink.CommittableStateManager;
 import org.apache.paimon.flink.sink.Committer;
 import org.apache.paimon.flink.sink.CommitterOperatorFactory;
@@ -60,13 +60,13 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
     private static final String GLOBAL_COMMITTER_NAME = "Multiplex Global 
Committer";
 
     private final boolean isOverwrite = false;
-    private final Catalog.Loader catalogLoader;
+    private final CatalogLoader catalogLoader;
     private final double commitCpuCores;
     @Nullable private final MemorySize commitHeapMemory;
     private final String commitUser;
 
     public FlinkCdcMultiTableSink(
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             double commitCpuCores,
             @Nullable MemorySize commitHeapMemory,
             String commitUser) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index a9ad66847b..bd18c7e7ad 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
@@ -72,7 +72,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
     //     it will check newly added tables and create the corresponding
     //     Paimon tables. 2) in multiplex sink where it is used to
     //     initialize different writers to multiple tables.
-    private Catalog.Loader catalogLoader;
+    private CatalogLoader catalogLoader;
     // database to sync, currently only support single database
     private String database;
     private MultiTablesSinkMode mode;
@@ -111,7 +111,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         return this;
     }
 
-    public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(Catalog.Loader 
catalogLoader) {
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(CatalogLoader 
catalogLoader) {
         this.catalogLoader = catalogLoader;
         return this;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 0ad412e47d..dd612a52c2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -51,7 +52,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
 
     private final Map<Identifier, SchemaManager> schemaManagers = new 
HashMap<>();
 
-    public MultiTableUpdatedDataFieldsProcessFunction(Catalog.Loader 
catalogLoader) {
+    public MultiTableUpdatedDataFieldsProcessFunction(CatalogLoader 
catalogLoader) {
         super(catalogLoader);
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
index 610856d3af..43f63854bb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.annotation.Public;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.Table;
 
@@ -39,7 +39,7 @@ public class RichCdcSinkBuilder {
     private DataStream<RichCdcRecord> input = null;
     private Table table = null;
     private Identifier identifier = null;
-    private Catalog.Loader catalogLoader = null;
+    private CatalogLoader catalogLoader = null;
 
     @Nullable private Integer parallelism;
 
@@ -62,7 +62,7 @@ public class RichCdcSinkBuilder {
         return this;
     }
 
-    public RichCdcSinkBuilder catalogLoader(Catalog.Loader catalogLoader) {
+    public RichCdcSinkBuilder catalogLoader(CatalogLoader catalogLoader) {
         this.catalogLoader = catalogLoader;
         return this;
     }
@@ -114,7 +114,7 @@ public class RichCdcSinkBuilder {
 
     /** @deprecated Use {@link #catalogLoader}. */
     @Deprecated
-    public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
+    public RichCdcSinkBuilder withCatalogLoader(CatalogLoader catalogLoader) {
         this.catalogLoader = catalogLoader;
         return this;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 64f00d96b0..504f631058 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -53,7 +53,7 @@ public class UpdatedDataFieldsProcessFunction
     private Set<FieldIdentifier> latestFields;
 
     public UpdatedDataFieldsProcessFunction(
-            SchemaManager schemaManager, Identifier identifier, Catalog.Loader 
catalogLoader) {
+            SchemaManager schemaManager, Identifier identifier, CatalogLoader 
catalogLoader) {
         super(catalogLoader);
         this.schemaManager = schemaManager;
         this.identifier = identifier;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 4f02b784c2..d50df23742 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -48,9 +49,9 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, 
O> extends Process
     private static final Logger LOG =
             
LoggerFactory.getLogger(UpdatedDataFieldsProcessFunctionBase.class);
 
-    protected final Catalog.Loader catalogLoader;
+    protected final CatalogLoader catalogLoader;
     protected Catalog catalog;
-    private boolean allowUpperCase;
+    private boolean caseSensitive;
 
     private static final List<DataTypeRoot> STRING_TYPES =
             Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
@@ -70,7 +71,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, 
O> extends Process
     private static final List<DataTypeRoot> TIMESTAMP_TYPES =
             Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
 
-    protected UpdatedDataFieldsProcessFunctionBase(Catalog.Loader 
catalogLoader) {
+    protected UpdatedDataFieldsProcessFunctionBase(CatalogLoader 
catalogLoader) {
         this.catalogLoader = catalogLoader;
     }
 
@@ -86,7 +87,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, 
O> extends Process
      */
     public void open(Configuration parameters) {
         this.catalog = catalogLoader.load();
-        this.allowUpperCase = this.catalog.allowUpperCase();
+        this.caseSensitive = this.catalog.caseSensitive();
     }
 
     protected void applySchemaChange(
@@ -215,8 +216,7 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
 
         List<SchemaChange> result = new ArrayList<>();
         for (DataField newField : updatedDataFields) {
-            String newFieldName =
-                    StringUtils.caseSensitiveConversion(newField.name(), 
allowUpperCase);
+            String newFieldName = 
StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive);
             if (oldFields.containsKey(newFieldName)) {
                 DataField oldField = oldFields.get(newFieldName);
                 // we compare by ignoring nullable, because partition keys and 
primary keys might be
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
index 9ba1837686..46c8e98fb6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
@@ -202,7 +202,7 @@ public class SchemaEvolutionTest extends TableTestBase {
         DataStream<List<DataField>> upDataFieldStream = 
env.fromCollection(prepareData());
         Options options = new Options();
         options.set("warehouse", tempPath.toString());
-        final Catalog.Loader catalogLoader = () -> 
FlinkCatalogFactory.createPaimonCatalog(options);
+        final CatalogLoader catalogLoader = () -> 
FlinkCatalogFactory.createPaimonCatalog(options);
         Identifier identifier = Identifier.create(database, tableName);
         DataStream<Void> schemaChangeProcessFunction =
                 upDataFieldStream
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 0e1f4e72ea..6e37c589ac 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -535,7 +535,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                         .withTableConfig(getBasicTableConfig())
                         .withCatalogConfig(
                                 Collections.singletonMap(
-                                        
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                                        CatalogOptions.CASE_SENSITIVE.key(), 
"false"))
                         .build();
         runActionWithDefaultEnv(action);
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 8a4dc2f303..ed1885f5d7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -1121,7 +1121,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
                         .withTableConfig(getBasicTableConfig())
                         .withCatalogConfig(
                                 Collections.singletonMap(
-                                        
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                                        CatalogOptions.CASE_SENSITIVE.key(), 
"false"))
                         .withComputedColumnArgs("_YEAR=year(_DATE)")
                         .build();
         runActionWithDefaultEnv(action);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
index de189bc205..606c46e90e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -475,7 +475,7 @@ public class KafkaSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                         .withTableConfig(getBasicTableConfig())
                         .withCatalogConfig(
                                 Collections.singletonMap(
-                                        
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                                        CatalogOptions.CASE_SENSITIVE.key(), 
"false"))
                         .build();
         runActionWithDefaultEnv(action);
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index ae0b0b412a..92c2a7243a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -196,7 +196,7 @@ public class MongoDBSyncDatabaseActionITCase extends 
MongoDBActionITCaseBase {
                         .withTableConfig(getBasicTableConfig())
                         .withCatalogConfig(
                                 Collections.singletonMap(
-                                        
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                                        CatalogOptions.CASE_SENSITIVE.key(), 
"false"))
                         .build();
         runActionWithDefaultEnv(action);
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index b4f31f2d6d..2d8489dc23 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mongodb;
 
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -382,7 +382,7 @@ public class MongoDBSyncTableActionITCase extends 
MongoDBActionITCaseBase {
                         .withTableConfig(getBasicTableConfig())
                         .withCatalogConfig(
                                 Collections.singletonMap(
-                                        
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                                        CatalogOptions.CASE_SENSITIVE.key(), 
"false"))
                         .withComputedColumnArgs("_YEAR=year(_DATE)")
                         .build();
         runActionWithDefaultEnv(action);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index b48b898d66..10ee548125 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
 import org.apache.paimon.options.CatalogOptions;
@@ -475,7 +474,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                 syncDatabaseActionBuilder(mySqlConfig)
                         .withCatalogConfig(
                                 Collections.singletonMap(
-                                        
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                                        CatalogOptions.CASE_SENSITIVE.key(), 
"false"))
                         .withTableConfig(getBasicTableConfig())
                         .build();
         runActionWithDefaultEnv(action);
@@ -496,7 +495,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                 syncDatabaseActionBuilder(mySqlConfig)
                         .withCatalogConfig(
                                 Collections.singletonMap(
-                                        
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                                        CatalogOptions.CASE_SENSITIVE.key(), 
"false"))
                         .withMode(COMBINED.configString())
                         .withTableConfig(getBasicTableConfig())
                         .build();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index febbe4e1de..749d87eb06 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.FileSystemCatalogOptions;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -1327,7 +1326,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                 syncTableActionBuilder(mySqlConfig)
                         .withCatalogConfig(
                                 Collections.singletonMap(
-                                        
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                                        CatalogOptions.CASE_SENSITIVE.key(), 
"false"))
                         
.withComputedColumnArgs("SUBSTRING=substring(UPPERCASE_STRING,2)")
                         .build();
         runActionWithDefaultEnv(action);
@@ -1363,7 +1362,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                 syncTableActionBuilder(mySqlConfig)
                         .withCatalogConfig(
                                 Collections.singletonMap(
-                                        
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                                        CatalogOptions.CASE_SENSITIVE.key(), 
"false"))
                         .withPrimaryKeys("ID1", "PART")
                         .withPartitionKeys("PART")
                         .build();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
index 867cbdbae0..43b7d2ba63 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.CatalogOptions;
@@ -54,7 +55,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class CdcMultiplexRecordChannelComputerTest {
 
     @TempDir java.nio.file.Path tempDir;
-    private Catalog.Loader catalogLoader;
+    private CatalogLoader catalogLoader;
     private Path warehouse;
     private String databaseName;
     private Identifier tableWithPartition;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 9f35b25026..4436aa392d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.sink.MultiTableCommittable;
 import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
@@ -82,7 +83,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
     private Identifier firstTable;
     private Catalog catalog;
     private Identifier secondTable;
-    private Catalog.Loader catalogLoader;
+    private CatalogLoader catalogLoader;
     private Schema firstTableSchema;
 
     @BeforeEach
@@ -340,7 +341,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         harness.close();
     }
 
-    private Catalog.Loader createCatalogLoader() {
+    private CatalogLoader createCatalogLoader() {
         Options catalogOptions = createCatalogOptions(warehouse);
         return () -> 
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
     }
@@ -688,7 +689,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
     }
 
     private OneInputStreamOperatorTestHarness<CdcMultiplexRecord, 
MultiTableCommittable>
-            createTestHarness(Catalog.Loader catalogLoader) throws Exception {
+            createTestHarness(CatalogLoader catalogLoader) throws Exception {
         CdcRecordStoreMultiWriteOperator.Factory operatorFactory =
                 new CdcRecordStoreMultiWriteOperator.Factory(
                         catalogLoader,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 28b137a93e..35286e3a88 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkCatalogFactory;
@@ -162,8 +162,7 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
 
         Options catalogOptions = new Options();
         catalogOptions.set("warehouse", tempDir.toString());
-        Catalog.Loader catalogLoader =
-                () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+        CatalogLoader catalogLoader = () -> 
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
 
         new FlinkCdcSyncDatabaseSinkBuilder<TestCdcEvent>()
                 .withInput(source)
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 8b19391f3e..9fccaac992 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.InternalRow;
@@ -159,8 +159,7 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
 
         Options catalogOptions = new Options();
         catalogOptions.set("warehouse", tempDir.toString());
-        Catalog.Loader catalogLoader =
-                () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+        CatalogLoader catalogLoader = () -> 
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
 
         new CdcSinkBuilder<TestCdcEvent>()
                 .withInput(source)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index dd95c48af8..3407735b4b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.TableType;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.procedure.ProcedureUtil;
 import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
@@ -519,7 +520,7 @@ public class FlinkCatalog extends AbstractCatalog {
             // Although catalog.createTable will copy the default options, but 
we need this info
             // here before create table, such as 
table-default.kafka.bootstrap.servers defined in
             // catalog options. Temporarily, we copy the default options here.
-            
Catalog.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent);
+            
CatalogUtils.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent);
             options.put(REGISTER_TIMEOUT.key(), 
logStoreAutoRegisterTimeout.toString());
             registerLogSystem(catalog, identifier, options, classLoader);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 30e32d62ef..4490023e7b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.LogicalTypeConversion;
@@ -99,7 +100,7 @@ public abstract class ActionBase implements Action {
         env.execute(name);
     }
 
-    protected Catalog.Loader catalogLoader() {
+    protected CatalogLoader catalogLoader() {
         // to make the action workflow serializable
         Options catalogOptions = this.catalogOptions;
         return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java
index 88730132ef..e2fd5a9d31 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.compact;
 
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
@@ -47,7 +47,7 @@ public class MultiAwareBucketTableScan extends 
MultiTableScanBase<Tuple2<Split,
     protected transient Map<Identifier, StreamTableScan> scansMap;
 
     public MultiAwareBucketTableScan(
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
index f5940740b6..805e8da0a4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.compact;
 
 import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -59,7 +60,7 @@ public abstract class MultiTableScanBase<T> implements 
AutoCloseable {
     protected boolean isStreaming;
 
     public MultiTableScanBase(
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
index da86b93af5..2ad2642b62 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.compact;
 
 import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
 import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
@@ -41,7 +41,7 @@ public class MultiUnawareBucketTableScan
     protected transient Map<Identifier, 
UnawareAppendTableCompactionCoordinator> tablesMap;
 
     public MultiUnawareBucketTableScan(
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
index 83d51f302e..07ec7d165e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
 import org.apache.paimon.append.UnawareAppendCompactionTask;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.compact.UnawareBucketCompactor;
 import org.apache.paimon.options.Options;
@@ -56,7 +57,7 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
             
LoggerFactory.getLogger(AppendOnlyMultiTableCompactionWorkerOperator.class);
 
     private final String commitUser;
-    private final Catalog.Loader catalogLoader;
+    private final CatalogLoader catalogLoader;
 
     // support multi table compaction
     private transient Map<Identifier, UnawareBucketCompactor> 
compactorContainer;
@@ -67,7 +68,7 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
 
     private AppendOnlyMultiTableCompactionWorkerOperator(
             StreamOperatorParameters<MultiTableCommittable> parameters,
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             String commitUser,
             Options options) {
         super(parameters, options);
@@ -188,9 +189,9 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
                     MultiTableUnawareAppendCompactionTask, 
MultiTableCommittable> {
 
         private final String commitUser;
-        private final Catalog.Loader catalogLoader;
+        private final CatalogLoader catalogLoader;
 
-        public Factory(Catalog.Loader catalogLoader, String commitUser, 
Options options) {
+        public Factory(CatalogLoader catalogLoader, String commitUser, Options 
options) {
             super(options);
             this.commitUser = commitUser;
             this.catalogLoader = catalogLoader;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index 25f76ce976..53f1bf165d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.manifest.WrappedManifestCommittable;
 import org.apache.paimon.options.Options;
 
@@ -55,14 +55,14 @@ public class CombinedTableCompactorSink implements 
Serializable {
     private static final String WRITER_NAME = "Writer";
     private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
 
-    private final Catalog.Loader catalogLoader;
+    private final CatalogLoader catalogLoader;
     private final boolean ignorePreviousFiles;
     private final boolean fullCompaction;
 
     private final Options options;
 
     public CombinedTableCompactorSink(
-            Catalog.Loader catalogLoader, Options options, boolean 
fullCompaction) {
+            CatalogLoader catalogLoader, Options options, boolean 
fullCompaction) {
         this.catalogLoader = catalogLoader;
         this.ignorePreviousFiles = false;
         this.fullCompaction = fullCompaction;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 58f6a38340..02a7e6c1b3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.utils.RuntimeContextUtils;
@@ -72,7 +73,7 @@ public class MultiTablesStoreCompactOperator
     private transient StoreSinkWriteState state;
     private transient DataFileMetaSerializer dataFileMetaSerializer;
 
-    private final Catalog.Loader catalogLoader;
+    private final CatalogLoader catalogLoader;
 
     protected Catalog catalog;
     protected Map<Identifier, FileStoreTable> tables;
@@ -81,7 +82,7 @@ public class MultiTablesStoreCompactOperator
 
     private MultiTablesStoreCompactOperator(
             StreamOperatorParameters<MultiTableCommittable> parameters,
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             String initialCommitUser,
             CheckpointConfig checkpointConfig,
             boolean isStreaming,
@@ -324,7 +325,7 @@ public class MultiTablesStoreCompactOperator
     /** {@link StreamOperatorFactory} of {@link 
MultiTablesStoreCompactOperator}. */
     public static class Factory
             extends PrepareCommitOperator.Factory<RowData, 
MultiTableCommittable> {
-        private final Catalog.Loader catalogLoader;
+        private final CatalogLoader catalogLoader;
         private final CheckpointConfig checkpointConfig;
         private final boolean isStreaming;
         private final boolean ignorePreviousFiles;
@@ -332,7 +333,7 @@ public class MultiTablesStoreCompactOperator
         private final String initialCommitUser;
 
         public Factory(
-                Catalog.Loader catalogLoader,
+                CatalogLoader catalogLoader,
                 String initialCommitUser,
                 CheckpointConfig checkpointConfig,
                 boolean isStreaming,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 537a98f97f..01acddb9ad 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.WrappedManifestCommittable;
@@ -56,12 +57,12 @@ public class StoreMultiCommitter
     private final boolean ignoreEmptyCommit;
     private final Map<String, String> dynamicOptions;
 
-    public StoreMultiCommitter(Catalog.Loader catalogLoader, Context context) {
+    public StoreMultiCommitter(CatalogLoader catalogLoader, Context context) {
         this(catalogLoader, context, false, Collections.emptyMap());
     }
 
     public StoreMultiCommitter(
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Context context,
             boolean ignoreEmptyCommit,
             Map<String, String> dynamicOptions) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
index dbdf776014..d190b9ccf3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
@@ -58,8 +58,11 @@ public class PartitionListeners implements Closeable {
             throws Exception {
         List<PartitionListener> listeners = new ArrayList<>();
 
-        ReportHmsListener.create(context.isRestored(), context.stateStore(), 
table)
+        // partition statistics reporter
+        ReportPartStatsListener.create(context.isRestored(), 
context.stateStore(), table)
                 .ifPresent(listeners::add);
+
+        // partition mark done
         PartitionMarkDone.create(
                         context.streamingCheckpointEnabled(),
                         context.isRestored(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
similarity index 85%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
index 853dc52c20..b75889d567 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/HmsReporter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
@@ -40,22 +40,24 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.paimon.catalog.Catalog.HIVE_LAST_UPDATE_TIME_PROP;
+import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
 import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
 import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
 import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
 import static 
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
 
 /** Action to report the table statistic from the latest snapshot to HMS. */
-public class HmsReporter implements Closeable {
+public class PartitionStatisticsReporter implements Closeable {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(HmsReporter.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionStatisticsReporter.class);
+
+    private static final String HIVE_LAST_UPDATE_TIME_PROP = 
"transient_lastDdlTime";
 
     private final MetastoreClient metastoreClient;
     private final SnapshotReader snapshotReader;
     private final SnapshotManager snapshotManager;
 
-    public HmsReporter(FileStoreTable table, MetastoreClient client) {
+    public PartitionStatisticsReporter(FileStoreTable table, MetastoreClient 
client) {
         this.metastoreClient =
                 Preconditions.checkNotNull(client, "the metastore client 
factory is null");
         this.snapshotReader = table.newSnapshotReader();
@@ -90,7 +92,12 @@ public class HmsReporter implements Closeable {
             statistic.put(NUM_FILES_PROP, String.valueOf(fileCount));
             statistic.put(TOTAL_SIZE_PROP, String.valueOf(totalSize));
             statistic.put(NUM_ROWS_PROP, String.valueOf(rowCount));
-            statistic.put(HIVE_LAST_UPDATE_TIME_PROP, 
String.valueOf(modifyTime / 1000));
+
+            String modifyTimeSeconds = String.valueOf(modifyTime / 1000);
+            statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+
+            // just for being compatible with hive metastore
+            statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
 
             LOG.info("alter partition {} with statistic {}.", partitionSpec, 
statistic);
             metastoreClient.alterPartition(partitionSpec, statistic, 
modifyTime, true);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
similarity index 91%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
index 842dd012e8..ca51c3df5b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportHmsListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
@@ -49,7 +49,7 @@ import java.util.Set;
  * This listener will collect data from the newly touched partition and then 
decide when to trigger
  * a report based on the partition's idle time.
  */
-public class ReportHmsListener implements PartitionListener {
+public class ReportPartStatsListener implements PartitionListener {
 
     @SuppressWarnings("unchecked")
     private static final ListStateDescriptor<Map<String, Long>> 
PENDING_REPORT_STATE_DESC =
@@ -58,20 +58,20 @@ public class ReportHmsListener implements PartitionListener 
{
                     new MapSerializer<>(StringSerializer.INSTANCE, 
LongSerializer.INSTANCE));
 
     private final InternalRowPartitionComputer partitionComputer;
-    private final HmsReporter hmsReporter;
+    private final PartitionStatisticsReporter partitionStatisticsReporter;
     private final ListState<Map<String, Long>> pendingPartitionsState;
     private final Map<String, Long> pendingPartitions;
     private final long idleTime;
 
-    private ReportHmsListener(
+    private ReportPartStatsListener(
             InternalRowPartitionComputer partitionComputer,
-            HmsReporter hmsReporter,
+            PartitionStatisticsReporter partitionStatisticsReporter,
             OperatorStateStore store,
             boolean isRestored,
             long idleTime)
             throws Exception {
         this.partitionComputer = partitionComputer;
-        this.hmsReporter = hmsReporter;
+        this.partitionStatisticsReporter = partitionStatisticsReporter;
         this.pendingPartitionsState = 
store.getListState(PENDING_REPORT_STATE_DESC);
         this.pendingPartitions = new HashMap<>();
         if (isRestored) {
@@ -108,7 +108,7 @@ public class ReportHmsListener implements PartitionListener 
{
         try {
             Map<String, Long> partitions = reportPartition(endInput);
             for (Map.Entry<String, Long> entry : partitions.entrySet()) {
-                hmsReporter.report(entry.getKey(), entry.getValue());
+                partitionStatisticsReporter.report(entry.getKey(), 
entry.getValue());
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -138,7 +138,7 @@ public class ReportHmsListener implements PartitionListener 
{
         
pendingPartitionsState.update(Collections.singletonList(pendingPartitions));
     }
 
-    public static Optional<ReportHmsListener> create(
+    public static Optional<ReportPartStatsListener> create(
             boolean isRestored, OperatorStateStore stateStore, FileStoreTable 
table)
             throws Exception {
 
@@ -169,9 +169,9 @@ public class ReportHmsListener implements PartitionListener 
{
                         coreOptions.legacyPartitionName());
 
         return Optional.of(
-                new ReportHmsListener(
+                new ReportPartStatsListener(
                         partitionComputer,
-                        new HmsReporter(
+                        new PartitionStatisticsReporter(
                                 table,
                                 
table.catalogEnvironment().metastoreClientFactory().create()),
                         stateStore,
@@ -182,8 +182,8 @@ public class ReportHmsListener implements PartitionListener 
{
 
     @Override
     public void close() throws IOException {
-        if (hmsReporter != null) {
-            hmsReporter.close();
+        if (partitionStatisticsReporter != null) {
+            partitionStatisticsReporter.close();
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
index 415eddb037..ac6af6a14f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.source.operator.CombinedAwareBatchSource;
 import org.apache.paimon.flink.source.operator.CombinedAwareStreamingSource;
@@ -44,7 +44,8 @@ import java.util.regex.Pattern;
  * compactor jobs in combined mode.
  */
 public class CombinedTableCompactorSourceBuilder {
-    private final Catalog.Loader catalogLoader;
+
+    private final CatalogLoader catalogLoader;
     private final Pattern includingPattern;
     private final Pattern excludingPattern;
     private final Pattern databasePattern;
@@ -55,7 +56,7 @@ public class CombinedTableCompactorSourceBuilder {
     @Nullable private Duration partitionIdleTime = null;
 
     public CombinedTableCompactorSourceBuilder(
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern databasePattern,
             Pattern includingPattern,
             Pattern excludingPattern,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java
index c3a1258bb1..2f7a82c951 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.source.operator;
 
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.flink.compact.MultiAwareBucketTableScan;
 import org.apache.paimon.flink.compact.MultiTableScanBase;
 import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
@@ -54,7 +54,7 @@ public class CombinedAwareBatchSource extends 
CombinedCompactorSource<Tuple2<Spl
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CombinedAwareBatchSource.class);
 
     public CombinedAwareBatchSource(
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern) {
@@ -112,7 +112,7 @@ public class CombinedAwareBatchSource extends 
CombinedCompactorSource<Tuple2<Spl
             StreamExecutionEnvironment env,
             String name,
             TypeInformation<RowData> typeInfo,
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java
index 9bd4a84f57..a23a3b41a4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.source.operator;
 
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.flink.compact.MultiAwareBucketTableScan;
 import org.apache.paimon.flink.compact.MultiTableScanBase;
 import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
@@ -51,7 +51,7 @@ public class CombinedAwareStreamingSource extends 
CombinedCompactorSource<Tuple2
     private final long monitorInterval;
 
     public CombinedAwareStreamingSource(
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern,
@@ -107,7 +107,7 @@ public class CombinedAwareStreamingSource extends 
CombinedCompactorSource<Tuple2
             StreamExecutionEnvironment env,
             String name,
             TypeInformation<RowData> typeInfo,
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java
index f58d86cdd6..e292d2441c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.source.operator;
 
 import org.apache.paimon.append.UnawareAppendCompactionTask;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
 import org.apache.paimon.table.source.Split;
 
@@ -44,16 +44,17 @@ import java.util.regex.Pattern;
  * single (non-parallel) monitoring task, it is responsible for the new Paimon 
table.
  */
 public abstract class CombinedCompactorSource<T> extends 
AbstractNonCoordinatedSource<T> {
-    private static final long serialVersionUID = 2L;
 
-    protected final Catalog.Loader catalogLoader;
+    private static final long serialVersionUID = 3L;
+
+    protected final CatalogLoader catalogLoader;
     protected final Pattern includingPattern;
     protected final Pattern excludingPattern;
     protected final Pattern databasePattern;
     protected final boolean isStreaming;
 
     public CombinedCompactorSource(
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java
index 64f0c38f5a..5c0d9c42dd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source.operator;
 
 import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.compact.MultiTableScanBase;
@@ -63,7 +64,7 @@ public class CombinedUnawareBatchSource
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CombinedUnawareBatchSource.class);
 
     public CombinedUnawareBatchSource(
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern) {
@@ -121,7 +122,7 @@ public class CombinedUnawareBatchSource
     public static DataStream<MultiTableUnawareAppendCompactionTask> 
buildSource(
             StreamExecutionEnvironment env,
             String name,
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java
index 6ea1ead4db..2e38d538a9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.source.operator;
 
 import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
-import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.flink.compact.MultiTableScanBase;
 import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
 import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
@@ -48,7 +48,7 @@ public class CombinedUnawareStreamingSource
     private final long monitorInterval;
 
     public CombinedUnawareStreamingSource(
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern,
@@ -104,7 +104,7 @@ public class CombinedUnawareStreamingSource
     public static DataStream<MultiTableUnawareAppendCompactionTask> 
buildSource(
             StreamExecutionEnvironment env,
             String name,
-            Catalog.Loader catalogLoader,
+            CatalogLoader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
             Pattern databasePattern,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
index fbc8bb9d75..ae3099ec06 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.source.operator;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
@@ -60,18 +61,18 @@ public class MultiTablesReadOperator extends 
AbstractStreamOperator<RowData>
 
     private static final long serialVersionUID = 1L;
 
-    private final Catalog.Loader catalogLoader;
+    private final CatalogLoader catalogLoader;
     private final boolean isStreaming;
 
     private Duration partitionIdleTime = null;
 
-    public MultiTablesReadOperator(Catalog.Loader catalogLoader, boolean 
isStreaming) {
+    public MultiTablesReadOperator(CatalogLoader catalogLoader, boolean 
isStreaming) {
         this.catalogLoader = catalogLoader;
         this.isStreaming = isStreaming;
     }
 
     public MultiTablesReadOperator(
-            Catalog.Loader catalogLoader, boolean isStreaming, Duration 
partitionIdleTime) {
+            CatalogLoader catalogLoader, boolean isStreaming, Duration 
partitionIdleTime) {
         this.catalogLoader = catalogLoader;
         this.isStreaming = isStreaming;
         this.partitionIdleTime = partitionIdleTime;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
index 0864741a17..15fde93755 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source.operator;
 
 import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.PartitionEntry;
@@ -54,12 +55,11 @@ public class MultiUnawareTablesReadOperator
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MultiUnawareTablesReadOperator.class);
 
-    private final Catalog.Loader catalogLoader;
+    private final CatalogLoader catalogLoader;
 
     private final Duration partitionIdleTime;
 
-    public MultiUnawareTablesReadOperator(
-            Catalog.Loader catalogLoader, Duration partitionIdleTime) {
+    public MultiUnawareTablesReadOperator(CatalogLoader catalogLoader, 
Duration partitionIdleTime) {
         this.catalogLoader = catalogLoader;
         this.partitionIdleTime = partitionIdleTime;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index d487d75925..0e85f559d9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
@@ -273,7 +274,7 @@ public class CompactorSinkITCase extends AbstractTestBase {
     }
 
     protected MultiTablesStoreCompactOperator.Factory 
createMultiTablesCompactOperator(
-            Catalog.Loader catalogLoader) throws Exception {
+            CatalogLoader catalogLoader) throws Exception {
         return new MultiTablesStoreCompactOperator.Factory(
                 catalogLoader,
                 commitUser,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 752679fb59..53e3a6dcb7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
@@ -78,7 +79,7 @@ class StoreMultiCommitterTest {
 
     private String initialCommitUser;
     private Path warehouse;
-    private Catalog.Loader catalogLoader;
+    private CatalogLoader catalogLoader;
     private Catalog catalog;
     private Identifier firstTable;
     private Identifier secondTable;
@@ -691,7 +692,7 @@ class StoreMultiCommitterTest {
         return harness;
     }
 
-    private Catalog.Loader createCatalogLoader() {
+    private CatalogLoader createCatalogLoader() {
         Options catalogOptions = createCatalogOptions(warehouse);
         return () -> 
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
similarity index 95%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
index f245940da5..142a0c32f7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HmsReporterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
@@ -49,8 +49,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-/** Test for {@link HmsReporter}. */
-public class HmsReporterTest {
+/** Test for {@link PartitionStatisticsReporter}. */
+public class PartitionStatisticsReporterTest {
 
     @TempDir java.nio.file.Path tempDir;
 
@@ -131,7 +131,7 @@ public class HmsReporterTest {
                     }
                 };
 
-        HmsReporter action = new HmsReporter(table, client);
+        PartitionStatisticsReporter action = new 
PartitionStatisticsReporter(table, client);
         long time = 1729598544974L;
         action.report("c1=a/", time);
         Assertions.assertThat(partitionParams).containsKey("c1=a/");
@@ -144,6 +144,8 @@ public class HmsReporterTest {
                                 "591",
                                 "numRows",
                                 "1",
+                                "lastUpdateTime",
+                                String.valueOf(time / 1000),
                                 "transient_lastDdlTime",
                                 String.valueOf(time / 1000)));
         action.close();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
index fba5f33807..3b41f39431 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
@@ -662,7 +663,7 @@ public class MultiTablesCompactorSourceBuilderITCase 
extends AbstractTestBase
         return b;
     }
 
-    private Catalog.Loader catalogLoader() {
+    private CatalogLoader catalogLoader() {
         // to make the action workflow serializable
         catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
         return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index c74ede9815..5744ac894d 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -103,7 +103,7 @@ import static 
org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
 import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
 import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
 import static org.apache.paimon.hive.HiveTableUtils.convertToFormatTable;
-import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
@@ -872,8 +872,8 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    public boolean allowUpperCase() {
-        return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(false);
+    public boolean caseSensitive() {
+        return catalogOptions.getOptional(CASE_SENSITIVE).orElse(false);
     }
 
     @Override
@@ -931,7 +931,6 @@ public class HiveCatalog extends AbstractCatalog {
     public void repairTable(Identifier identifier) throws 
TableNotExistException {
         checkNotBranch(identifier, "repairTable");
         checkNotSystemTable(identifier, "repairTable");
-        validateIdentifierNameCaseInsensitive(identifier);
 
         Path location = getTableLocation(identifier);
         TableSchema tableSchema =
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index d6318c723f..de6e2414fc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -69,7 +69,7 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.FORMAT_TABLE;
-import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
 import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
@@ -96,9 +96,9 @@ public class SparkCatalog extends SparkBaseCatalog implements 
SupportFunction, S
         CatalogContext catalogContext =
                 CatalogContext.create(Options.fromMap(options), 
sessionState.newHadoopConf());
 
-        // if spark is case-insensitive, set allow upper case to catalog
+        // if spark is case-insensitive, set case-sensitive to catalog
         if (!sessionState.conf().caseSensitiveAnalysis()) {
-            newOptions.put(ALLOW_UPPER_CASE.key(), "true");
+            newOptions.put(CASE_SENSITIVE.key(), "true");
         }
         options = new CaseInsensitiveStringMap(newOptions);
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 6b7b17b1b1..b57228fa44 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -62,7 +62,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
-import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
+import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 import static org.apache.paimon.options.CatalogOptions.METASTORE;
 import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
 import static 
org.apache.paimon.spark.SparkCatalogOptions.CREATE_UNDERLYING_SESSION_CATALOG;
@@ -331,9 +331,9 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
             options.put(DEFAULT_DATABASE.key(), sessionCatalogDefaultDatabase);
         }
 
-        // if spark is case-insensitive, set allow upper case to catalog
+        // if spark is case-insensitive, set case-sensitive to catalog
         if (!sqlConf.caseSensitiveAnalysis()) {
-            options.put(ALLOW_UPPER_CASE.key(), "true");
+            options.put(CASE_SENSITIVE.key(), "true");
         }
     }
 


Reply via email to