tarak271 commented on code in PR #4859:
URL: https://github.com/apache/hive/pull/4859#discussion_r1510901271


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -295,4 +324,230 @@ public static LockRequest createLockRequest(HiveConf 
conf, CompactionInfo ci, lo
         !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
     return requestBuilder.build();
   }
+
+  private static CompactionResponse requestCompaction(CompactionInfo ci, 
String runAs, String hostname,
+      String runtimeVersion, TxnStore txnHandler) throws MetaException {
+    CompactionRequest compactionRequest = new CompactionRequest(ci.dbname, 
ci.tableName, ci.type);
+    if (ci.partName != null)
+      compactionRequest.setPartitionname(ci.partName);
+    compactionRequest.setRunas(runAs);
+    if (StringUtils.isEmpty(ci.initiatorId)) {
+      compactionRequest.setInitiatorId(hostname + "-" + 
Thread.currentThread().getId());
+    } else {
+      compactionRequest.setInitiatorId(ci.initiatorId);
+    }
+    compactionRequest.setInitiatorVersion(runtimeVersion);
+    compactionRequest.setPoolName(ci.poolName);
+    LOG.info("Requesting compaction: " + compactionRequest);
+    CompactionResponse resp = txnHandler.compact(compactionRequest);
+    if (resp.isAccepted()) {
+      ci.id = resp.getId();
+    }
+    return resp;
+  }
+
+  private static CompactionType determineCompactionType(CompactionInfo ci, 
AcidDirectory dir,
+      Map<String, String> tblProperties, long baseSize, long deltaSize, 
HiveConf conf) {
+    boolean noBase = false;
+    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+    if (baseSize == 0 && deltaSize > 0) {
+      noBase = true;
+    } else {
+      String deltaPctProp =
+          tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+      float deltaPctThreshold = deltaPctProp == null ? 
HiveConf.getFloatVar(conf,
+          HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : 
Float.parseFloat(deltaPctProp);
+      boolean bigEnough = (float) deltaSize / (float) baseSize > 
deltaPctThreshold;
+      boolean multiBase = dir.getObsolete().stream().anyMatch(path -> 
path.getName().startsWith(AcidUtils.BASE_PREFIX));
+
+      boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
+      if (LOG.isDebugEnabled()) {
+        StringBuilder msg = new StringBuilder("delta size: ");
+        msg.append(deltaSize);
+        msg.append(" base size: ");
+        msg.append(baseSize);
+        msg.append(" multiBase ");
+        msg.append(multiBase);
+        msg.append(" deltaSize ");
+        msg.append(deltaSize);
+        msg.append(" threshold: ");
+        msg.append(deltaPctThreshold);
+        msg.append(" delta/base ratio > 
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
+            .append(": ");
+        msg.append(bigEnough);
+        msg.append(".");
+        if (!initiateMajor) {
+          msg.append("not");
+        }
+        msg.append(" initiating major compaction.");
+        LOG.debug(msg.toString());
+      }
+      if (initiateMajor)
+        return CompactionType.MAJOR;
+    }
+
+    String deltaNumProp =
+        tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+    int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf,
+        HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : 
Integer.parseInt(deltaNumProp);
+    boolean enough = deltas.size() > deltaNumThreshold;
+    if (!enough) {
+      LOG.debug(
+          "Not enough deltas to initiate compaction for table=" + ci.tableName 
+ "partition=" + ci.partName
+              + ". Found: " + deltas.size() + " deltas, threshold is " + 
deltaNumThreshold);
+      return null;
+    }
+    // If there's no base file, do a major compaction
+    LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" 
: "has") + " base," + "requesting "
+        + (noBase ? "major" : "minor") + " compaction");
+
+    return noBase || !isMinorCompactionSupported(conf, tblProperties,
+        dir) ? CompactionType.MAJOR : CompactionType.MINOR;
+  }
+
+  private static long getBaseSize(AcidDirectory dir) throws IOException {
+    long baseSize = 0;
+    if (dir.getBase() != null) {
+      baseSize = getDirSize(dir.getFs(), dir.getBase());
+    } else {
+      for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) 
{
+        baseSize += origStat.getFileStatus().getLen();
+      }
+    }
+    return baseSize;
+  }
+
+  private static long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) 
throws IOException {
+    return dir.getFiles(fs, 
Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
+        .mapToLong(FileStatus::getLen).sum();
+  }
+
+  private static CompactionType checkForCompaction(final CompactionInfo ci, 
final ValidWriteIdList writeIds,
+      final StorageDescriptor sd, final Map<String, String> tblProperties, 
final String runAs, TxnStore txnHandler,
+      HiveConf conf) throws IOException, InterruptedException {
+    // If it's marked as too many aborted, we already know we need to compact
+    if (ci.tooManyAborts) {
+      LOG.debug("Found too many aborted transactions for "
+          + ci.getFullPartitionName() + ", " + "initiating major compaction");
+      return CompactionType.MAJOR;
+    }
+
+    if (ci.hasOldAbort) {
+      HiveConf.ConfVars oldAbortedTimeoutProp = 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
+      LOG.debug(
+          "Found an aborted transaction for " + ci.getFullPartitionName() + " 
with age older than threshold "
+              + oldAbortedTimeoutProp + ": " + 
conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
+              + "Initiating minor compaction.");
+      return CompactionType.MINOR;
+    }
+    AcidDirectory acidDirectory = AcidUtils.getAcidState(sd, writeIds, conf);
+    long baseSize = getBaseSize(acidDirectory);
+    FileSystem fs = acidDirectory.getFs();
+    Map<Path, Long> deltaSizes = new HashMap<>();
+    for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
+      deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
+    }
+    long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
+    AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, 
ci.partName, conf, txnHandler, baseSize,
+        deltaSizes, acidDirectory.getObsolete());
+
+    if (runJobAsSelf(runAs)) {
+      return determineCompactionType(ci, acidDirectory, tblProperties, 
baseSize, deltaSize, conf);
+    } else {
+      LOG.info("Going to initiate as user " + runAs + " for " + 
ci.getFullPartitionName());
+      UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, 
UserGroupInformation.getLoginUser());
+      CompactionType compactionType;
+      try {
+        compactionType = ugi.doAs(
+            (PrivilegedExceptionAction<CompactionType>) () -> 
determineCompactionType(ci, acidDirectory, tblProperties,
+                baseSize, deltaSize, conf));
+      } finally {
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi + 
" for " + ci.getFullPartitionName(),
+              exception);
+        }
+      }
+      return compactionType;
+    }
+  }
+
+  private static ValidWriteIdList resolveValidWriteIds(Table t, TxnStore 
txnHandler, HiveConf conf)
+      throws NoSuchTxnException, MetaException {
+    ValidTxnList validTxnList = new 
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
+    // The response will have one entry per table and hence we get only one 
ValidWriteIdList
+    String fullTableName = TxnUtils.getFullTableName(t.getDbName(), 
t.getTableName());
+    GetValidWriteIdsRequest validWriteIdsRequest =
+        new GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
+    validWriteIdsRequest.setValidTxnList(validTxnList.writeToString());
+
+    return TxnUtils.createValidCompactWriteIdList(
+        
txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0));
+  }
+
+  public static CompactionResponse scheduleCompactionIfRequired(CompactionInfo 
ci, Table t, Partition p, String runAs,
+      boolean metricsEnabled, String hostName, TxnStore txnHandler, HiveConf 
conf) throws MetaException {
+    StorageDescriptor sd = resolveStorageDescriptor(t, p);
+    try {
+      ValidWriteIdList validWriteIds = resolveValidWriteIds(t, txnHandler, 
conf);
+
+      checkInterrupt(Initiator.class.getName());
+
+      CompactionType type = checkForCompaction(ci, validWriteIds, sd, 
t.getParameters(), runAs, txnHandler, conf);
+      if (type != null) {
+        ci.type = type;
+        return requestCompaction(ci, runAs, hostName, ci.initiatorVersion, 
txnHandler);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to