veghlaci05 commented on code in PR #4032:
URL: https://github.com/apache/hive/pull/4032#discussion_r1116855904


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionHandler.java:
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CacheContainer;
+import org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest;
+import 
org.apache.hadoop.hive.ql.txn.compactor.CleaningRequest.CleaningRequestBuilder;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.commons.collections4.ListUtils.subtract;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
+import static java.util.Objects.isNull;
+
+/**
+ * A compaction based implementation of RequestHandler.
+ * Provides implementation of finding ready to clean items, preprocessing of 
cleaning request,
+ * postprocessing of cleaning request and failure handling of cleaning request.
+ */
+class CompactionHandler extends RequestHandler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionHandler.class.getName());
+
+  public CompactionHandler(HiveConf conf, TxnStore txnHandler,
+                                          CacheContainer cacheContainer, 
boolean metricsEnabled,
+                                          FSRemover fsRemover, ExecutorService 
cleanerExecutor) {
+    super(conf, txnHandler, cacheContainer, metricsEnabled, fsRemover, 
cleanerExecutor);
+  }
+
+  @Override
+  protected void processInternal() throws Exception {
+    long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+    long retentionTime = HiveConf.getBoolVar(conf, 
HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+            ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 
TimeUnit.MILLISECONDS)
+            : 0;
+    List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
+    if (!readyToClean.isEmpty()) {
+      long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
+      final long cleanerWaterMark =
+              minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, 
minTxnIdSeenOpen);
+
+      LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
+      // For checking which compaction can be cleaned we can use the 
minOpenTxnId
+      // However findReadyToClean will return all records that were compacted 
with old version of HMS
+      // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to 
provide minTxnIdSeenOpen
+      // to the clean method, to avoid cleaning up deltas needed for running 
queries
+      // when min_history_level is finally dropped, than every HMS will commit 
compaction the new way
+      // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used 
instead.
+      List<CompletableFuture<Void>> asyncTasks = new ArrayList<>();
+      for (CompactionInfo ci : readyToClean) {
+        CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(
+                CompactorUtil.ThrowingRunnable.unchecked(() -> clean(ci, 
cleanerWaterMark, metricsEnabled)), cleanerExecutor)
+                .exceptionally(t -> {
+                  LOG.error("Error clearing {} due to :", 
ci.getFullPartitionName(), t);
+                  return null;
+                });
+        asyncTasks.add(asyncTask);
+      }
+      //Use get instead of join, so we can receive InterruptedException and 
shutdown gracefully
+      CompletableFuture.allOf(asyncTasks.toArray(new 
CompletableFuture[0])).get();
+    }
+  }
+
+  private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean 
metricsEnabled) throws MetaException {
+    LOG.info("Starting cleaning for {}", ci);
+    PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
+    String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
+            (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
+    try {
+      if (metricsEnabled) {
+        perfLogger.perfLogBegin(CompactionHandler.class.getName(), 
cleanerMetric);
+      }
+      final String location = ci.getProperty("location");
+
+      Table t = null;
+      Partition p = null;
+
+      if (isNull(location)) {
+        t = cacheContainer.computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci.dbname, ci.tableName));
+        if (isNull(t)) {
+          // The table was dropped before we got around to cleaning it.
+          LOG.info("Unable to find table {}, assuming it was dropped. {}", 
ci.getFullTableName(),
+                  idWatermark(ci));
+          txnHandler.markCleaned(ci);
+          return;
+        }
+        if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+          // The table was marked no clean up true.
+          LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true", 
ci.getFullTableName());
+          txnHandler.markCleaned(ci);
+          return;
+        }
+        if (!isNull(ci.partName)) {
+          p = resolvePartition(ci.dbname, ci.tableName, ci.partName);
+          if (isNull(p)) {
+            // The partition was dropped before we got around to cleaning it.
+            LOG.info("Unable to find partition {}, assuming it was dropped. 
{}",
+                    ci.getFullPartitionName(), idWatermark(ci));
+            txnHandler.markCleaned(ci);
+            return;
+          }
+          if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+            // The partition was marked no clean up true.
+            LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to 
true", ci.getFullPartitionName());
+            txnHandler.markCleaned(ci);
+            return;
+          }
+        }
+      }
+      txnHandler.markCleanerStart(ci);
+
+      if (!isNull(t) || !isNull(ci.partName)) {
+        String path = isNull(location)
+                ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
+                : location;
+        boolean dropPartition = !isNull(ci.partName) && isNull(p);
+
+        //check if partition wasn't re-created
+        if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName, 
ci.partName))) {
+          cleanUsingLocation(ci, path, true);
+        } else {
+          cleanUsingAcidDir(ci, path, minOpenTxnGLB);
+        }
+      } else {
+        cleanUsingLocation(ci, location, false);
+      }
+    } catch (Exception e) {
+      LOG.error("Caught exception when cleaning, unable to complete cleaning 
of {} due to {}", ci,
+              e.getMessage());
+      ci.errorMessage = e.getMessage();
+      if (metricsEnabled) {
+        
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
+      }
+      handleCleanerAttemptFailure(ci);
+    }  finally {
+      if (metricsEnabled) {
+        perfLogger.perfLogEnd(CompactionHandler.class.getName(), 
cleanerMetric);
+      }
+    }
+  }
+
+  private void cleanUsingLocation(CompactionInfo ci, String path, boolean 
requiresLock) throws MetaException {
+    List<Path> deleted;
+    if (requiresLock) {
+      LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, 
DataOperationType.DELETE);
+      LockResponse res = null;
+      try {
+        res = txnHandler.lock(lockRequest);
+        deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
+      } catch (NoSuchTxnException | TxnAbortedException e) {
+        LOG.error("Error while trying to acquire exclusive write lock: {}", 
e.getMessage());
+        throw new MetaException(e.getMessage());
+      } finally {
+        if (res != null) {
+          try {
+            txnHandler.unlock(new UnlockRequest(res.getLockid()));
+          } catch (NoSuchLockException | TxnOpenException e) {
+            LOG.error("Error while trying to release exclusive write lock: 
{}", e.getMessage());
+          }
+        }
+      }
+    } else {
+      deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
+    }
+    if (!deleted.isEmpty()) {
+      txnHandler.markCleaned(ci);
+    } else {
+      txnHandler.clearCleanerStart(ci);
+    }
+  }
+
+  private void cleanUsingAcidDir(CompactionInfo ci, String location, long 
minOpenTxnGLB) throws Exception {
+    ValidTxnList validTxnList =
+            TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), 
minOpenTxnGLB);
+    //save it so that getAcidState() sees it
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    /*
+     * {@code validTxnList} is capped by minOpenTxnGLB so if
+     * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} 
sees a base/delta
+     * produced by a compactor, that means every reader that could be active 
right now see it
+     * as well.  That means if this base/delta shadows some earlier 
base/delta, the it will be
+     * used in favor of any files that it shadows.  Thus the shadowed files 
are safe to delete.
+     *
+     *
+     * The metadata about aborted writeIds (and consequently aborted txn IDs) 
cannot be deleted
+     * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
+     * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
+     * For example given partition P1, txnid:150 starts and sees txnid:149 as 
open.
+     * Say compactor runs in txnid:160, but 149 is still open and P1 has the 
largest resolved
+     * writeId:17.  Compactor will produce base_17_c160.
+     * Suppose txnid:149 writes delta_18_18
+     * to P1 and aborts.  Compactor can only remove TXN_COMPONENTS entries
+     * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and 
perhaps corrupted) but
+     * not visible based on 'validTxnList' capped at minOpenTxn so it will not 
not be cleaned by
+     * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so 
we must keep the
+     * metadata that says that 18 is aborted.
+     * In a slightly different case, whatever txn created delta_18 (and all 
other txn) may have
+     * committed by the time cleaner runs and so cleaner will indeed see 
delta_18_18 and remove
+     * it (since it has nothing but aborted data).  But we can't tell which 
actually happened
+     * in markCleaned() so make sure it doesn't delete meta above 
CG_CQ_HIGHEST_WRITE_ID.
+     *
+     * We could perhaps make cleaning of aborted and obsolete and remove all 
aborted files up
+     * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta 
can be removed
+     * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID.  
This could be
+     * useful if there is all of a sudden a flood of aborted txns.  (For 
another day).
+     */
+
+    // Creating 'reader' list since we are interested in the set of 'obsolete' 
files
+    ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, 
validTxnList);
+    LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+    Path path = new Path(location);
+    FileSystem fs = path.getFileSystem(conf);
+
+    // Collect all the files/dirs
+    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = 
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
+    AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, 
validWriteIdList, Ref.from(false), false,
+            dirSnapshots);
+    Table table = cacheContainer.computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci.dbname, ci.tableName));
+    boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, ci.partName);
+
+    List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir, 
isDynPartAbort);
+    if (isDynPartAbort || dir.hasUncompactedAborts()) {
+      ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
+    }
+
+    List<Path> deleted = fsRemover.clean(new 
CleaningRequestBuilder().setLocation(location).setRunAs(ci.runAs)
+            
.setObsoleteDirs(obsoleteDirs).setPurge(true).setFullPartitionName(ci.getFullPartitionName())
+            .build());
+
+    if (deleted.size() > 0) {
+      AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, 
ci.partName, dir.getObsolete(), conf,
+              txnHandler);
+    }
+
+    // Make sure there are no leftovers below the compacted watermark
+    boolean success = false;
+    conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
+    dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
+                    ci.getFullTableName(), new long[0], new BitSet(), 
ci.highestWriteId, Long.MAX_VALUE),
+            Ref.from(false), false, dirSnapshots);
+
+    List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir, 
isDynPartAbort), deleted);
+    if (!remained.isEmpty()) {
+      LOG.warn("{} Remained {} obsolete directories from {}. {}",
+              idWatermark(ci), remained.size(), location, 
CompactorUtil.getDebugInfo(remained));
+    } else {
+      LOG.debug(idWatermark(ci) + " All cleared below the watermark: " + 
ci.highestWriteId + " from " + location);
+      success = true;
+    }
+    if (success || CompactorUtil.isDynPartAbort(table, ci.partName)) {
+      txnHandler.markCleaned(ci);
+    } else {
+      txnHandler.clearCleanerStart(ci);
+      LOG.warn("No files were removed. Leaving queue entry {} in ready for 
cleaning state.", ci);
+    }
+  }
+
+  protected LockRequest createLockRequest(CompactionInfo ci, long txnId, 
LockType lockType, DataOperationType opType) {

Review Comment:
   This method has a single usage, all parameters except ci can be removed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to