deniskuzZ commented on code in PR #5957: URL: https://github.com/apache/hive/pull/5957#discussion_r2207071989
########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java: ########## @@ -165,4 +168,73 @@ private void addCompactionTargetIfEligible(Table table, org.apache.iceberg.Table ci.type = compactionEvaluator.determineCompactionType(); compactions.add(ci); } + + /** + * Finds all unique non-compaction-modified partitions (with added or deleted files) between a given past + * snapshot ID and the table's current (latest) snapshot. + * @param hiveTable The {@link org.apache.hadoop.hive.ql.metadata.Table} instance to inspect. + * @param pastSnapshotId The ID of the older snapshot (exclusive). + * @param latestSpecOnly when True, returns partitions with the current spec only; + * False - older specs only; + * Null - any spec + * @return A List of {@link org.apache.hadoop.hive.ql.metadata.Partition} representing the unique modified + * partition names. + * @throws IllegalArgumentException if snapshot IDs are invalid or out of order, or if the table has no current + * snapshot. + */ + private List<Partition> findModifiedPartitions(org.apache.hadoop.hive.ql.metadata.Table hiveTable, + org.apache.iceberg.Table icebergTable, Long pastSnapshotId, Boolean latestSpecOnly) { + Snapshot currentSnapshot = icebergTable.currentSnapshot(); + if (currentSnapshot == null) { + throw new IllegalArgumentException(String.format("Table %s has no current snapshot. Cannot determine range.", + icebergTable.name())); + } + Snapshot pastSnapshot = pastSnapshotId != null ? icebergTable.snapshot(pastSnapshotId) : null; + + List<Snapshot> relevantSnapshots = StreamSupport.stream(icebergTable.snapshots().spliterator(), false) + .filter(s -> pastSnapshot == null || s.timestampMillis() > pastSnapshot.timestampMillis() && + s.timestampMillis() <= currentSnapshot.timestampMillis()) + .filter(s -> s.summary().get(IcebergTableUtil.SNAPSHOT_SOURCE_PROP) == null) + .sorted(Comparator.comparingLong(Snapshot::timestampMillis)) + .toList(); + + Set<String> modifiedPartitions = Sets.newHashSet(); + + for (Snapshot snapshot : relevantSnapshots) { + FileIO io = icebergTable.io(); + modifiedPartitions.addAll(IcebergTableUtil.getPartitionNames(icebergTable, snapshot.addedDataFiles(io), + latestSpecOnly)); + modifiedPartitions.addAll(IcebergTableUtil.getPartitionNames(icebergTable, snapshot.removedDataFiles(io), + latestSpecOnly)); + modifiedPartitions.addAll(IcebergTableUtil.getPartitionNames(icebergTable, snapshot.addedDeleteFiles(io), + latestSpecOnly)); + modifiedPartitions.addAll(IcebergTableUtil.getPartitionNames(icebergTable, snapshot.removedDeleteFiles(io), + latestSpecOnly)); + } + + return IcebergTableUtil.convertNameToMetastorePartition(hiveTable, modifiedPartitions); + } + + /** + * Checks if a table has had new commits since a given snapshot that were not caused by compaction. + * @param icebergTable The Iceberg table to check. + * @param pastSnapshotId The ID of the snapshot to check from (exclusive). + * @return true if at least one non-compaction snapshot exists since the pastSnapshotId + * whose source is not compaction, false otherwise. + * @throws IllegalArgumentException if the table has no current snapshot. + */ + private boolean hasNonCompactionCommits(org.apache.iceberg.Table icebergTable, Long pastSnapshotId) { Review Comment: maybe `hasNewCommits`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org