This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new db17a73e4 [AMORO-2635] Enhance table partition files list performance 
(#4003)
db17a73e4 is described below

commit db17a73e47c1146c8bc725a327ebcbc6bf79d4c4
Author: Fei Wang <[email protected]>
AuthorDate: Fri Dec 12 00:59:27 2025 -0800

    [AMORO-2635] Enhance table partition files list performance (#4003)
    
    * [AMORO-2635]
    
    * UT
    
    * UT for fallback
    
    * address comments
    
    * TODO
---
 .../dashboard/MixedAndIcebergTableDescriptor.java  | 186 ++++++++++++++++++---
 .../TestIcebergServerTableDescriptor.java          |  80 +++++++++
 2 files changed, 244 insertions(+), 22 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
index df035ee0b..19627f86f 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
@@ -44,6 +44,7 @@ import 
org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
 import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
 import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
 import org.apache.amoro.server.process.TableProcessMeta;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
@@ -74,15 +75,21 @@ import org.apache.amoro.utils.MixedTableUtil;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.IcebergFindFiles;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableScan;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Types;
@@ -445,35 +452,170 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
     if (mixedTable.spec().isUnpartitioned()) {
       return new ArrayList<>();
     }
-    Map<String, PartitionBaseInfo> partitionBaseInfoHashMap = new HashMap<>();
 
-    CloseableIterable<PartitionFileBaseInfo> tableFiles =
-        getTableFilesInternal(amoroTable, null, null);
+    List<PartitionBaseInfo> result = new ArrayList<>();
+
+    // For keyed tables, we need to collect partitions from both change and 
base tables
+    if (mixedTable.isKeyedTable()) {
+      Map<String, PartitionBaseInfo> partitionMap = new HashMap<>();
+
+      // Collect from change table
+      List<PartitionBaseInfo> changePartitions =
+          collectPartitionsFromTable(mixedTable.asKeyedTable().changeTable());
+      for (PartitionBaseInfo partition : changePartitions) {
+        partitionMap.put(partition.getPartition(), partition);
+      }
+
+      // Collect from base table and merge
+      List<PartitionBaseInfo> basePartitions =
+          collectPartitionsFromTable(mixedTable.asKeyedTable().baseTable());
+      for (PartitionBaseInfo basePartition : basePartitions) {
+        if (partitionMap.containsKey(basePartition.getPartition())) {
+          PartitionBaseInfo existing = 
partitionMap.get(basePartition.getPartition());
+          existing.setFileCount(existing.getFileCount() + 
basePartition.getFileCount());
+          existing.setFileSize(existing.getFileSize() + 
basePartition.getFileSize());
+        } else {
+          partitionMap.put(basePartition.getPartition(), basePartition);
+        }
+      }
+
+      result.addAll(partitionMap.values());
+    } else {
+      result = collectPartitionsFromTable(mixedTable.asUnkeyedTable());
+    }
+
+    return result;
+  }
+
+  /**
+   * Collect partition information from an Iceberg table using the PARTITIONS 
metadata table. This
+   * is much more efficient than scanning all data files, especially for 
tables with many files.
+   *
+   * @param table The Iceberg table to collect partitions from
+   * @return List of partition information
+   */
+  private List<PartitionBaseInfo> collectPartitionsFromTable(Table table) {
+    List<PartitionBaseInfo> partitions = new ArrayList<>();
+
     try {
-      for (PartitionFileBaseInfo fileInfo : tableFiles) {
-        if (!partitionBaseInfoHashMap.containsKey(fileInfo.getPartition())) {
-          PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo();
-          partitionBaseInfo.setPartition(fileInfo.getPartition());
-          partitionBaseInfo.setSpecId(fileInfo.getSpecId());
-          partitionBaseInfoHashMap.put(fileInfo.getPartition(), 
partitionBaseInfo);
+      Preconditions.checkArgument(
+          table instanceof HasTableOperations, "table must support table 
operations");
+      TableOperations ops = ((HasTableOperations) table).operations();
+
+      // Use PARTITIONS metadata table for efficient partition statistics
+      Table partitionsTable =
+          MetadataTableUtils.createMetadataTableInstance(
+              ops,
+              table.name(),
+              table.name() + "#" + MetadataTableType.PARTITIONS.name(),
+              MetadataTableType.PARTITIONS);
+
+      TableScan scan = partitionsTable.newScan();
+      try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+        CloseableIterable<CloseableIterable<StructLike>> transform =
+            CloseableIterable.transform(tasks, task -> 
task.asDataTask().rows());
+
+        try (CloseableIterable<StructLike> rows = 
CloseableIterable.concat(transform)) {
+          for (StructLike row : rows) {
+            PartitionBaseInfo partitionInfo = new PartitionBaseInfo();
+
+            // Get partition field - it's a struct
+            StructLike partition = row.get(0, StructLike.class);
+            int specId = row.get(1, Integer.class);
+
+            // Convert partition struct to path string
+            PartitionSpec spec = table.specs().get(specId);
+            String partitionPath = spec.partitionToPath(partition);
+
+            partitionInfo.setPartition(partitionPath);
+            partitionInfo.setSpecId(specId);
+
+            // Get file statistics from the partition metadata table
+            // Schema: partition, spec_id, record_count, file_count,
+            // total_data_file_size_in_bytes,
+            // position_delete_record_count, position_delete_file_count,
+            // equality_delete_record_count, equality_delete_file_count,
+            // last_updated_at, last_updated_snapshot_id
+            Integer dataFileCount = row.get(3, Integer.class);
+            Long totalDataFileSize = row.get(4, Long.class);
+            Integer posDeleteFileCount = row.get(6, Integer.class);
+            Integer eqDeleteFileCount = row.get(8, Integer.class);
+            Long lastUpdatedAt = row.get(9, Long.class);
+
+            // Total file count = data files + position delete files + 
equality delete files
+            int totalFileCount =
+                (dataFileCount != null ? dataFileCount : 0)
+                    + (posDeleteFileCount != null ? posDeleteFileCount : 0)
+                    + (eqDeleteFileCount != null ? eqDeleteFileCount : 0);
+            partitionInfo.setFileCount(totalFileCount);
+            // TODO: Iceberg partitions table currently only reports data file 
sizes, not delete
+            // file sizes.
+            // This will be updated once Iceberg supports reporting delete 
file sizes.
+            // See: https://github.com/apache/iceberg/issues/14803
+            partitionInfo.setFileSize(totalDataFileSize != null ? 
totalDataFileSize : 0L);
+            partitionInfo.setLastCommitTime(lastUpdatedAt != null ? 
lastUpdatedAt : 0L);
+
+            partitions.add(partitionInfo);
+          }
         }
-        PartitionBaseInfo partitionInfo = 
partitionBaseInfoHashMap.get(fileInfo.getPartition());
-        partitionInfo.setFileCount(partitionInfo.getFileCount() + 1);
-        partitionInfo.setFileSize(partitionInfo.getFileSize() + 
fileInfo.getFileSize());
-        partitionInfo.setLastCommitTime(
-            partitionInfo.getLastCommitTime() > fileInfo.getCommitTime()
-                ? partitionInfo.getLastCommitTime()
-                : fileInfo.getCommitTime());
       }
-    } finally {
-      try {
-        tableFiles.close();
-      } catch (IOException e) {
-        LOG.warn("Failed to close the manifest reader.", e);
+    } catch (Exception e) {
+      LOG.warn(
+          "Failed to use PARTITIONS metadata table, falling back to file 
scanning. Error: {}",
+          e.getMessage());
+      // Fallback to the old approach if metadata table access fails
+      return collectPartitionsFromFileScan(table);
+    }
+
+    return partitions;
+  }
+
+  /**
+   * Fallback method to collect partitions by scanning all files. This is kept 
for backward
+   * compatibility and error cases.
+   *
+   * @param table The Iceberg table to scan
+   * @return List of partition information
+   */
+  protected List<PartitionBaseInfo> collectPartitionsFromFileScan(Table table) 
{
+    Map<String, PartitionBaseInfo> partitionMap = new HashMap<>();
+
+    IcebergFindFiles manifestReader =
+        new IcebergFindFiles(table).ignoreDeleted().planWith(executorService);
+
+    try (CloseableIterable<IcebergFindFiles.IcebergManifestEntry> entries =
+        manifestReader.entries()) {
+
+      for (IcebergFindFiles.IcebergManifestEntry entry : entries) {
+        ContentFile<?> file = entry.getFile();
+        PartitionSpec spec = table.specs().get(file.specId());
+        String partitionPath = spec.partitionToPath(file.partition());
+
+        if (!partitionMap.containsKey(partitionPath)) {
+          PartitionBaseInfo partitionInfo = new PartitionBaseInfo();
+          partitionInfo.setPartition(partitionPath);
+          partitionInfo.setSpecId(file.specId());
+          partitionInfo.setFileCount(0);
+          partitionInfo.setFileSize(0L);
+          partitionInfo.setLastCommitTime(0L);
+          partitionMap.put(partitionPath, partitionInfo);
+        }
+
+        PartitionBaseInfo partitionInfo = partitionMap.get(partitionPath);
+        partitionInfo.setFileCount(partitionInfo.getFileCount() + 1);
+        partitionInfo.setFileSize(partitionInfo.getFileSize() + 
file.fileSizeInBytes());
+
+        long snapshotId = entry.getSnapshotId();
+        if (table.snapshot(snapshotId) != null) {
+          long commitTime = table.snapshot(snapshotId).timestampMillis();
+          
partitionInfo.setLastCommitTime(Math.max(partitionInfo.getLastCommitTime(), 
commitTime));
+        }
       }
+    } catch (IOException e) {
+      LOG.error("Failed to scan files for partition information", e);
     }
 
-    return new ArrayList<>(partitionBaseInfoHashMap.values());
+    return new ArrayList<>(partitionMap.values());
   }
 
   @Override
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
index 99d7a29ae..265a5700d 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
@@ -77,6 +77,81 @@ public class TestIcebergServerTableDescriptor extends 
TestServerTableDescriptor
         .commit();
   }
 
+  @Test
+  public void testGetTablePartitions() {
+    // The default test table is partitioned by 'age' field (see
+    // IcebergHadoopCatalogTestHelper.SPEC)
+    org.apache.iceberg.Table icebergTable = getTable();
+    Assert.assertTrue("Test table should be partitioned", 
icebergTable.spec().isPartitioned());
+
+    // Add some data files to create partitions
+    org.apache.iceberg.DataFile file1 =
+        org.apache.iceberg.DataFiles.builder(icebergTable.spec())
+            .withPath("/path/to/data-a.parquet")
+            .withFileSizeInBytes(100)
+            .withRecordCount(10)
+            .build();
+    icebergTable.newAppend().appendFile(file1).commit();
+
+    // Get partitions using descriptor
+    MixedAndIcebergTableDescriptor descriptor = new 
MixedAndIcebergTableDescriptor();
+    List<org.apache.amoro.table.descriptor.PartitionBaseInfo> partitions =
+        descriptor.getTablePartitions(getAmoroCatalog().loadTable(TEST_DB, 
TEST_TABLE));
+
+    // Verify we got partition info - should have at least 1 partition after 
adding a file
+    Assert.assertNotNull(partitions);
+    Assert.assertTrue("Should have at least 1 partition", partitions.size() > 
0);
+
+    // Verify file count is correct
+    long totalFiles = partitions.stream().mapToLong(p -> 
p.getFileCount()).sum();
+    Assert.assertEquals("Should have 1 file total", 1, totalFiles);
+
+    // Verify we used PARTITIONS metadata table which provides fileSize and 
lastCommitTime
+    for (org.apache.amoro.table.descriptor.PartitionBaseInfo partition : 
partitions) {
+      // File size should be available from PARTITIONS metadata table
+      Assert.assertTrue(
+          "FileSize should be available from PARTITIONS metadata table",
+          partition.getFileSize() >= 0);
+      // Last commit time should be available from PARTITIONS metadata table
+      Assert.assertTrue(
+          "LastCommitTime should be available from PARTITIONS metadata table",
+          partition.getLastCommitTime() >= 0);
+    }
+  }
+
+  @Test
+  public void testGetTablePartitionsFallback() {
+    // Test the fallback path by using a custom descriptor that forces fallback
+    org.apache.iceberg.Table icebergTable = getTable();
+    Assert.assertTrue("Test table should be partitioned", 
icebergTable.spec().isPartitioned());
+
+    // Add data file
+    org.apache.iceberg.DataFile file1 =
+        org.apache.iceberg.DataFiles.builder(icebergTable.spec())
+            .withPath("/path/to/fallback-data.parquet")
+            .withFileSizeInBytes(200)
+            .withRecordCount(20)
+            .build();
+    icebergTable.newAppend().appendFile(file1).commit();
+
+    // Use the fallback method directly
+    TestMixedAndIcebergTableDescriptor testDescriptor = new 
TestMixedAndIcebergTableDescriptor();
+    List<org.apache.amoro.table.descriptor.PartitionBaseInfo> partitions =
+        testDescriptor.testCollectPartitionsFromFileScan(icebergTable);
+
+    // Verify fallback works correctly
+    Assert.assertNotNull(partitions);
+    Assert.assertTrue("Fallback should return at least 1 partition", 
partitions.size() > 0);
+
+    // Verify file count
+    long totalFiles = partitions.stream().mapToLong(p -> 
p.getFileCount()).sum();
+    Assert.assertEquals("Fallback should count files correctly", 1, 
totalFiles);
+
+    // Verify fallback calculates actual values (not 0 like PARTITIONS table)
+    long totalSize = partitions.stream().mapToLong(p -> p.getFileSize()).sum();
+    Assert.assertTrue("Fallback should calculate actual file size (not 0)", 
totalSize > 0);
+  }
+
   @Test
   public void testOptimizingProcess() {
     TestMixedAndIcebergTableDescriptor descriptor = new 
TestMixedAndIcebergTableDescriptor();
@@ -292,6 +367,11 @@ public class TestIcebergServerTableDescriptor extends 
TestServerTableDescriptor
   /** Test descriptor class, add insert table/optimizing process methods for 
test. */
   private static class TestMixedAndIcebergTableDescriptor extends 
MixedAndIcebergTableDescriptor {
 
+    public List<org.apache.amoro.table.descriptor.PartitionBaseInfo>
+        testCollectPartitionsFromFileScan(Table table) {
+      return collectPartitionsFromFileScan(table);
+    }
+
     public void insertTable(ServerTableIdentifier identifier) {
       doAs(TableMetaMapper.class, mapper -> mapper.insertTable(identifier));
     }

Reply via email to