[
https://issues.apache.org/jira/browse/HIVE-27019?focusedWorklogId=845776&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-845776
]
ASF GitHub Bot logged work on HIVE-27019:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Feb/23 06:10
Start Date: 16/Feb/23 06:10
Worklog Time Spent: 10m
Work Description: SourabhBadhya commented on code in PR #4032:
URL: https://github.com/apache/hive/pull/4032#discussion_r1108051902
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java:
##########
@@ -141,49 +93,37 @@ public void run() {
new
CleanerCycleUpdater(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION,
startedAt));
}
- long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-
- checkInterrupt();
-
- List<CompactionInfo> readyToClean =
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
-
- checkInterrupt();
-
- if (!readyToClean.isEmpty()) {
- long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
- final long cleanerWaterMark =
- minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId,
minTxnIdSeenOpen);
-
- LOG.info("Cleaning based on min open txn id: " + cleanerWaterMark);
- List<CompletableFuture<Void>> cleanerList = new ArrayList<>();
- // For checking which compaction can be cleaned we can use the
minOpenTxnId
- // However findReadyToClean will return all records that were
compacted with old version of HMS
- // where the CQ_NEXT_TXN_ID is not set. For these compactions we
need to provide minTxnIdSeenOpen
- // to the clean method, to avoid cleaning up deltas needed for
running queries
- // when min_history_level is finally dropped, than every HMS will
commit compaction the new way
- // and minTxnIdSeenOpen can be removed and minOpenTxnId can be
used instead.
- for (CompactionInfo compactionInfo : readyToClean) {
-
- //Check for interruption before scheduling each compactionInfo
and return if necessary
+ for (Handler handler : handlers) {
+ try {
+ List<CleaningRequest> readyToClean = handler.findReadyToClean();
checkInterrupt();
- CompletableFuture<Void> asyncJob =
- CompletableFuture.runAsync(
- ThrowingRunnable.unchecked(() ->
clean(compactionInfo, cleanerWaterMark, metricsEnabled)),
- cleanerExecutor)
- .exceptionally(t -> {
- LOG.error("Error clearing {}",
compactionInfo.getFullPartitionName(), t);
- return null;
- });
- cleanerList.add(asyncJob);
+ if (!readyToClean.isEmpty()) {
+ List<CompletableFuture<Void>> cleanerList = new ArrayList<>();
+ for (CleaningRequest cr : readyToClean) {
+
+ //Check for interruption before scheduling each cleaning
request and return if necessary
+ checkInterrupt();
+
+ CompletableFuture<Void> asyncJob =
CompletableFuture.runAsync(
+ ThrowingRunnable.unchecked(new
FSRemover(handler, cr)), cleanerExecutor)
+ .exceptionally(t -> {
+ LOG.error("Error clearing: {}",
cr.getFullPartitionName(), t);
+ return null;
+ });
Review Comment:
Implemented it in a similar way. Done.
Issue Time Tracking
-------------------
Worklog Id: (was: 845776)
Time Spent: 4h 40m (was: 4.5h)
> Split Cleaner into separate manageable modular entities
> -------------------------------------------------------
>
> Key: HIVE-27019
> URL: https://issues.apache.org/jira/browse/HIVE-27019
> Project: Hive
> Issue Type: Sub-task
> Reporter: Sourabh Badhya
> Assignee: Sourabh Badhya
> Priority: Major
> Labels: pull-request-available
> Time Spent: 4h 40m
> Remaining Estimate: 0h
>
> As described by the parent task -
> Cleaner can be divided into separate entities like -
> *1) Handler* - This entity fetches the data from the metastore DB from
> relevant tables and converts it into a request entity called CleaningRequest.
> It would also do SQL operations post cleanup (postprocess). Every type of
> cleaning request is provided by a separate handler.
> *2) Filesystem remover* - This entity fetches the cleaning requests from
> various handlers and deletes them according to the cleaning request.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)