[
https://issues.apache.org/jira/browse/HIVE-27020?focusedWorklogId=851984&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-851984
]
ASF GitHub Bot logged work on HIVE-27020:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Mar/23 11:39
Start Date: 21/Mar/23 11:39
Worklog Time Spent: 10m
Work Description: akshat0395 commented on code in PR #4091:
URL: https://github.com/apache/hive/pull/4091#discussion_r1143245882
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.AcidTxnInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.isNull;
+
+/**
+ * Abort-cleanup based implementation of TaskHandler.
+ * Provides implementation of creation of abort clean tasks.
+ */
+class AbortedTxnCleaner extends AcidTxnCleaner {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbortedTxnCleaner.class.getName());
+
+ public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler,
+ MetadataCache metadataCache, boolean metricsEnabled,
+ FSRemover fsRemover) {
+ super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
+ }
+
+ /**
+ The following cleanup is based on the following idea - <br>
+ 1. Aborted cleanup is independent of compaction. This is because
directories which are written by
+ aborted txns are not visible by any open txns. It is only visible while
determining the AcidState (which
+ only sees the aborted deltas and does not read the file).<br><br>
+
+ The following algorithm is used to clean the set of aborted directories -
<br>
+ a. Find the list of entries which are suitable for cleanup (This is done
in {@link TxnStore#findReadyToCleanForAborts(long, int)}).<br>
+ b. If the table/partition does not exist, then remove the associated
aborted entry in TXN_COMPONENTS table. <br>
+ c. Get the AcidState of the table by using the min open txnID, database
name, tableName, partition name, highest write ID <br>
+ d. Fetch the aborted directories and delete the directories. <br>
+ e. Fetch the aborted write IDs from the AcidState and use it to delete
the associated metadata in the TXN_COMPONENTS table.
+ **/
+ @Override
+ public List<Runnable> getTasks() throws MetaException {
+ int abortedThreshold = HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+ long abortedTimeThreshold = HiveConf
+ .getTimeVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
+ TimeUnit.MILLISECONDS);
+ List<AcidTxnInfo> readyToCleanAborts =
txnHandler.findReadyToCleanForAborts(abortedTimeThreshold, abortedThreshold);
+
+ if (!readyToCleanAborts.isEmpty()) {
+ return readyToCleanAborts.stream().map(ci ->
ThrowingRunnable.unchecked(() ->
+ clean(ci, ci.txnId > 0 ? ci.txnId : Long.MAX_VALUE,
metricsEnabled)))
+ .collect(Collectors.toList());
+ }
+ return Collections.emptyList();
+ }
+
+ private void clean(AcidTxnInfo info, long minOpenTxn, boolean
metricsEnabled) throws MetaException {
+ LOG.info("Starting cleaning for {}", info);
+ PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
+ String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_";
+ try {
+ if (metricsEnabled) {
+ perfLogger.perfLogBegin(AbortedTxnCleaner.class.getName(),
cleanerMetric);
+ }
+ Table t;
+ Partition p = null;
+ t = metadataCache.computeIfAbsent(info.getFullTableName(), () ->
resolveTable(info.dbname, info.tableName));
+ if (isNull(t)) {
+ // The table was dropped before we got around to cleaning it.
+ LOG.info("Unable to find table {}, assuming it was dropped.",
info.getFullTableName());
+ txnHandler.markCleanedForAborts(info);
+ return;
+ }
+ if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+ // The table was marked no clean up true.
+ LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true",
info.getFullTableName());
+ return;
+ }
+ if (!isNull(info.partName)) {
+ p = resolvePartition(info.dbname, info.tableName, info.partName);
+ if (isNull(p)) {
+ // The partition was dropped before we got around to cleaning it.
+ LOG.info("Unable to find partition {}, assuming it was dropped.",
+ info.getFullPartitionName());
+ txnHandler.markCleanedForAborts(info);
+ return;
+ }
+ if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+ // The partition was marked no clean up true.
+ LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to
true", info.getFullPartitionName());
+ return;
+ }
+ }
+
+ String location =
CompactorUtil.resolveStorageDescriptor(t,p).getLocation();
+ info.runAs = TxnUtils.findUserToRunAs(location, t, conf);
+ abortCleanUsingAcidDir(info, location, minOpenTxn);
+
+ } catch (Exception e) {
+ LOG.error("Caught exception when cleaning, unable to complete cleaning
of {} due to {}", info,
+ e.getMessage());
+ throw new MetaException(e.getMessage());
+ } finally {
+ if (metricsEnabled) {
+ perfLogger.perfLogEnd(AbortedTxnCleaner.class.getName(),
cleanerMetric);
Review Comment:
@veghlaci05 My bad, when I mentioned retry I commented on the finally block,
what I meant to do is trigger a conversation around retry mechanism for the
abortCleanUsingAcidDir not the metrics
Issue Time Tracking
-------------------
Worklog Id: (was: 851984)
Time Spent: 3h 20m (was: 3h 10m)
> Implement a separate handler to handle aborted transaction cleanup
> ------------------------------------------------------------------
>
> Key: HIVE-27020
> URL: https://issues.apache.org/jira/browse/HIVE-27020
> Project: Hive
> Issue Type: Sub-task
> Reporter: Sourabh Badhya
> Assignee: Sourabh Badhya
> Priority: Major
> Labels: pull-request-available
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> As described in the parent task, once the cleaner is separated into different
> entities, implement a separate handler which can create requests for aborted
> transactions cleanup. This would move the aborted transaction cleanup
> exclusively to the cleaner.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)