SourabhBadhya commented on code in PR #4032:
URL: https://github.com/apache/hive/pull/4032#discussion_r1108051902


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java:
##########
@@ -141,49 +93,37 @@ public void run() {
                     new 
CleanerCycleUpdater(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION, 
startedAt));
           }
 
-          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-
-          checkInterrupt();
-
-          List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
-
-          checkInterrupt();
-
-          if (!readyToClean.isEmpty()) {
-            long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
-            final long cleanerWaterMark =
-                minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, 
minTxnIdSeenOpen);
-
-            LOG.info("Cleaning based on min open txn id: " + cleanerWaterMark);
-            List<CompletableFuture<Void>> cleanerList = new ArrayList<>();
-            // For checking which compaction can be cleaned we can use the 
minOpenTxnId
-            // However findReadyToClean will return all records that were 
compacted with old version of HMS
-            // where the CQ_NEXT_TXN_ID is not set. For these compactions we 
need to provide minTxnIdSeenOpen
-            // to the clean method, to avoid cleaning up deltas needed for 
running queries
-            // when min_history_level is finally dropped, than every HMS will 
commit compaction the new way
-            // and minTxnIdSeenOpen can be removed and minOpenTxnId can be 
used instead.
-            for (CompactionInfo compactionInfo : readyToClean) {
-
-              //Check for interruption before scheduling each compactionInfo 
and return if necessary
+          for (Handler handler : handlers) {
+            try {
+              List<CleaningRequest> readyToClean = handler.findReadyToClean();
               checkInterrupt();
 
-              CompletableFuture<Void> asyncJob =
-                  CompletableFuture.runAsync(
-                          ThrowingRunnable.unchecked(() -> 
clean(compactionInfo, cleanerWaterMark, metricsEnabled)),
-                          cleanerExecutor)
-                      .exceptionally(t -> {
-                        LOG.error("Error clearing {}", 
compactionInfo.getFullPartitionName(), t);
-                        return null;
-                      });
-              cleanerList.add(asyncJob);
+              if (!readyToClean.isEmpty()) {
+                List<CompletableFuture<Void>> cleanerList = new ArrayList<>();
+                for (CleaningRequest cr : readyToClean) {
+
+                  //Check for interruption before scheduling each cleaning 
request and return if necessary
+                  checkInterrupt();
+
+                  CompletableFuture<Void> asyncJob = 
CompletableFuture.runAsync(
+                                  ThrowingRunnable.unchecked(new 
FSRemover(handler, cr)), cleanerExecutor)
+                          .exceptionally(t -> {
+                            LOG.error("Error clearing: {}", 
cr.getFullPartitionName(), t);
+                            return null;
+                          });

Review Comment:
   Implemented it in a similar way. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to