This is an automated email from the ASF dual-hosted git repository.

taowang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 61827a7f1 [AMORO-3716]Enhance FlinkUnifiedCatalog getTable efficiency 
(#3717)
61827a7f1 is described below

commit 61827a7f1a03e501813eb20143a38a40de3a91c7
Author: Nico CHen <[email protected]>
AuthorDate: Fri Aug 8 16:15:30 2025 +0800

    [AMORO-3716]Enhance FlinkUnifiedCatalog getTable efficiency (#3717)
    
    [AMORO-3716]Enhance FlinkUnifiedCatalog getTable efficiency,especially for 
mixed_format
---
 .../amoro/flink/catalog/FlinkUnifiedCatalog.java   | 57 ++++++++++++++++------
 .../apache/amoro/flink/catalog/MixedCatalog.java   | 31 ++++++------
 2 files changed, 57 insertions(+), 31 deletions(-)

diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
index 2c5c61f7c..c3400a175 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java
@@ -37,6 +37,8 @@ import 
org.apache.amoro.flink.table.UnifiedDynamicTableFactory;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.table.TableIdentifier;
+import org.apache.amoro.table.TableMetaStore;
+import org.apache.amoro.utils.CatalogUtil;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -66,7 +68,9 @@ import org.apache.flink.table.factories.Factory;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 
 /** This is a Flink catalog wrap a unified catalog. */
 public class FlinkUnifiedCatalog extends AbstractCatalog {
@@ -169,22 +173,43 @@ public class FlinkUnifiedCatalog extends AbstractCatalog {
   @Override
   public CatalogBaseTable getTable(ObjectPath tablePath)
       throws TableNotExistException, CatalogException {
-    AmoroTable<?> amoroTable;
-    try {
-      amoroTable = unifiedCatalog.loadTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
-    } catch (NoSuchTableException e) {
-      throw new TableNotExistException(getName(), tablePath, e);
-    }
-    AbstractCatalog catalog = originalCatalog(amoroTable);
-    CatalogTable catalogTable = (CatalogTable) catalog.getTable(tablePath);
-    final Map<String, String> flinkProperties = 
Maps.newHashMap(catalogTable.getOptions());
-    flinkProperties.put(TABLE_FORMAT.key(), amoroTable.format().toString());
-
-    return CatalogTable.of(
-        catalogTable.getUnresolvedSchema(),
-        catalogTable.getComment(),
-        catalogTable.getPartitionKeys(),
-        flinkProperties);
+    TableIdentifier tableIdentifier =
+        TableIdentifier.of(
+            this.amoroCatalogName, tablePath.getDatabaseName(), 
tablePath.getObjectName());
+    Set<TableFormat> formats =
+        CatalogUtil.tableFormats(unifiedCatalog.metastoreType(), 
unifiedCatalog.properties());
+
+    TableMetaStore tableMetaStore = unifiedCatalog.authenticationContext();
+    return formats.stream()
+        .map(
+            f -> {
+              try {
+                AbstractCatalog catalog =
+                    getOriginalCatalog(f)
+                        .orElseGet(() -> 
createOriginalCatalog(tableIdentifier, f));
+                CatalogTable catalogTable =
+                    (CatalogTable) tableMetaStore.doAs(() -> 
catalog.getTable(tablePath));
+                final Map<String, String> flinkProperties =
+                    Maps.newHashMap(catalogTable.getOptions());
+                flinkProperties.put(TABLE_FORMAT.key(), f.toString());
+                return CatalogTable.of(
+                    catalogTable.getUnresolvedSchema(),
+                    catalogTable.getComment(),
+                    catalogTable.getPartitionKeys(),
+                    flinkProperties);
+              } catch (RuntimeException e) {
+                // only handle no such table case
+                if (e.getCause() instanceof TableNotExistException
+                    || e.getCause() instanceof NoSuchTableException) {
+                  return null;
+                } else {
+                  throw e;
+                }
+              }
+            })
+        .filter(Objects::nonNull)
+        .findFirst()
+        .orElseThrow(() -> new TableNotExistException(getName(), tablePath));
   }
 
   @Override
diff --git 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
index d8dba8a50..25f5d9586 100644
--- 
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
+++ 
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java
@@ -195,23 +195,24 @@ public class MixedCatalog extends AbstractCatalog {
   public CatalogBaseTable getTable(ObjectPath tablePath)
       throws TableNotExistException, CatalogException {
     TableIdentifier tableIdentifier = getTableIdentifier(tablePath);
-    if (!internalCatalog.tableExists(tableIdentifier)) {
+    try {
+      MixedTable table = internalCatalog.loadTable(tableIdentifier);
+      Schema mixedTableSchema = table.schema();
+
+      Map<String, String> mixedTableProperties = 
Maps.newHashMap(table.properties());
+      fillTableProperties(mixedTableProperties);
+      fillTableMetaPropertiesIfLookupLike(mixedTableProperties, 
tableIdentifier);
+
+      List<String> partitionKeys = toPartitionKeys(table.spec(), 
table.schema());
+      return CatalogTable.of(
+          toSchema(mixedTableSchema, MixedFormatUtils.getPrimaryKeys(table), 
mixedTableProperties)
+              .toSchema(),
+          null,
+          partitionKeys,
+          mixedTableProperties);
+    } catch (NoSuchTableException e) {
       throw new TableNotExistException(this.getName(), tablePath);
     }
-    MixedTable table = internalCatalog.loadTable(tableIdentifier);
-    Schema mixedTableSchema = table.schema();
-
-    Map<String, String> mixedTableProperties = 
Maps.newHashMap(table.properties());
-    fillTableProperties(mixedTableProperties);
-    fillTableMetaPropertiesIfLookupLike(mixedTableProperties, tableIdentifier);
-
-    List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
-    return CatalogTable.of(
-        toSchema(mixedTableSchema, MixedFormatUtils.getPrimaryKeys(table), 
mixedTableProperties)
-            .toSchema(),
-        null,
-        partitionKeys,
-        mixedTableProperties);
   }
 
   /**

Reply via email to