deniskuzZ commented on code in PR #6292:
URL: https://github.com/apache/hive/pull/6292#discussion_r2781493032


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java:
##########
@@ -212,6 +202,77 @@ private List<Partition> 
findModifiedPartitions(org.apache.hadoop.hive.ql.metadat
     }
   }
 
+  /**
+   * Checks if there are any modified partitions (with added or deleted files) 
between a given past
+   * snapshot ID and the table's current (latest) snapshot. This method 
short-circuits as soon as
+   * it finds any modified partition, making it more efficient than 
findModifiedPartitions when
+   * only checking for existence.
+   * @param icebergTable The Iceberg table to inspect.
+   * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot 
to check from (exclusive).
+   * @param latestSpecOnly when True, checks partitions with the current spec 
only;
+   *                       False - older specs only;
+   *                       Null - any spec
+   * @return true if at least one modified partition exists, false otherwise.
+   */
+  private boolean hasModifiedPartitions(org.apache.iceberg.Table icebergTable, 
Long pastSnapshotTimeMil,
+                                        Boolean latestSpecOnly) {
+    List<Snapshot> relevantSnapshots = getRelevantSnapshots(icebergTable, 
pastSnapshotTimeMil).toList();
+    if (relevantSnapshots.isEmpty()) {
+      return false;
+    }
+
+    try (ExecutorService executor = 
Executors.newVirtualThreadPerTaskExecutor()) {
+      // Submit a task for each snapshot and collect the Futures
+      List<Future<Set<String>>> futures = createPartitionNameFutures(
+          executor, icebergTable, relevantSnapshots, latestSpecOnly);
+
+      // Check results as they complete and short-circuit on first non-empty 
result
+      for (Future<Set<String>> future : futures) {

Review Comment:
   that's not good, blocks in submission order
   
   ````
   private static <T, R> R executeAndConsume(
       ExecutorService executor,
       List<Callable<T>> tasks,
       BiFunction<R, T, Boolean> consumer,
       Supplier<R> resultSupplier
   ) throws InterruptedException, ExecutionException {
   
     CompletionService<T> cs = new ExecutorCompletionService<>(executor);
   
     for (Callable<T> task : tasks) {
       cs.submit(task);
     }
   
     R result = resultSupplier.get();
   
     for (int i = 0; i < tasks.size(); i++) {
       T value = cs.take().get();
       boolean shouldStop = consumer.apply(result, value);
       if (shouldStop) {
         return result;
       }
     }
   
     return result;
   }
   ````
   1. hasModifiedPartitions
   ````
   try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) 
{
   
     List<Callable<Set<String>>> tasks =
         createPartitionNameTasks(icebergTable, relevantSnapshots, 
latestSpecOnly);
   
     Boolean found = executeAndConsume(
         executor,
         tasks,
         (ignored, partitions) -> !partitions.isEmpty(),
         () -> false
     );
   
     return found;
   
   } 
   ````
   
   2. findModifiedPartitions
   
   ````
   try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) 
{
   
     List<Callable<Set<String>>> tasks =
         createPartitionNameTasks(icebergTable, relevantSnapshots, 
latestSpecOnly);
   
     Set<String> modifiedPartitions = executeAndConsume(
         executor,
         tasks,
         (acc, partitions) -> {
           acc.addAll(partitions);
           return false; // never short-circuit
         },
         Sets::newHashSet
     );
   
     return IcebergTableUtil.convertNameToMetastorePartition(
         hiveTable, modifiedPartitions);
   
   }
   ````



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to