deniskuzZ commented on code in PR #5957:
URL: https://github.com/apache/hive/pull/5957#discussion_r2207083625
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java:
##########
@@ -165,4 +168,73 @@ private void addCompactionTargetIfEligible(Table table,
org.apache.iceberg.Table
ci.type = compactionEvaluator.determineCompactionType();
compactions.add(ci);
}
+
+ /**
+ * Finds all unique non-compaction-modified partitions (with added or
deleted files) between a given past
+ * snapshot ID and the table's current (latest) snapshot.
+ * @param hiveTable The {@link org.apache.hadoop.hive.ql.metadata.Table}
instance to inspect.
+ * @param pastSnapshotId The ID of the older snapshot (exclusive).
+ * @param latestSpecOnly when True, returns partitions with the current spec
only;
+ * False - older specs only;
+ * Null - any spec
+ * @return A List of {@link org.apache.hadoop.hive.ql.metadata.Partition}
representing the unique modified
+ * partition names.
+ * @throws IllegalArgumentException if snapshot IDs are invalid or out of
order, or if the table has no current
+ * snapshot.
+ */
+ private List<Partition>
findModifiedPartitions(org.apache.hadoop.hive.ql.metadata.Table hiveTable,
+ org.apache.iceberg.Table icebergTable, Long pastSnapshotId, Boolean
latestSpecOnly) {
+ Snapshot currentSnapshot = icebergTable.currentSnapshot();
+ if (currentSnapshot == null) {
+ throw new IllegalArgumentException(String.format("Table %s has no
current snapshot. Cannot determine range.",
+ icebergTable.name()));
+ }
+ Snapshot pastSnapshot = pastSnapshotId != null ?
icebergTable.snapshot(pastSnapshotId) : null;
+
+ List<Snapshot> relevantSnapshots =
StreamSupport.stream(icebergTable.snapshots().spliterator(), false)
+ .filter(s -> pastSnapshot == null || s.timestampMillis() >
pastSnapshot.timestampMillis() &&
+ s.timestampMillis() <= currentSnapshot.timestampMillis())
+ .filter(s -> s.summary().get(IcebergTableUtil.SNAPSHOT_SOURCE_PROP) ==
null)
+ .sorted(Comparator.comparingLong(Snapshot::timestampMillis))
+ .toList();
+
+ Set<String> modifiedPartitions = Sets.newHashSet();
+
+ for (Snapshot snapshot : relevantSnapshots) {
+ FileIO io = icebergTable.io();
+
modifiedPartitions.addAll(IcebergTableUtil.getPartitionNames(icebergTable,
snapshot.addedDataFiles(io),
+ latestSpecOnly));
+
modifiedPartitions.addAll(IcebergTableUtil.getPartitionNames(icebergTable,
snapshot.removedDataFiles(io),
+ latestSpecOnly));
+
modifiedPartitions.addAll(IcebergTableUtil.getPartitionNames(icebergTable,
snapshot.addedDeleteFiles(io),
+ latestSpecOnly));
+
modifiedPartitions.addAll(IcebergTableUtil.getPartitionNames(icebergTable,
snapshot.removedDeleteFiles(io),
+ latestSpecOnly));
+ }
+
+ return IcebergTableUtil.convertNameToMetastorePartition(hiveTable,
modifiedPartitions);
+ }
+
+ /**
+ * Checks if a table has had new commits since a given snapshot that were
not caused by compaction.
+ * @param icebergTable The Iceberg table to check.
+ * @param pastSnapshotId The ID of the snapshot to check from (exclusive).
+ * @return true if at least one non-compaction snapshot exists since the
pastSnapshotId
+ * whose source is not compaction, false otherwise.
+ * @throws IllegalArgumentException if the table has no current snapshot.
+ */
+ private boolean hasNonCompactionCommits(org.apache.iceberg.Table
icebergTable, Long pastSnapshotId) {
+ Snapshot currentSnapshot = icebergTable.currentSnapshot();
+ if (currentSnapshot == null) {
+ throw new IllegalArgumentException(String.format("Table %s has no
current snapshot. Cannot determine range.",
+ icebergTable.name()));
+ }
+ Snapshot pastSnapshot = icebergTable.snapshot(pastSnapshotId);
+
+ return StreamSupport.stream(icebergTable.snapshots().spliterator(), false)
+ .filter(s -> pastSnapshot == null || s.timestampMillis() >
pastSnapshot.timestampMillis() &&
+ s.timestampMillis() <= currentSnapshot.timestampMillis())
Review Comment:
why do we need this check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]