This is an automated email from the ASF dual-hosted git repository.
taowang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 61827a7f1 [AMORO-3716]Enhance FlinkUnifiedCatalog getTable efficiency
(#3717)
61827a7f1 is described below
commit 61827a7f1a03e501813eb20143a38a40de3a91c7
Author: Nico CHen <[email protected]>
AuthorDate: Fri Aug 8 16:15:30 2025 +0800
[AMORO-3716]Enhance FlinkUnifiedCatalog getTable efficiency (#3717)
[AMORO-3716]Enhance FlinkUnifiedCatalog getTable efficiency,especially for
mixed_format
---
.../amoro/flink/catalog/FlinkUnifiedCatalog.java | 57 ++++++++++++++++------
.../apache/amoro/flink/catalog/MixedCatalog.java | 31 ++++++------
2 files changed, 57 insertions(+), 31 deletions(-)
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
index 2c5c61f7c..c3400a175 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
@@ -37,6 +37,8 @@ import
org.apache.amoro.flink.table.UnifiedDynamicTableFactory;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.TableIdentifier;
+import org.apache.amoro.table.TableMetaStore;
+import org.apache.amoro.utils.CatalogUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -66,7 +68,9 @@ import org.apache.flink.table.factories.Factory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
/** This is a Flink catalog wrap a unified catalog. */
public class FlinkUnifiedCatalog extends AbstractCatalog {
@@ -169,22 +173,43 @@ public class FlinkUnifiedCatalog extends AbstractCatalog {
@Override
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
- AmoroTable<?> amoroTable;
- try {
- amoroTable = unifiedCatalog.loadTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
- } catch (NoSuchTableException e) {
- throw new TableNotExistException(getName(), tablePath, e);
- }
- AbstractCatalog catalog = originalCatalog(amoroTable);
- CatalogTable catalogTable = (CatalogTable) catalog.getTable(tablePath);
- final Map<String, String> flinkProperties =
Maps.newHashMap(catalogTable.getOptions());
- flinkProperties.put(TABLE_FORMAT.key(), amoroTable.format().toString());
-
- return CatalogTable.of(
- catalogTable.getUnresolvedSchema(),
- catalogTable.getComment(),
- catalogTable.getPartitionKeys(),
- flinkProperties);
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(
+ this.amoroCatalogName, tablePath.getDatabaseName(),
tablePath.getObjectName());
+ Set<TableFormat> formats =
+ CatalogUtil.tableFormats(unifiedCatalog.metastoreType(),
unifiedCatalog.properties());
+
+ TableMetaStore tableMetaStore = unifiedCatalog.authenticationContext();
+ return formats.stream()
+ .map(
+ f -> {
+ try {
+ AbstractCatalog catalog =
+ getOriginalCatalog(f)
+ .orElseGet(() ->
createOriginalCatalog(tableIdentifier, f));
+ CatalogTable catalogTable =
+ (CatalogTable) tableMetaStore.doAs(() ->
catalog.getTable(tablePath));
+ final Map<String, String> flinkProperties =
+ Maps.newHashMap(catalogTable.getOptions());
+ flinkProperties.put(TABLE_FORMAT.key(), f.toString());
+ return CatalogTable.of(
+ catalogTable.getUnresolvedSchema(),
+ catalogTable.getComment(),
+ catalogTable.getPartitionKeys(),
+ flinkProperties);
+ } catch (RuntimeException e) {
+ // only handle no such table case
+ if (e.getCause() instanceof TableNotExistException
+ || e.getCause() instanceof NoSuchTableException) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ })
+ .filter(Objects::nonNull)
+ .findFirst()
+ .orElseThrow(() -> new TableNotExistException(getName(), tablePath));
}
@Override
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
index d8dba8a50..25f5d9586 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
@@ -195,23 +195,24 @@ public class MixedCatalog extends AbstractCatalog {
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
TableIdentifier tableIdentifier = getTableIdentifier(tablePath);
- if (!internalCatalog.tableExists(tableIdentifier)) {
+ try {
+ MixedTable table = internalCatalog.loadTable(tableIdentifier);
+ Schema mixedTableSchema = table.schema();
+
+ Map<String, String> mixedTableProperties =
Maps.newHashMap(table.properties());
+ fillTableProperties(mixedTableProperties);
+ fillTableMetaPropertiesIfLookupLike(mixedTableProperties,
tableIdentifier);
+
+ List<String> partitionKeys = toPartitionKeys(table.spec(),
table.schema());
+ return CatalogTable.of(
+ toSchema(mixedTableSchema, MixedFormatUtils.getPrimaryKeys(table),
mixedTableProperties)
+ .toSchema(),
+ null,
+ partitionKeys,
+ mixedTableProperties);
+ } catch (NoSuchTableException e) {
throw new TableNotExistException(this.getName(), tablePath);
}
- MixedTable table = internalCatalog.loadTable(tableIdentifier);
- Schema mixedTableSchema = table.schema();
-
- Map<String, String> mixedTableProperties =
Maps.newHashMap(table.properties());
- fillTableProperties(mixedTableProperties);
- fillTableMetaPropertiesIfLookupLike(mixedTableProperties, tableIdentifier);
-
- List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
- return CatalogTable.of(
- toSchema(mixedTableSchema, MixedFormatUtils.getPrimaryKeys(table),
mixedTableProperties)
- .toSchema(),
- null,
- partitionKeys,
- mixedTableProperties);
}
/**