xxubai commented on code in PR #4003:
URL: https://github.com/apache/amoro/pull/4003#discussion_r2601815503
##########
amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java:
##########
@@ -445,35 +452,160 @@ public List<PartitionBaseInfo>
getTablePartitions(AmoroTable<?> amoroTable) {
if (mixedTable.spec().isUnpartitioned()) {
return new ArrayList<>();
}
- Map<String, PartitionBaseInfo> partitionBaseInfoHashMap = new HashMap<>();
- CloseableIterable<PartitionFileBaseInfo> tableFiles =
- getTableFilesInternal(amoroTable, null, null);
+ List<PartitionBaseInfo> result = new ArrayList<>();
+
+ // For keyed tables, we need to collect partitions from both change and
base tables
+ if (mixedTable.isKeyedTable()) {
+ Map<String, PartitionBaseInfo> partitionMap = new HashMap<>();
+
+ // Collect from change table
+ List<PartitionBaseInfo> changePartitions =
+ collectPartitionsFromTable(mixedTable.asKeyedTable().changeTable());
+ for (PartitionBaseInfo partition : changePartitions) {
+ partitionMap.put(partition.getPartition(), partition);
+ }
+
+ // Collect from base table and merge
+ List<PartitionBaseInfo> basePartitions =
+ collectPartitionsFromTable(mixedTable.asKeyedTable().baseTable());
+ for (PartitionBaseInfo basePartition : basePartitions) {
+ if (partitionMap.containsKey(basePartition.getPartition())) {
+ PartitionBaseInfo existing =
partitionMap.get(basePartition.getPartition());
+ existing.setFileCount(existing.getFileCount() +
basePartition.getFileCount());
+ existing.setFileSize(existing.getFileSize() +
basePartition.getFileSize());
+ } else {
+ partitionMap.put(basePartition.getPartition(), basePartition);
+ }
+ }
+
+ result.addAll(partitionMap.values());
+ } else {
+ result = collectPartitionsFromTable(mixedTable.asUnkeyedTable());
+ }
+
+ return result;
+ }
+
+ /**
+ * Collect partition information from an Iceberg table using the PARTITIONS
metadata table. This
+ * is much more efficient than scanning all data files, especially for
tables with many files.
+ *
+ * @param table The Iceberg table to collect partitions from
+ * @return List of partition information
+ */
+ private List<PartitionBaseInfo> collectPartitionsFromTable(Table table) {
+ List<PartitionBaseInfo> partitions = new ArrayList<>();
+
try {
- for (PartitionFileBaseInfo fileInfo : tableFiles) {
- if (!partitionBaseInfoHashMap.containsKey(fileInfo.getPartition())) {
- PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo();
- partitionBaseInfo.setPartition(fileInfo.getPartition());
- partitionBaseInfo.setSpecId(fileInfo.getSpecId());
- partitionBaseInfoHashMap.put(fileInfo.getPartition(),
partitionBaseInfo);
+ Preconditions.checkArgument(
+ table instanceof HasTableOperations, "table must support table
operations");
+ TableOperations ops = ((HasTableOperations) table).operations();
+
+ // Use PARTITIONS metadata table for efficient partition statistics
+ Table partitionsTable =
+ MetadataTableUtils.createMetadataTableInstance(
+ ops,
+ table.name(),
+ table.name() + "#" + MetadataTableType.PARTITIONS.name(),
+ MetadataTableType.PARTITIONS);
+
+ TableScan scan = partitionsTable.newScan();
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ CloseableIterable<CloseableIterable<StructLike>> transform =
+ CloseableIterable.transform(tasks, task ->
task.asDataTask().rows());
+
+ try (CloseableIterable<StructLike> rows =
CloseableIterable.concat(transform)) {
+ for (StructLike row : rows) {
+ PartitionBaseInfo partitionInfo = new PartitionBaseInfo();
+
+ // Get partition field - it's a struct
+ StructLike partition = row.get(0, StructLike.class);
+ int specId = row.get(1, Integer.class);
+
+ // Convert partition struct to path string
+ PartitionSpec spec = table.specs().get(specId);
+ String partitionPath = spec.partitionToPath(partition);
+
+ partitionInfo.setPartition(partitionPath);
+ partitionInfo.setSpecId(specId);
+
+ // Get file statistics from the partition metadata table
+ // record_count: index 2
+ // file_count: index 3
+ Long recordCount = row.get(2, Long.class);
+ Integer dataFileCount = row.get(3, Integer.class);
+
+ partitionInfo.setFileCount(dataFileCount != null ? dataFileCount :
0);
Review Comment:
lack delete file count?
##########
amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java:
##########
@@ -445,35 +452,160 @@ public List<PartitionBaseInfo>
getTablePartitions(AmoroTable<?> amoroTable) {
if (mixedTable.spec().isUnpartitioned()) {
return new ArrayList<>();
}
- Map<String, PartitionBaseInfo> partitionBaseInfoHashMap = new HashMap<>();
- CloseableIterable<PartitionFileBaseInfo> tableFiles =
- getTableFilesInternal(amoroTable, null, null);
+ List<PartitionBaseInfo> result = new ArrayList<>();
+
+ // For keyed tables, we need to collect partitions from both change and
base tables
+ if (mixedTable.isKeyedTable()) {
+ Map<String, PartitionBaseInfo> partitionMap = new HashMap<>();
+
+ // Collect from change table
+ List<PartitionBaseInfo> changePartitions =
+ collectPartitionsFromTable(mixedTable.asKeyedTable().changeTable());
+ for (PartitionBaseInfo partition : changePartitions) {
+ partitionMap.put(partition.getPartition(), partition);
+ }
+
+ // Collect from base table and merge
+ List<PartitionBaseInfo> basePartitions =
+ collectPartitionsFromTable(mixedTable.asKeyedTable().baseTable());
+ for (PartitionBaseInfo basePartition : basePartitions) {
+ if (partitionMap.containsKey(basePartition.getPartition())) {
+ PartitionBaseInfo existing =
partitionMap.get(basePartition.getPartition());
+ existing.setFileCount(existing.getFileCount() +
basePartition.getFileCount());
+ existing.setFileSize(existing.getFileSize() +
basePartition.getFileSize());
+ } else {
+ partitionMap.put(basePartition.getPartition(), basePartition);
+ }
+ }
+
+ result.addAll(partitionMap.values());
+ } else {
+ result = collectPartitionsFromTable(mixedTable.asUnkeyedTable());
+ }
+
+ return result;
+ }
+
+ /**
+ * Collect partition information from an Iceberg table using the PARTITIONS
metadata table. This
+ * is much more efficient than scanning all data files, especially for
tables with many files.
+ *
+ * @param table The Iceberg table to collect partitions from
+ * @return List of partition information
+ */
+ private List<PartitionBaseInfo> collectPartitionsFromTable(Table table) {
+ List<PartitionBaseInfo> partitions = new ArrayList<>();
+
try {
- for (PartitionFileBaseInfo fileInfo : tableFiles) {
- if (!partitionBaseInfoHashMap.containsKey(fileInfo.getPartition())) {
- PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo();
- partitionBaseInfo.setPartition(fileInfo.getPartition());
- partitionBaseInfo.setSpecId(fileInfo.getSpecId());
- partitionBaseInfoHashMap.put(fileInfo.getPartition(),
partitionBaseInfo);
+ Preconditions.checkArgument(
+ table instanceof HasTableOperations, "table must support table
operations");
+ TableOperations ops = ((HasTableOperations) table).operations();
+
+ // Use PARTITIONS metadata table for efficient partition statistics
+ Table partitionsTable =
+ MetadataTableUtils.createMetadataTableInstance(
+ ops,
+ table.name(),
+ table.name() + "#" + MetadataTableType.PARTITIONS.name(),
+ MetadataTableType.PARTITIONS);
+
+ TableScan scan = partitionsTable.newScan();
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ CloseableIterable<CloseableIterable<StructLike>> transform =
+ CloseableIterable.transform(tasks, task ->
task.asDataTask().rows());
+
+ try (CloseableIterable<StructLike> rows =
CloseableIterable.concat(transform)) {
+ for (StructLike row : rows) {
+ PartitionBaseInfo partitionInfo = new PartitionBaseInfo();
+
+ // Get partition field - it's a struct
+ StructLike partition = row.get(0, StructLike.class);
+ int specId = row.get(1, Integer.class);
+
+ // Convert partition struct to path string
+ PartitionSpec spec = table.specs().get(specId);
+ String partitionPath = spec.partitionToPath(partition);
+
+ partitionInfo.setPartition(partitionPath);
+ partitionInfo.setSpecId(specId);
+
+ // Get file statistics from the partition metadata table
+ // record_count: index 2
+ // file_count: index 3
+ Long recordCount = row.get(2, Long.class);
+ Integer dataFileCount = row.get(3, Integer.class);
+
+ partitionInfo.setFileCount(dataFileCount != null ? dataFileCount :
0);
+
+ // Note: The PARTITIONS metadata table doesn't include total file
size or commit time
+ // For file size, we would need to sum from files metadata which
defeats the performance
+ // benefit
+ // Setting to 0 as a trade-off for performance
+ partitionInfo.setFileSize(0L);
Review Comment:
Should we use total data file size in bytes `total_data_file_size_in_bytes`?
##########
amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java:
##########
@@ -445,35 +452,160 @@ public List<PartitionBaseInfo>
getTablePartitions(AmoroTable<?> amoroTable) {
if (mixedTable.spec().isUnpartitioned()) {
return new ArrayList<>();
}
- Map<String, PartitionBaseInfo> partitionBaseInfoHashMap = new HashMap<>();
- CloseableIterable<PartitionFileBaseInfo> tableFiles =
- getTableFilesInternal(amoroTable, null, null);
+ List<PartitionBaseInfo> result = new ArrayList<>();
+
+ // For keyed tables, we need to collect partitions from both change and
base tables
+ if (mixedTable.isKeyedTable()) {
+ Map<String, PartitionBaseInfo> partitionMap = new HashMap<>();
+
+ // Collect from change table
+ List<PartitionBaseInfo> changePartitions =
+ collectPartitionsFromTable(mixedTable.asKeyedTable().changeTable());
+ for (PartitionBaseInfo partition : changePartitions) {
+ partitionMap.put(partition.getPartition(), partition);
+ }
+
+ // Collect from base table and merge
+ List<PartitionBaseInfo> basePartitions =
+ collectPartitionsFromTable(mixedTable.asKeyedTable().baseTable());
+ for (PartitionBaseInfo basePartition : basePartitions) {
+ if (partitionMap.containsKey(basePartition.getPartition())) {
+ PartitionBaseInfo existing =
partitionMap.get(basePartition.getPartition());
+ existing.setFileCount(existing.getFileCount() +
basePartition.getFileCount());
+ existing.setFileSize(existing.getFileSize() +
basePartition.getFileSize());
+ } else {
+ partitionMap.put(basePartition.getPartition(), basePartition);
+ }
+ }
+
+ result.addAll(partitionMap.values());
+ } else {
+ result = collectPartitionsFromTable(mixedTable.asUnkeyedTable());
+ }
+
+ return result;
+ }
+
+ /**
+ * Collect partition information from an Iceberg table using the PARTITIONS
metadata table. This
+ * is much more efficient than scanning all data files, especially for
tables with many files.
+ *
+ * @param table The Iceberg table to collect partitions from
+ * @return List of partition information
+ */
+ private List<PartitionBaseInfo> collectPartitionsFromTable(Table table) {
+ List<PartitionBaseInfo> partitions = new ArrayList<>();
+
try {
- for (PartitionFileBaseInfo fileInfo : tableFiles) {
- if (!partitionBaseInfoHashMap.containsKey(fileInfo.getPartition())) {
- PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo();
- partitionBaseInfo.setPartition(fileInfo.getPartition());
- partitionBaseInfo.setSpecId(fileInfo.getSpecId());
- partitionBaseInfoHashMap.put(fileInfo.getPartition(),
partitionBaseInfo);
+ Preconditions.checkArgument(
+ table instanceof HasTableOperations, "table must support table
operations");
+ TableOperations ops = ((HasTableOperations) table).operations();
+
+ // Use PARTITIONS metadata table for efficient partition statistics
+ Table partitionsTable =
+ MetadataTableUtils.createMetadataTableInstance(
+ ops,
+ table.name(),
+ table.name() + "#" + MetadataTableType.PARTITIONS.name(),
+ MetadataTableType.PARTITIONS);
+
+ TableScan scan = partitionsTable.newScan();
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ CloseableIterable<CloseableIterable<StructLike>> transform =
+ CloseableIterable.transform(tasks, task ->
task.asDataTask().rows());
+
+ try (CloseableIterable<StructLike> rows =
CloseableIterable.concat(transform)) {
+ for (StructLike row : rows) {
+ PartitionBaseInfo partitionInfo = new PartitionBaseInfo();
+
+ // Get partition field - it's a struct
+ StructLike partition = row.get(0, StructLike.class);
+ int specId = row.get(1, Integer.class);
+
+ // Convert partition struct to path string
+ PartitionSpec spec = table.specs().get(specId);
+ String partitionPath = spec.partitionToPath(partition);
+
+ partitionInfo.setPartition(partitionPath);
+ partitionInfo.setSpecId(specId);
+
+ // Get file statistics from the partition metadata table
+ // record_count: index 2
+ // file_count: index 3
+ Long recordCount = row.get(2, Long.class);
+ Integer dataFileCount = row.get(3, Integer.class);
+
+ partitionInfo.setFileCount(dataFileCount != null ? dataFileCount :
0);
+
+ // Note: The PARTITIONS metadata table doesn't include total file
size or commit time
+ // For file size, we would need to sum from files metadata which
defeats the performance
+ // benefit
+ // Setting to 0 as a trade-off for performance
+ partitionInfo.setFileSize(0L);
+ partitionInfo.setLastCommitTime(0L);
Review Comment:
row 9 can obain the last update time: `last_updated_at`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]