turboFei commented on code in PR #4003:
URL: https://github.com/apache/amoro/pull/4003#discussion_r2613080096
##########
amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java:
##########
@@ -445,35 +452,166 @@ public List<PartitionBaseInfo>
getTablePartitions(AmoroTable<?> amoroTable) {
if (mixedTable.spec().isUnpartitioned()) {
return new ArrayList<>();
}
- Map<String, PartitionBaseInfo> partitionBaseInfoHashMap = new HashMap<>();
- CloseableIterable<PartitionFileBaseInfo> tableFiles =
- getTableFilesInternal(amoroTable, null, null);
+ List<PartitionBaseInfo> result = new ArrayList<>();
+
+ // For keyed tables, we need to collect partitions from both change and
base tables
+ if (mixedTable.isKeyedTable()) {
+ Map<String, PartitionBaseInfo> partitionMap = new HashMap<>();
+
+ // Collect from change table
+ List<PartitionBaseInfo> changePartitions =
+ collectPartitionsFromTable(mixedTable.asKeyedTable().changeTable());
+ for (PartitionBaseInfo partition : changePartitions) {
+ partitionMap.put(partition.getPartition(), partition);
+ }
+
+ // Collect from base table and merge
+ List<PartitionBaseInfo> basePartitions =
+ collectPartitionsFromTable(mixedTable.asKeyedTable().baseTable());
+ for (PartitionBaseInfo basePartition : basePartitions) {
+ if (partitionMap.containsKey(basePartition.getPartition())) {
+ PartitionBaseInfo existing =
partitionMap.get(basePartition.getPartition());
+ existing.setFileCount(existing.getFileCount() +
basePartition.getFileCount());
+ existing.setFileSize(existing.getFileSize() +
basePartition.getFileSize());
+ } else {
+ partitionMap.put(basePartition.getPartition(), basePartition);
+ }
+ }
+
+ result.addAll(partitionMap.values());
+ } else {
+ result = collectPartitionsFromTable(mixedTable.asUnkeyedTable());
+ }
+
+ return result;
+ }
+
+ /**
+ * Collect partition information from an Iceberg table using the PARTITIONS
metadata table. This
+ * is much more efficient than scanning all data files, especially for
tables with many files.
+ *
+ * @param table The Iceberg table to collect partitions from
+ * @return List of partition information
+ */
+ private List<PartitionBaseInfo> collectPartitionsFromTable(Table table) {
+ List<PartitionBaseInfo> partitions = new ArrayList<>();
+
try {
- for (PartitionFileBaseInfo fileInfo : tableFiles) {
- if (!partitionBaseInfoHashMap.containsKey(fileInfo.getPartition())) {
- PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo();
- partitionBaseInfo.setPartition(fileInfo.getPartition());
- partitionBaseInfo.setSpecId(fileInfo.getSpecId());
- partitionBaseInfoHashMap.put(fileInfo.getPartition(),
partitionBaseInfo);
+ Preconditions.checkArgument(
+ table instanceof HasTableOperations, "table must support table
operations");
+ TableOperations ops = ((HasTableOperations) table).operations();
+
+ // Use PARTITIONS metadata table for efficient partition statistics
+ Table partitionsTable =
+ MetadataTableUtils.createMetadataTableInstance(
+ ops,
+ table.name(),
+ table.name() + "#" + MetadataTableType.PARTITIONS.name(),
+ MetadataTableType.PARTITIONS);
+
+ TableScan scan = partitionsTable.newScan();
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ CloseableIterable<CloseableIterable<StructLike>> transform =
+ CloseableIterable.transform(tasks, task ->
task.asDataTask().rows());
+
+ try (CloseableIterable<StructLike> rows =
CloseableIterable.concat(transform)) {
+ for (StructLike row : rows) {
+ PartitionBaseInfo partitionInfo = new PartitionBaseInfo();
+
+ // Get partition field - it's a struct
+ StructLike partition = row.get(0, StructLike.class);
+ int specId = row.get(1, Integer.class);
+
+ // Convert partition struct to path string
+ PartitionSpec spec = table.specs().get(specId);
+ String partitionPath = spec.partitionToPath(partition);
+
+ partitionInfo.setPartition(partitionPath);
+ partitionInfo.setSpecId(specId);
+
+ // Get file statistics from the partition metadata table
+ // Schema: partition, spec_id, record_count, file_count,
+ // total_data_file_size_in_bytes,
+ // position_delete_record_count, position_delete_file_count,
+ // equality_delete_record_count, equality_delete_file_count,
+ // last_updated_at, last_updated_snapshot_id
+ Integer dataFileCount = row.get(3, Integer.class);
+ Long totalDataFileSize = row.get(4, Long.class);
+ Integer posDeleteFileCount = row.get(6, Integer.class);
+ Integer eqDeleteFileCount = row.get(8, Integer.class);
+ Long lastUpdatedAt = row.get(9, Long.class);
+
+ // Total file count = data files + position delete files +
equality delete files
+ int totalFileCount =
+ (dataFileCount != null ? dataFileCount : 0)
+ + (posDeleteFileCount != null ? posDeleteFileCount : 0)
+ + (eqDeleteFileCount != null ? eqDeleteFileCount : 0);
+ partitionInfo.setFileCount(totalFileCount);
+ partitionInfo.setFileSize(totalDataFileSize != null ?
totalDataFileSize : 0L);
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]