[
https://issues.apache.org/jira/browse/HIVE-27019?focusedWorklogId=847514&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-847514
]
ASF GitHub Bot logged work on HIVE-27019:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Feb/23 13:05
Start Date: 24/Feb/23 13:05
Worklog Time Spent: 10m
Work Description: SourabhBadhya commented on code in PR #4032:
URL: https://github.com/apache/hive/pull/4032#discussion_r1116973210
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionHandler.java:
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CacheContainer;
+import org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest;
+import
org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest.CleaningRequestBuilder;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.commons.collections4.ListUtils.subtract;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
+import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
+import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
+import static java.util.Objects.isNull;
+
+/**
+ * A compaction based implementation of RequestHandler.
+ * Provides implementation of finding ready to clean items, preprocessing of
cleaning request,
+ * postprocessing of cleaning request and failure handling of cleaning request.
+ */
+class CompactionHandler extends RequestHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactionHandler.class.getName());
+
+ public CompactionHandler(HiveConf conf, TxnStore txnHandler,
+ CacheContainer cacheContainer,
boolean metricsEnabled,
+ FSRemover fsRemover, ExecutorService
cleanerExecutor) {
+ super(conf, txnHandler, cacheContainer, metricsEnabled, fsRemover,
cleanerExecutor);
+ }
+
+ @Override
+ protected void processInternal() throws Exception {
+ long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+ long retentionTime = HiveConf.getBoolVar(conf,
HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+ ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME,
TimeUnit.MILLISECONDS)
+ : 0;
+ List<CompactionInfo> readyToClean =
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
+ if (!readyToClean.isEmpty()) {
+ long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
+ final long cleanerWaterMark =
+ minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId,
minTxnIdSeenOpen);
+
+ LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
+ // For checking which compaction can be cleaned we can use the
minOpenTxnId
+ // However findReadyToClean will return all records that were compacted
with old version of HMS
+ // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to
provide minTxnIdSeenOpen
+ // to the clean method, to avoid cleaning up deltas needed for running
queries
+ // when min_history_level is finally dropped, than every HMS will commit
compaction the new way
+ // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used
instead.
+ List<CompletableFuture<Void>> asyncTasks = new ArrayList<>();
+ for (CompactionInfo ci : readyToClean) {
+ CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(
+ CompactorUtil.ThrowingRunnable.unchecked(() -> clean(ci,
cleanerWaterMark, metricsEnabled)), cleanerExecutor)
+ .exceptionally(t -> {
+ LOG.error("Error clearing {} due to :",
ci.getFullPartitionName(), t);
+ return null;
+ });
+ asyncTasks.add(asyncTask);
+ }
+ //Use get instead of join, so we can receive InterruptedException and
shutdown gracefully
+ CompletableFuture.allOf(asyncTasks.toArray(new
CompletableFuture[0])).get();
+ }
+ }
+
+ private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean
metricsEnabled) throws MetaException {
+ LOG.info("Starting cleaning for {}", ci);
+ PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
+ String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
+ (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
+ try {
+ if (metricsEnabled) {
+ perfLogger.perfLogBegin(CompactionHandler.class.getName(),
cleanerMetric);
+ }
+ final String location = ci.getProperty("location");
+
+ Table t = null;
+ Partition p = null;
+
+ if (isNull(location)) {
+ t = cacheContainer.computeIfAbsent(ci.getFullTableName(), () ->
resolveTable(ci.dbname, ci.tableName));
+ if (isNull(t)) {
+ // The table was dropped before we got around to cleaning it.
+ LOG.info("Unable to find table {}, assuming it was dropped. {}",
ci.getFullTableName(),
+ idWatermark(ci));
+ txnHandler.markCleaned(ci);
+ return;
+ }
+ if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+ // The table was marked no clean up true.
+ LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true",
ci.getFullTableName());
+ txnHandler.markCleaned(ci);
+ return;
+ }
+ if (!isNull(ci.partName)) {
+ p = resolvePartition(ci.dbname, ci.tableName, ci.partName);
+ if (isNull(p)) {
+ // The partition was dropped before we got around to cleaning it.
+ LOG.info("Unable to find partition {}, assuming it was dropped.
{}",
+ ci.getFullPartitionName(), idWatermark(ci));
+ txnHandler.markCleaned(ci);
+ return;
+ }
+ if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+ // The partition was marked no clean up true.
+ LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to
true", ci.getFullPartitionName());
+ txnHandler.markCleaned(ci);
+ return;
+ }
+ }
+ }
+ txnHandler.markCleanerStart(ci);
+
+ if (!isNull(t) || !isNull(ci.partName)) {
+ String path = isNull(location)
+ ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
+ : location;
+ boolean dropPartition = !isNull(ci.partName) && isNull(p);
+
+ //check if partition wasn't re-created
+ if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName,
ci.partName))) {
+ cleanUsingLocation(ci, path, true);
+ } else {
+ cleanUsingAcidDir(ci, path, minOpenTxnGLB);
+ }
+ } else {
+ cleanUsingLocation(ci, location, false);
+ }
+ } catch (Exception e) {
+ LOG.error("Caught exception when cleaning, unable to complete cleaning
of {} due to {}", ci,
+ e.getMessage());
+ ci.errorMessage = e.getMessage();
+ if (metricsEnabled) {
+
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
+ }
+ handleCleanerAttemptFailure(ci);
+ } finally {
+ if (metricsEnabled) {
+ perfLogger.perfLogEnd(CompactionHandler.class.getName(),
cleanerMetric);
+ }
+ }
+ }
+
+ private void cleanUsingLocation(CompactionInfo ci, String path, boolean
requiresLock) throws MetaException {
+ List<Path> deleted;
+ if (requiresLock) {
+ LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE,
DataOperationType.DELETE);
+ LockResponse res = null;
+ try {
+ res = txnHandler.lock(lockRequest);
+ deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
+ } catch (NoSuchTxnException | TxnAbortedException e) {
+ LOG.error("Error while trying to acquire exclusive write lock: {}",
e.getMessage());
+ throw new MetaException(e.getMessage());
+ } finally {
+ if (res != null) {
+ try {
+ txnHandler.unlock(new UnlockRequest(res.getLockid()));
+ } catch (NoSuchLockException | TxnOpenException e) {
+ LOG.error("Error while trying to release exclusive write lock:
{}", e.getMessage());
+ }
+ }
+ }
+ } else {
+ deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
+ }
+ if (!deleted.isEmpty()) {
+ txnHandler.markCleaned(ci);
+ } else {
+ txnHandler.clearCleanerStart(ci);
+ }
+ }
+
+ private void cleanUsingAcidDir(CompactionInfo ci, String location, long
minOpenTxnGLB) throws Exception {
+ ValidTxnList validTxnList =
+ TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(),
minOpenTxnGLB);
+ //save it so that getAcidState() sees it
+ conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+ /*
+ * {@code validTxnList} is capped by minOpenTxnGLB so if
+ * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)}
sees a base/delta
+ * produced by a compactor, that means every reader that could be active
right now see it
+ * as well. That means if this base/delta shadows some earlier
base/delta, the it will be
+ * used in favor of any files that it shadows. Thus the shadowed files
are safe to delete.
+ *
+ *
+ * The metadata about aborted writeIds (and consequently aborted txn IDs)
cannot be deleted
+ * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
+ * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
+ * For example given partition P1, txnid:150 starts and sees txnid:149 as
open.
+ * Say compactor runs in txnid:160, but 149 is still open and P1 has the
largest resolved
+ * writeId:17. Compactor will produce base_17_c160.
+ * Suppose txnid:149 writes delta_18_18
+ * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries
+ * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and
perhaps corrupted) but
+ * not visible based on 'validTxnList' capped at minOpenTxn so it will not
not be cleaned by
+ * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so
we must keep the
+ * metadata that says that 18 is aborted.
+ * In a slightly different case, whatever txn created delta_18 (and all
other txn) may have
+ * committed by the time cleaner runs and so cleaner will indeed see
delta_18_18 and remove
+ * it (since it has nothing but aborted data). But we can't tell which
actually happened
+ * in markCleaned() so make sure it doesn't delete meta above
CG_CQ_HIGHEST_WRITE_ID.
+ *
+ * We could perhaps make cleaning of aborted and obsolete and remove all
aborted files up
+ * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta
can be removed
+ * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID.
This could be
+ * useful if there is all of a sudden a flood of aborted txns. (For
another day).
+ */
+
+ // Creating 'reader' list since we are interested in the set of 'obsolete'
files
+ ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci,
validTxnList);
+ LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+ Path path = new Path(location);
+ FileSystem fs = path.getFileSystem(conf);
+
+ // Collect all the files/dirs
+ Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots =
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
+ AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf,
validWriteIdList, Ref.from(false), false,
+ dirSnapshots);
+ Table table = cacheContainer.computeIfAbsent(ci.getFullTableName(), () ->
resolveTable(ci.dbname, ci.tableName));
+ boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, ci.partName);
+
+ List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir,
isDynPartAbort);
+ if (isDynPartAbort || dir.hasUncompactedAborts()) {
+ ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
+ }
+
+ List<Path> deleted = fsRemover.clean(new
CleaningRequestBuilder().setLocation(location).setRunAs(ci.runAs)
+
.setObsoleteDirs(obsoleteDirs).setPurge(true).setFullPartitionName(ci.getFullPartitionName())
+ .build());
+
+ if (deleted.size() > 0) {
+ AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName,
ci.partName, dir.getObsolete(), conf,
+ txnHandler);
+ }
+
+ // Make sure there are no leftovers below the compacted watermark
+ boolean success = false;
+ conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
+ dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
+ ci.getFullTableName(), new long[0], new BitSet(),
ci.highestWriteId, Long.MAX_VALUE),
+ Ref.from(false), false, dirSnapshots);
+
+ List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir,
isDynPartAbort), deleted);
+ if (!remained.isEmpty()) {
+ LOG.warn("{} Remained {} obsolete directories from {}. {}",
+ idWatermark(ci), remained.size(), location,
CompactorUtil.getDebugInfo(remained));
+ } else {
+ LOG.debug(idWatermark(ci) + " All cleared below the watermark: " +
ci.highestWriteId + " from " + location);
+ success = true;
+ }
+ if (success || CompactorUtil.isDynPartAbort(table, ci.partName)) {
+ txnHandler.markCleaned(ci);
+ } else {
+ txnHandler.clearCleanerStart(ci);
+ LOG.warn("No files were removed. Leaving queue entry {} in ready for
cleaning state.", ci);
+ }
+ }
+
+ protected LockRequest createLockRequest(CompactionInfo ci, long txnId,
LockType lockType, DataOperationType opType) {
Review Comment:
Makes sense. Removed them. Done.
Issue Time Tracking
-------------------
Worklog Id: (was: 847514)
Time Spent: 9h 40m (was: 9.5h)
> Split Cleaner into separate manageable modular entities
> -------------------------------------------------------
>
> Key: HIVE-27019
> URL: https://issues.apache.org/jira/browse/HIVE-27019
> Project: Hive
> Issue Type: Sub-task
> Reporter: Sourabh Badhya
> Assignee: Sourabh Badhya
> Priority: Major
> Labels: pull-request-available
> Time Spent: 9h 40m
> Remaining Estimate: 0h
>
> As described by the parent task -
> Cleaner can be divided into separate entities like -
> *1) Handler* - This entity fetches the data from the metastore DB from
> relevant tables and converts it into a request entity called CleaningRequest.
> It would also do SQL operations post cleanup (postprocess). Every type of
> cleaning request is provided by a separate handler.
> *2) Filesystem remover* - This entity fetches the cleaning requests from
> various handlers and deletes them according to the cleaning request.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)