This is an automated email from the ASF dual-hosted git repository.
xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new db17a73e4 [AMORO-2635] Enhance table partition files list performance
(#4003)
db17a73e4 is described below
commit db17a73e47c1146c8bc725a327ebcbc6bf79d4c4
Author: Fei Wang <[email protected]>
AuthorDate: Fri Dec 12 00:59:27 2025 -0800
[AMORO-2635] Enhance table partition files list performance (#4003)
* [AMORO-2635]
* UT
* UT for fallback
* address comments
* TODO
---
.../dashboard/MixedAndIcebergTableDescriptor.java | 186 ++++++++++++++++++---
.../TestIcebergServerTableDescriptor.java | 80 +++++++++
2 files changed, 244 insertions(+), 22 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
index df035ee0b..19627f86f 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
@@ -44,6 +44,7 @@ import
org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.process.TableProcessMeta;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
@@ -74,15 +75,21 @@ import org.apache.amoro.utils.MixedTableUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.IcebergFindFiles;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableScan;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
@@ -445,35 +452,170 @@ public class MixedAndIcebergTableDescriptor extends
PersistentBase
if (mixedTable.spec().isUnpartitioned()) {
return new ArrayList<>();
}
- Map<String, PartitionBaseInfo> partitionBaseInfoHashMap = new HashMap<>();
- CloseableIterable<PartitionFileBaseInfo> tableFiles =
- getTableFilesInternal(amoroTable, null, null);
+ List<PartitionBaseInfo> result = new ArrayList<>();
+
+ // For keyed tables, we need to collect partitions from both change and
base tables
+ if (mixedTable.isKeyedTable()) {
+ Map<String, PartitionBaseInfo> partitionMap = new HashMap<>();
+
+ // Collect from change table
+ List<PartitionBaseInfo> changePartitions =
+ collectPartitionsFromTable(mixedTable.asKeyedTable().changeTable());
+ for (PartitionBaseInfo partition : changePartitions) {
+ partitionMap.put(partition.getPartition(), partition);
+ }
+
+ // Collect from base table and merge
+ List<PartitionBaseInfo> basePartitions =
+ collectPartitionsFromTable(mixedTable.asKeyedTable().baseTable());
+ for (PartitionBaseInfo basePartition : basePartitions) {
+ if (partitionMap.containsKey(basePartition.getPartition())) {
+ PartitionBaseInfo existing =
partitionMap.get(basePartition.getPartition());
+ existing.setFileCount(existing.getFileCount() +
basePartition.getFileCount());
+ existing.setFileSize(existing.getFileSize() +
basePartition.getFileSize());
+ } else {
+ partitionMap.put(basePartition.getPartition(), basePartition);
+ }
+ }
+
+ result.addAll(partitionMap.values());
+ } else {
+ result = collectPartitionsFromTable(mixedTable.asUnkeyedTable());
+ }
+
+ return result;
+ }
+
+ /**
+ * Collect partition information from an Iceberg table using the PARTITIONS
metadata table. This
+ * is much more efficient than scanning all data files, especially for
tables with many files.
+ *
+ * @param table The Iceberg table to collect partitions from
+ * @return List of partition information
+ */
+ private List<PartitionBaseInfo> collectPartitionsFromTable(Table table) {
+ List<PartitionBaseInfo> partitions = new ArrayList<>();
+
try {
- for (PartitionFileBaseInfo fileInfo : tableFiles) {
- if (!partitionBaseInfoHashMap.containsKey(fileInfo.getPartition())) {
- PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo();
- partitionBaseInfo.setPartition(fileInfo.getPartition());
- partitionBaseInfo.setSpecId(fileInfo.getSpecId());
- partitionBaseInfoHashMap.put(fileInfo.getPartition(),
partitionBaseInfo);
+ Preconditions.checkArgument(
+ table instanceof HasTableOperations, "table must support table
operations");
+ TableOperations ops = ((HasTableOperations) table).operations();
+
+ // Use PARTITIONS metadata table for efficient partition statistics
+ Table partitionsTable =
+ MetadataTableUtils.createMetadataTableInstance(
+ ops,
+ table.name(),
+ table.name() + "#" + MetadataTableType.PARTITIONS.name(),
+ MetadataTableType.PARTITIONS);
+
+ TableScan scan = partitionsTable.newScan();
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ CloseableIterable<CloseableIterable<StructLike>> transform =
+ CloseableIterable.transform(tasks, task ->
task.asDataTask().rows());
+
+ try (CloseableIterable<StructLike> rows =
CloseableIterable.concat(transform)) {
+ for (StructLike row : rows) {
+ PartitionBaseInfo partitionInfo = new PartitionBaseInfo();
+
+ // Get partition field - it's a struct
+ StructLike partition = row.get(0, StructLike.class);
+ int specId = row.get(1, Integer.class);
+
+ // Convert partition struct to path string
+ PartitionSpec spec = table.specs().get(specId);
+ String partitionPath = spec.partitionToPath(partition);
+
+ partitionInfo.setPartition(partitionPath);
+ partitionInfo.setSpecId(specId);
+
+ // Get file statistics from the partition metadata table
+ // Schema: partition, spec_id, record_count, file_count,
+ // total_data_file_size_in_bytes,
+ // position_delete_record_count, position_delete_file_count,
+ // equality_delete_record_count, equality_delete_file_count,
+ // last_updated_at, last_updated_snapshot_id
+ Integer dataFileCount = row.get(3, Integer.class);
+ Long totalDataFileSize = row.get(4, Long.class);
+ Integer posDeleteFileCount = row.get(6, Integer.class);
+ Integer eqDeleteFileCount = row.get(8, Integer.class);
+ Long lastUpdatedAt = row.get(9, Long.class);
+
+ // Total file count = data files + position delete files +
equality delete files
+ int totalFileCount =
+ (dataFileCount != null ? dataFileCount : 0)
+ + (posDeleteFileCount != null ? posDeleteFileCount : 0)
+ + (eqDeleteFileCount != null ? eqDeleteFileCount : 0);
+ partitionInfo.setFileCount(totalFileCount);
+ // TODO: Iceberg partitions table currently only reports data file
sizes, not delete
+ // file sizes.
+ // This will be updated once Iceberg supports reporting delete
file sizes.
+ // See: https://github.com/apache/iceberg/issues/14803
+ partitionInfo.setFileSize(totalDataFileSize != null ?
totalDataFileSize : 0L);
+ partitionInfo.setLastCommitTime(lastUpdatedAt != null ?
lastUpdatedAt : 0L);
+
+ partitions.add(partitionInfo);
+ }
}
- PartitionBaseInfo partitionInfo =
partitionBaseInfoHashMap.get(fileInfo.getPartition());
- partitionInfo.setFileCount(partitionInfo.getFileCount() + 1);
- partitionInfo.setFileSize(partitionInfo.getFileSize() +
fileInfo.getFileSize());
- partitionInfo.setLastCommitTime(
- partitionInfo.getLastCommitTime() > fileInfo.getCommitTime()
- ? partitionInfo.getLastCommitTime()
- : fileInfo.getCommitTime());
}
- } finally {
- try {
- tableFiles.close();
- } catch (IOException e) {
- LOG.warn("Failed to close the manifest reader.", e);
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to use PARTITIONS metadata table, falling back to file
scanning. Error: {}",
+ e.getMessage());
+ // Fallback to the old approach if metadata table access fails
+ return collectPartitionsFromFileScan(table);
+ }
+
+ return partitions;
+ }
+
+ /**
+ * Fallback method to collect partitions by scanning all files. This is kept
for backward
+ * compatibility and error cases.
+ *
+ * @param table The Iceberg table to scan
+ * @return List of partition information
+ */
+ protected List<PartitionBaseInfo> collectPartitionsFromFileScan(Table table)
{
+ Map<String, PartitionBaseInfo> partitionMap = new HashMap<>();
+
+ IcebergFindFiles manifestReader =
+ new IcebergFindFiles(table).ignoreDeleted().planWith(executorService);
+
+ try (CloseableIterable<IcebergFindFiles.IcebergManifestEntry> entries =
+ manifestReader.entries()) {
+
+ for (IcebergFindFiles.IcebergManifestEntry entry : entries) {
+ ContentFile<?> file = entry.getFile();
+ PartitionSpec spec = table.specs().get(file.specId());
+ String partitionPath = spec.partitionToPath(file.partition());
+
+ if (!partitionMap.containsKey(partitionPath)) {
+ PartitionBaseInfo partitionInfo = new PartitionBaseInfo();
+ partitionInfo.setPartition(partitionPath);
+ partitionInfo.setSpecId(file.specId());
+ partitionInfo.setFileCount(0);
+ partitionInfo.setFileSize(0L);
+ partitionInfo.setLastCommitTime(0L);
+ partitionMap.put(partitionPath, partitionInfo);
+ }
+
+ PartitionBaseInfo partitionInfo = partitionMap.get(partitionPath);
+ partitionInfo.setFileCount(partitionInfo.getFileCount() + 1);
+ partitionInfo.setFileSize(partitionInfo.getFileSize() +
file.fileSizeInBytes());
+
+ long snapshotId = entry.getSnapshotId();
+ if (table.snapshot(snapshotId) != null) {
+ long commitTime = table.snapshot(snapshotId).timestampMillis();
+
partitionInfo.setLastCommitTime(Math.max(partitionInfo.getLastCommitTime(),
commitTime));
+ }
}
+ } catch (IOException e) {
+ LOG.error("Failed to scan files for partition information", e);
}
- return new ArrayList<>(partitionBaseInfoHashMap.values());
+ return new ArrayList<>(partitionMap.values());
}
@Override
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
index 99d7a29ae..265a5700d 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
@@ -77,6 +77,81 @@ public class TestIcebergServerTableDescriptor extends
TestServerTableDescriptor
.commit();
}
+ @Test
+ public void testGetTablePartitions() {
+ // The default test table is partitioned by 'age' field (see
+ // IcebergHadoopCatalogTestHelper.SPEC)
+ org.apache.iceberg.Table icebergTable = getTable();
+ Assert.assertTrue("Test table should be partitioned",
icebergTable.spec().isPartitioned());
+
+ // Add some data files to create partitions
+ org.apache.iceberg.DataFile file1 =
+ org.apache.iceberg.DataFiles.builder(icebergTable.spec())
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(100)
+ .withRecordCount(10)
+ .build();
+ icebergTable.newAppend().appendFile(file1).commit();
+
+ // Get partitions using descriptor
+ MixedAndIcebergTableDescriptor descriptor = new
MixedAndIcebergTableDescriptor();
+ List<org.apache.amoro.table.descriptor.PartitionBaseInfo> partitions =
+ descriptor.getTablePartitions(getAmoroCatalog().loadTable(TEST_DB,
TEST_TABLE));
+
+ // Verify we got partition info - should have at least 1 partition after
adding a file
+ Assert.assertNotNull(partitions);
+ Assert.assertTrue("Should have at least 1 partition", partitions.size() >
0);
+
+ // Verify file count is correct
+ long totalFiles = partitions.stream().mapToLong(p ->
p.getFileCount()).sum();
+ Assert.assertEquals("Should have 1 file total", 1, totalFiles);
+
+ // Verify we used PARTITIONS metadata table which provides fileSize and
lastCommitTime
+ for (org.apache.amoro.table.descriptor.PartitionBaseInfo partition :
partitions) {
+ // File size should be available from PARTITIONS metadata table
+ Assert.assertTrue(
+ "FileSize should be available from PARTITIONS metadata table",
+ partition.getFileSize() >= 0);
+ // Last commit time should be available from PARTITIONS metadata table
+ Assert.assertTrue(
+ "LastCommitTime should be available from PARTITIONS metadata table",
+ partition.getLastCommitTime() >= 0);
+ }
+ }
+
+ @Test
+ public void testGetTablePartitionsFallback() {
+ // Test the fallback path by using a custom descriptor that forces fallback
+ org.apache.iceberg.Table icebergTable = getTable();
+ Assert.assertTrue("Test table should be partitioned",
icebergTable.spec().isPartitioned());
+
+ // Add data file
+ org.apache.iceberg.DataFile file1 =
+ org.apache.iceberg.DataFiles.builder(icebergTable.spec())
+ .withPath("/path/to/fallback-data.parquet")
+ .withFileSizeInBytes(200)
+ .withRecordCount(20)
+ .build();
+ icebergTable.newAppend().appendFile(file1).commit();
+
+ // Use the fallback method directly
+ TestMixedAndIcebergTableDescriptor testDescriptor = new
TestMixedAndIcebergTableDescriptor();
+ List<org.apache.amoro.table.descriptor.PartitionBaseInfo> partitions =
+ testDescriptor.testCollectPartitionsFromFileScan(icebergTable);
+
+ // Verify fallback works correctly
+ Assert.assertNotNull(partitions);
+ Assert.assertTrue("Fallback should return at least 1 partition",
partitions.size() > 0);
+
+ // Verify file count
+ long totalFiles = partitions.stream().mapToLong(p ->
p.getFileCount()).sum();
+ Assert.assertEquals("Fallback should count files correctly", 1,
totalFiles);
+
+ // Verify fallback calculates actual values (not 0 like PARTITIONS table)
+ long totalSize = partitions.stream().mapToLong(p -> p.getFileSize()).sum();
+ Assert.assertTrue("Fallback should calculate actual file size (not 0)",
totalSize > 0);
+ }
+
@Test
public void testOptimizingProcess() {
TestMixedAndIcebergTableDescriptor descriptor = new
TestMixedAndIcebergTableDescriptor();
@@ -292,6 +367,11 @@ public class TestIcebergServerTableDescriptor extends
TestServerTableDescriptor
/** Test descriptor class, add insert table/optimizing process methods for
test. */
private static class TestMixedAndIcebergTableDescriptor extends
MixedAndIcebergTableDescriptor {
+ public List<org.apache.amoro.table.descriptor.PartitionBaseInfo>
+ testCollectPartitionsFromFileScan(Table table) {
+ return collectPartitionsFromFileScan(table);
+ }
+
public void insertTable(ServerTableIdentifier identifier) {
doAs(TableMetaMapper.class, mapper -> mapper.insertTable(identifier));
}