deniskuzZ commented on code in PR #6292:
URL: https://github.com/apache/hive/pull/6292#discussion_r2781493032


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java:
##########
@@ -212,6 +202,77 @@ private List<Partition> 
findModifiedPartitions(org.apache.hadoop.hive.ql.metadat
     }
   }
 
+  /**
+   * Checks if there are any modified partitions (with added or deleted files) 
between a given past
+   * snapshot ID and the table's current (latest) snapshot. This method 
short-circuits as soon as
+   * it finds any modified partition, making it more efficient than 
findModifiedPartitions when
+   * only checking for existence.
+   * @param icebergTable The Iceberg table to inspect.
+   * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot 
to check from (exclusive).
+   * @param latestSpecOnly when True, checks partitions with the current spec 
only;
+   *                       False - older specs only;
+   *                       Null - any spec
+   * @return true if at least one modified partition exists, false otherwise.
+   */
+  private boolean hasModifiedPartitions(org.apache.iceberg.Table icebergTable, 
Long pastSnapshotTimeMil,
+                                        Boolean latestSpecOnly) {
+    List<Snapshot> relevantSnapshots = getRelevantSnapshots(icebergTable, 
pastSnapshotTimeMil).toList();
+    if (relevantSnapshots.isEmpty()) {
+      return false;
+    }
+
+    try (ExecutorService executor = 
Executors.newVirtualThreadPerTaskExecutor()) {
+      // Submit a task for each snapshot and collect the Futures
+      List<Future<Set<String>>> futures = createPartitionNameFutures(
+          executor, icebergTable, relevantSnapshots, latestSpecOnly);
+
+      // Check results as they complete and short-circuit on first non-empty 
result
+      for (Future<Set<String>> future : futures) {

Review Comment:
   that's not good, blocks in submission order
   
   ````
   private <R> R findModifiedPartitionsInternal(
       org.apache.iceberg.Table icebergTable,
       Long pastSnapshotTimeMil,
       Boolean latestSpecOnly,
       Supplier<R> resultSupplier,
       BiFunction<R, Set<String>, Boolean> resultConsumer
   ) {
   
     List<Snapshot> relevantSnapshots =
         getRelevantSnapshots(icebergTable, pastSnapshotTimeMil).toList();
   
     R result = resultSupplier.get();
     if (relevantSnapshots.isEmpty()) {
       return result;
     }
   
     List<Callable<Set<String>>> tasks =
         createPartitionNameTasks(icebergTable, relevantSnapshots, 
latestSpecOnly);
   
     try (ExecutorService executor = 
Executors.newVirtualThreadPerTaskExecutor()) {
       CompletionService<Set<String>> cs = new 
ExecutorCompletionService<>(executor);
   
       // submit tasks
       for (Callable<Set<String>> task : tasks) {
         cs.submit(task);
       }
   
       // process results
       for (int i = 0; i < tasks.size(); i++) {
         if (resultConsumer.apply(result, cs.take().get())) {
           return (R) Boolean.TRUE; // short-circuit
         }
       }
   
       return result;
   
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeMetaException(
           e, "Interrupted while processing modified partitions");
     } catch (ExecutionException e) {
       throw new RuntimeMetaException(
           e, "Failed to process modified partitions in parallel");
     }
   }
   
   private boolean hasModifiedPartitions(
       org.apache.iceberg.Table icebergTable,
       Long pastSnapshotTimeMil,
       Boolean latestSpecOnly) {
   
     return findModifiedPartitionsInternal(
         icebergTable,
         pastSnapshotTimeMil,
         latestSpecOnly,
         () -> false,
         (ignored, partitions) -> !partitions.isEmpty()
     );
   }
   
   private List<Partition> findModifiedPartitions(
       org.apache.hadoop.hive.ql.metadata.Table hiveTable,
       org.apache.iceberg.Table icebergTable,
       Long pastSnapshotTimeMil,
       Boolean latestSpecOnly) {
   
     Set<String> modifiedPartitions = findModifiedPartitionsInternal(
         icebergTable,
         pastSnapshotTimeMil,
         latestSpecOnly,
         Sets::newHashSet,
         (acc, partitions) -> { 
             acc.addAll(partitions); 
             return false; // never short-circuit
         }
     );
   
     return IcebergTableUtil.convertNameToMetastorePartition(
         hiveTable, modifiedPartitions);
   }
   
   private List<Callable<Set<String>>> createPartitionNameTasks(
       org.apache.iceberg.Table icebergTable,
       List<Snapshot> relevantSnapshots,
       Boolean latestSpecOnly) {
   
     return relevantSnapshots.stream()
         .map(snapshot -> (Callable<Set<String>>) () ->
             IcebergTableUtil.getPartitionNames(
                 icebergTable,
                 getAffectedFiles(snapshot, icebergTable.io()),
                 latestSpecOnly))
         .toList();
   }
   
   private List<ContentFile<?>> getAffectedFiles(Snapshot snapshot, FileIO io) {
       return FluentIterable.<ContentFile<?>>concat(
               snapshot.addedDataFiles(io),
               snapshot.removedDataFiles(io),
               snapshot.addedDeleteFiles(io),
               snapshot.removedDeleteFiles(io))
           .toList();
   }
   
   ````



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to