This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 33518bb93 [AMORO-3800] Unify the configuration name for AMS URI (#3801)
33518bb93 is described below

commit 33518bb933f1b9045b099dceb619c399267bbac6
Author: ZhouJinsong <[email protected]>
AuthorDate: Sun Sep 28 15:47:48 2025 +0800

    [AMORO-3800] Unify the configuration name for AMS URI (#3801)
    
    * Unify the configuration name for AMS URI
    
    * Rollback some changes
    
    * Fix a unit test error
    
    * Fix some issues in unit tests
    
    * Fix some checkstyle errors
---
 .../java/org/apache/amoro/client/AmsThriftUrl.java |  1 -
 .../amoro/properties/CatalogMetaProperties.java    |  1 -
 .../java/org/apache/amoro/mixed/CatalogLoader.java | 32 +++++-----
 .../org/apache/amoro/catalog/CatalogTestBase.java  |  6 +-
 .../apache/amoro/flink/InternalCatalogBuilder.java | 24 ++++----
 .../amoro/flink/catalog/FlinkUnifiedCatalog.java   |  9 ++-
 .../apache/amoro/flink/catalog/MixedCatalog.java   |  6 +-
 .../catalog/factories/CatalogFactoryOptions.java   |  6 +-
 .../factories/FlinkUnifiedCatalogFactory.java      | 10 +--
 .../factories/mixed/MixedCatalogFactory.java       | 71 ----------------------
 .../factories/mixed/MixedHiveCatalogFactory.java   |  2 +-
 .../mixed/MixedIcebergCatalogFactory.java          | 42 ++++++++++++-
 .../flink/table/MixedDynamicTableFactory.java      |  6 +-
 .../amoro/flink/table/MixedFormatTableLoader.java  |  8 +--
 .../org.apache.flink.table.factories.Factory       |  1 -
 .../org/apache/amoro/flink/FlinkTableTestBase.java |  5 +-
 .../java/org/apache/amoro/flink/FlinkTestBase.java | 12 ++--
 .../amoro/flink/catalog/FlinkCatalogContext.java   |  4 +-
 .../flink/catalog/FlinkUnifiedCatalogITCase.java   |  4 +-
 .../amoro/flink/catalog/TestMixedCatalog.java      | 64 +------------------
 .../hybrid/reader/MixedIncrementalLoaderTest.java  |  6 +-
 .../org/apache/amoro/flink/table/LookupITCase.java |  6 +-
 .../amoro/flink/table/TestLookupSecondary.java     |  8 +--
 .../amoro/flink/write/FlinkTaskWriterBaseTest.java |  4 +-
 .../org.apache.flink.table.factories.Factory       |  4 +-
 .../amoro/spark/SparkUnifiedCatalogProperties.java |  4 +-
 .../apache/amoro/spark/mixed/ArcticDataSource.java | 33 ----------
 .../amoro/spark/mixed/MixedSparkCatalogBase.java   | 11 ++--
 ...org.apache.spark.sql.sources.DataSourceRegister |  1 -
 .../amoro/spark/test/MixedTableTestBase.java       |  7 ---
 .../org/apache/amoro/spark/test/SparkTestBase.java |  6 +-
 .../apache/amoro/spark/test/SparkTestContext.java  |  4 +-
 .../test/unified/UnifiedCatalogTestSuites.java     |  2 +-
 .../analysis/QueryWithConstraintCheck.scala        | 27 ++++----
 .../suites/api/TestKeyedTableDataFrameAPI.java     |  2 +-
 .../suites/api/TestUnKeyedTableDataFrameAPI.java   |  8 +--
 .../test/suites/catalog/TestSessionCatalog.java    |  2 +-
 .../suites/sql/TestMixedFormatSessionCatalog.java  | 16 ++---
 .../analysis/QueryWithConstraintCheck.scala        | 25 ++++----
 .../suites/api/TestKeyedTableDataFrameAPI.java     |  2 +-
 .../suites/api/TestUnKeyedTableDataFrameAPI.java   |  8 +--
 .../test/suites/catalog/TestSessionCatalog.java    |  2 +-
 .../suites/sql/TestMixedFormatSessionCatalog.java  | 16 ++---
 docs/engines/flink/flink-cdc-ingestion.md          |  2 +-
 docs/engines/flink/flink-ddl.md                    |  8 +--
 docs/engines/flink/flink-dml.md                    |  2 +-
 docs/engines/spark/spark-conf.md                   | 10 +--
 docs/engines/spark/spark-get-started.md            |  4 +-
 48 files changed, 209 insertions(+), 335 deletions(-)

diff --git 
a/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java 
b/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
index 07dafacea..233fb6e69 100644
--- a/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
+++ b/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
@@ -40,7 +40,6 @@ public class AmsThriftUrl {
   public static final String PARAM_SOCKET_TIMEOUT = "socketTimeout";
   public static final int DEFAULT_SOCKET_TIMEOUT = 5000;
   public static final String ZOOKEEPER_FLAG = "zookeeper";
-  public static final String THRIFT_FLAG = "thrift";
   public static final String THRIFT_URL_FORMAT = "thrift://%s:%d/%s%s";
   public static final int MAX_RETRIES = 3;
   private static final Logger logger = 
LoggerFactory.getLogger(AmsThriftUrl.class);
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/properties/CatalogMetaProperties.java
 
b/amoro-common/src/main/java/org/apache/amoro/properties/CatalogMetaProperties.java
index c56fcc3f9..31fb1ffb3 100644
--- 
a/amoro-common/src/main/java/org/apache/amoro/properties/CatalogMetaProperties.java
+++ 
b/amoro-common/src/main/java/org/apache/amoro/properties/CatalogMetaProperties.java
@@ -69,7 +69,6 @@ public class CatalogMetaProperties {
   public static final long CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT =
       TimeUnit.MINUTES.toMillis(5);
 
-  // only used for unified catalog
   public static final String AMS_URI = "ams.uri";
 
   // only used for engine properties
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java
index b285fec01..1afc5e31f 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java
@@ -38,7 +38,7 @@ import org.apache.iceberg.common.DynConstructors;
 import java.util.Map;
 import java.util.Set;
 
-/** Catalogs, create mixed-format catalog from metastore thrift url. */
+/** Catalogs, create mixed-format catalog from metastore thrift uri. */
 public class CatalogLoader {
 
   public static final String INTERNAL_CATALOG_IMPL = 
InternalMixedIcebergCatalog.class.getName();
@@ -48,27 +48,27 @@ public class CatalogLoader {
   /**
    * Entrypoint for loading Catalog.
    *
-   * @param catalogUrl mixed-format catalog url, 
thrift://ams-host:port/catalog_name
+   * @param catalogUri mixed-format catalog uri, 
thrift://ams-host:port/catalog_name
    * @param properties client side catalog configs
    * @return mixed-format catalog object
    */
-  public static MixedFormatCatalog load(String catalogUrl, Map<String, String> 
properties) {
-    AmsThriftUrl url = AmsThriftUrl.parse(catalogUrl, 
Constants.THRIFT_TABLE_SERVICE_NAME);
+  public static MixedFormatCatalog load(String catalogUri, Map<String, String> 
properties) {
+    AmsThriftUrl url = AmsThriftUrl.parse(catalogUri, 
Constants.THRIFT_TABLE_SERVICE_NAME);
     if (url.catalogName() == null || url.catalogName().contains("/")) {
       throw new IllegalArgumentException("invalid catalog name " + 
url.catalogName());
     }
 
-    return loadCatalog(catalogUrl, url.catalogName(), properties);
+    return loadCatalog(catalogUri, url.catalogName(), properties);
   }
 
   /**
    * Entrypoint for loading catalog.
    *
-   * @param catalogUrl mixed-format catalog url, 
thrift://ams-host:port/catalog_name
+   * @param catalogUri mixed-format catalog uri, 
thrift://ams-host:port/catalog_name
    * @return mixed-format catalog object
    */
-  public static MixedFormatCatalog load(String catalogUrl) {
-    return load(catalogUrl, Maps.newHashMap());
+  public static MixedFormatCatalog load(String catalogUri) {
+    return load(catalogUri, Maps.newHashMap());
   }
 
   /**
@@ -106,15 +106,15 @@ public class CatalogLoader {
   /**
    * Load catalog meta from metastore.
    *
-   * @param catalogUrl - catalog url
+   * @param catalogUri - catalog uri
    * @return catalog meta
    */
-  public static CatalogMeta loadMeta(String catalogUrl) {
-    AmsThriftUrl url = AmsThriftUrl.parse(catalogUrl, 
Constants.THRIFT_TABLE_SERVICE_NAME);
+  public static CatalogMeta loadMeta(String catalogUri) {
+    AmsThriftUrl url = AmsThriftUrl.parse(catalogUri, 
Constants.THRIFT_TABLE_SERVICE_NAME);
     if (url.catalogName() == null || url.catalogName().contains("/")) {
       throw new IllegalArgumentException("invalid catalog name " + 
url.catalogName());
     }
-    AmsClient client = new PooledAmsClient(catalogUrl);
+    AmsClient client = new PooledAmsClient(catalogUri);
     try {
       return client.getCatalog(url.catalogName());
     } catch (TException e) {
@@ -125,18 +125,18 @@ public class CatalogLoader {
   /**
    * Entrypoint for loading catalog
    *
-   * @param metaStoreUrl mixed-format metastore url
+   * @param metaStoreUri mixed-format metastore uri
    * @param catalogName mixed-format catalog name
    * @param properties client side catalog configs
    * @return mixed-format catalog object
    */
   private static MixedFormatCatalog loadCatalog(
-      String metaStoreUrl, String catalogName, Map<String, String> properties) 
{
-    AmsClient client = new PooledAmsClient(metaStoreUrl);
+      String metaStoreUri, String catalogName, Map<String, String> properties) 
{
+    AmsClient client = new PooledAmsClient(metaStoreUri);
     try {
       CatalogMeta catalogMeta = client.getCatalog(catalogName);
       String type = catalogMeta.getCatalogType();
-      catalogMeta.putToCatalogProperties(CatalogMetaProperties.AMS_URI, 
metaStoreUrl);
+      catalogMeta.putToCatalogProperties(CatalogMetaProperties.AMS_URI, 
metaStoreUri);
       MixedFormatCatalogUtil.mergeCatalogProperties(catalogMeta, properties);
       return createCatalog(
           catalogName,
diff --git 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/CatalogTestBase.java
 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/CatalogTestBase.java
index e39af36cf..1ccbc3496 100644
--- 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/CatalogTestBase.java
+++ 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/CatalogTestBase.java
@@ -74,16 +74,16 @@ public abstract class CatalogTestBase {
 
   protected MixedFormatCatalog getMixedFormatCatalog() {
     if (mixedFormatCatalog == null) {
-      mixedFormatCatalog = CatalogLoader.load(getCatalogUrl());
+      mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
     }
     return mixedFormatCatalog;
   }
 
   protected void refreshMixedFormatCatalog() {
-    this.mixedFormatCatalog = CatalogLoader.load(getCatalogUrl());
+    this.mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
   }
 
-  protected String getCatalogUrl() {
+  protected String getCatalogUri() {
     return TEST_AMS.getServerUrl() + "/" + catalogMeta.getCatalogName();
   }
 
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/InternalCatalogBuilder.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/InternalCatalogBuilder.java
index 089255df5..e31bc6605 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/InternalCatalogBuilder.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/InternalCatalogBuilder.java
@@ -50,13 +50,13 @@ import java.util.Map;
 public class InternalCatalogBuilder implements Serializable {
   private static final Logger LOG = 
LoggerFactory.getLogger(InternalCatalogBuilder.class);
 
-  private String metastoreUrl;
+  private String amsUri;
   private Map<String, String> properties = new HashMap<>(0);
   private String catalogName;
 
   private MixedFormatCatalog createMixedFormatCatalog() {
-    if (metastoreUrl != null) {
-      return CatalogLoader.load(metastoreUrl, properties);
+    if (amsUri != null) {
+      return CatalogLoader.load(amsUri, properties);
     } else {
       Preconditions.checkArgument(catalogName != null, "Catalog name cannot be 
empty");
       String metastoreType = 
properties.get(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE);
@@ -101,23 +101,25 @@ public class InternalCatalogBuilder implements 
Serializable {
     }
 
     if (!Strings.isNullOrEmpty(hadoopConfDir)) {
+      java.nio.file.Path hdfsSiteFile = Paths.get(hadoopConfDir, 
"hdfs-site.xml");
       Preconditions.checkState(
-          Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")),
+          Files.exists(hdfsSiteFile),
           "Failed to load Hadoop configuration: missing %s",
-          Paths.get(hadoopConfDir, "hdfs-site.xml"));
+          hdfsSiteFile);
       newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml"));
+      java.nio.file.Path coreSiteFile = Paths.get(hadoopConfDir, 
"core-site.xml");
       Preconditions.checkState(
-          Files.exists(Paths.get(hadoopConfDir, "core-site.xml")),
+          Files.exists(coreSiteFile),
           "Failed to load Hadoop configuration: missing %s",
-          Paths.get(hadoopConfDir, "core-site.xml"));
+          coreSiteFile);
       newConf.addResource(new Path(hadoopConfDir, "core-site.xml"));
     }
 
     return newConf;
   }
 
-  public String getMetastoreUrl() {
-    return metastoreUrl;
+  public String getAmsUri() {
+    return amsUri;
   }
 
   public Map<String, String> getProperties() {
@@ -134,8 +136,8 @@ public class InternalCatalogBuilder implements Serializable 
{
     return createMixedFormatCatalog();
   }
 
-  public InternalCatalogBuilder metastoreUrl(String metastoreUrl) {
-    this.metastoreUrl = metastoreUrl;
+  public InternalCatalogBuilder amsUri(String amsUri) {
+    this.amsUri = amsUri;
     return this;
   }
 
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
index 0b135d308..c56bb6c94 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
@@ -31,7 +31,8 @@ import org.apache.amoro.client.AmsThriftUrl;
 import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
 import org.apache.amoro.flink.catalog.factories.FlinkUnifiedCatalogFactory;
 import 
org.apache.amoro.flink.catalog.factories.iceberg.IcebergFlinkCatalogFactory;
-import org.apache.amoro.flink.catalog.factories.mixed.MixedCatalogFactory;
+import org.apache.amoro.flink.catalog.factories.mixed.MixedHiveCatalogFactory;
+import 
org.apache.amoro.flink.catalog.factories.mixed.MixedIcebergCatalogFactory;
 import 
org.apache.amoro.flink.catalog.factories.paimon.PaimonFlinkCatalogFactory;
 import org.apache.amoro.flink.table.UnifiedDynamicTableFactory;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
@@ -493,8 +494,10 @@ public class FlinkUnifiedCatalog extends AbstractCatalog {
   private AbstractCatalog createOriginalCatalog(
       TableIdentifier tableIdentifier, TableFormat tableFormat) {
     CatalogFactory catalogFactory;
-    if (tableFormat.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
-      catalogFactory = new MixedCatalogFactory();
+    if (tableFormat.equals(TableFormat.MIXED_ICEBERG)) {
+      catalogFactory = new MixedIcebergCatalogFactory();
+    } else if (tableFormat.equals(TableFormat.MIXED_HIVE)) {
+      catalogFactory = new MixedHiveCatalogFactory();
     } else if (tableFormat.equals(TableFormat.ICEBERG)) {
       catalogFactory = new IcebergFlinkCatalogFactory(hadoopConf);
     } else if (tableFormat.equals(TableFormat.PAIMON)) {
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
index 25f5d9586..3026f608b 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
@@ -102,8 +102,8 @@ public class MixedCatalog extends AbstractCatalog {
   public static final String DEFAULT_DB = "default";
 
   /**
-   * To distinguish 'CREATE TABLE LIKE' by checking stack {@link
-   * 
org.apache.flink.table.planner.operations.SqlCreateTableConverter#lookupLikeSourceTable}
+   * To distinguish 'CREATE TABLE LIKE' by checking stack
+   * 
org.apache.flink.table.planner.operations.SqlCreateTableConverter#lookupLikeSourceTable
    */
   public static final String SQL_LIKE_METHOD = "lookupLikeSourceTable";
 
@@ -242,7 +242,7 @@ public class MixedCatalog extends AbstractCatalog {
     properties.put(MixedFormatValidator.MIXED_FORMAT_CATALOG.key(), 
tableIdentifier.getCatalog());
     properties.put(MixedFormatValidator.MIXED_FORMAT_TABLE.key(), 
tableIdentifier.getTableName());
     properties.put(MixedFormatValidator.MIXED_FORMAT_DATABASE.key(), 
tableIdentifier.getDatabase());
-    properties.put(CatalogFactoryOptions.METASTORE_URL.key(), 
catalogBuilder.getMetastoreUrl());
+    properties.put(CatalogFactoryOptions.AMS_URI.key(), 
catalogBuilder.getAmsUri());
   }
 
   private static List<String> toPartitionKeys(PartitionSpec spec, Schema 
icebergSchema) {
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/CatalogFactoryOptions.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/CatalogFactoryOptions.java
index 464e32eb0..95e5888e7 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/CatalogFactoryOptions.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/CatalogFactoryOptions.java
@@ -22,6 +22,7 @@ import static 
org.apache.amoro.properties.CatalogMetaProperties.TABLE_FORMATS;
 
 import org.apache.amoro.flink.catalog.FlinkUnifiedCatalog;
 import org.apache.amoro.flink.catalog.MixedCatalog;
+import org.apache.amoro.properties.CatalogMetaProperties;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -31,11 +32,10 @@ import org.apache.flink.configuration.ConfigOptions;
 public class CatalogFactoryOptions {
   public static final String MIXED_ICEBERG_IDENTIFIER = "mixed_iceberg";
   public static final String MIXED_HIVE_IDENTIFIER = "mixed_hive";
-  @Deprecated public static final String LEGACY_MIXED_IDENTIFIER = "arctic";
   public static final String UNIFIED_IDENTIFIER = "unified";
 
-  public static final ConfigOption<String> METASTORE_URL =
-      ConfigOptions.key("metastore.url").stringType().noDefaultValue();
+  public static final ConfigOption<String> AMS_URI =
+      
ConfigOptions.key(CatalogMetaProperties.AMS_URI).stringType().noDefaultValue();
 
   public static final ConfigOption<String> FLINK_TABLE_FORMATS =
       ConfigOptions.key(TABLE_FORMATS)
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/FlinkUnifiedCatalogFactory.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/FlinkUnifiedCatalogFactory.java
index 5b1cfcf86..063666b17 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/FlinkUnifiedCatalogFactory.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/FlinkUnifiedCatalogFactory.java
@@ -77,16 +77,16 @@ public class FlinkUnifiedCatalogFactory implements 
CatalogFactory {
         context
             .getOptions()
             .getOrDefault(CommonCatalogOptions.DEFAULT_DATABASE_KEY, 
MixedCatalog.DEFAULT_DB);
-    final String metastoreUrl = 
context.getOptions().get(CatalogFactoryOptions.METASTORE_URL.key());
+    final String metastoreUri = 
context.getOptions().get(CatalogFactoryOptions.AMS_URI.key());
     final Map<String, String> catalogProperties = 
getCatalogProperties(context.getOptions());
 
     UnifiedCatalog unifiedCatalog;
-    if (metastoreUrl != null) {
+    if (metastoreUri != null) {
       String amoroCatalogName =
-          AmsThriftUrl.parse(metastoreUrl, 
THRIFT_TABLE_SERVICE_NAME).catalogName();
+          AmsThriftUrl.parse(metastoreUri, 
THRIFT_TABLE_SERVICE_NAME).catalogName();
       unifiedCatalog =
           UnifiedCatalogLoader.loadUnifiedCatalog(
-              metastoreUrl, amoroCatalogName, catalogProperties);
+              metastoreUri, amoroCatalogName, catalogProperties);
     } else {
       String metastoreType = 
catalogProperties.get(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE);
       Preconditions.checkArgument(metastoreType != null, "Catalog type cannot 
be empty");
@@ -105,7 +105,7 @@ public class FlinkUnifiedCatalogFactory implements 
CatalogFactory {
     validate(tableFormats);
 
     return new FlinkUnifiedCatalog(
-        metastoreUrl, defaultDatabase, unifiedCatalog, context, hadoopConf);
+        metastoreUri, defaultDatabase, unifiedCatalog, context, hadoopConf);
   }
 
   private void validate(Set<TableFormat> expectedFormats) {
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedCatalogFactory.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedCatalogFactory.java
deleted file mode 100644
index 2298c859b..000000000
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedCatalogFactory.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.flink.catalog.factories.mixed;
-
-import static org.apache.amoro.flink.table.OptionsUtil.getCatalogProperties;
-
-import org.apache.amoro.flink.InternalCatalogBuilder;
-import org.apache.amoro.flink.catalog.MixedCatalog;
-import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
-import org.apache.flink.table.factories.CatalogFactory;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-/** Factory for {@link MixedCatalog} */
-public class MixedCatalogFactory implements CatalogFactory {
-
-  @Override
-  public String factoryIdentifier() {
-    return CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER;
-  }
-
-  @Override
-  public Catalog createCatalog(Context context) {
-
-    final String defaultDatabase =
-        context
-            .getOptions()
-            .getOrDefault(CommonCatalogOptions.DEFAULT_DATABASE_KEY, 
MixedCatalog.DEFAULT_DB);
-    final String metastoreUrl = 
context.getOptions().get(CatalogFactoryOptions.METASTORE_URL.key());
-    final Map<String, String> catalogProperties = 
getCatalogProperties(context.getOptions());
-
-    final InternalCatalogBuilder catalogBuilder =
-        InternalCatalogBuilder.builder()
-            .metastoreUrl(metastoreUrl)
-            .catalogName(context.getName())
-            .properties(catalogProperties);
-
-    return new MixedCatalog(context.getName(), defaultDatabase, 
catalogBuilder);
-  }
-
-  @Override
-  public Set<ConfigOption<?>> requiredOptions() {
-    return Collections.emptySet();
-  }
-
-  @Override
-  public Set<ConfigOption<?>> optionalOptions() {
-    return Collections.emptySet();
-  }
-}
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedHiveCatalogFactory.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedHiveCatalogFactory.java
index 21e9664d8..d55eff1a2 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedHiveCatalogFactory.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedHiveCatalogFactory.java
@@ -25,7 +25,7 @@ import 
org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
  * The factory to create {@link MixedCatalog} with {@link
  * CatalogFactoryOptions#MIXED_HIVE_IDENTIFIER} identifier.
  */
-public class MixedHiveCatalogFactory extends MixedCatalogFactory {
+public class MixedHiveCatalogFactory extends MixedIcebergCatalogFactory {
 
   @Override
   public String factoryIdentifier() {
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedIcebergCatalogFactory.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedIcebergCatalogFactory.java
index 1d74a2208..b394e1eaa 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedIcebergCatalogFactory.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedIcebergCatalogFactory.java
@@ -18,17 +18,57 @@
 
 package org.apache.amoro.flink.catalog.factories.mixed;
 
+import static org.apache.amoro.flink.table.OptionsUtil.getCatalogProperties;
+
+import org.apache.amoro.flink.InternalCatalogBuilder;
 import org.apache.amoro.flink.catalog.MixedCatalog;
 import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.CatalogFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * The factory to create {@link MixedCatalog} with {@link
  * CatalogFactoryOptions#MIXED_ICEBERG_IDENTIFIER} identifier.
  */
-public class MixedIcebergCatalogFactory extends MixedCatalogFactory {
+public class MixedIcebergCatalogFactory implements CatalogFactory {
 
   @Override
   public String factoryIdentifier() {
     return CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER;
   }
+
+  @Override
+  public Catalog createCatalog(Context context) {
+
+    final String defaultDatabase =
+        context
+            .getOptions()
+            .getOrDefault(CommonCatalogOptions.DEFAULT_DATABASE_KEY, 
MixedCatalog.DEFAULT_DB);
+    final String amsUri = 
context.getOptions().get(CatalogFactoryOptions.AMS_URI.key());
+    final Map<String, String> catalogProperties = 
getCatalogProperties(context.getOptions());
+
+    final InternalCatalogBuilder catalogBuilder =
+        InternalCatalogBuilder.builder()
+            .amsUri(amsUri)
+            .catalogName(context.getName())
+            .properties(catalogProperties);
+
+    return new MixedCatalog(context.getName(), defaultDatabase, 
catalogBuilder);
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return Collections.emptySet();
+  }
 }
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedDynamicTableFactory.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedDynamicTableFactory.java
index 60f8f05a6..d9ca8ea5b 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedDynamicTableFactory.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedDynamicTableFactory.java
@@ -101,10 +101,10 @@ public class MixedDynamicTableFactory
       Preconditions.checkNotNull(
           actualCatalogName,
           String.format("%s should be set", 
MixedFormatValidator.MIXED_FORMAT_CATALOG.key()));
-      String metastoreUrl = options.get(CatalogFactoryOptions.METASTORE_URL);
+      String amsUri = options.get(CatalogFactoryOptions.AMS_URI);
       actualBuilder =
           InternalCatalogBuilder.builder()
-              .metastoreUrl(metastoreUrl)
+              .amsUri(amsUri)
               .catalogName(actualCatalogName)
               .properties(options.toMap());
     }
@@ -230,7 +230,7 @@ public class MixedDynamicTableFactory
     options.add(MixedFormatValidator.MIXED_FORMAT_TABLE);
     options.add(MixedFormatValidator.MIXED_FORMAT_DATABASE);
     options.add(MixedFormatValidator.DIM_TABLE_ENABLE);
-    options.add(CatalogFactoryOptions.METASTORE_URL);
+    options.add(CatalogFactoryOptions.AMS_URI);
 
     // lookup
     options.add(MixedFormatValidator.LOOKUP_CACHE_MAX_ROWS);
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
index 831efa5c6..d7282739f 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
@@ -62,20 +62,20 @@ public class MixedFormatTableLoader implements TableLoader {
 
   public static MixedFormatTableLoader of(
       TableIdentifier tableIdentifier, Map<String, String> 
flinkTableProperties) {
-    String metastoreUrl = 
flinkTableProperties.get(CatalogFactoryOptions.METASTORE_URL.key());
+    String metastoreUri = 
flinkTableProperties.get(CatalogFactoryOptions.AMS_URI.key());
     return new MixedFormatTableLoader(
         tableIdentifier,
-        InternalCatalogBuilder.builder().metastoreUrl(metastoreUrl),
+        InternalCatalogBuilder.builder().amsUri(metastoreUri),
         flinkTableProperties);
   }
 
   public static MixedFormatTableLoader of(
       TableIdentifier tableIdentifier,
-      String metastoreUrl,
+      String metastoreUri,
       Map<String, String> flinkTableProperties) {
     return new MixedFormatTableLoader(
         tableIdentifier,
-        InternalCatalogBuilder.builder().metastoreUrl(metastoreUrl),
+        InternalCatalogBuilder.builder().amsUri(metastoreUri),
         flinkTableProperties);
   }
 
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 3125d8dfa..ff59f0f44 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -18,6 +18,5 @@
 
 org.apache.amoro.flink.catalog.factories.mixed.MixedIcebergCatalogFactory
 org.apache.amoro.flink.catalog.factories.mixed.MixedHiveCatalogFactory
-org.apache.amoro.flink.catalog.factories.mixed.MixedCatalogFactory
 org.apache.amoro.flink.catalog.factories.FlinkUnifiedCatalogFactory
 org.apache.amoro.flink.table.MixedDynamicTableFactory
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTableTestBase.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTableTestBase.java
index c9e1a60f4..de8d1be1d 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTableTestBase.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTableTestBase.java
@@ -98,12 +98,11 @@ public interface FlinkTableTestBase {
   }
 
   default MixedFormatTableLoader getTableLoader(
-      String catalogName, String metastoreUrl, MixedTable mixedTable) {
+      String catalogName, String amsUri, MixedTable mixedTable) {
     TableIdentifier identifier =
         TableIdentifier.of(
             catalogName, mixedTable.id().getDatabase(), 
mixedTable.id().getTableName());
-    InternalCatalogBuilder internalCatalogBuilder =
-        InternalCatalogBuilder.builder().metastoreUrl(metastoreUrl);
+    InternalCatalogBuilder internalCatalogBuilder = 
InternalCatalogBuilder.builder().amsUri(amsUri);
     return MixedFormatTableLoader.of(identifier, internalCatalogBuilder, 
mixedTable.properties());
   }
 }
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java
index 9a5504bdf..cd3501a07 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java
@@ -18,7 +18,7 @@
 
 package org.apache.amoro.flink;
 
-import static 
org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER;
+import static 
org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER;
 import static 
org.apache.amoro.flink.kafka.testutils.KafkaContainerTest.KAFKA_CONTAINER;
 import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED;
 
@@ -94,7 +94,7 @@ public class FlinkTestBase extends TableTestBase {
 
   @Rule public TestName name = new TestName();
 
-  public static String metastoreUrl;
+  public static String metastoreUri;
 
   protected static final int KAFKA_PARTITION_NUMS = 1;
 
@@ -120,14 +120,14 @@ public class FlinkTestBase extends TableTestBase {
 
   @Before
   public void before() throws Exception {
-    metastoreUrl = getCatalogUrl();
-    catalogBuilder = 
InternalCatalogBuilder.builder().metastoreUrl(metastoreUrl);
+    metastoreUri = getCatalogUri();
+    catalogBuilder = InternalCatalogBuilder.builder().amsUri(metastoreUri);
   }
 
   public void config() {
     props = Maps.newHashMap();
-    props.put("type", LEGACY_MIXED_IDENTIFIER);
-    props.put(CatalogFactoryOptions.METASTORE_URL.key(), metastoreUrl);
+    props.put("type", MIXED_ICEBERG_IDENTIFIER);
+    props.put(CatalogFactoryOptions.AMS_URI.key(), metastoreUri);
   }
 
   public static void prepare() throws Exception {
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkCatalogContext.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkCatalogContext.java
index 9e369266d..86d134322 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkCatalogContext.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkCatalogContext.java
@@ -18,7 +18,7 @@
 
 package org.apache.amoro.flink.catalog;
 
-import static 
org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions.METASTORE_URL;
+import static 
org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions.AMS_URI;
 import static 
org.apache.amoro.flink.table.descriptors.MixedFormatValidator.TABLE_FORMAT;
 
 import org.apache.amoro.TableFormat;
@@ -114,7 +114,7 @@ public class FlinkCatalogContext {
     TEST_AMS.getAmsHandler().dropCatalog(meta.getCatalogName());
     TEST_AMS.getAmsHandler().createCatalog(meta);
 
-    factoryOptions.put(METASTORE_URL.key(), TEST_AMS.getServerUrl() + "/" + 
meta.getCatalogName());
+    factoryOptions.put(AMS_URI.key(), TEST_AMS.getServerUrl() + "/" + 
meta.getCatalogName());
     final FactoryUtil.DefaultCatalogContext context =
         new FactoryUtil.DefaultCatalogContext(
             "FLINK_" + tableFormat,
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java
index 879e312df..6e9a654bf 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java
@@ -73,9 +73,7 @@ public class FlinkUnifiedCatalogITCase extends 
CatalogITCaseBase {
   @Before
   public void setup() throws Exception {
     String catalog = "unified_catalog";
-    exec(
-        "CREATE CATALOG %s WITH ('type'='unified', 'metastore.url'='%s')",
-        catalog, getCatalogUrl());
+    exec("CREATE CATALOG %s WITH ('type'='unified', 'ams.uri'='%s')", catalog, 
getCatalogUri());
     exec("USE CATALOG %s", catalog);
     exec("USE %s", tableTestHelper().id().getDatabase());
     Optional<Catalog> catalogOptional = getTableEnv().getCatalog(catalog);
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java
index b9195a2e5..5e4bb582b 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java
@@ -34,7 +34,6 @@ import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.CatalogTestBase;
 import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
-import org.apache.amoro.flink.table.MixedDynamicTableFactory;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.table.MixedTable;
@@ -92,9 +91,7 @@ public class TestMixedCatalog extends CatalogTestBase {
   @Parameterized.Parameters(name = "catalogFactoryType = {0}")
   public static Object[] parameters() {
     return new Object[] {
-      CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
-      CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER,
-      CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER
+      CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER, 
CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
     };
   }
 
@@ -110,7 +107,7 @@ public class TestMixedCatalog extends CatalogTestBase {
   public void before() throws Exception {
     props = Maps.newHashMap();
     props.put("type", catalogFactoryType);
-    props.put(CatalogFactoryOptions.METASTORE_URL.key(), getCatalogUrl());
+    props.put(CatalogFactoryOptions.AMS_URI.key(), getCatalogUri());
     sql("CREATE CATALOG " + catalogName + " WITH %s", toWithClause(props));
     sql("USE CATALOG " + catalogName);
     sql("CREATE DATABASE " + catalogName + "." + DB);
@@ -380,63 +377,6 @@ public class TestMixedCatalog extends CatalogTestBase {
     sql("DROP TABLE default_catalog.default_database." + TABLE);
   }
 
-  @Test
-  public void testDefaultCatalogDDLWithVirtualColumn() {
-    // this test only for LEGACY_MIXED_IDENTIFIER
-    if 
(catalogFactoryType.equals(CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER)) {
-      // create mixed-format table with only physical columns
-      sql(
-          "CREATE TABLE "
-              + catalogName
-              + "."
-              + DB
-              + "."
-              + TABLE
-              + " ("
-              + " id INT,"
-              + " t TIMESTAMP(6),"
-              + " PRIMARY KEY (id) NOT ENFORCED "
-              + ") PARTITIONED BY(t) "
-              + " WITH ("
-              + " 'connector' = 'mixed-format'"
-              + ")");
-
-      // insert values into mixed-format table
-      insertValue();
-
-      // create Table with compute columns under default catalog
-      props = Maps.newHashMap();
-      props.put("connector", MixedDynamicTableFactory.IDENTIFIER);
-      props.put(CatalogFactoryOptions.METASTORE_URL.key(), getCatalogUrl());
-      props.put(MixedDynamicTableFactory.IDENTIFIER + ".catalog", catalogName);
-      props.put(MixedDynamicTableFactory.IDENTIFIER + ".database", DB);
-      props.put(MixedDynamicTableFactory.IDENTIFIER + ".table", TABLE);
-
-      sql(
-          "CREATE TABLE default_catalog.default_database."
-              + TABLE
-              + " ("
-              + " id INT,"
-              + " t TIMESTAMP(6),"
-              + " compute_id as id+5 ,"
-              + " proc as PROCTIME(), "
-              + " PRIMARY KEY (id) NOT ENFORCED "
-              + ") PARTITIONED BY(t) "
-              + "WITH %s",
-          toWithClause(props));
-
-      // select from mixed-format table with compute columns under default 
catalog
-      List<Row> rows =
-          sql(
-              "SELECT * FROM default_catalog.default_database."
-                  + TABLE
-                  + " /*+ OPTIONS("
-                  + "'streaming'='false'"
-                  + ") */");
-      checkRows(rows);
-    }
-  }
-
   private void checkRows(List<Row> rows) {
     Assert.assertEquals(1, rows.size());
     int id = (int) rows.get(0).getField("id");
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java
index b9d13e4cd..61da6d3e7 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java
@@ -116,7 +116,7 @@ public class MixedIncrementalLoaderTest extends 
TableTestBase implements FlinkTa
         Lists.newArrayList(Expressions.greaterThan("op_time", 
"2022-06-20T10:10:11.0"));
     ContinuousSplitPlanner morPlanner =
         new MergeOnReadIncrementalPlanner(
-            getTableLoader(getCatalogName(), getMetastoreUrl(), keyedTable));
+            getTableLoader(getCatalogName(), getMetastoreUri(), keyedTable));
 
     FlinkKeyedMORDataReader flinkKeyedMORDataReader =
         new FlinkKeyedMORDataReader(
@@ -161,8 +161,8 @@ public class MixedIncrementalLoaderTest extends 
TableTestBase implements FlinkTa
   }
 
   @Override
-  public String getMetastoreUrl() {
-    return getCatalogUrl();
+  public String getMetastoreUri() {
+    return getCatalogUri();
   }
 
   @Override
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java
index bff63d3ae..9a1a21e09 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java
@@ -65,7 +65,7 @@ public class LookupITCase extends CatalogITCaseBase 
implements FlinkTaskWriterBa
     }
     exec(
         "create catalog mixed_catalog with ('type'='arctic', 
'metastore.url'='%s')",
-        getCatalogUrl());
+        getCatalogUri());
     exec(
         "create table mixed_catalog.%s.L (id int) "
             + "with ('scan.startup.mode'='earliest', 'monitor-interval'='1 
s','streaming'='true')",
@@ -141,8 +141,8 @@ public class LookupITCase extends CatalogITCaseBase 
implements FlinkTaskWriterBa
   }
 
   @Override
-  public String getMetastoreUrl() {
-    return getCatalogUrl();
+  public String getMetastoreUri() {
+    return getCatalogUri();
   }
 
   @Override
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java
index 2795ca911..659f4e955 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java
@@ -67,8 +67,8 @@ public class TestLookupSecondary extends CatalogITCaseBase 
implements FlinkTaskW
       db = dbs.get(0);
     }
     exec(
-        "create catalog mixed_catalog with ('type'='arctic', 
'metastore.url'='%s')",
-        getCatalogUrl());
+        "create catalog mixed_catalog with ('type'='mixed_iceberg', 
'ams.uri'='%s')",
+        getCatalogUri());
     exec(
         "create table mixed_catalog.%s.L (id int) "
             + "with ('scan.startup.mode'='earliest', 'monitor-interval'='1 
s')",
@@ -143,8 +143,8 @@ public class TestLookupSecondary extends CatalogITCaseBase 
implements FlinkTaskW
   }
 
   @Override
-  public String getMetastoreUrl() {
-    return getCatalogUrl();
+  public String getMetastoreUri() {
+    return getCatalogUri();
   }
 
   @Override
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/FlinkTaskWriterBaseTest.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/FlinkTaskWriterBaseTest.java
index 93bc98269..610a7854b 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/FlinkTaskWriterBaseTest.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/FlinkTaskWriterBaseTest.java
@@ -92,7 +92,7 @@ public interface FlinkTaskWriterBaseTest extends 
FlinkTableTestBase {
     } else {
       records =
           recordsOfUnkeyedTable(
-              getTableLoader(getCatalogName(), getMetastoreUrl(), mixedTable),
+              getTableLoader(getCatalogName(), getMetastoreUri(), mixedTable),
               selectedSchema,
               flinkTableSchema);
     }
@@ -101,7 +101,7 @@ public interface FlinkTaskWriterBaseTest extends 
FlinkTableTestBase {
   }
 
   /** For asserting unkeyed table records. */
-  String getMetastoreUrl();
+  String getMetastoreUri();
 
   /** For asserting unkeyed table records. */
   String getCatalogName();
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 2a8b595d8..8105e2025 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,5 +16,7 @@
 # limitations under the License.
 #
 
-org.apache.amoro.flink.catalog.factories.mixed.MixedCatalogFactory
+org.apache.amoro.flink.catalog.factories.mixed.MixedIcebergCatalogFactory
+org.apache.amoro.flink.catalog.factories.mixed.MixedHiveCatalogFactory
+org.apache.amoro.flink.catalog.factories.FlinkUnifiedCatalogFactory
 org.apache.amoro.flink.table.MixedDynamicTableFactory
\ No newline at end of file
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogProperties.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogProperties.java
index c4f90e921..4cd337ec6 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogProperties.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogProperties.java
@@ -18,9 +18,11 @@
 
 package org.apache.amoro.spark;
 
+import org.apache.amoro.properties.CatalogMetaProperties;
+
 /** Unified catalog properties. */
 public class SparkUnifiedCatalogProperties {
 
   /** AMS URI, to load unified catalog information. */
-  public static final String URI = "uri";
+  public static final String URI = CatalogMetaProperties.AMS_URI;
 }
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/ArcticDataSource.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/ArcticDataSource.java
deleted file mode 100644
index a54ec70ba..000000000
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/ArcticDataSource.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.spark.mixed;
-
-/**
- * Mixed-format DataSource register
- *
- * @deprecated using {@link MixedHiveDataSource} or {@link 
MixedIcebergDataSource} instead.
- */
-@Deprecated
-public class ArcticDataSource extends MixedSourceBase {
-
-  @Override
-  public String shortName() {
-    return "arctic";
-  }
-}
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java
index b0d9f1cdf..5620b2f5f 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java
@@ -25,6 +25,7 @@ import static 
org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
 
 import org.apache.amoro.mixed.CatalogLoader;
 import org.apache.amoro.mixed.MixedFormatCatalog;
+import org.apache.amoro.properties.CatalogMetaProperties;
 import org.apache.amoro.shade.guava32.com.google.common.base.Joiner;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
@@ -71,12 +72,12 @@ public abstract class MixedSparkCatalogBase
     this.catalogName = name;
     Map<String, String> properties = Maps.newHashMap(options);
     if (tableMetaStore == null) {
-      String catalogUrl = options.get("url");
-      if (StringUtils.isBlank(catalogUrl)) {
-        catalogUrl = options.get("uri");
+      String catalogUri = options.get(CatalogMetaProperties.AMS_URI);
+      if (StringUtils.isBlank(catalogUri)) {
+        catalogUri = options.get("uri");
       }
-      if (catalogUrl != null) {
-        catalog = CatalogLoader.load(catalogUrl, properties);
+      if (catalogUri != null) {
+        catalog = CatalogLoader.load(catalogUri, properties);
       } else {
         Configuration localConfiguration =
             SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name);
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 47f0d565e..9410c0dc0 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -16,6 +16,5 @@
 # limitations under the License.
 #
 
-org.apache.amoro.spark.mixed.ArcticDataSource
 org.apache.amoro.spark.mixed.MixedHiveDataSource
 org.apache.amoro.spark.mixed.MixedIcebergDataSource
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/MixedTableTestBase.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/MixedTableTestBase.java
index 33b89a5e2..b0ea31bae 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/MixedTableTestBase.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/MixedTableTestBase.java
@@ -21,7 +21,6 @@ package org.apache.amoro.spark.test;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.mixed.CatalogLoader;
 import org.apache.amoro.mixed.MixedFormatCatalog;
-import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.table.MixedTable;
 import org.apache.amoro.table.PrimaryKeySpec;
@@ -67,12 +66,6 @@ public class MixedTableTestBase extends SparkTestBase {
     return catalog().loadTable(target().toAmoroIdentifier());
   }
 
-  public String provider(TableFormat format) {
-    Preconditions.checkArgument(
-        format == TableFormat.MIXED_HIVE || format == 
TableFormat.MIXED_ICEBERG);
-    return "arctic";
-  }
-
   public MixedTable createMixedFormatSource(Schema schema, 
Consumer<TableBuilder> consumer) {
     TestIdentifier identifier =
         TestIdentifier.ofDataLake(currentCatalog, catalog().name(), 
database(), sourceTable, true);
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestBase.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestBase.java
index 03478b197..d2a86bf39 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestBase.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestBase.java
@@ -82,7 +82,7 @@ public class SparkTestBase {
     return ImmutableMap.of(
         "spark.sql.catalog.spark_catalog",
         SparkTestContext.SESSION_CATALOG_IMPL,
-        "spark.sql.catalog.spark_catalog.uri",
+        "spark.sql.catalog.spark_catalog.ams.uri",
         CONTEXT.amsCatalogUrl(TableFormat.MIXED_HIVE));
   }
 
@@ -129,9 +129,9 @@ public class SparkTestBase {
   protected String sparkCatalogToAMSCatalog(String sparkCatalog) {
     String uri = null;
     try {
-      uri = spark().conf().get("spark.sql.catalog." + sparkCatalog + ".uri");
+      uri = spark().conf().get("spark.sql.catalog." + sparkCatalog + 
".ams.uri");
     } catch (NoSuchElementException e) {
-      uri = spark().conf().get("spark.sql.catalog." + sparkCatalog + ".url");
+      uri = spark().conf().get("spark.sql.catalog." + sparkCatalog + ".uri");
     }
     AmsThriftUrl catalogUri = AmsThriftUrl.parse(uri, 
Constants.THRIFT_TABLE_SERVICE_NAME);
     return catalogUri.catalogName();
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
index 20bdfa004..0edfd3ec0 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
@@ -219,13 +219,13 @@ public class SparkTestContext {
   private void addMixedSparkCatalog(
       Map<String, String> configs, String catalogName, TableFormat format) {
     configs.put("spark.sql.catalog." + catalogName, MIXED_CATALOG_IMPL);
-    configs.put("spark.sql.catalog." + catalogName + ".uri", 
amsCatalogUrl(format));
+    configs.put("spark.sql.catalog." + catalogName + ".ams.uri", 
amsCatalogUrl(format));
   }
 
   private void addUnifiedSparkCatalog(
       Map<String, String> configs, String catalogName, TableFormat format) {
     configs.put("spark.sql.catalog." + catalogName, UNIFIED_CATALOG_IMP);
-    configs.put("spark.sql.catalog." + catalogName + ".uri", 
amsCatalogUrl(format));
+    configs.put("spark.sql.catalog." + catalogName + ".ams.uri", 
amsCatalogUrl(format));
   }
 
   private boolean isSameSparkConf(Map<String, String> sparkConf) {
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
index d87fcf890..1fe08e132 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
@@ -45,7 +45,7 @@ public class UnifiedCatalogTestSuites extends SparkTestBase {
     return ImmutableMap.of(
         "spark.sql.catalog.spark_catalog",
         "org.apache.amoro.spark.SparkUnifiedSessionCatalog",
-        "spark.sql.catalog.spark_catalog.uri",
+        "spark.sql.catalog.spark_catalog.ams.uri",
         CONTEXT.amsCatalogUrl(null));
   }
 
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
index c5990c743..0a3a1feaa 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
@@ -69,26 +69,27 @@ case class QueryWithConstraintCheck(spark: SparkSession) 
extends Rule[LogicalPla
       c.copy(query = checkDataQuery)
   }
 
-  def checkDuplicatesEnabled(): Boolean = {
+  private def checkDuplicatesEnabled(): Boolean = {
     java.lang.Boolean.valueOf(spark.sessionState.conf.getConfString(
       SparkSQLProperties.CHECK_SOURCE_DUPLICATES_ENABLE,
       SparkSQLProperties.CHECK_SOURCE_DUPLICATES_ENABLE_DEFAULT))
   }
 
-  def isCreateKeyedTable(catalog: CatalogPlugin, tableSpec: TableSpec): 
Boolean = {
+  private def isCreateKeyedTable(catalog: CatalogPlugin, tableSpec: 
TableSpec): Boolean = {
     catalog match {
-      case _: MixedFormatSparkCatalog =>
-        tableSpec.provider.isDefined && 
tableSpec.provider.get.equalsIgnoreCase(
-          "arctic") && tableSpec.properties.contains("primary.keys")
-      case _: MixedFormatSparkSessionCatalog[_] =>
-        tableSpec.provider.isDefined && 
tableSpec.provider.get.equalsIgnoreCase(
-          "arctic") && tableSpec.properties.contains("primary.keys")
-      case _ =>
-        false
+      case _: MixedFormatSparkCatalog | _: MixedFormatSparkSessionCatalog[_]
+          if tableSpec.provider.exists(isMixedFormatProvider) &&
+            tableSpec.properties.contains("primary.keys") => true
+      case _ => false
     }
   }
 
-  def buildValidatePrimaryKeyDuplication(
+  private def isMixedFormatProvider(provider: String): Boolean = {
+    val providerLower = provider.toLowerCase
+    providerLower == "mixed_iceberg" || providerLower == "mixed_hive"
+  }
+
+  private def buildValidatePrimaryKeyDuplication(
       r: DataSourceV2Relation,
       query: LogicalPlan): LogicalPlan = {
     r.table match {
@@ -109,7 +110,7 @@ case class QueryWithConstraintCheck(spark: SparkSession) 
extends Rule[LogicalPla
     }
   }
 
-  def buildValidatePrimaryKeyDuplicationByPrimaries(
+  private def buildValidatePrimaryKeyDuplicationByPrimaries(
       primaries: Array[String],
       query: LogicalPlan): LogicalPlan = {
     val attributes = query.output.filter(p => primaries.contains(p.name))
@@ -122,7 +123,7 @@ case class QueryWithConstraintCheck(spark: SparkSession) 
extends Rule[LogicalPla
     Filter(havingExpr, aggPlan)
   }
 
-  protected def findOutputAttr(attrs: Seq[Attribute], attrName: String): 
Attribute = {
+  private def findOutputAttr(attrs: Seq[Attribute], attrName: String): 
Attribute = {
     attrs.find(attr => resolver(attr.name, attrName)).getOrElse {
       throw new UnsupportedOperationException(s"Cannot find $attrName in 
$attrs")
     }
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
index a63c161f2..827cf93dd 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
@@ -141,7 +141,7 @@ public class TestKeyedTableDataFrameAPI extends 
MixedTableTestBase {
         .option("overwrite-mode", "dynamic")
         .mode(SaveMode.Overwrite)
         .save(tablePath);
-    df = spark().read().format("arctic").load(tablePath);
+    df = spark().read().format(provider(format)).load(tablePath);
     Assert.assertEquals(3, df.count());
 
     df =
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
index 508de67a6..77ec05a1d 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
@@ -122,8 +122,8 @@ public class TestUnKeyedTableDataFrameAPI extends 
MixedTableTestBase {
                     RowFactory.create(2, "bbb", "bbb"),
                     RowFactory.create(3, "ccc", "ccc")),
                 structType);
-    df.write().format("arctic").partitionBy("day").save(tablePath);
-    df = spark().read().format("arctic").load(tablePath);
+    df.write().format(provider(format)).partitionBy("day").save(tablePath);
+    df = spark().read().format(format.name()).load(tablePath);
     Assertions.assertEquals(3, df.count());
 
     // test overwrite dynamic
@@ -136,12 +136,12 @@ public class TestUnKeyedTableDataFrameAPI extends 
MixedTableTestBase {
                     RowFactory.create(6, "ccc", "ccc")),
                 structType);
     df.write()
-        .format("arctic")
+        .format(provider(format))
         .partitionBy("day")
         .option("overwrite-mode", "dynamic")
         .mode(SaveMode.Overwrite)
         .save(tablePath);
-    df = spark().read().format("arctic").load(tablePath);
+    df = spark().read().format(provider(format)).load(tablePath);
     Assertions.assertEquals(5, df.count());
   }
 }
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
index 5785abec3..ae2a64eaf 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
@@ -52,7 +52,7 @@ public class TestSessionCatalog extends MixedTableTestBase {
     return ImmutableMap.of(
         "spark.sql.catalog.spark_catalog",
         SparkTestContext.SESSION_CATALOG_IMPL,
-        "spark.sql.catalog.spark_catalog.url",
+        "spark.sql.catalog.spark_catalog.ams.uri",
         CONTEXT.amsCatalogUrl(TableFormat.MIXED_ICEBERG));
   }
 
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
index f4945d5b6..85a309f21 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
@@ -58,9 +58,9 @@ public class TestMixedFormatSessionCatalog extends 
MixedTableTestBase {
 
   public static Stream<Arguments> testCreateTable() {
     return Stream.of(
-        Arguments.arguments("arctic", true, "", "hive comment"),
-        Arguments.arguments("arctic", false, "pt", "hive comment"),
-        Arguments.arguments("arctic", true, "pt", "hive comment"),
+        Arguments.arguments("mixed_iceberg", true, "", "hive comment"),
+        Arguments.arguments("mixed_iceberg", false, "pt", "hive comment"),
+        Arguments.arguments("mixed_iceberg", true, "pt", "hive comment"),
         Arguments.arguments("parquet", false, "pt", null),
         Arguments.arguments("parquet", false, "dt string", null));
   }
@@ -85,7 +85,7 @@ public class TestMixedFormatSessionCatalog extends 
MixedTableTestBase {
 
     sql(sqlText);
 
-    if ("arctic".equalsIgnoreCase(provider)) {
+    if ("mixed_iceberg".equalsIgnoreCase(provider)) {
       Assertions.assertTrue(tableExists());
     }
 
@@ -113,9 +113,9 @@ public class TestMixedFormatSessionCatalog extends 
MixedTableTestBase {
 
   public static Stream<Arguments> testCreateTableAsSelect() {
     return Stream.of(
-        Arguments.arguments("arctic", true, "", true, "hive comment"),
-        Arguments.arguments("arctic", false, "pt", true, "hive comment"),
-        Arguments.arguments("arctic", true, "pt", false, "hive comment"),
+        Arguments.arguments("mixed_iceberg", true, "", true, "hive comment"),
+        Arguments.arguments("mixed_iceberg", false, "pt", true, "hive 
comment"),
+        Arguments.arguments("mixed_iceberg", true, "pt", false, "hive 
comment"),
         Arguments.arguments("parquet", false, "pt", false, "hive comment"),
         Arguments.arguments("parquet", false, "", false, "hive comment"));
   }
@@ -142,7 +142,7 @@ public class TestMixedFormatSessionCatalog extends 
MixedTableTestBase {
     sqlText += " AS SELECT * FROM " + source();
 
     sql(sqlText);
-    if ("arctic".equalsIgnoreCase(provider)) {
+    if ("mixed_iceberg".equalsIgnoreCase(provider)) {
       Assertions.assertTrue(tableExists());
     }
 
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
index b5389e445..9cb38c73a 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
@@ -71,26 +71,27 @@ case class QueryWithConstraintCheck(spark: SparkSession) 
extends Rule[LogicalPla
       c.copy(query = checkDataQuery)
   }
 
-  def checkDuplicatesEnabled(): Boolean = {
+  private def checkDuplicatesEnabled(): Boolean = {
     java.lang.Boolean.valueOf(spark.sessionState.conf.getConfString(
       SparkSQLProperties.CHECK_SOURCE_DUPLICATES_ENABLE,
       SparkSQLProperties.CHECK_SOURCE_DUPLICATES_ENABLE_DEFAULT))
   }
 
-  def isCreateKeyedTable(catalog: CatalogPlugin, tableSpec: TableSpec): 
Boolean = {
+  private def isCreateKeyedTable(catalog: CatalogPlugin, tableSpec: 
TableSpec): Boolean = {
     catalog match {
-      case _: MixedFormatSparkCatalog =>
-        tableSpec.provider.isDefined && 
tableSpec.provider.get.equalsIgnoreCase(
-          "arctic") && tableSpec.properties.contains("primary.keys")
-      case _: MixedFormatSparkSessionCatalog[_] =>
-        tableSpec.provider.isDefined && 
tableSpec.provider.get.equalsIgnoreCase(
-          "arctic") && tableSpec.properties.contains("primary.keys")
-      case _ =>
-        false
+      case _: MixedFormatSparkCatalog | _: MixedFormatSparkSessionCatalog[_]
+          if tableSpec.provider.exists(isMixedFormatProvider) &&
+            tableSpec.properties.contains("primary.keys") => true
+      case _ => false
     }
   }
 
-  def buildValidatePrimaryKeyDuplication(
+  private def isMixedFormatProvider(provider: String): Boolean = {
+    val providerLower = provider.toLowerCase
+    providerLower == "mixed_iceberg" || providerLower == "mixed_hive"
+  }
+
+  private def buildValidatePrimaryKeyDuplication(
       r: DataSourceV2Relation,
       query: LogicalPlan): LogicalPlan = {
     r.table match {
@@ -111,7 +112,7 @@ case class QueryWithConstraintCheck(spark: SparkSession) 
extends Rule[LogicalPla
     }
   }
 
-  def buildValidatePrimaryKeyDuplicationByPrimaries(
+  private def buildValidatePrimaryKeyDuplicationByPrimaries(
       primaries: Array[String],
       query: LogicalPlan): LogicalPlan = {
     val attributes = query.output.filter(p => primaries.contains(p.name))
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
index a63c161f2..827cf93dd 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
@@ -141,7 +141,7 @@ public class TestKeyedTableDataFrameAPI extends 
MixedTableTestBase {
         .option("overwrite-mode", "dynamic")
         .mode(SaveMode.Overwrite)
         .save(tablePath);
-    df = spark().read().format("arctic").load(tablePath);
+    df = spark().read().format(provider(format)).load(tablePath);
     Assert.assertEquals(3, df.count());
 
     df =
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
index 508de67a6..5b2dc805b 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
@@ -122,8 +122,8 @@ public class TestUnKeyedTableDataFrameAPI extends 
MixedTableTestBase {
                     RowFactory.create(2, "bbb", "bbb"),
                     RowFactory.create(3, "ccc", "ccc")),
                 structType);
-    df.write().format("arctic").partitionBy("day").save(tablePath);
-    df = spark().read().format("arctic").load(tablePath);
+    df.write().format(provider(format)).partitionBy("day").save(tablePath);
+    df = spark().read().format(provider(format)).load(tablePath);
     Assertions.assertEquals(3, df.count());
 
     // test overwrite dynamic
@@ -136,12 +136,12 @@ public class TestUnKeyedTableDataFrameAPI extends 
MixedTableTestBase {
                     RowFactory.create(6, "ccc", "ccc")),
                 structType);
     df.write()
-        .format("arctic")
+        .format(provider(format))
         .partitionBy("day")
         .option("overwrite-mode", "dynamic")
         .mode(SaveMode.Overwrite)
         .save(tablePath);
-    df = spark().read().format("arctic").load(tablePath);
+    df = spark().read().format(provider(format)).load(tablePath);
     Assertions.assertEquals(5, df.count());
   }
 }
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
index 5785abec3..ae2a64eaf 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
@@ -52,7 +52,7 @@ public class TestSessionCatalog extends MixedTableTestBase {
     return ImmutableMap.of(
         "spark.sql.catalog.spark_catalog",
         SparkTestContext.SESSION_CATALOG_IMPL,
-        "spark.sql.catalog.spark_catalog.url",
+        "spark.sql.catalog.spark_catalog.ams.uri",
         CONTEXT.amsCatalogUrl(TableFormat.MIXED_ICEBERG));
   }
 
diff --git 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
index b12e78c1e..352168549 100644
--- 
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
+++ 
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
@@ -57,9 +57,9 @@ public class TestMixedFormatSessionCatalog extends 
MixedTableTestBase {
 
   public static Stream<Arguments> testCreateTable() {
     return Stream.of(
-        Arguments.arguments("arctic", true, ""),
-        Arguments.arguments("arctic", false, "pt"),
-        Arguments.arguments("arctic", true, "pt"),
+        Arguments.arguments("mixed_iceberg", true, ""),
+        Arguments.arguments("mixed_iceberg", false, "pt"),
+        Arguments.arguments("mixed_iceberg", true, "pt"),
         Arguments.arguments("parquet", false, "pt"),
         Arguments.arguments("parquet", false, "dt string"));
   }
@@ -80,7 +80,7 @@ public class TestMixedFormatSessionCatalog extends 
MixedTableTestBase {
 
     sql(sqlText);
 
-    if ("arctic".equalsIgnoreCase(provider)) {
+    if ("mixed_iceberg".equalsIgnoreCase(provider)) {
       Assertions.assertTrue(tableExists());
     }
 
@@ -105,9 +105,9 @@ public class TestMixedFormatSessionCatalog extends 
MixedTableTestBase {
 
   public static Stream<Arguments> testCreateTableAsSelect() {
     return Stream.of(
-        Arguments.arguments("arctic", true, "", true),
-        Arguments.arguments("arctic", false, "pt", true),
-        Arguments.arguments("arctic", true, "pt", false),
+        Arguments.arguments("mixed_iceberg", true, "", true),
+        Arguments.arguments("mixed_iceberg", false, "pt", true),
+        Arguments.arguments("mixed_iceberg", true, "pt", false),
         Arguments.arguments("parquet", false, "pt", false),
         Arguments.arguments("parquet", false, "", false));
   }
@@ -129,7 +129,7 @@ public class TestMixedFormatSessionCatalog extends 
MixedTableTestBase {
     sqlText += " AS SELECT * FROM " + source();
 
     sql(sqlText);
-    if ("arctic".equalsIgnoreCase(provider)) {
+    if ("mixed_iceberg".equalsIgnoreCase(provider)) {
       Assertions.assertTrue(tableExists());
     }
 
diff --git a/docs/engines/flink/flink-cdc-ingestion.md 
b/docs/engines/flink/flink-cdc-ingestion.md
index c79e311c8..bf1df9b11 100644
--- a/docs/engines/flink/flink-cdc-ingestion.md
+++ b/docs/engines/flink/flink-cdc-ingestion.md
@@ -94,7 +94,7 @@ CREATE TABLE products (
 
 CREATE CATALOG amoro_catalog WITH (
     'type'='amoro',
-    'metastore.url'='thrift://<ip>:<port>/<catalog_name_in_metastore>'
+    'ams.uri'='thrift://<ip>:<port>/<catalog_name_in_metastore>'
 ); 
 
 CREATE TABLE IF NOT EXISTS `amoro_catalog`.`db`.`test_tb`(
diff --git a/docs/engines/flink/flink-ddl.md b/docs/engines/flink/flink-ddl.md
index 3978e417a..220a68f3f 100644
--- a/docs/engines/flink/flink-ddl.md
+++ b/docs/engines/flink/flink-ddl.md
@@ -43,7 +43,7 @@ Where `<catalog_name>` is the user-defined name of the Flink 
catalog, and `<conf
 | Key              | Default Value | Type    | Required | Description          
                                                                                
                                                                                
                                                    |
 
|------------------|---------------|---------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | type             | N/A           | String  | Yes      | Catalog type, 
validate values are mixed_iceberg and mixed_hive                                
                                                                                
                                                           |
-| metastore.url    | (none)        | String  | No       | The URL for Amoro 
Metastore is thrift://`<ip>`:`<port>`/`<catalog_name_in_metastore>`.<br>If high 
availability is enabled for AMS, it can also be specified in the form of 
zookeeper://{zookeeper-server}/{cluster-name}/{catalog-name}. |
+| ams.uri          | (none)        | String  | No       | The URI for Amoro 
Metastore is thrift://`<ip>`:`<port>`/`<catalog_name_in_metastore>`.<br>If high 
availability is enabled for AMS, it can also be specified in the form of 
zookeeper://{zookeeper-server}/{cluster-name}/{catalog-name}. |
 | default-database | default       | String  | No       | The default database 
to use                                                                          
                                                                                
                                                    |
 | property-version | 1             | Integer | No       | Catalog properties 
version, this option is for future backward compatibility                       
                                                                                
                                                      |
 | catalog-type     | N/A           | String  | No       | Metastore type of 
the catalog, validate values are hadoop, hive, rest, custom                     
                                                                                
                                                       |
@@ -68,7 +68,7 @@ Modify the `conf/sql-client-defaults.yaml` file in the Flink 
directory.
 catalogs:
 - name: <catalog_name>
   type: mixed_iceberg
-  metastore.url: ...
+  ams.uri: ...
   ...
 ```
 
@@ -130,13 +130,13 @@ CREATE TABLE `test_table` (
     PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
     'connector' = 'mixed-format',
-    'metastore.url' = '',
+    'ams.uri' = '',
     'mixed_format.catalog' = '',
     'mixed_format.database' = '',
     'mixed_format.table' = ''
 );
 ```
-where `<metastore.url>` is the URL of the Amoro Metastore, and 
`mixed_format.catalog`, `mixed_format.database` and `mixed_format.table` are 
the catalog name, database name and table name of this table under the AMS, 
respectively.
+where `<ams.uri>` is the URI of the Amoro Metastore, and 
`mixed_format.catalog`, `mixed_format.database` and `mixed_format.table` are 
the catalog name, database name and table name of this table under the AMS, 
respectively.
 
 ### CREATE TABLE LIKE
 Create a table with the same table structure, partitions, and table properties 
as an existing table. This can be achieved using CREATE TABLE LIKE.
diff --git a/docs/engines/flink/flink-dml.md b/docs/engines/flink/flink-dml.md
index bdc9f7f21..e84425971 100644
--- a/docs/engines/flink/flink-dml.md
+++ b/docs/engines/flink/flink-dml.md
@@ -222,7 +222,7 @@ CREATE TEMPORARY TABLE Customers (
     zip STRING
 ) WITH (
     'connector' = 'mixed-format',
-    'metastore.url' = '',
+    'ams.uri' = '',
     'mixed-format.catalog' = '',
     'mixed-format.database' = '',
     'mixed-format.table' = '',
diff --git a/docs/engines/spark/spark-conf.md b/docs/engines/spark/spark-conf.md
index 6afdb65bc..3e99dcff1 100644
--- a/docs/engines/spark/spark-conf.md
+++ b/docs/engines/spark/spark-conf.md
@@ -36,7 +36,7 @@ metadata from AMS with following properties:
 
 ```properties
 spark.sql.catalog.mixed_catalog=org.apache.amoro.spark.MixedFormatSparkCatalog
-spark.sql.catalog.mixed_catalog.url=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
+spark.sql.catalog.mixed_catalog.ams.uri=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
 ```
 
 Or create a mixed_catalog with local configurations with following properties:
@@ -62,7 +62,7 @@ In this way, you don't need to use the `use {catalog}` 
command to switch the def
 ```properties
 spark.sql.defaultCatalog=mixed_catalog
 spark.sql.catalog.mixed_catalog=org.apache.amoro.spark.MixedFormatSparkCatalog
-spark.sql.catalog.mixed_catalog.url=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
+spark.sql.catalog.mixed_catalog.ams.uri=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
 ```
 
 In a standalone AmoroSparkCatalog scenario, only Mixed-Format tables can be 
created and accessed in the corresponding
@@ -76,7 +76,7 @@ The configuration method is as follows.
 
 ```properties
 
spark.sql.catalog.spark_catalog=org.apache.amoro.spark.MixedFormatSparkSessionCatalog
-spark.sql.catalog.spark_catalog.url=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
+spark.sql.catalog.spark_catalog.ams.uri=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
 ```
 
 When using the `MixedFormatSparkSessionCatalog` as the implementation of the 
`spark_catalog`, it behaves as follows
@@ -97,12 +97,12 @@ When using the `MixedFormatSparkSessionCatalog`, there are 
several points to kee
 
 ## The high availability configuration
 
-If AMS is configured with high availability, you can configure the 
`spark.sql.catalog.{catalog_name}.url` property in
+If AMS is configured with high availability, you can configure the 
`spark.sql.catalog.{catalog_name}.ams.uri` property in
 the following way to achieve higher availability.
 
 ```properties
 spark.sql.catalog.mixed_catalog=org.apache.amoro.spark.MixedFormatSparkCatalog
-spark.sql.catalog.mixed_catalog.url=zookeeper://{zookeeper-endpoint-list}/{cluster-name}/{catalog-name}
+spark.sql.catalog.mixed_catalog.ams.uri=zookeeper://{zookeeper-endpoint-list}/{cluster-name}/{catalog-name}
 ```
 
 Among above:
diff --git a/docs/engines/spark/spark-get-started.md 
b/docs/engines/spark/spark-get-started.md
index 7bc7dd3f4..6caa611f0 100644
--- a/docs/engines/spark/spark-get-started.md
+++ b/docs/engines/spark/spark-get-started.md
@@ -55,10 +55,10 @@ spark-shell --packages 
org.apache.amoro:amoro-mixed-spark-3.3-runtime:0.7.0
 ${SPARK_HOME}/bin/spark-sql \
     --conf 
spark.sql.extensions=org.apache.amoro.spark.MixedFormatSparkExtensions \
     --conf 
spark.sql.catalog.local_catalog=org.apache.amoro.spark.MixedFormatSparkCatalog \
-    --conf 
spark.sql.catalog.local_catalog.url=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME}
+    --conf 
spark.sql.catalog.local_catalog.ams.uri=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME}
 ```
 
-> Amoro manages the Catalog through AMS, and Spark catalog needs to be mapped 
to Amoro Catalog via URL,
+> Amoro manages the Catalog through AMS, and Spark catalog needs to be mapped 
to Amoro Catalog via URI,
 > in the following format:
 > `thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME}`,
 > The mixed-format-spark-connector will automatically download the Hadoop site 
 > configuration file through

Reply via email to