This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 33518bb93 [AMORO-3800] Unify the configuration name for AMS URI (#3801)
33518bb93 is described below
commit 33518bb933f1b9045b099dceb619c399267bbac6
Author: ZhouJinsong <[email protected]>
AuthorDate: Sun Sep 28 15:47:48 2025 +0800
[AMORO-3800] Unify the configuration name for AMS URI (#3801)
* Unify the configuration name for AMS URI
* Rollback some changes
* Fix a unit test error
* Fix some issues in unit tests
* Fix some checkstyle errors
---
.../java/org/apache/amoro/client/AmsThriftUrl.java | 1 -
.../amoro/properties/CatalogMetaProperties.java | 1 -
.../java/org/apache/amoro/mixed/CatalogLoader.java | 32 +++++-----
.../org/apache/amoro/catalog/CatalogTestBase.java | 6 +-
.../apache/amoro/flink/InternalCatalogBuilder.java | 24 ++++----
.../amoro/flink/catalog/FlinkUnifiedCatalog.java | 9 ++-
.../apache/amoro/flink/catalog/MixedCatalog.java | 6 +-
.../catalog/factories/CatalogFactoryOptions.java | 6 +-
.../factories/FlinkUnifiedCatalogFactory.java | 10 +--
.../factories/mixed/MixedCatalogFactory.java | 71 ----------------------
.../factories/mixed/MixedHiveCatalogFactory.java | 2 +-
.../mixed/MixedIcebergCatalogFactory.java | 42 ++++++++++++-
.../flink/table/MixedDynamicTableFactory.java | 6 +-
.../amoro/flink/table/MixedFormatTableLoader.java | 8 +--
.../org.apache.flink.table.factories.Factory | 1 -
.../org/apache/amoro/flink/FlinkTableTestBase.java | 5 +-
.../java/org/apache/amoro/flink/FlinkTestBase.java | 12 ++--
.../amoro/flink/catalog/FlinkCatalogContext.java | 4 +-
.../flink/catalog/FlinkUnifiedCatalogITCase.java | 4 +-
.../amoro/flink/catalog/TestMixedCatalog.java | 64 +------------------
.../hybrid/reader/MixedIncrementalLoaderTest.java | 6 +-
.../org/apache/amoro/flink/table/LookupITCase.java | 6 +-
.../amoro/flink/table/TestLookupSecondary.java | 8 +--
.../amoro/flink/write/FlinkTaskWriterBaseTest.java | 4 +-
.../org.apache.flink.table.factories.Factory | 4 +-
.../amoro/spark/SparkUnifiedCatalogProperties.java | 4 +-
.../apache/amoro/spark/mixed/ArcticDataSource.java | 33 ----------
.../amoro/spark/mixed/MixedSparkCatalogBase.java | 11 ++--
...org.apache.spark.sql.sources.DataSourceRegister | 1 -
.../amoro/spark/test/MixedTableTestBase.java | 7 ---
.../org/apache/amoro/spark/test/SparkTestBase.java | 6 +-
.../apache/amoro/spark/test/SparkTestContext.java | 4 +-
.../test/unified/UnifiedCatalogTestSuites.java | 2 +-
.../analysis/QueryWithConstraintCheck.scala | 27 ++++----
.../suites/api/TestKeyedTableDataFrameAPI.java | 2 +-
.../suites/api/TestUnKeyedTableDataFrameAPI.java | 8 +--
.../test/suites/catalog/TestSessionCatalog.java | 2 +-
.../suites/sql/TestMixedFormatSessionCatalog.java | 16 ++---
.../analysis/QueryWithConstraintCheck.scala | 25 ++++----
.../suites/api/TestKeyedTableDataFrameAPI.java | 2 +-
.../suites/api/TestUnKeyedTableDataFrameAPI.java | 8 +--
.../test/suites/catalog/TestSessionCatalog.java | 2 +-
.../suites/sql/TestMixedFormatSessionCatalog.java | 16 ++---
docs/engines/flink/flink-cdc-ingestion.md | 2 +-
docs/engines/flink/flink-ddl.md | 8 +--
docs/engines/flink/flink-dml.md | 2 +-
docs/engines/spark/spark-conf.md | 10 +--
docs/engines/spark/spark-get-started.md | 4 +-
48 files changed, 209 insertions(+), 335 deletions(-)
diff --git
a/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
b/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
index 07dafacea..233fb6e69 100644
--- a/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
+++ b/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
@@ -40,7 +40,6 @@ public class AmsThriftUrl {
public static final String PARAM_SOCKET_TIMEOUT = "socketTimeout";
public static final int DEFAULT_SOCKET_TIMEOUT = 5000;
public static final String ZOOKEEPER_FLAG = "zookeeper";
- public static final String THRIFT_FLAG = "thrift";
public static final String THRIFT_URL_FORMAT = "thrift://%s:%d/%s%s";
public static final int MAX_RETRIES = 3;
private static final Logger logger =
LoggerFactory.getLogger(AmsThriftUrl.class);
diff --git
a/amoro-common/src/main/java/org/apache/amoro/properties/CatalogMetaProperties.java
b/amoro-common/src/main/java/org/apache/amoro/properties/CatalogMetaProperties.java
index c56fcc3f9..31fb1ffb3 100644
---
a/amoro-common/src/main/java/org/apache/amoro/properties/CatalogMetaProperties.java
+++
b/amoro-common/src/main/java/org/apache/amoro/properties/CatalogMetaProperties.java
@@ -69,7 +69,6 @@ public class CatalogMetaProperties {
public static final long CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
- // only used for unified catalog
public static final String AMS_URI = "ams.uri";
// only used for engine properties
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java
index b285fec01..1afc5e31f 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java
@@ -38,7 +38,7 @@ import org.apache.iceberg.common.DynConstructors;
import java.util.Map;
import java.util.Set;
-/** Catalogs, create mixed-format catalog from metastore thrift url. */
+/** Catalogs, create mixed-format catalog from metastore thrift uri. */
public class CatalogLoader {
public static final String INTERNAL_CATALOG_IMPL =
InternalMixedIcebergCatalog.class.getName();
@@ -48,27 +48,27 @@ public class CatalogLoader {
/**
* Entrypoint for loading Catalog.
*
- * @param catalogUrl mixed-format catalog url,
thrift://ams-host:port/catalog_name
+ * @param catalogUri mixed-format catalog uri,
thrift://ams-host:port/catalog_name
* @param properties client side catalog configs
* @return mixed-format catalog object
*/
- public static MixedFormatCatalog load(String catalogUrl, Map<String, String>
properties) {
- AmsThriftUrl url = AmsThriftUrl.parse(catalogUrl,
Constants.THRIFT_TABLE_SERVICE_NAME);
+ public static MixedFormatCatalog load(String catalogUri, Map<String, String>
properties) {
+ AmsThriftUrl url = AmsThriftUrl.parse(catalogUri,
Constants.THRIFT_TABLE_SERVICE_NAME);
if (url.catalogName() == null || url.catalogName().contains("/")) {
throw new IllegalArgumentException("invalid catalog name " +
url.catalogName());
}
- return loadCatalog(catalogUrl, url.catalogName(), properties);
+ return loadCatalog(catalogUri, url.catalogName(), properties);
}
/**
* Entrypoint for loading catalog.
*
- * @param catalogUrl mixed-format catalog url,
thrift://ams-host:port/catalog_name
+ * @param catalogUri mixed-format catalog uri,
thrift://ams-host:port/catalog_name
* @return mixed-format catalog object
*/
- public static MixedFormatCatalog load(String catalogUrl) {
- return load(catalogUrl, Maps.newHashMap());
+ public static MixedFormatCatalog load(String catalogUri) {
+ return load(catalogUri, Maps.newHashMap());
}
/**
@@ -106,15 +106,15 @@ public class CatalogLoader {
/**
* Load catalog meta from metastore.
*
- * @param catalogUrl - catalog url
+ * @param catalogUri - catalog uri
* @return catalog meta
*/
- public static CatalogMeta loadMeta(String catalogUrl) {
- AmsThriftUrl url = AmsThriftUrl.parse(catalogUrl,
Constants.THRIFT_TABLE_SERVICE_NAME);
+ public static CatalogMeta loadMeta(String catalogUri) {
+ AmsThriftUrl url = AmsThriftUrl.parse(catalogUri,
Constants.THRIFT_TABLE_SERVICE_NAME);
if (url.catalogName() == null || url.catalogName().contains("/")) {
throw new IllegalArgumentException("invalid catalog name " +
url.catalogName());
}
- AmsClient client = new PooledAmsClient(catalogUrl);
+ AmsClient client = new PooledAmsClient(catalogUri);
try {
return client.getCatalog(url.catalogName());
} catch (TException e) {
@@ -125,18 +125,18 @@ public class CatalogLoader {
/**
* Entrypoint for loading catalog
*
- * @param metaStoreUrl mixed-format metastore url
+ * @param metaStoreUri mixed-format metastore uri
* @param catalogName mixed-format catalog name
* @param properties client side catalog configs
* @return mixed-format catalog object
*/
private static MixedFormatCatalog loadCatalog(
- String metaStoreUrl, String catalogName, Map<String, String> properties)
{
- AmsClient client = new PooledAmsClient(metaStoreUrl);
+ String metaStoreUri, String catalogName, Map<String, String> properties)
{
+ AmsClient client = new PooledAmsClient(metaStoreUri);
try {
CatalogMeta catalogMeta = client.getCatalog(catalogName);
String type = catalogMeta.getCatalogType();
- catalogMeta.putToCatalogProperties(CatalogMetaProperties.AMS_URI,
metaStoreUrl);
+ catalogMeta.putToCatalogProperties(CatalogMetaProperties.AMS_URI,
metaStoreUri);
MixedFormatCatalogUtil.mergeCatalogProperties(catalogMeta, properties);
return createCatalog(
catalogName,
diff --git
a/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/CatalogTestBase.java
b/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/CatalogTestBase.java
index e39af36cf..1ccbc3496 100644
---
a/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/CatalogTestBase.java
+++
b/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/CatalogTestBase.java
@@ -74,16 +74,16 @@ public abstract class CatalogTestBase {
protected MixedFormatCatalog getMixedFormatCatalog() {
if (mixedFormatCatalog == null) {
- mixedFormatCatalog = CatalogLoader.load(getCatalogUrl());
+ mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
}
return mixedFormatCatalog;
}
protected void refreshMixedFormatCatalog() {
- this.mixedFormatCatalog = CatalogLoader.load(getCatalogUrl());
+ this.mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
}
- protected String getCatalogUrl() {
+ protected String getCatalogUri() {
return TEST_AMS.getServerUrl() + "/" + catalogMeta.getCatalogName();
}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/InternalCatalogBuilder.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/InternalCatalogBuilder.java
index 089255df5..e31bc6605 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/InternalCatalogBuilder.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/InternalCatalogBuilder.java
@@ -50,13 +50,13 @@ import java.util.Map;
public class InternalCatalogBuilder implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(InternalCatalogBuilder.class);
- private String metastoreUrl;
+ private String amsUri;
private Map<String, String> properties = new HashMap<>(0);
private String catalogName;
private MixedFormatCatalog createMixedFormatCatalog() {
- if (metastoreUrl != null) {
- return CatalogLoader.load(metastoreUrl, properties);
+ if (amsUri != null) {
+ return CatalogLoader.load(amsUri, properties);
} else {
Preconditions.checkArgument(catalogName != null, "Catalog name cannot be
empty");
String metastoreType =
properties.get(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE);
@@ -101,23 +101,25 @@ public class InternalCatalogBuilder implements
Serializable {
}
if (!Strings.isNullOrEmpty(hadoopConfDir)) {
+ java.nio.file.Path hdfsSiteFile = Paths.get(hadoopConfDir,
"hdfs-site.xml");
Preconditions.checkState(
- Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")),
+ Files.exists(hdfsSiteFile),
"Failed to load Hadoop configuration: missing %s",
- Paths.get(hadoopConfDir, "hdfs-site.xml"));
+ hdfsSiteFile);
newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml"));
+ java.nio.file.Path coreSiteFile = Paths.get(hadoopConfDir,
"core-site.xml");
Preconditions.checkState(
- Files.exists(Paths.get(hadoopConfDir, "core-site.xml")),
+ Files.exists(coreSiteFile),
"Failed to load Hadoop configuration: missing %s",
- Paths.get(hadoopConfDir, "core-site.xml"));
+ coreSiteFile);
newConf.addResource(new Path(hadoopConfDir, "core-site.xml"));
}
return newConf;
}
- public String getMetastoreUrl() {
- return metastoreUrl;
+ public String getAmsUri() {
+ return amsUri;
}
public Map<String, String> getProperties() {
@@ -134,8 +136,8 @@ public class InternalCatalogBuilder implements Serializable
{
return createMixedFormatCatalog();
}
- public InternalCatalogBuilder metastoreUrl(String metastoreUrl) {
- this.metastoreUrl = metastoreUrl;
+ public InternalCatalogBuilder amsUri(String amsUri) {
+ this.amsUri = amsUri;
return this;
}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
index 0b135d308..c56bb6c94 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
@@ -31,7 +31,8 @@ import org.apache.amoro.client.AmsThriftUrl;
import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
import org.apache.amoro.flink.catalog.factories.FlinkUnifiedCatalogFactory;
import
org.apache.amoro.flink.catalog.factories.iceberg.IcebergFlinkCatalogFactory;
-import org.apache.amoro.flink.catalog.factories.mixed.MixedCatalogFactory;
+import org.apache.amoro.flink.catalog.factories.mixed.MixedHiveCatalogFactory;
+import
org.apache.amoro.flink.catalog.factories.mixed.MixedIcebergCatalogFactory;
import
org.apache.amoro.flink.catalog.factories.paimon.PaimonFlinkCatalogFactory;
import org.apache.amoro.flink.table.UnifiedDynamicTableFactory;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
@@ -493,8 +494,10 @@ public class FlinkUnifiedCatalog extends AbstractCatalog {
private AbstractCatalog createOriginalCatalog(
TableIdentifier tableIdentifier, TableFormat tableFormat) {
CatalogFactory catalogFactory;
- if (tableFormat.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
- catalogFactory = new MixedCatalogFactory();
+ if (tableFormat.equals(TableFormat.MIXED_ICEBERG)) {
+ catalogFactory = new MixedIcebergCatalogFactory();
+ } else if (tableFormat.equals(TableFormat.MIXED_HIVE)) {
+ catalogFactory = new MixedHiveCatalogFactory();
} else if (tableFormat.equals(TableFormat.ICEBERG)) {
catalogFactory = new IcebergFlinkCatalogFactory(hadoopConf);
} else if (tableFormat.equals(TableFormat.PAIMON)) {
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
index 25f5d9586..3026f608b 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
@@ -102,8 +102,8 @@ public class MixedCatalog extends AbstractCatalog {
public static final String DEFAULT_DB = "default";
/**
- * To distinguish 'CREATE TABLE LIKE' by checking stack {@link
- *
org.apache.flink.table.planner.operations.SqlCreateTableConverter#lookupLikeSourceTable}
+ * To distinguish 'CREATE TABLE LIKE' by checking stack
+ *
org.apache.flink.table.planner.operations.SqlCreateTableConverter#lookupLikeSourceTable
*/
public static final String SQL_LIKE_METHOD = "lookupLikeSourceTable";
@@ -242,7 +242,7 @@ public class MixedCatalog extends AbstractCatalog {
properties.put(MixedFormatValidator.MIXED_FORMAT_CATALOG.key(),
tableIdentifier.getCatalog());
properties.put(MixedFormatValidator.MIXED_FORMAT_TABLE.key(),
tableIdentifier.getTableName());
properties.put(MixedFormatValidator.MIXED_FORMAT_DATABASE.key(),
tableIdentifier.getDatabase());
- properties.put(CatalogFactoryOptions.METASTORE_URL.key(),
catalogBuilder.getMetastoreUrl());
+ properties.put(CatalogFactoryOptions.AMS_URI.key(),
catalogBuilder.getAmsUri());
}
private static List<String> toPartitionKeys(PartitionSpec spec, Schema
icebergSchema) {
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/CatalogFactoryOptions.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/CatalogFactoryOptions.java
index 464e32eb0..95e5888e7 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/CatalogFactoryOptions.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/CatalogFactoryOptions.java
@@ -22,6 +22,7 @@ import static
org.apache.amoro.properties.CatalogMetaProperties.TABLE_FORMATS;
import org.apache.amoro.flink.catalog.FlinkUnifiedCatalog;
import org.apache.amoro.flink.catalog.MixedCatalog;
+import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
@@ -31,11 +32,10 @@ import org.apache.flink.configuration.ConfigOptions;
public class CatalogFactoryOptions {
public static final String MIXED_ICEBERG_IDENTIFIER = "mixed_iceberg";
public static final String MIXED_HIVE_IDENTIFIER = "mixed_hive";
- @Deprecated public static final String LEGACY_MIXED_IDENTIFIER = "arctic";
public static final String UNIFIED_IDENTIFIER = "unified";
- public static final ConfigOption<String> METASTORE_URL =
- ConfigOptions.key("metastore.url").stringType().noDefaultValue();
+ public static final ConfigOption<String> AMS_URI =
+
ConfigOptions.key(CatalogMetaProperties.AMS_URI).stringType().noDefaultValue();
public static final ConfigOption<String> FLINK_TABLE_FORMATS =
ConfigOptions.key(TABLE_FORMATS)
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/FlinkUnifiedCatalogFactory.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/FlinkUnifiedCatalogFactory.java
index 5b1cfcf86..063666b17 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/FlinkUnifiedCatalogFactory.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/FlinkUnifiedCatalogFactory.java
@@ -77,16 +77,16 @@ public class FlinkUnifiedCatalogFactory implements
CatalogFactory {
context
.getOptions()
.getOrDefault(CommonCatalogOptions.DEFAULT_DATABASE_KEY,
MixedCatalog.DEFAULT_DB);
- final String metastoreUrl =
context.getOptions().get(CatalogFactoryOptions.METASTORE_URL.key());
+ final String metastoreUri =
context.getOptions().get(CatalogFactoryOptions.AMS_URI.key());
final Map<String, String> catalogProperties =
getCatalogProperties(context.getOptions());
UnifiedCatalog unifiedCatalog;
- if (metastoreUrl != null) {
+ if (metastoreUri != null) {
String amoroCatalogName =
- AmsThriftUrl.parse(metastoreUrl,
THRIFT_TABLE_SERVICE_NAME).catalogName();
+ AmsThriftUrl.parse(metastoreUri,
THRIFT_TABLE_SERVICE_NAME).catalogName();
unifiedCatalog =
UnifiedCatalogLoader.loadUnifiedCatalog(
- metastoreUrl, amoroCatalogName, catalogProperties);
+ metastoreUri, amoroCatalogName, catalogProperties);
} else {
String metastoreType =
catalogProperties.get(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE);
Preconditions.checkArgument(metastoreType != null, "Catalog type cannot
be empty");
@@ -105,7 +105,7 @@ public class FlinkUnifiedCatalogFactory implements
CatalogFactory {
validate(tableFormats);
return new FlinkUnifiedCatalog(
- metastoreUrl, defaultDatabase, unifiedCatalog, context, hadoopConf);
+ metastoreUri, defaultDatabase, unifiedCatalog, context, hadoopConf);
}
private void validate(Set<TableFormat> expectedFormats) {
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedCatalogFactory.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedCatalogFactory.java
deleted file mode 100644
index 2298c859b..000000000
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedCatalogFactory.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.flink.catalog.factories.mixed;
-
-import static org.apache.amoro.flink.table.OptionsUtil.getCatalogProperties;
-
-import org.apache.amoro.flink.InternalCatalogBuilder;
-import org.apache.amoro.flink.catalog.MixedCatalog;
-import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
-import org.apache.flink.table.factories.CatalogFactory;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-/** Factory for {@link MixedCatalog} */
-public class MixedCatalogFactory implements CatalogFactory {
-
- @Override
- public String factoryIdentifier() {
- return CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER;
- }
-
- @Override
- public Catalog createCatalog(Context context) {
-
- final String defaultDatabase =
- context
- .getOptions()
- .getOrDefault(CommonCatalogOptions.DEFAULT_DATABASE_KEY,
MixedCatalog.DEFAULT_DB);
- final String metastoreUrl =
context.getOptions().get(CatalogFactoryOptions.METASTORE_URL.key());
- final Map<String, String> catalogProperties =
getCatalogProperties(context.getOptions());
-
- final InternalCatalogBuilder catalogBuilder =
- InternalCatalogBuilder.builder()
- .metastoreUrl(metastoreUrl)
- .catalogName(context.getName())
- .properties(catalogProperties);
-
- return new MixedCatalog(context.getName(), defaultDatabase,
catalogBuilder);
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- return Collections.emptySet();
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- return Collections.emptySet();
- }
-}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedHiveCatalogFactory.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedHiveCatalogFactory.java
index 21e9664d8..d55eff1a2 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedHiveCatalogFactory.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedHiveCatalogFactory.java
@@ -25,7 +25,7 @@ import
org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
* The factory to create {@link MixedCatalog} with {@link
* CatalogFactoryOptions#MIXED_HIVE_IDENTIFIER} identifier.
*/
-public class MixedHiveCatalogFactory extends MixedCatalogFactory {
+public class MixedHiveCatalogFactory extends MixedIcebergCatalogFactory {
@Override
public String factoryIdentifier() {
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedIcebergCatalogFactory.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedIcebergCatalogFactory.java
index 1d74a2208..b394e1eaa 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedIcebergCatalogFactory.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedIcebergCatalogFactory.java
@@ -18,17 +18,57 @@
package org.apache.amoro.flink.catalog.factories.mixed;
+import static org.apache.amoro.flink.table.OptionsUtil.getCatalogProperties;
+
+import org.apache.amoro.flink.InternalCatalogBuilder;
import org.apache.amoro.flink.catalog.MixedCatalog;
import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.CatalogFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
/**
* The factory to create {@link MixedCatalog} with {@link
* CatalogFactoryOptions#MIXED_ICEBERG_IDENTIFIER} identifier.
*/
-public class MixedIcebergCatalogFactory extends MixedCatalogFactory {
+public class MixedIcebergCatalogFactory implements CatalogFactory {
@Override
public String factoryIdentifier() {
return CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER;
}
+
+ @Override
+ public Catalog createCatalog(Context context) {
+
+ final String defaultDatabase =
+ context
+ .getOptions()
+ .getOrDefault(CommonCatalogOptions.DEFAULT_DATABASE_KEY,
MixedCatalog.DEFAULT_DB);
+ final String amsUri =
context.getOptions().get(CatalogFactoryOptions.AMS_URI.key());
+ final Map<String, String> catalogProperties =
getCatalogProperties(context.getOptions());
+
+ final InternalCatalogBuilder catalogBuilder =
+ InternalCatalogBuilder.builder()
+ .amsUri(amsUri)
+ .catalogName(context.getName())
+ .properties(catalogProperties);
+
+ return new MixedCatalog(context.getName(), defaultDatabase,
catalogBuilder);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
+ }
}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedDynamicTableFactory.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedDynamicTableFactory.java
index 60f8f05a6..d9ca8ea5b 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedDynamicTableFactory.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedDynamicTableFactory.java
@@ -101,10 +101,10 @@ public class MixedDynamicTableFactory
Preconditions.checkNotNull(
actualCatalogName,
String.format("%s should be set",
MixedFormatValidator.MIXED_FORMAT_CATALOG.key()));
- String metastoreUrl = options.get(CatalogFactoryOptions.METASTORE_URL);
+ String amsUri = options.get(CatalogFactoryOptions.AMS_URI);
actualBuilder =
InternalCatalogBuilder.builder()
- .metastoreUrl(metastoreUrl)
+ .amsUri(amsUri)
.catalogName(actualCatalogName)
.properties(options.toMap());
}
@@ -230,7 +230,7 @@ public class MixedDynamicTableFactory
options.add(MixedFormatValidator.MIXED_FORMAT_TABLE);
options.add(MixedFormatValidator.MIXED_FORMAT_DATABASE);
options.add(MixedFormatValidator.DIM_TABLE_ENABLE);
- options.add(CatalogFactoryOptions.METASTORE_URL);
+ options.add(CatalogFactoryOptions.AMS_URI);
// lookup
options.add(MixedFormatValidator.LOOKUP_CACHE_MAX_ROWS);
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
index 831efa5c6..d7282739f 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
@@ -62,20 +62,20 @@ public class MixedFormatTableLoader implements TableLoader {
public static MixedFormatTableLoader of(
TableIdentifier tableIdentifier, Map<String, String>
flinkTableProperties) {
- String metastoreUrl =
flinkTableProperties.get(CatalogFactoryOptions.METASTORE_URL.key());
+ String metastoreUri =
flinkTableProperties.get(CatalogFactoryOptions.AMS_URI.key());
return new MixedFormatTableLoader(
tableIdentifier,
- InternalCatalogBuilder.builder().metastoreUrl(metastoreUrl),
+ InternalCatalogBuilder.builder().amsUri(metastoreUri),
flinkTableProperties);
}
public static MixedFormatTableLoader of(
TableIdentifier tableIdentifier,
- String metastoreUrl,
+ String metastoreUri,
Map<String, String> flinkTableProperties) {
return new MixedFormatTableLoader(
tableIdentifier,
- InternalCatalogBuilder.builder().metastoreUrl(metastoreUrl),
+ InternalCatalogBuilder.builder().amsUri(metastoreUri),
flinkTableProperties);
}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 3125d8dfa..ff59f0f44 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -18,6 +18,5 @@
org.apache.amoro.flink.catalog.factories.mixed.MixedIcebergCatalogFactory
org.apache.amoro.flink.catalog.factories.mixed.MixedHiveCatalogFactory
-org.apache.amoro.flink.catalog.factories.mixed.MixedCatalogFactory
org.apache.amoro.flink.catalog.factories.FlinkUnifiedCatalogFactory
org.apache.amoro.flink.table.MixedDynamicTableFactory
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTableTestBase.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTableTestBase.java
index c9e1a60f4..de8d1be1d 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTableTestBase.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTableTestBase.java
@@ -98,12 +98,11 @@ public interface FlinkTableTestBase {
}
default MixedFormatTableLoader getTableLoader(
- String catalogName, String metastoreUrl, MixedTable mixedTable) {
+ String catalogName, String amsUri, MixedTable mixedTable) {
TableIdentifier identifier =
TableIdentifier.of(
catalogName, mixedTable.id().getDatabase(),
mixedTable.id().getTableName());
- InternalCatalogBuilder internalCatalogBuilder =
- InternalCatalogBuilder.builder().metastoreUrl(metastoreUrl);
+ InternalCatalogBuilder internalCatalogBuilder =
InternalCatalogBuilder.builder().amsUri(amsUri);
return MixedFormatTableLoader.of(identifier, internalCatalogBuilder,
mixedTable.properties());
}
}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java
index 9a5504bdf..cd3501a07 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java
@@ -18,7 +18,7 @@
package org.apache.amoro.flink;
-import static
org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER;
+import static
org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER;
import static
org.apache.amoro.flink.kafka.testutils.KafkaContainerTest.KAFKA_CONTAINER;
import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED;
@@ -94,7 +94,7 @@ public class FlinkTestBase extends TableTestBase {
@Rule public TestName name = new TestName();
- public static String metastoreUrl;
+ public static String metastoreUri;
protected static final int KAFKA_PARTITION_NUMS = 1;
@@ -120,14 +120,14 @@ public class FlinkTestBase extends TableTestBase {
@Before
public void before() throws Exception {
- metastoreUrl = getCatalogUrl();
- catalogBuilder =
InternalCatalogBuilder.builder().metastoreUrl(metastoreUrl);
+ metastoreUri = getCatalogUri();
+ catalogBuilder = InternalCatalogBuilder.builder().amsUri(metastoreUri);
}
public void config() {
props = Maps.newHashMap();
- props.put("type", LEGACY_MIXED_IDENTIFIER);
- props.put(CatalogFactoryOptions.METASTORE_URL.key(), metastoreUrl);
+ props.put("type", MIXED_ICEBERG_IDENTIFIER);
+ props.put(CatalogFactoryOptions.AMS_URI.key(), metastoreUri);
}
public static void prepare() throws Exception {
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkCatalogContext.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkCatalogContext.java
index 9e369266d..86d134322 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkCatalogContext.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkCatalogContext.java
@@ -18,7 +18,7 @@
package org.apache.amoro.flink.catalog;
-import static
org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions.METASTORE_URL;
+import static
org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions.AMS_URI;
import static
org.apache.amoro.flink.table.descriptors.MixedFormatValidator.TABLE_FORMAT;
import org.apache.amoro.TableFormat;
@@ -114,7 +114,7 @@ public class FlinkCatalogContext {
TEST_AMS.getAmsHandler().dropCatalog(meta.getCatalogName());
TEST_AMS.getAmsHandler().createCatalog(meta);
- factoryOptions.put(METASTORE_URL.key(), TEST_AMS.getServerUrl() + "/" +
meta.getCatalogName());
+ factoryOptions.put(AMS_URI.key(), TEST_AMS.getServerUrl() + "/" +
meta.getCatalogName());
final FactoryUtil.DefaultCatalogContext context =
new FactoryUtil.DefaultCatalogContext(
"FLINK_" + tableFormat,
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java
index 879e312df..6e9a654bf 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java
@@ -73,9 +73,7 @@ public class FlinkUnifiedCatalogITCase extends
CatalogITCaseBase {
@Before
public void setup() throws Exception {
String catalog = "unified_catalog";
- exec(
- "CREATE CATALOG %s WITH ('type'='unified', 'metastore.url'='%s')",
- catalog, getCatalogUrl());
+ exec("CREATE CATALOG %s WITH ('type'='unified', 'ams.uri'='%s')", catalog,
getCatalogUri());
exec("USE CATALOG %s", catalog);
exec("USE %s", tableTestHelper().id().getDatabase());
Optional<Catalog> catalogOptional = getTableEnv().getCatalog(catalog);
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java
index b9195a2e5..5e4bb582b 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java
@@ -34,7 +34,6 @@ import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestBase;
import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
-import org.apache.amoro.flink.table.MixedDynamicTableFactory;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.MixedTable;
@@ -92,9 +91,7 @@ public class TestMixedCatalog extends CatalogTestBase {
@Parameterized.Parameters(name = "catalogFactoryType = {0}")
public static Object[] parameters() {
return new Object[] {
- CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
- CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER,
- CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER
+ CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
};
}
@@ -110,7 +107,7 @@ public class TestMixedCatalog extends CatalogTestBase {
public void before() throws Exception {
props = Maps.newHashMap();
props.put("type", catalogFactoryType);
- props.put(CatalogFactoryOptions.METASTORE_URL.key(), getCatalogUrl());
+ props.put(CatalogFactoryOptions.AMS_URI.key(), getCatalogUri());
sql("CREATE CATALOG " + catalogName + " WITH %s", toWithClause(props));
sql("USE CATALOG " + catalogName);
sql("CREATE DATABASE " + catalogName + "." + DB);
@@ -380,63 +377,6 @@ public class TestMixedCatalog extends CatalogTestBase {
sql("DROP TABLE default_catalog.default_database." + TABLE);
}
- @Test
- public void testDefaultCatalogDDLWithVirtualColumn() {
- // this test only for LEGACY_MIXED_IDENTIFIER
- if
(catalogFactoryType.equals(CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER)) {
- // create mixed-format table with only physical columns
- sql(
- "CREATE TABLE "
- + catalogName
- + "."
- + DB
- + "."
- + TABLE
- + " ("
- + " id INT,"
- + " t TIMESTAMP(6),"
- + " PRIMARY KEY (id) NOT ENFORCED "
- + ") PARTITIONED BY(t) "
- + " WITH ("
- + " 'connector' = 'mixed-format'"
- + ")");
-
- // insert values into mixed-format table
- insertValue();
-
- // create Table with compute columns under default catalog
- props = Maps.newHashMap();
- props.put("connector", MixedDynamicTableFactory.IDENTIFIER);
- props.put(CatalogFactoryOptions.METASTORE_URL.key(), getCatalogUrl());
- props.put(MixedDynamicTableFactory.IDENTIFIER + ".catalog", catalogName);
- props.put(MixedDynamicTableFactory.IDENTIFIER + ".database", DB);
- props.put(MixedDynamicTableFactory.IDENTIFIER + ".table", TABLE);
-
- sql(
- "CREATE TABLE default_catalog.default_database."
- + TABLE
- + " ("
- + " id INT,"
- + " t TIMESTAMP(6),"
- + " compute_id as id+5 ,"
- + " proc as PROCTIME(), "
- + " PRIMARY KEY (id) NOT ENFORCED "
- + ") PARTITIONED BY(t) "
- + "WITH %s",
- toWithClause(props));
-
- // select from mixed-format table with compute columns under default
catalog
- List<Row> rows =
- sql(
- "SELECT * FROM default_catalog.default_database."
- + TABLE
- + " /*+ OPTIONS("
- + "'streaming'='false'"
- + ") */");
- checkRows(rows);
- }
- }
-
private void checkRows(List<Row> rows) {
Assert.assertEquals(1, rows.size());
int id = (int) rows.get(0).getField("id");
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java
index b9d13e4cd..61da6d3e7 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java
@@ -116,7 +116,7 @@ public class MixedIncrementalLoaderTest extends
TableTestBase implements FlinkTa
Lists.newArrayList(Expressions.greaterThan("op_time",
"2022-06-20T10:10:11.0"));
ContinuousSplitPlanner morPlanner =
new MergeOnReadIncrementalPlanner(
- getTableLoader(getCatalogName(), getMetastoreUrl(), keyedTable));
+ getTableLoader(getCatalogName(), getMetastoreUri(), keyedTable));
FlinkKeyedMORDataReader flinkKeyedMORDataReader =
new FlinkKeyedMORDataReader(
@@ -161,8 +161,8 @@ public class MixedIncrementalLoaderTest extends
TableTestBase implements FlinkTa
}
@Override
- public String getMetastoreUrl() {
- return getCatalogUrl();
+ public String getMetastoreUri() {
+ return getCatalogUri();
}
@Override
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java
index bff63d3ae..9a1a21e09 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java
@@ -65,7 +65,7 @@ public class LookupITCase extends CatalogITCaseBase
implements FlinkTaskWriterBa
}
exec(
"create catalog mixed_catalog with ('type'='arctic',
'metastore.url'='%s')",
- getCatalogUrl());
+ getCatalogUri());
exec(
"create table mixed_catalog.%s.L (id int) "
+ "with ('scan.startup.mode'='earliest', 'monitor-interval'='1
s','streaming'='true')",
@@ -141,8 +141,8 @@ public class LookupITCase extends CatalogITCaseBase
implements FlinkTaskWriterBa
}
@Override
- public String getMetastoreUrl() {
- return getCatalogUrl();
+ public String getMetastoreUri() {
+ return getCatalogUri();
}
@Override
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java
index 2795ca911..659f4e955 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestLookupSecondary.java
@@ -67,8 +67,8 @@ public class TestLookupSecondary extends CatalogITCaseBase
implements FlinkTaskW
db = dbs.get(0);
}
exec(
- "create catalog mixed_catalog with ('type'='arctic',
'metastore.url'='%s')",
- getCatalogUrl());
+ "create catalog mixed_catalog with ('type'='mixed_iceberg',
'ams.uri'='%s')",
+ getCatalogUri());
exec(
"create table mixed_catalog.%s.L (id int) "
+ "with ('scan.startup.mode'='earliest', 'monitor-interval'='1
s')",
@@ -143,8 +143,8 @@ public class TestLookupSecondary extends CatalogITCaseBase
implements FlinkTaskW
}
@Override
- public String getMetastoreUrl() {
- return getCatalogUrl();
+ public String getMetastoreUri() {
+ return getCatalogUri();
}
@Override
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/FlinkTaskWriterBaseTest.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/FlinkTaskWriterBaseTest.java
index 93bc98269..610a7854b 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/FlinkTaskWriterBaseTest.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/FlinkTaskWriterBaseTest.java
@@ -92,7 +92,7 @@ public interface FlinkTaskWriterBaseTest extends
FlinkTableTestBase {
} else {
records =
recordsOfUnkeyedTable(
- getTableLoader(getCatalogName(), getMetastoreUrl(), mixedTable),
+ getTableLoader(getCatalogName(), getMetastoreUri(), mixedTable),
selectedSchema,
flinkTableSchema);
}
@@ -101,7 +101,7 @@ public interface FlinkTaskWriterBaseTest extends
FlinkTableTestBase {
}
/** For asserting unkeyed table records. */
- String getMetastoreUrl();
+ String getMetastoreUri();
/** For asserting unkeyed table records. */
String getCatalogName();
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 2a8b595d8..8105e2025 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,5 +16,7 @@
# limitations under the License.
#
-org.apache.amoro.flink.catalog.factories.mixed.MixedCatalogFactory
+org.apache.amoro.flink.catalog.factories.mixed.MixedIcebergCatalogFactory
+org.apache.amoro.flink.catalog.factories.mixed.MixedHiveCatalogFactory
+org.apache.amoro.flink.catalog.factories.FlinkUnifiedCatalogFactory
org.apache.amoro.flink.table.MixedDynamicTableFactory
\ No newline at end of file
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogProperties.java
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogProperties.java
index c4f90e921..4cd337ec6 100644
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogProperties.java
+++
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogProperties.java
@@ -18,9 +18,11 @@
package org.apache.amoro.spark;
+import org.apache.amoro.properties.CatalogMetaProperties;
+
/** Unified catalog properties. */
public class SparkUnifiedCatalogProperties {
/** AMS URI, to load unified catalog information. */
- public static final String URI = "uri";
+ public static final String URI = CatalogMetaProperties.AMS_URI;
}
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/ArcticDataSource.java
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/ArcticDataSource.java
deleted file mode 100644
index a54ec70ba..000000000
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/ArcticDataSource.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.spark.mixed;
-
-/**
- * Mixed-format DataSource register
- *
- * @deprecated using {@link MixedHiveDataSource} or {@link
MixedIcebergDataSource} instead.
- */
-@Deprecated
-public class ArcticDataSource extends MixedSourceBase {
-
- @Override
- public String shortName() {
- return "arctic";
- }
-}
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java
index b0d9f1cdf..5620b2f5f 100644
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java
+++
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java
@@ -25,6 +25,7 @@ import static
org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
import org.apache.amoro.mixed.CatalogLoader;
import org.apache.amoro.mixed.MixedFormatCatalog;
+import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.shade.guava32.com.google.common.base.Joiner;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
@@ -71,12 +72,12 @@ public abstract class MixedSparkCatalogBase
this.catalogName = name;
Map<String, String> properties = Maps.newHashMap(options);
if (tableMetaStore == null) {
- String catalogUrl = options.get("url");
- if (StringUtils.isBlank(catalogUrl)) {
- catalogUrl = options.get("uri");
+ String catalogUri = options.get(CatalogMetaProperties.AMS_URI);
+ if (StringUtils.isBlank(catalogUri)) {
+ catalogUri = options.get("uri");
}
- if (catalogUrl != null) {
- catalog = CatalogLoader.load(catalogUrl, properties);
+ if (catalogUri != null) {
+ catalog = CatalogLoader.load(catalogUri, properties);
} else {
Configuration localConfiguration =
SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name);
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 47f0d565e..9410c0dc0 100644
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -16,6 +16,5 @@
# limitations under the License.
#
-org.apache.amoro.spark.mixed.ArcticDataSource
org.apache.amoro.spark.mixed.MixedHiveDataSource
org.apache.amoro.spark.mixed.MixedIcebergDataSource
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/MixedTableTestBase.java
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/MixedTableTestBase.java
index 33b89a5e2..b0ea31bae 100644
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/MixedTableTestBase.java
+++
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/MixedTableTestBase.java
@@ -21,7 +21,6 @@ package org.apache.amoro.spark.test;
import org.apache.amoro.TableFormat;
import org.apache.amoro.mixed.CatalogLoader;
import org.apache.amoro.mixed.MixedFormatCatalog;
-import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.PrimaryKeySpec;
@@ -67,12 +66,6 @@ public class MixedTableTestBase extends SparkTestBase {
return catalog().loadTable(target().toAmoroIdentifier());
}
- public String provider(TableFormat format) {
- Preconditions.checkArgument(
- format == TableFormat.MIXED_HIVE || format ==
TableFormat.MIXED_ICEBERG);
- return "arctic";
- }
-
public MixedTable createMixedFormatSource(Schema schema,
Consumer<TableBuilder> consumer) {
TestIdentifier identifier =
TestIdentifier.ofDataLake(currentCatalog, catalog().name(),
database(), sourceTable, true);
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestBase.java
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestBase.java
index 03478b197..d2a86bf39 100644
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestBase.java
+++
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestBase.java
@@ -82,7 +82,7 @@ public class SparkTestBase {
return ImmutableMap.of(
"spark.sql.catalog.spark_catalog",
SparkTestContext.SESSION_CATALOG_IMPL,
- "spark.sql.catalog.spark_catalog.uri",
+ "spark.sql.catalog.spark_catalog.ams.uri",
CONTEXT.amsCatalogUrl(TableFormat.MIXED_HIVE));
}
@@ -129,9 +129,9 @@ public class SparkTestBase {
protected String sparkCatalogToAMSCatalog(String sparkCatalog) {
String uri = null;
try {
- uri = spark().conf().get("spark.sql.catalog." + sparkCatalog + ".uri");
+ uri = spark().conf().get("spark.sql.catalog." + sparkCatalog +
".ams.uri");
} catch (NoSuchElementException e) {
- uri = spark().conf().get("spark.sql.catalog." + sparkCatalog + ".url");
+ uri = spark().conf().get("spark.sql.catalog." + sparkCatalog + ".uri");
}
AmsThriftUrl catalogUri = AmsThriftUrl.parse(uri,
Constants.THRIFT_TABLE_SERVICE_NAME);
return catalogUri.catalogName();
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
index 20bdfa004..0edfd3ec0 100644
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
+++
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
@@ -219,13 +219,13 @@ public class SparkTestContext {
private void addMixedSparkCatalog(
Map<String, String> configs, String catalogName, TableFormat format) {
configs.put("spark.sql.catalog." + catalogName, MIXED_CATALOG_IMPL);
- configs.put("spark.sql.catalog." + catalogName + ".uri",
amsCatalogUrl(format));
+ configs.put("spark.sql.catalog." + catalogName + ".ams.uri",
amsCatalogUrl(format));
}
private void addUnifiedSparkCatalog(
Map<String, String> configs, String catalogName, TableFormat format) {
configs.put("spark.sql.catalog." + catalogName, UNIFIED_CATALOG_IMP);
- configs.put("spark.sql.catalog." + catalogName + ".uri",
amsCatalogUrl(format));
+ configs.put("spark.sql.catalog." + catalogName + ".ams.uri",
amsCatalogUrl(format));
}
private boolean isSameSparkConf(Map<String, String> sparkConf) {
diff --git
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
index d87fcf890..1fe08e132 100644
---
a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
+++
b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
@@ -45,7 +45,7 @@ public class UnifiedCatalogTestSuites extends SparkTestBase {
return ImmutableMap.of(
"spark.sql.catalog.spark_catalog",
"org.apache.amoro.spark.SparkUnifiedSessionCatalog",
- "spark.sql.catalog.spark_catalog.uri",
+ "spark.sql.catalog.spark_catalog.ams.uri",
CONTEXT.amsCatalogUrl(null));
}
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
index c5990c743..0a3a1feaa 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
@@ -69,26 +69,27 @@ case class QueryWithConstraintCheck(spark: SparkSession)
extends Rule[LogicalPla
c.copy(query = checkDataQuery)
}
- def checkDuplicatesEnabled(): Boolean = {
+ private def checkDuplicatesEnabled(): Boolean = {
java.lang.Boolean.valueOf(spark.sessionState.conf.getConfString(
SparkSQLProperties.CHECK_SOURCE_DUPLICATES_ENABLE,
SparkSQLProperties.CHECK_SOURCE_DUPLICATES_ENABLE_DEFAULT))
}
- def isCreateKeyedTable(catalog: CatalogPlugin, tableSpec: TableSpec):
Boolean = {
+ private def isCreateKeyedTable(catalog: CatalogPlugin, tableSpec:
TableSpec): Boolean = {
catalog match {
- case _: MixedFormatSparkCatalog =>
- tableSpec.provider.isDefined &&
tableSpec.provider.get.equalsIgnoreCase(
- "arctic") && tableSpec.properties.contains("primary.keys")
- case _: MixedFormatSparkSessionCatalog[_] =>
- tableSpec.provider.isDefined &&
tableSpec.provider.get.equalsIgnoreCase(
- "arctic") && tableSpec.properties.contains("primary.keys")
- case _ =>
- false
+ case _: MixedFormatSparkCatalog | _: MixedFormatSparkSessionCatalog[_]
+ if tableSpec.provider.exists(isMixedFormatProvider) &&
+ tableSpec.properties.contains("primary.keys") => true
+ case _ => false
}
}
- def buildValidatePrimaryKeyDuplication(
+ private def isMixedFormatProvider(provider: String): Boolean = {
+ val providerLower = provider.toLowerCase
+ providerLower == "mixed_iceberg" || providerLower == "mixed_hive"
+ }
+
+ private def buildValidatePrimaryKeyDuplication(
r: DataSourceV2Relation,
query: LogicalPlan): LogicalPlan = {
r.table match {
@@ -109,7 +110,7 @@ case class QueryWithConstraintCheck(spark: SparkSession)
extends Rule[LogicalPla
}
}
- def buildValidatePrimaryKeyDuplicationByPrimaries(
+ private def buildValidatePrimaryKeyDuplicationByPrimaries(
primaries: Array[String],
query: LogicalPlan): LogicalPlan = {
val attributes = query.output.filter(p => primaries.contains(p.name))
@@ -122,7 +123,7 @@ case class QueryWithConstraintCheck(spark: SparkSession)
extends Rule[LogicalPla
Filter(havingExpr, aggPlan)
}
- protected def findOutputAttr(attrs: Seq[Attribute], attrName: String):
Attribute = {
+ private def findOutputAttr(attrs: Seq[Attribute], attrName: String):
Attribute = {
attrs.find(attr => resolver(attr.name, attrName)).getOrElse {
throw new UnsupportedOperationException(s"Cannot find $attrName in
$attrs")
}
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
index a63c161f2..827cf93dd 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
@@ -141,7 +141,7 @@ public class TestKeyedTableDataFrameAPI extends
MixedTableTestBase {
.option("overwrite-mode", "dynamic")
.mode(SaveMode.Overwrite)
.save(tablePath);
- df = spark().read().format("arctic").load(tablePath);
+ df = spark().read().format(provider(format)).load(tablePath);
Assert.assertEquals(3, df.count());
df =
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
index 508de67a6..77ec05a1d 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
@@ -122,8 +122,8 @@ public class TestUnKeyedTableDataFrameAPI extends
MixedTableTestBase {
RowFactory.create(2, "bbb", "bbb"),
RowFactory.create(3, "ccc", "ccc")),
structType);
- df.write().format("arctic").partitionBy("day").save(tablePath);
- df = spark().read().format("arctic").load(tablePath);
+ df.write().format(provider(format)).partitionBy("day").save(tablePath);
+ df = spark().read().format(format.name()).load(tablePath);
Assertions.assertEquals(3, df.count());
// test overwrite dynamic
@@ -136,12 +136,12 @@ public class TestUnKeyedTableDataFrameAPI extends
MixedTableTestBase {
RowFactory.create(6, "ccc", "ccc")),
structType);
df.write()
- .format("arctic")
+ .format(provider(format))
.partitionBy("day")
.option("overwrite-mode", "dynamic")
.mode(SaveMode.Overwrite)
.save(tablePath);
- df = spark().read().format("arctic").load(tablePath);
+ df = spark().read().format(provider(format)).load(tablePath);
Assertions.assertEquals(5, df.count());
}
}
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
index 5785abec3..ae2a64eaf 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
@@ -52,7 +52,7 @@ public class TestSessionCatalog extends MixedTableTestBase {
return ImmutableMap.of(
"spark.sql.catalog.spark_catalog",
SparkTestContext.SESSION_CATALOG_IMPL,
- "spark.sql.catalog.spark_catalog.url",
+ "spark.sql.catalog.spark_catalog.ams.uri",
CONTEXT.amsCatalogUrl(TableFormat.MIXED_ICEBERG));
}
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
index f4945d5b6..85a309f21 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
@@ -58,9 +58,9 @@ public class TestMixedFormatSessionCatalog extends
MixedTableTestBase {
public static Stream<Arguments> testCreateTable() {
return Stream.of(
- Arguments.arguments("arctic", true, "", "hive comment"),
- Arguments.arguments("arctic", false, "pt", "hive comment"),
- Arguments.arguments("arctic", true, "pt", "hive comment"),
+ Arguments.arguments("mixed_iceberg", true, "", "hive comment"),
+ Arguments.arguments("mixed_iceberg", false, "pt", "hive comment"),
+ Arguments.arguments("mixed_iceberg", true, "pt", "hive comment"),
Arguments.arguments("parquet", false, "pt", null),
Arguments.arguments("parquet", false, "dt string", null));
}
@@ -85,7 +85,7 @@ public class TestMixedFormatSessionCatalog extends
MixedTableTestBase {
sql(sqlText);
- if ("arctic".equalsIgnoreCase(provider)) {
+ if ("mixed_iceberg".equalsIgnoreCase(provider)) {
Assertions.assertTrue(tableExists());
}
@@ -113,9 +113,9 @@ public class TestMixedFormatSessionCatalog extends
MixedTableTestBase {
public static Stream<Arguments> testCreateTableAsSelect() {
return Stream.of(
- Arguments.arguments("arctic", true, "", true, "hive comment"),
- Arguments.arguments("arctic", false, "pt", true, "hive comment"),
- Arguments.arguments("arctic", true, "pt", false, "hive comment"),
+ Arguments.arguments("mixed_iceberg", true, "", true, "hive comment"),
+ Arguments.arguments("mixed_iceberg", false, "pt", true, "hive
comment"),
+ Arguments.arguments("mixed_iceberg", true, "pt", false, "hive
comment"),
Arguments.arguments("parquet", false, "pt", false, "hive comment"),
Arguments.arguments("parquet", false, "", false, "hive comment"));
}
@@ -142,7 +142,7 @@ public class TestMixedFormatSessionCatalog extends
MixedTableTestBase {
sqlText += " AS SELECT * FROM " + source();
sql(sqlText);
- if ("arctic".equalsIgnoreCase(provider)) {
+ if ("mixed_iceberg".equalsIgnoreCase(provider)) {
Assertions.assertTrue(tableExists());
}
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
index b5389e445..9cb38c73a 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/QueryWithConstraintCheck.scala
@@ -71,26 +71,27 @@ case class QueryWithConstraintCheck(spark: SparkSession)
extends Rule[LogicalPla
c.copy(query = checkDataQuery)
}
- def checkDuplicatesEnabled(): Boolean = {
+ private def checkDuplicatesEnabled(): Boolean = {
java.lang.Boolean.valueOf(spark.sessionState.conf.getConfString(
SparkSQLProperties.CHECK_SOURCE_DUPLICATES_ENABLE,
SparkSQLProperties.CHECK_SOURCE_DUPLICATES_ENABLE_DEFAULT))
}
- def isCreateKeyedTable(catalog: CatalogPlugin, tableSpec: TableSpec):
Boolean = {
+ private def isCreateKeyedTable(catalog: CatalogPlugin, tableSpec:
TableSpec): Boolean = {
catalog match {
- case _: MixedFormatSparkCatalog =>
- tableSpec.provider.isDefined &&
tableSpec.provider.get.equalsIgnoreCase(
- "arctic") && tableSpec.properties.contains("primary.keys")
- case _: MixedFormatSparkSessionCatalog[_] =>
- tableSpec.provider.isDefined &&
tableSpec.provider.get.equalsIgnoreCase(
- "arctic") && tableSpec.properties.contains("primary.keys")
- case _ =>
- false
+ case _: MixedFormatSparkCatalog | _: MixedFormatSparkSessionCatalog[_]
+ if tableSpec.provider.exists(isMixedFormatProvider) &&
+ tableSpec.properties.contains("primary.keys") => true
+ case _ => false
}
}
- def buildValidatePrimaryKeyDuplication(
+ private def isMixedFormatProvider(provider: String): Boolean = {
+ val providerLower = provider.toLowerCase
+ providerLower == "mixed_iceberg" || providerLower == "mixed_hive"
+ }
+
+ private def buildValidatePrimaryKeyDuplication(
r: DataSourceV2Relation,
query: LogicalPlan): LogicalPlan = {
r.table match {
@@ -111,7 +112,7 @@ case class QueryWithConstraintCheck(spark: SparkSession)
extends Rule[LogicalPla
}
}
- def buildValidatePrimaryKeyDuplicationByPrimaries(
+ private def buildValidatePrimaryKeyDuplicationByPrimaries(
primaries: Array[String],
query: LogicalPlan): LogicalPlan = {
val attributes = query.output.filter(p => primaries.contains(p.name))
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
index a63c161f2..827cf93dd 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestKeyedTableDataFrameAPI.java
@@ -141,7 +141,7 @@ public class TestKeyedTableDataFrameAPI extends
MixedTableTestBase {
.option("overwrite-mode", "dynamic")
.mode(SaveMode.Overwrite)
.save(tablePath);
- df = spark().read().format("arctic").load(tablePath);
+ df = spark().read().format(provider(format)).load(tablePath);
Assert.assertEquals(3, df.count());
df =
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
index 508de67a6..5b2dc805b 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/api/TestUnKeyedTableDataFrameAPI.java
@@ -122,8 +122,8 @@ public class TestUnKeyedTableDataFrameAPI extends
MixedTableTestBase {
RowFactory.create(2, "bbb", "bbb"),
RowFactory.create(3, "ccc", "ccc")),
structType);
- df.write().format("arctic").partitionBy("day").save(tablePath);
- df = spark().read().format("arctic").load(tablePath);
+ df.write().format(provider(format)).partitionBy("day").save(tablePath);
+ df = spark().read().format(provider(format)).load(tablePath);
Assertions.assertEquals(3, df.count());
// test overwrite dynamic
@@ -136,12 +136,12 @@ public class TestUnKeyedTableDataFrameAPI extends
MixedTableTestBase {
RowFactory.create(6, "ccc", "ccc")),
structType);
df.write()
- .format("arctic")
+ .format(provider(format))
.partitionBy("day")
.option("overwrite-mode", "dynamic")
.mode(SaveMode.Overwrite)
.save(tablePath);
- df = spark().read().format("arctic").load(tablePath);
+ df = spark().read().format(provider(format)).load(tablePath);
Assertions.assertEquals(5, df.count());
}
}
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
index 5785abec3..ae2a64eaf 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/catalog/TestSessionCatalog.java
@@ -52,7 +52,7 @@ public class TestSessionCatalog extends MixedTableTestBase {
return ImmutableMap.of(
"spark.sql.catalog.spark_catalog",
SparkTestContext.SESSION_CATALOG_IMPL,
- "spark.sql.catalog.spark_catalog.url",
+ "spark.sql.catalog.spark_catalog.ams.uri",
CONTEXT.amsCatalogUrl(TableFormat.MIXED_ICEBERG));
}
diff --git
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
index b12e78c1e..352168549 100644
---
a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
+++
b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/test/java/org/apache/amoro/spark/test/suites/sql/TestMixedFormatSessionCatalog.java
@@ -57,9 +57,9 @@ public class TestMixedFormatSessionCatalog extends
MixedTableTestBase {
public static Stream<Arguments> testCreateTable() {
return Stream.of(
- Arguments.arguments("arctic", true, ""),
- Arguments.arguments("arctic", false, "pt"),
- Arguments.arguments("arctic", true, "pt"),
+ Arguments.arguments("mixed_iceberg", true, ""),
+ Arguments.arguments("mixed_iceberg", false, "pt"),
+ Arguments.arguments("mixed_iceberg", true, "pt"),
Arguments.arguments("parquet", false, "pt"),
Arguments.arguments("parquet", false, "dt string"));
}
@@ -80,7 +80,7 @@ public class TestMixedFormatSessionCatalog extends
MixedTableTestBase {
sql(sqlText);
- if ("arctic".equalsIgnoreCase(provider)) {
+ if ("mixed_iceberg".equalsIgnoreCase(provider)) {
Assertions.assertTrue(tableExists());
}
@@ -105,9 +105,9 @@ public class TestMixedFormatSessionCatalog extends
MixedTableTestBase {
public static Stream<Arguments> testCreateTableAsSelect() {
return Stream.of(
- Arguments.arguments("arctic", true, "", true),
- Arguments.arguments("arctic", false, "pt", true),
- Arguments.arguments("arctic", true, "pt", false),
+ Arguments.arguments("mixed_iceberg", true, "", true),
+ Arguments.arguments("mixed_iceberg", false, "pt", true),
+ Arguments.arguments("mixed_iceberg", true, "pt", false),
Arguments.arguments("parquet", false, "pt", false),
Arguments.arguments("parquet", false, "", false));
}
@@ -129,7 +129,7 @@ public class TestMixedFormatSessionCatalog extends
MixedTableTestBase {
sqlText += " AS SELECT * FROM " + source();
sql(sqlText);
- if ("arctic".equalsIgnoreCase(provider)) {
+ if ("mixed_iceberg".equalsIgnoreCase(provider)) {
Assertions.assertTrue(tableExists());
}
diff --git a/docs/engines/flink/flink-cdc-ingestion.md
b/docs/engines/flink/flink-cdc-ingestion.md
index c79e311c8..bf1df9b11 100644
--- a/docs/engines/flink/flink-cdc-ingestion.md
+++ b/docs/engines/flink/flink-cdc-ingestion.md
@@ -94,7 +94,7 @@ CREATE TABLE products (
CREATE CATALOG amoro_catalog WITH (
'type'='amoro',
- 'metastore.url'='thrift://<ip>:<port>/<catalog_name_in_metastore>'
+ 'ams.uri'='thrift://<ip>:<port>/<catalog_name_in_metastore>'
);
CREATE TABLE IF NOT EXISTS `amoro_catalog`.`db`.`test_tb`(
diff --git a/docs/engines/flink/flink-ddl.md b/docs/engines/flink/flink-ddl.md
index 3978e417a..220a68f3f 100644
--- a/docs/engines/flink/flink-ddl.md
+++ b/docs/engines/flink/flink-ddl.md
@@ -43,7 +43,7 @@ Where `<catalog_name>` is the user-defined name of the Flink
catalog, and `<conf
| Key | Default Value | Type | Required | Description
|
|------------------|---------------|---------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| type | N/A | String | Yes | Catalog type,
validate values are mixed_iceberg and mixed_hive
|
-| metastore.url | (none) | String | No | The URL for Amoro
Metastore is thrift://`<ip>`:`<port>`/`<catalog_name_in_metastore>`.<br>If high
availability is enabled for AMS, it can also be specified in the form of
zookeeper://{zookeeper-server}/{cluster-name}/{catalog-name}. |
+| ams.uri | (none) | String | No | The URI for Amoro
Metastore is thrift://`<ip>`:`<port>`/`<catalog_name_in_metastore>`.<br>If high
availability is enabled for AMS, it can also be specified in the form of
zookeeper://{zookeeper-server}/{cluster-name}/{catalog-name}. |
| default-database | default | String | No | The default database
to use
|
| property-version | 1 | Integer | No | Catalog properties
version, this option is for future backward compatibility
|
| catalog-type | N/A | String | No | Metastore type of
the catalog, validate values are hadoop, hive, rest, custom
|
@@ -68,7 +68,7 @@ Modify the `conf/sql-client-defaults.yaml` file in the Flink
directory.
catalogs:
- name: <catalog_name>
type: mixed_iceberg
- metastore.url: ...
+ ams.uri: ...
...
```
@@ -130,13 +130,13 @@ CREATE TABLE `test_table` (
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mixed-format',
- 'metastore.url' = '',
+ 'ams.uri' = '',
'mixed_format.catalog' = '',
'mixed_format.database' = '',
'mixed_format.table' = ''
);
```
-where `<metastore.url>` is the URL of the Amoro Metastore, and
`mixed_format.catalog`, `mixed_format.database` and `mixed_format.table` are
the catalog name, database name and table name of this table under the AMS,
respectively.
+where `<ams.uri>` is the URI of the Amoro Metastore, and
`mixed_format.catalog`, `mixed_format.database` and `mixed_format.table` are
the catalog name, database name and table name of this table under the AMS,
respectively.
### CREATE TABLE LIKE
Create a table with the same table structure, partitions, and table properties
as an existing table. This can be achieved using CREATE TABLE LIKE.
diff --git a/docs/engines/flink/flink-dml.md b/docs/engines/flink/flink-dml.md
index bdc9f7f21..e84425971 100644
--- a/docs/engines/flink/flink-dml.md
+++ b/docs/engines/flink/flink-dml.md
@@ -222,7 +222,7 @@ CREATE TEMPORARY TABLE Customers (
zip STRING
) WITH (
'connector' = 'mixed-format',
- 'metastore.url' = '',
+ 'ams.uri' = '',
'mixed-format.catalog' = '',
'mixed-format.database' = '',
'mixed-format.table' = '',
diff --git a/docs/engines/spark/spark-conf.md b/docs/engines/spark/spark-conf.md
index 6afdb65bc..3e99dcff1 100644
--- a/docs/engines/spark/spark-conf.md
+++ b/docs/engines/spark/spark-conf.md
@@ -36,7 +36,7 @@ metadata from AMS with following properties:
```properties
spark.sql.catalog.mixed_catalog=org.apache.amoro.spark.MixedFormatSparkCatalog
-spark.sql.catalog.mixed_catalog.url=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
+spark.sql.catalog.mixed_catalog.ams.uri=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
```
Or create a mixed_catalog with local configurations with following properties:
@@ -62,7 +62,7 @@ In this way, you don't need to use the `use {catalog}`
command to switch the def
```properties
spark.sql.defaultCatalog=mixed_catalog
spark.sql.catalog.mixed_catalog=org.apache.amoro.spark.MixedFormatSparkCatalog
-spark.sql.catalog.mixed_catalog.url=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
+spark.sql.catalog.mixed_catalog.ams.uri=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
```
In a standalone AmoroSparkCatalog scenario, only Mixed-Format tables can be
created and accessed in the corresponding
@@ -76,7 +76,7 @@ The configuration method is as follows.
```properties
spark.sql.catalog.spark_catalog=org.apache.amoro.spark.MixedFormatSparkSessionCatalog
-spark.sql.catalog.spark_catalog.url=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
+spark.sql.catalog.spark_catalog.ams.uri=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE}
```
When using the `MixedFormatSparkSessionCatalog` as the implementation of the
`spark_catalog`, it behaves as follows
@@ -97,12 +97,12 @@ When using the `MixedFormatSparkSessionCatalog`, there are
several points to kee
## The high availability configuration
-If AMS is configured with high availability, you can configure the
`spark.sql.catalog.{catalog_name}.url` property in
+If AMS is configured with high availability, you can configure the
`spark.sql.catalog.{catalog_name}.ams.uri` property in
the following way to achieve higher availability.
```properties
spark.sql.catalog.mixed_catalog=org.apache.amoro.spark.MixedFormatSparkCatalog
-spark.sql.catalog.mixed_catalog.url=zookeeper://{zookeeper-endpoint-list}/{cluster-name}/{catalog-name}
+spark.sql.catalog.mixed_catalog.ams.uri=zookeeper://{zookeeper-endpoint-list}/{cluster-name}/{catalog-name}
```
Among above:
diff --git a/docs/engines/spark/spark-get-started.md
b/docs/engines/spark/spark-get-started.md
index 7bc7dd3f4..6caa611f0 100644
--- a/docs/engines/spark/spark-get-started.md
+++ b/docs/engines/spark/spark-get-started.md
@@ -55,10 +55,10 @@ spark-shell --packages
org.apache.amoro:amoro-mixed-spark-3.3-runtime:0.7.0
${SPARK_HOME}/bin/spark-sql \
--conf
spark.sql.extensions=org.apache.amoro.spark.MixedFormatSparkExtensions \
--conf
spark.sql.catalog.local_catalog=org.apache.amoro.spark.MixedFormatSparkCatalog \
- --conf
spark.sql.catalog.local_catalog.url=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME}
+ --conf
spark.sql.catalog.local_catalog.ams.uri=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME}
```
-> Amoro manages the Catalog through AMS, and Spark catalog needs to be mapped
to Amoro Catalog via URL,
+> Amoro manages the Catalog through AMS, and Spark catalog needs to be mapped
to Amoro Catalog via URI,
> in the following format:
> `thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME}`,
> The mixed-format-spark-connector will automatically download the Hadoop site
> configuration file through