[
https://issues.apache.org/jira/browse/HIVE-27020?focusedWorklogId=857250&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-857250
]
ASF GitHub Bot logged work on HIVE-27020:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Apr/23 18:12
Start Date: 16/Apr/23 18:12
Worklog Time Spent: 10m
Work Description: SourabhBadhya commented on code in PR #4091:
URL: https://github.com/apache/hive/pull/4091#discussion_r1167991380
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.AcidTxnInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.isNull;
+
+/**
+ * Abort-cleanup based implementation of TaskHandler.
+ * Provides implementation of creation of abort clean tasks.
+ */
+class AbortedTxnCleaner extends TaskHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbortedTxnCleaner.class.getName());
+
+ public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler,
+ MetadataCache metadataCache, boolean metricsEnabled,
+ FSRemover fsRemover) {
+ super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
+ }
+
+ /**
+ The following cleanup is based on the following idea - <br>
+ 1. Aborted cleanup is independent of compaction. This is because
directories which are written by
+ aborted txns are not visible by any open txns. It is only visible while
determining the AcidState (which
+ only sees the aborted deltas and does not read the file).<br><br>
+
+ The following algorithm is used to clean the set of aborted directories -
<br>
+ a. Find the list of entries which are suitable for cleanup (This is done
in {@link TxnStore#findReadyToCleanForAborts(long, int)}).<br>
+ b. If the table/partition does not exist, then remove the associated
aborted entry in TXN_COMPONENTS table. <br>
+ c. Get the AcidState of the table by using the min open txnID, database
name, tableName, partition name, highest write ID <br>
+ d. Fetch the aborted directories and delete the directories. <br>
+ e. Fetch the aborted write IDs from the AcidState and use it to delete
the associated metadata in the TXN_COMPONENTS table.
+ **/
+ @Override
+ public List<Runnable> getTasks() throws MetaException {
+ int abortedThreshold = HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+ long abortedTimeThreshold = HiveConf
+ .getTimeVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
+ TimeUnit.MILLISECONDS);
+ List<AcidTxnInfo> readyToCleanAborts =
txnHandler.findReadyToCleanForAborts(abortedTimeThreshold, abortedThreshold);
+
+ if (!readyToCleanAborts.isEmpty()) {
+ return readyToCleanAborts.stream().map(ci ->
ThrowingRunnable.unchecked(() ->
+ clean(ci, ci.txnId > 0 ? ci.txnId : Long.MAX_VALUE,
metricsEnabled)))
+ .collect(Collectors.toList());
+ }
+ return Collections.emptyList();
+ }
+
+ private void clean(AcidTxnInfo info, long minOpenTxn, boolean
metricsEnabled) throws MetaException {
+ LOG.info("Starting cleaning for {}", info);
+ PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
+ String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_";
+ try {
+ if (metricsEnabled) {
+ perfLogger.perfLogBegin(AbortedTxnCleaner.class.getName(),
cleanerMetric);
+ }
+ Table t;
+ Partition p = null;
+ t = metadataCache.computeIfAbsent(info.getFullTableName(), () ->
resolveTable(info.dbname, info.tableName));
+ if (isNull(t)) {
+ // The table was dropped before we got around to cleaning it.
+ LOG.info("Unable to find table {}, assuming it was dropped.",
info.getFullTableName());
+ txnHandler.markCleanedForAborts(info);
+ return;
+ }
+ if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+ // The table was marked no clean up true.
+ LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true",
info.getFullTableName());
+ return;
+ }
+ if (!isNull(info.partName)) {
+ p = resolvePartition(info.dbname, info.tableName, info.partName);
+ if (isNull(p)) {
+ // The partition was dropped before we got around to cleaning it.
+ LOG.info("Unable to find partition {}, assuming it was dropped.",
+ info.getFullPartitionName());
+ txnHandler.markCleanedForAborts(info);
+ return;
+ }
+ if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+ // The partition was marked no clean up true.
+ LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to
true", info.getFullPartitionName());
+ return;
+ }
+ }
+
+ String location =
CompactorUtil.resolveStorageDescriptor(t,p).getLocation();
+ info.runAs = TxnUtils.findUserToRunAs(location, t, conf);
+ abortCleanUsingAcidDir(info, location, minOpenTxn);
+
+ } catch (Exception e) {
+ LOG.error("Caught exception when cleaning, unable to complete cleaning
of {} due to {}", info,
+ e.getMessage());
+ throw new MetaException(e.getMessage());
+ } finally {
+ if (metricsEnabled) {
+ perfLogger.perfLogEnd(AbortedTxnCleaner.class.getName(),
cleanerMetric);
+ }
+ }
+ }
+
+ private void abortCleanUsingAcidDir(AcidTxnInfo info, String location, long
minOpenTxn) throws Exception {
+ ValidTxnList validTxnList =
+
TxnUtils.createValidTxnListForAbortedTxnCleaner(txnHandler.getOpenTxns(),
minOpenTxn);
+ //save it so that getAcidState() sees it
+ conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+
+ ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(info,
validTxnList);
+
+ // Set the highestWriteId of the cleanup equal to the min(minOpenWriteId -
1, highWatermark).
+ // This is necessary for looking at the complete state of the table till
the min open write Id
+ // (if there is an open txn on the table) or the highestWatermark.
+ // This is used later on while deleting the records in TXN_COMPONENTS
table.
+ info.highestWriteId =
Math.min(isNull(validWriteIdList.getMinOpenWriteId()) ?
+ Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1,
validWriteIdList.getHighWatermark());
+ Table table = metadataCache.computeIfAbsent(info.getFullTableName(), () ->
resolveTable(info.dbname, info.tableName));
+ LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+ boolean success = cleanAndVerifyObsoleteDirectories(info, location,
validWriteIdList, table);
+ if (success || CompactorUtil.isDynPartAbort(table, info.partName)) {
Review Comment:
This is part of the cleanup task when the config will be removed. Until then
this is supposed to be here.
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.AcidTxnInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.isNull;
+
+/**
+ * Abort-cleanup based implementation of TaskHandler.
+ * Provides implementation of creation of abort clean tasks.
+ */
+class AbortedTxnCleaner extends TaskHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbortedTxnCleaner.class.getName());
+
+ public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler,
+ MetadataCache metadataCache, boolean metricsEnabled,
+ FSRemover fsRemover) {
+ super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
+ }
+
+ /**
+ The following cleanup is based on the following idea - <br>
+ 1. Aborted cleanup is independent of compaction. This is because
directories which are written by
+ aborted txns are not visible by any open txns. It is only visible while
determining the AcidState (which
+ only sees the aborted deltas and does not read the file).<br><br>
+
+ The following algorithm is used to clean the set of aborted directories -
<br>
+ a. Find the list of entries which are suitable for cleanup (This is done
in {@link TxnStore#findReadyToCleanForAborts(long, int)}).<br>
+ b. If the table/partition does not exist, then remove the associated
aborted entry in TXN_COMPONENTS table. <br>
+ c. Get the AcidState of the table by using the min open txnID, database
name, tableName, partition name, highest write ID <br>
+ d. Fetch the aborted directories and delete the directories. <br>
+ e. Fetch the aborted write IDs from the AcidState and use it to delete
the associated metadata in the TXN_COMPONENTS table.
+ **/
+ @Override
+ public List<Runnable> getTasks() throws MetaException {
+ int abortedThreshold = HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+ long abortedTimeThreshold = HiveConf
+ .getTimeVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
+ TimeUnit.MILLISECONDS);
+ List<AcidTxnInfo> readyToCleanAborts =
txnHandler.findReadyToCleanForAborts(abortedTimeThreshold, abortedThreshold);
+
+ if (!readyToCleanAborts.isEmpty()) {
+ return readyToCleanAborts.stream().map(ci ->
ThrowingRunnable.unchecked(() ->
+ clean(ci, ci.txnId > 0 ? ci.txnId : Long.MAX_VALUE,
metricsEnabled)))
+ .collect(Collectors.toList());
+ }
+ return Collections.emptyList();
+ }
+
+ private void clean(AcidTxnInfo info, long minOpenTxn, boolean
metricsEnabled) throws MetaException {
+ LOG.info("Starting cleaning for {}", info);
+ PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
+ String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_";
+ try {
+ if (metricsEnabled) {
+ perfLogger.perfLogBegin(AbortedTxnCleaner.class.getName(),
cleanerMetric);
+ }
+ Table t;
+ Partition p = null;
+ t = metadataCache.computeIfAbsent(info.getFullTableName(), () ->
resolveTable(info.dbname, info.tableName));
+ if (isNull(t)) {
+ // The table was dropped before we got around to cleaning it.
+ LOG.info("Unable to find table {}, assuming it was dropped.",
info.getFullTableName());
+ txnHandler.markCleanedForAborts(info);
+ return;
+ }
+ if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+ // The table was marked no clean up true.
+ LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true",
info.getFullTableName());
+ return;
+ }
+ if (!isNull(info.partName)) {
+ p = resolvePartition(info.dbname, info.tableName, info.partName);
+ if (isNull(p)) {
+ // The partition was dropped before we got around to cleaning it.
+ LOG.info("Unable to find partition {}, assuming it was dropped.",
+ info.getFullPartitionName());
+ txnHandler.markCleanedForAborts(info);
+ return;
+ }
+ if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+ // The partition was marked no clean up true.
+ LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to
true", info.getFullPartitionName());
+ return;
+ }
+ }
+
+ String location =
CompactorUtil.resolveStorageDescriptor(t,p).getLocation();
+ info.runAs = TxnUtils.findUserToRunAs(location, t, conf);
+ abortCleanUsingAcidDir(info, location, minOpenTxn);
+
+ } catch (Exception e) {
+ LOG.error("Caught exception when cleaning, unable to complete cleaning
of {} due to {}", info,
+ e.getMessage());
+ throw new MetaException(e.getMessage());
+ } finally {
+ if (metricsEnabled) {
+ perfLogger.perfLogEnd(AbortedTxnCleaner.class.getName(),
cleanerMetric);
+ }
+ }
+ }
+
+ private void abortCleanUsingAcidDir(AcidTxnInfo info, String location, long
minOpenTxn) throws Exception {
+ ValidTxnList validTxnList =
+
TxnUtils.createValidTxnListForAbortedTxnCleaner(txnHandler.getOpenTxns(),
minOpenTxn);
+ //save it so that getAcidState() sees it
+ conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+
+ ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(info,
validTxnList);
+
+ // Set the highestWriteId of the cleanup equal to the min(minOpenWriteId -
1, highWatermark).
+ // This is necessary for looking at the complete state of the table till
the min open write Id
+ // (if there is an open txn on the table) or the highestWatermark.
+ // This is used later on while deleting the records in TXN_COMPONENTS
table.
+ info.highestWriteId =
Math.min(isNull(validWriteIdList.getMinOpenWriteId()) ?
+ Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1,
validWriteIdList.getHighWatermark());
+ Table table = metadataCache.computeIfAbsent(info.getFullTableName(), () ->
resolveTable(info.dbname, info.tableName));
+ LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+ boolean success = cleanAndVerifyObsoleteDirectories(info, location,
validWriteIdList, table);
+ if (success || CompactorUtil.isDynPartAbort(table, info.partName)) {
+ txnHandler.markCleanedForAborts(info);
Review Comment:
Used `markCleaned(info, isAbortOnly)` for making it generic.
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java:
##########
@@ -337,18 +287,9 @@ private static String idWatermark(CompactionInfo ci) {
return " id=" + ci.id;
}
- private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci,
ValidTxnList validTxnList)
+ private ValidReaderWriteIdList
getValidCleanerWriteIdListForCompactionCleaner(CompactionInfo ci, ValidTxnList
validTxnList)
Review Comment:
Implemented it, done.
Issue Time Tracking
-------------------
Worklog Id: (was: 857250)
Time Spent: 12.5h (was: 12h 20m)
> Implement a separate handler to handle aborted transaction cleanup
> ------------------------------------------------------------------
>
> Key: HIVE-27020
> URL: https://issues.apache.org/jira/browse/HIVE-27020
> Project: Hive
> Issue Type: Sub-task
> Reporter: Sourabh Badhya
> Assignee: Sourabh Badhya
> Priority: Major
> Labels: pull-request-available
> Time Spent: 12.5h
> Remaining Estimate: 0h
>
> As described in the parent task, once the cleaner is separated into different
> entities, implement a separate handler which can create requests for aborted
> transactions cleanup. This would move the aborted transaction cleanup
> exclusively to the cleaner.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)