[
https://issues.apache.org/jira/browse/HIVE-27019?focusedWorklogId=845779&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-845779
]
ASF GitHub Bot logged work on HIVE-27019:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Feb/23 06:11
Start Date: 16/Feb/23 06:11
Worklog Time Spent: 10m
Work Description: SourabhBadhya commented on code in PR #4032:
URL: https://github.com/apache/hive/pull/4032#discussion_r1108051403
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/Handler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.thrift.TBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+/**
+ * An abstract class which defines the list of utility methods for performing
cleanup activities.
+ */
+public abstract class Handler<T extends CleaningRequest> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(Handler.class.getName());
+ protected final TxnStore txnHandler;
+ protected final HiveConf conf;
+ protected final boolean metricsEnabled;
+ private Optional<Cache<String, TBase>> metaCache;
+
+ Handler(HiveConf conf, TxnStore txnHandler, boolean metricsEnabled) {
+ this.conf = conf;
+ this.txnHandler = txnHandler;
+ boolean tableCacheOn = MetastoreConf.getBoolVar(this.conf,
MetastoreConf.ConfVars.COMPACTOR_CLEANER_TABLECACHE_ON);
+ this.metaCache = initializeCache(tableCacheOn);
+ this.metricsEnabled = metricsEnabled;
+ }
+
+ public HiveConf getConf() {
+ return conf;
+ }
+
+ public TxnStore getTxnHandler() {
+ return txnHandler;
+ }
+
+ public boolean isMetricsEnabled() {
+ return metricsEnabled;
+ }
+
+ /**
+ * Find the list of objects which are ready for cleaning.
+ * @return Cleaning requests
+ */
+ public abstract List<T> findReadyToClean() throws MetaException;
+
+ /**
+ * Execute just before cleanup
+ * @param cleaningRequest - Cleaning request
+ */
+ public abstract void beforeExecutingCleaningRequest(T cleaningRequest)
throws MetaException;
+
+ /**
+ * Execute just after cleanup
+ * @param cleaningRequest Cleaning request
+ * @param deletedFiles List of deleted files
+ * @return True if cleanup was successful, false otherwise
+ * @throws MetaException
+ */
+ public abstract boolean afterExecutingCleaningRequest(T cleaningRequest,
List<Path> deletedFiles) throws MetaException;
+
+ /**
+ * Execute in the event of failure
+ * @param cleaningRequest Cleaning request
+ * @param ex Failure exception
+ * @throws MetaException
+ */
+ public abstract void failureExecutingCleaningRequest(T cleaningRequest,
Exception ex) throws MetaException;
+
+ public Table resolveTable(String dbName, String tableName) throws
MetaException {
+ try {
+ return getMSForConf(conf).getTable(getDefaultCatalog(conf), dbName,
tableName);
+ } catch (MetaException e) {
+ LOG.error("Unable to find table {}.{}, {}", dbName, tableName,
e.getMessage());
+ throw e;
+ }
+ }
+
+ protected Partition resolvePartition(String dbName, String tableName, String
partName) throws MetaException {
+ if (partName != null) {
+ List<Partition> parts;
+ try {
+ parts = CompactorUtil.getPartitionsByNames(conf, dbName, tableName,
partName);
+ if (parts == null || parts.isEmpty()) {
+ // The partition got dropped before we went looking for it.
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to find partition: {}.{}.{}", dbName, tableName,
partName, e);
+ throw e;
+ }
+ if (parts.size() != 1) {
+ LOG.error("{}.{}.{} does not refer to a single partition. {}", dbName,
tableName, partName,
+ Arrays.toString(parts.toArray()));
+ throw new MetaException(String.join("Too many partitions for : ",
dbName, tableName, partName));
+ }
+ return parts.get(0);
+ } else {
+ return null;
+ }
+ }
+
+ public <B extends TBase<B,?>> B computeIfAbsent(String key, Callable<B>
callable) throws Exception {
+ if (metaCache.isPresent()) {
+ try {
+ return (B) metaCache.get().get(key, callable);
+ } catch (ExecutionException e) {
+ throw (Exception) e.getCause();
+ }
+ }
+ return callable.call();
+ }
+
+ Optional<Cache<String, TBase>> initializeCache(boolean tableCacheOn) {
+ if (tableCacheOn) {
+ metaCache = Optional.of(CacheBuilder.newBuilder().softValues().build());
+ }
+ return metaCache;
+ }
+
+ public void invalidateMetaCache() {
+ metaCache.ifPresent(Cache::invalidateAll);
+ }
+
+ protected List<Path> getObsoleteDirs(AcidDirectory dir, boolean
isDynPartAbort) {
+ List<Path> obsoleteDirs = dir.getObsolete();
+ /*
+ * add anything in 'dir' that only has data from aborted transactions -
no one should be
+ * trying to read anything in that dir (except getAcidState() that only
reads the name of
+ * this dir itself)
+ * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's
ok (suppose there
+ * are no active txns when cleaner runs). The key is to not delete
metadata about aborted
+ * txns with write IDs > {@link CompactionInfo#highestWriteId}.
+ * See {@link TxnStore#markCleaned(CompactionInfo)}
+ */
+ obsoleteDirs.addAll(dir.getAbortedDirectories());
+ if (isDynPartAbort) {
+ // In the event of an aborted DP operation, we should only consider the
aborted directories for cleanup.
+ // Including obsolete directories for partitioned tables can result in
data loss.
+ obsoleteDirs = dir.getAbortedDirectories();
+ }
+ return obsoleteDirs;
+ }
Review Comment:
Moved `getObsoleteDirs` to CompactorUtil since its using AcidDirectory which
might not be generic to all handlers. However, the caching is done on a handler
level now. Hence, caching functions are kept here itself.
`resolvePartition` seems generic to most handlers, hence kept it here.
Issue Time Tracking
-------------------
Worklog Id: (was: 845779)
Time Spent: 5h (was: 4h 50m)
> Split Cleaner into separate manageable modular entities
> -------------------------------------------------------
>
> Key: HIVE-27019
> URL: https://issues.apache.org/jira/browse/HIVE-27019
> Project: Hive
> Issue Type: Sub-task
> Reporter: Sourabh Badhya
> Assignee: Sourabh Badhya
> Priority: Major
> Labels: pull-request-available
> Time Spent: 5h
> Remaining Estimate: 0h
>
> As described by the parent task -
> Cleaner can be divided into separate entities like -
> *1) Handler* - This entity fetches the data from the metastore DB from
> relevant tables and converts it into a request entity called CleaningRequest.
> It would also do SQL operations post cleanup (postprocess). Every type of
> cleaning request is provided by a separate handler.
> *2) Filesystem remover* - This entity fetches the cleaning requests from
> various handlers and deletes them according to the cleaning request.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)