zabetak commented on code in PR #4859:
URL: https://github.com/apache/hive/pull/4859#discussion_r1503941565


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -59,6 +86,8 @@
 public class CompactorUtil {
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactorUtil.class);
   public static final String COMPACTOR = "compactor";
+  public static final String COMPACTOR_THRESHOLD_PREFIX = 
"compactorthreshold.";

Review Comment:
   This field along with most of the new methods added in this class can be 
made `private static` to improve encapsulation.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -295,4 +324,232 @@ public static LockRequest createLockRequest(HiveConf 
conf, CompactionInfo ci, lo
         !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
     return requestBuilder.build();
   }
+
+  public static String getInitiatorId(long threadId, String hostName) {
+    return hostName + "-" + threadId;
+  }
+
+  public static CompactionResponse requestCompaction(CompactionInfo ci, String 
runAs, String hostname, String runtimeVersion, TxnStore txnHandler) throws 
MetaException {

Review Comment:
   This line and few others that were added now exceed the 120 characters which 
is suggested by Hive coding conventions. Please reformat those accordingly.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -295,4 +324,232 @@ public static LockRequest createLockRequest(HiveConf 
conf, CompactionInfo ci, lo
         !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
     return requestBuilder.build();
   }
+
+  public static String getInitiatorId(long threadId, String hostName) {
+    return hostName + "-" + threadId;
+  }
+
+  public static CompactionResponse requestCompaction(CompactionInfo ci, String 
runAs, String hostname, String runtimeVersion, TxnStore txnHandler) throws 
MetaException {
+    CompactionRequest compactionRequest = new CompactionRequest(ci.dbname, 
ci.tableName, ci.type);
+    if (ci.partName != null)
+      compactionRequest.setPartitionname(ci.partName);
+    compactionRequest.setRunas(runAs);
+    if (StringUtils.isEmpty(ci.initiatorId)) {
+      
compactionRequest.setInitiatorId(getInitiatorId(Thread.currentThread().getId(),hostname));
+    } else {
+      compactionRequest.setInitiatorId(ci.initiatorId);
+    }
+    compactionRequest.setInitiatorVersion(runtimeVersion);
+    compactionRequest.setPoolName(ci.poolName);
+    LOG.info("Requesting compaction: " + compactionRequest);
+    CompactionResponse resp = txnHandler.compact(compactionRequest);
+    if (resp.isAccepted()) {
+      ci.id = resp.getId();
+    }
+    return resp;
+  }
+
+  static AcidDirectory getAcidDirectory(StorageDescriptor sd, ValidWriteIdList 
writeIds, HiveConf conf) throws IOException {
+    Path location = new Path(sd.getLocation());
+    FileSystem fs = location.getFileSystem(conf);
+    return AcidUtils.getAcidState(fs, location, conf, writeIds, 
Ref.from(false), false);
+  }

Review Comment:
   This method is better fit for `AcidUtils` than here. Please move it there 
and rename it to `AcidUtils.getAcidState` for making the APIs more uniform.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -295,4 +324,232 @@ public static LockRequest createLockRequest(HiveConf 
conf, CompactionInfo ci, lo
         !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
     return requestBuilder.build();
   }
+
+  public static String getInitiatorId(long threadId, String hostName) {
+    return hostName + "-" + threadId;
+  }
+

Review Comment:
   The method is longer than the actual code and it is only used in a single 
place. Please remove the method and inline the code where its used.



##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java:
##########
@@ -69,40 +80,48 @@ public AlterTableCompactOperation(DDLOperationContext 
context, AlterTableCompact
       compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets());
     }
 
-    InitiatorBase initiatorBase = new InitiatorBase();
-    initiatorBase.setConf(context.getConf());
-    initiatorBase.init(new AtomicBoolean());
-
-    Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
-        convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
-
-    if(desc.getPartitionSpec() != null){
-      Optional<String> partitionName =  
partitionMap.keySet().stream().findFirst();
-      partitionName.ifPresent(compactionRequest::setPartitionname);
-    }
-    List<CompactionResponse> compactionResponses =
-        initiatorBase.initiateCompactionForTable(compactionRequest, 
table.getTTable(), partitionMap);
-    for (CompactionResponse compactionResponse : compactionResponses) {
-      if (!compactionResponse.isAccepted()) {
-        String message;
-        if (compactionResponse.isSetErrormessage()) {
-          message = compactionResponse.getErrormessage();
-          throw new HiveException(ErrorMsg.COMPACTION_REFUSED, 
table.getDbName(), table.getTableName(),
-              "CompactionId: " + compactionResponse.getId(), message);
-        }
-        context.getConsole().printInfo(
-            "Compaction already enqueued with id " + 
compactionResponse.getId() + "; State is "
-                + compactionResponse.getState());
-        continue;
+    //Will directly initiate compaction if an un-partitioned table/a partition 
is specified in the request
+    if (desc.getPartitionSpec() != null || !table.isPartitioned()) {
+      if (desc.getPartitionSpec() != null) {
+        Optional<String> partitionName = 
partitionMap.keySet().stream().findFirst();
+        partitionName.ifPresent(compactionRequest::setPartitionname);
       }
-      context.getConsole().printInfo("Compaction enqueued with id " + 
compactionResponse.getId());
-      if (desc.isBlocking() && compactionResponse.isAccepted()) {
-        waitForCompactionToFinish(compactionResponse, context);
+      CompactionResponse compactionResponse = 
CompactorUtil.initiateCompactionForTable(compactionRequest,txnHandler);
+      parseCompactionResponse(compactionResponse, table, 
compactionRequest.getPartitionname());
+    } else { // Check for eligible partitions and initiate compaction
+      for (Map.Entry<String, org.apache.hadoop.hive.metastore.api.Partition> 
partitionMapEntry : partitionMap.entrySet()) {
+        compactionRequest.setPartitionname(partitionMapEntry.getKey());
+        CompactionResponse compactionResponse =
+            CompactorUtil.initiateCompactionForPartition(table.getTTable(), 
partitionMapEntry.getValue(),
+                compactionRequest,JavaUtils.hostname(),txnHandler, 
context.getConf());

Review Comment:
   nit: There are some spaces missing between commas and parameters.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -295,4 +324,232 @@ public static LockRequest createLockRequest(HiveConf 
conf, CompactionInfo ci, lo
         !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
     return requestBuilder.build();
   }
+
+  public static String getInitiatorId(long threadId, String hostName) {
+    return hostName + "-" + threadId;
+  }
+
+  public static CompactionResponse requestCompaction(CompactionInfo ci, String 
runAs, String hostname, String runtimeVersion, TxnStore txnHandler) throws 
MetaException {
+    CompactionRequest compactionRequest = new CompactionRequest(ci.dbname, 
ci.tableName, ci.type);
+    if (ci.partName != null)
+      compactionRequest.setPartitionname(ci.partName);
+    compactionRequest.setRunas(runAs);
+    if (StringUtils.isEmpty(ci.initiatorId)) {
+      
compactionRequest.setInitiatorId(getInitiatorId(Thread.currentThread().getId(),hostname));
+    } else {
+      compactionRequest.setInitiatorId(ci.initiatorId);
+    }
+    compactionRequest.setInitiatorVersion(runtimeVersion);
+    compactionRequest.setPoolName(ci.poolName);
+    LOG.info("Requesting compaction: " + compactionRequest);
+    CompactionResponse resp = txnHandler.compact(compactionRequest);
+    if (resp.isAccepted()) {
+      ci.id = resp.getId();
+    }
+    return resp;
+  }
+
+  static AcidDirectory getAcidDirectory(StorageDescriptor sd, ValidWriteIdList 
writeIds, HiveConf conf) throws IOException {
+    Path location = new Path(sd.getLocation());
+    FileSystem fs = location.getFileSystem(conf);
+    return AcidUtils.getAcidState(fs, location, conf, writeIds, 
Ref.from(false), false);
+  }
+
+  public static CompactionType determineCompactionType(CompactionInfo ci, 
AcidDirectory dir,
+                                                       Map<String, String> 
tblProperties, long baseSize, long deltaSize, HiveConf conf) {
+    boolean noBase = false;
+    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+    if (baseSize == 0 && deltaSize > 0) {
+      noBase = true;
+    } else {
+      String deltaPctProp =
+              tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+      float deltaPctThreshold = deltaPctProp == null ? 
HiveConf.getFloatVar(conf,
+              HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : 
Float.parseFloat(deltaPctProp);
+      boolean bigEnough = (float) deltaSize / (float) baseSize > 
deltaPctThreshold;
+      boolean multiBase = dir.getObsolete().stream().anyMatch(path -> 
path.getName().startsWith(AcidUtils.BASE_PREFIX));
+
+      boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
+      if (LOG.isDebugEnabled()) {
+        StringBuilder msg = new StringBuilder("delta size: ");
+        msg.append(deltaSize);
+        msg.append(" base size: ");
+        msg.append(baseSize);
+        msg.append(" multiBase ");
+        msg.append(multiBase);
+        msg.append(" deltaSize ");
+        msg.append(deltaSize);
+        msg.append(" threshold: ");
+        msg.append(deltaPctThreshold);
+        msg.append(" delta/base ratio > 
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
+                .append(": ");
+        msg.append(bigEnough);
+        msg.append(".");
+        if (!initiateMajor) {
+          msg.append("not");
+        }
+        msg.append(" initiating major compaction.");
+        LOG.debug(msg.toString());
+      }
+      if (initiateMajor)
+        return CompactionType.MAJOR;
+    }
+
+    String deltaNumProp =
+            tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+    int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf,
+            HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : 
Integer.parseInt(deltaNumProp);
+    boolean enough = deltas.size() > deltaNumThreshold;
+    if (!enough) {
+      LOG.debug("Not enough deltas to initiate compaction for table=" + 
ci.tableName + "partition=" + ci.partName
+              + ". Found: " + deltas.size() + " deltas, threshold is " + 
deltaNumThreshold);
+      return null;
+    }
+    // If there's no base file, do a major compaction
+    LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" 
: "has") + " base," + "requesting "
+            + (noBase ? "major" : "minor") + " compaction");
+
+    return noBase || !CompactorUtil.isMinorCompactionSupported(conf, 
tblProperties, dir) ? CompactionType.MAJOR : CompactionType.MINOR;
+  }
+
+  public static long getBaseSize(AcidDirectory dir) throws IOException {
+    long baseSize = 0;
+    if (dir.getBase() != null) {
+      baseSize = getDirSize(dir.getFs(), dir.getBase());
+    } else {
+      for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) 
{
+        baseSize += origStat.getFileStatus().getLen();
+      }
+    }
+    return baseSize;
+  }
+
+  public static long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) 
throws IOException {
+    return dir.getFiles(fs, 
Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
+            .mapToLong(FileStatus::getLen).sum();
+  }
+
+  public static CompactionType checkForCompaction(final CompactionInfo ci, 
final ValidWriteIdList writeIds,
+                                                  final StorageDescriptor sd, 
final Map<String, String> tblProperties, final String runAs, TxnStore 
txnHandler, HiveConf conf)
+          throws IOException, InterruptedException {
+    // If it's marked as too many aborted, we already know we need to compact
+    if (ci.tooManyAborts) {
+      LOG.debug("Found too many aborted transactions for " + 
ci.getFullPartitionName() + ", "
+              + "initiating major compaction");
+      return CompactionType.MAJOR;
+    }
+
+    if (ci.hasOldAbort) {
+      HiveConf.ConfVars oldAbortedTimeoutProp = 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
+      LOG.debug("Found an aborted transaction for " + 
ci.getFullPartitionName() + " with age older than threshold "
+              + oldAbortedTimeoutProp + ": " + 
conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
+              + "Initiating minor compaction.");
+      return CompactionType.MINOR;
+    }
+    AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds, conf);
+    long baseSize = getBaseSize(acidDirectory);
+    FileSystem fs = acidDirectory.getFs();
+    Map<Path, Long> deltaSizes = new HashMap<>();
+    for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
+      deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
+    }
+    long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
+    AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, 
ci.partName, conf, txnHandler, baseSize,
+            deltaSizes, acidDirectory.getObsolete());
+
+    if (CompactorUtil.runJobAsSelf(runAs)) {
+      return determineCompactionType(ci, acidDirectory, tblProperties, 
baseSize, deltaSize, conf);
+    } else {
+      LOG.info("Going to initiate as user " + runAs + " for " + 
ci.getFullPartitionName());
+      UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, 
UserGroupInformation.getLoginUser());
+      CompactionType compactionType;
+      try {
+        compactionType = ugi.doAs(
+                (PrivilegedExceptionAction<CompactionType>) () -> 
determineCompactionType(ci, acidDirectory, tblProperties,
+                        baseSize, deltaSize, conf));
+      } finally {
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi + 
" for " + ci.getFullPartitionName(),
+                  exception);
+        }
+      }
+      return compactionType;
+    }
+  }
+
+  public static ValidWriteIdList resolveValidWriteIds(Table t, TxnStore 
txnHandler, HiveConf conf)
+          throws NoSuchTxnException, MetaException {
+    ValidTxnList validTxnList = new 
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
+    // The response will have one entry per table and hence we get only one 
ValidWriteIdList
+    String fullTableName = TxnUtils.getFullTableName(t.getDbName(), 
t.getTableName());
+    GetValidWriteIdsRequest validWriteIdsRequest = new 
GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
+    validWriteIdsRequest.setValidTxnList(validTxnList.writeToString());
+
+    return 
TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0));
+  }
+
+  public static CompactionResponse scheduleCompactionIfRequired(CompactionInfo 
ci, Table t,
+                                                                Partition p, 
String runAs, boolean metricsEnabled, String hostName, TxnStore txnHandler, 
HiveConf conf)
+          throws MetaException {
+    StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(t, p);
+    try {
+      ValidWriteIdList validWriteIds = resolveValidWriteIds(t,txnHandler, 
conf);
+
+      CompactorUtil.checkInterrupt(Initiator.class.getName());
+
+      CompactionType type = checkForCompaction(ci, validWriteIds, sd, 
t.getParameters(), runAs, txnHandler, conf);
+      if (type != null) {
+        ci.type = type;
+        return requestCompaction(ci, runAs,hostName, ci.initiatorVersion, 
txnHandler);
+      }
+    } catch (InterruptedException e) {
+      //Handle InterruptedException separately so the compactionInfo won't be 
marked as failed.
+      LOG.info("Initiator pool is being shut down, task received 
interruption.");
+    } catch (Throwable ex) {
+      String errorMessage = "Caught exception while trying to determine if we 
should compact " + ci + ". Marking "
+              + "failed to avoid repeated failures, " + ex;
+      LOG.error(errorMessage);
+      ci.errorMessage = errorMessage;
+      if (metricsEnabled) {
+        
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc();
+      }
+      txnHandler.markFailed(ci);
+    }
+    return null;
+  }
+
+  public static CompactionResponse initiateCompactionForPartition(Table table, 
Partition partition,
+                                                           CompactionRequest 
compactionRequest,String hostName, TxnStore txnHandler, HiveConf conf) throws 
MetaException {
+    ValidTxnList validTxnList = 
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    CompactionResponse compactionResponse;
+    CompactionInfo compactionInfo =
+            new CompactionInfo(table.getDbName(), table.getTableName(), 
compactionRequest.getPartitionname(),
+                    compactionRequest.getType());
+    compactionInfo.initiatorId = compactionRequest.getInitiatorId();
+    compactionInfo.orderByClause = compactionRequest.getOrderByClause();
+    compactionInfo.initiatorVersion = compactionRequest.getInitiatorVersion();
+    if (compactionRequest.getNumberOfBuckets() > 0) {
+      compactionInfo.numberOfBuckets = compactionRequest.getNumberOfBuckets();
+    }
+    compactionInfo.poolName = compactionRequest.getPoolName();
+    try {
+      StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(table, 
partition);
+      String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
+      LOG.info("Checking to see if we should compact partition {} of table 
{}.{}",
+              compactionInfo.partName, table.getDbName(), 
table.getTableName());
+      compactionResponse = 
CompactorUtil.scheduleCompactionIfRequired(compactionInfo, table, partition, 
runAs, false, hostName, txnHandler, conf);

Review Comment:
   `CompactorUtil` classifier is not needed. Removing it will make the line 
shorter and the code more readable.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -295,4 +324,232 @@ public static LockRequest createLockRequest(HiveConf 
conf, CompactionInfo ci, lo
         !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
     return requestBuilder.build();
   }
+
+  public static String getInitiatorId(long threadId, String hostName) {
+    return hostName + "-" + threadId;
+  }
+
+  public static CompactionResponse requestCompaction(CompactionInfo ci, String 
runAs, String hostname, String runtimeVersion, TxnStore txnHandler) throws 
MetaException {
+    CompactionRequest compactionRequest = new CompactionRequest(ci.dbname, 
ci.tableName, ci.type);
+    if (ci.partName != null)
+      compactionRequest.setPartitionname(ci.partName);
+    compactionRequest.setRunas(runAs);
+    if (StringUtils.isEmpty(ci.initiatorId)) {
+      
compactionRequest.setInitiatorId(getInitiatorId(Thread.currentThread().getId(),hostname));
+    } else {
+      compactionRequest.setInitiatorId(ci.initiatorId);
+    }
+    compactionRequest.setInitiatorVersion(runtimeVersion);
+    compactionRequest.setPoolName(ci.poolName);
+    LOG.info("Requesting compaction: " + compactionRequest);
+    CompactionResponse resp = txnHandler.compact(compactionRequest);
+    if (resp.isAccepted()) {
+      ci.id = resp.getId();
+    }
+    return resp;
+  }
+
+  static AcidDirectory getAcidDirectory(StorageDescriptor sd, ValidWriteIdList 
writeIds, HiveConf conf) throws IOException {
+    Path location = new Path(sd.getLocation());
+    FileSystem fs = location.getFileSystem(conf);
+    return AcidUtils.getAcidState(fs, location, conf, writeIds, 
Ref.from(false), false);
+  }
+
+  public static CompactionType determineCompactionType(CompactionInfo ci, 
AcidDirectory dir,
+                                                       Map<String, String> 
tblProperties, long baseSize, long deltaSize, HiveConf conf) {
+    boolean noBase = false;
+    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+    if (baseSize == 0 && deltaSize > 0) {
+      noBase = true;
+    } else {
+      String deltaPctProp =
+              tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+      float deltaPctThreshold = deltaPctProp == null ? 
HiveConf.getFloatVar(conf,
+              HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : 
Float.parseFloat(deltaPctProp);
+      boolean bigEnough = (float) deltaSize / (float) baseSize > 
deltaPctThreshold;
+      boolean multiBase = dir.getObsolete().stream().anyMatch(path -> 
path.getName().startsWith(AcidUtils.BASE_PREFIX));
+
+      boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
+      if (LOG.isDebugEnabled()) {
+        StringBuilder msg = new StringBuilder("delta size: ");
+        msg.append(deltaSize);
+        msg.append(" base size: ");
+        msg.append(baseSize);
+        msg.append(" multiBase ");
+        msg.append(multiBase);
+        msg.append(" deltaSize ");
+        msg.append(deltaSize);
+        msg.append(" threshold: ");
+        msg.append(deltaPctThreshold);
+        msg.append(" delta/base ratio > 
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
+                .append(": ");
+        msg.append(bigEnough);
+        msg.append(".");
+        if (!initiateMajor) {
+          msg.append("not");
+        }
+        msg.append(" initiating major compaction.");
+        LOG.debug(msg.toString());
+      }
+      if (initiateMajor)
+        return CompactionType.MAJOR;
+    }
+
+    String deltaNumProp =
+            tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+    int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf,
+            HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : 
Integer.parseInt(deltaNumProp);
+    boolean enough = deltas.size() > deltaNumThreshold;
+    if (!enough) {
+      LOG.debug("Not enough deltas to initiate compaction for table=" + 
ci.tableName + "partition=" + ci.partName
+              + ". Found: " + deltas.size() + " deltas, threshold is " + 
deltaNumThreshold);
+      return null;
+    }
+    // If there's no base file, do a major compaction
+    LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" 
: "has") + " base," + "requesting "
+            + (noBase ? "major" : "minor") + " compaction");
+
+    return noBase || !CompactorUtil.isMinorCompactionSupported(conf, 
tblProperties, dir) ? CompactionType.MAJOR : CompactionType.MINOR;
+  }
+
+  public static long getBaseSize(AcidDirectory dir) throws IOException {
+    long baseSize = 0;
+    if (dir.getBase() != null) {
+      baseSize = getDirSize(dir.getFs(), dir.getBase());
+    } else {
+      for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) 
{
+        baseSize += origStat.getFileStatus().getLen();
+      }
+    }
+    return baseSize;
+  }
+
+  public static long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) 
throws IOException {
+    return dir.getFiles(fs, 
Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
+            .mapToLong(FileStatus::getLen).sum();
+  }
+
+  public static CompactionType checkForCompaction(final CompactionInfo ci, 
final ValidWriteIdList writeIds,
+                                                  final StorageDescriptor sd, 
final Map<String, String> tblProperties, final String runAs, TxnStore 
txnHandler, HiveConf conf)
+          throws IOException, InterruptedException {
+    // If it's marked as too many aborted, we already know we need to compact
+    if (ci.tooManyAborts) {
+      LOG.debug("Found too many aborted transactions for " + 
ci.getFullPartitionName() + ", "
+              + "initiating major compaction");
+      return CompactionType.MAJOR;
+    }
+
+    if (ci.hasOldAbort) {
+      HiveConf.ConfVars oldAbortedTimeoutProp = 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
+      LOG.debug("Found an aborted transaction for " + 
ci.getFullPartitionName() + " with age older than threshold "
+              + oldAbortedTimeoutProp + ": " + 
conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
+              + "Initiating minor compaction.");
+      return CompactionType.MINOR;
+    }
+    AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds, conf);
+    long baseSize = getBaseSize(acidDirectory);
+    FileSystem fs = acidDirectory.getFs();
+    Map<Path, Long> deltaSizes = new HashMap<>();
+    for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
+      deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
+    }
+    long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
+    AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, 
ci.partName, conf, txnHandler, baseSize,
+            deltaSizes, acidDirectory.getObsolete());
+
+    if (CompactorUtil.runJobAsSelf(runAs)) {
+      return determineCompactionType(ci, acidDirectory, tblProperties, 
baseSize, deltaSize, conf);
+    } else {
+      LOG.info("Going to initiate as user " + runAs + " for " + 
ci.getFullPartitionName());
+      UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, 
UserGroupInformation.getLoginUser());
+      CompactionType compactionType;
+      try {
+        compactionType = ugi.doAs(
+                (PrivilegedExceptionAction<CompactionType>) () -> 
determineCompactionType(ci, acidDirectory, tblProperties,
+                        baseSize, deltaSize, conf));
+      } finally {
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi + 
" for " + ci.getFullPartitionName(),
+                  exception);
+        }
+      }
+      return compactionType;
+    }
+  }
+
+  public static ValidWriteIdList resolveValidWriteIds(Table t, TxnStore 
txnHandler, HiveConf conf)
+          throws NoSuchTxnException, MetaException {
+    ValidTxnList validTxnList = new 
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
+    // The response will have one entry per table and hence we get only one 
ValidWriteIdList
+    String fullTableName = TxnUtils.getFullTableName(t.getDbName(), 
t.getTableName());
+    GetValidWriteIdsRequest validWriteIdsRequest = new 
GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
+    validWriteIdsRequest.setValidTxnList(validTxnList.writeToString());
+
+    return 
TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0));
+  }
+
+  public static CompactionResponse scheduleCompactionIfRequired(CompactionInfo 
ci, Table t,
+                                                                Partition p, 
String runAs, boolean metricsEnabled, String hostName, TxnStore txnHandler, 
HiveConf conf)
+          throws MetaException {
+    StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(t, p);

Review Comment:
   We are already inside `CompactorUtil` so full classifier is not necessary.



##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java:
##########
@@ -69,40 +80,48 @@ public AlterTableCompactOperation(DDLOperationContext 
context, AlterTableCompact
       compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets());
     }
 
-    InitiatorBase initiatorBase = new InitiatorBase();
-    initiatorBase.setConf(context.getConf());
-    initiatorBase.init(new AtomicBoolean());
-
-    Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
-        convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
-
-    if(desc.getPartitionSpec() != null){
-      Optional<String> partitionName =  
partitionMap.keySet().stream().findFirst();
-      partitionName.ifPresent(compactionRequest::setPartitionname);
-    }
-    List<CompactionResponse> compactionResponses =
-        initiatorBase.initiateCompactionForTable(compactionRequest, 
table.getTTable(), partitionMap);
-    for (CompactionResponse compactionResponse : compactionResponses) {
-      if (!compactionResponse.isAccepted()) {
-        String message;
-        if (compactionResponse.isSetErrormessage()) {
-          message = compactionResponse.getErrormessage();
-          throw new HiveException(ErrorMsg.COMPACTION_REFUSED, 
table.getDbName(), table.getTableName(),
-              "CompactionId: " + compactionResponse.getId(), message);
-        }
-        context.getConsole().printInfo(
-            "Compaction already enqueued with id " + 
compactionResponse.getId() + "; State is "
-                + compactionResponse.getState());
-        continue;
+    //Will directly initiate compaction if an un-partitioned table/a partition 
is specified in the request
+    if (desc.getPartitionSpec() != null || !table.isPartitioned()) {
+      if (desc.getPartitionSpec() != null) {
+        Optional<String> partitionName = 
partitionMap.keySet().stream().findFirst();
+        partitionName.ifPresent(compactionRequest::setPartitionname);
       }
-      context.getConsole().printInfo("Compaction enqueued with id " + 
compactionResponse.getId());
-      if (desc.isBlocking() && compactionResponse.isAccepted()) {
-        waitForCompactionToFinish(compactionResponse, context);
+      CompactionResponse compactionResponse = 
CompactorUtil.initiateCompactionForTable(compactionRequest,txnHandler);
+      parseCompactionResponse(compactionResponse, table, 
compactionRequest.getPartitionname());
+    } else { // Check for eligible partitions and initiate compaction
+      for (Map.Entry<String, org.apache.hadoop.hive.metastore.api.Partition> 
partitionMapEntry : partitionMap.entrySet()) {
+        compactionRequest.setPartitionname(partitionMapEntry.getKey());
+        CompactionResponse compactionResponse =
+            CompactorUtil.initiateCompactionForPartition(table.getTTable(), 
partitionMapEntry.getValue(),
+                compactionRequest,JavaUtils.hostname(),txnHandler, 
context.getConf());

Review Comment:
   There is an identical method with `JavaUtils.hostname` in `ServerUtils` 
which is in common. The latter is a better alternative cause it avoids the 
extra coupling with the standalone-metastore module.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -295,4 +324,232 @@ public static LockRequest createLockRequest(HiveConf 
conf, CompactionInfo ci, lo
         !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
     return requestBuilder.build();
   }
+
+  public static String getInitiatorId(long threadId, String hostName) {
+    return hostName + "-" + threadId;
+  }
+
+  public static CompactionResponse requestCompaction(CompactionInfo ci, String 
runAs, String hostname, String runtimeVersion, TxnStore txnHandler) throws 
MetaException {
+    CompactionRequest compactionRequest = new CompactionRequest(ci.dbname, 
ci.tableName, ci.type);
+    if (ci.partName != null)
+      compactionRequest.setPartitionname(ci.partName);
+    compactionRequest.setRunas(runAs);
+    if (StringUtils.isEmpty(ci.initiatorId)) {
+      
compactionRequest.setInitiatorId(getInitiatorId(Thread.currentThread().getId(),hostname));
+    } else {
+      compactionRequest.setInitiatorId(ci.initiatorId);
+    }
+    compactionRequest.setInitiatorVersion(runtimeVersion);
+    compactionRequest.setPoolName(ci.poolName);
+    LOG.info("Requesting compaction: " + compactionRequest);
+    CompactionResponse resp = txnHandler.compact(compactionRequest);
+    if (resp.isAccepted()) {
+      ci.id = resp.getId();
+    }
+    return resp;
+  }
+
+  static AcidDirectory getAcidDirectory(StorageDescriptor sd, ValidWriteIdList 
writeIds, HiveConf conf) throws IOException {
+    Path location = new Path(sd.getLocation());
+    FileSystem fs = location.getFileSystem(conf);
+    return AcidUtils.getAcidState(fs, location, conf, writeIds, 
Ref.from(false), false);
+  }
+
+  public static CompactionType determineCompactionType(CompactionInfo ci, 
AcidDirectory dir,
+                                                       Map<String, String> 
tblProperties, long baseSize, long deltaSize, HiveConf conf) {
+    boolean noBase = false;
+    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+    if (baseSize == 0 && deltaSize > 0) {
+      noBase = true;
+    } else {
+      String deltaPctProp =
+              tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+      float deltaPctThreshold = deltaPctProp == null ? 
HiveConf.getFloatVar(conf,
+              HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : 
Float.parseFloat(deltaPctProp);
+      boolean bigEnough = (float) deltaSize / (float) baseSize > 
deltaPctThreshold;
+      boolean multiBase = dir.getObsolete().stream().anyMatch(path -> 
path.getName().startsWith(AcidUtils.BASE_PREFIX));
+
+      boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
+      if (LOG.isDebugEnabled()) {
+        StringBuilder msg = new StringBuilder("delta size: ");
+        msg.append(deltaSize);
+        msg.append(" base size: ");
+        msg.append(baseSize);
+        msg.append(" multiBase ");
+        msg.append(multiBase);
+        msg.append(" deltaSize ");
+        msg.append(deltaSize);
+        msg.append(" threshold: ");
+        msg.append(deltaPctThreshold);
+        msg.append(" delta/base ratio > 
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
+                .append(": ");
+        msg.append(bigEnough);
+        msg.append(".");
+        if (!initiateMajor) {
+          msg.append("not");
+        }
+        msg.append(" initiating major compaction.");
+        LOG.debug(msg.toString());
+      }
+      if (initiateMajor)
+        return CompactionType.MAJOR;
+    }
+
+    String deltaNumProp =
+            tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+    int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf,
+            HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : 
Integer.parseInt(deltaNumProp);
+    boolean enough = deltas.size() > deltaNumThreshold;
+    if (!enough) {
+      LOG.debug("Not enough deltas to initiate compaction for table=" + 
ci.tableName + "partition=" + ci.partName
+              + ". Found: " + deltas.size() + " deltas, threshold is " + 
deltaNumThreshold);
+      return null;
+    }
+    // If there's no base file, do a major compaction
+    LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" 
: "has") + " base," + "requesting "
+            + (noBase ? "major" : "minor") + " compaction");
+
+    return noBase || !CompactorUtil.isMinorCompactionSupported(conf, 
tblProperties, dir) ? CompactionType.MAJOR : CompactionType.MINOR;
+  }
+
+  public static long getBaseSize(AcidDirectory dir) throws IOException {
+    long baseSize = 0;
+    if (dir.getBase() != null) {
+      baseSize = getDirSize(dir.getFs(), dir.getBase());
+    } else {
+      for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) 
{
+        baseSize += origStat.getFileStatus().getLen();
+      }
+    }
+    return baseSize;
+  }
+
+  public static long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) 
throws IOException {
+    return dir.getFiles(fs, 
Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
+            .mapToLong(FileStatus::getLen).sum();
+  }
+
+  public static CompactionType checkForCompaction(final CompactionInfo ci, 
final ValidWriteIdList writeIds,
+                                                  final StorageDescriptor sd, 
final Map<String, String> tblProperties, final String runAs, TxnStore 
txnHandler, HiveConf conf)
+          throws IOException, InterruptedException {
+    // If it's marked as too many aborted, we already know we need to compact
+    if (ci.tooManyAborts) {
+      LOG.debug("Found too many aborted transactions for " + 
ci.getFullPartitionName() + ", "
+              + "initiating major compaction");
+      return CompactionType.MAJOR;
+    }
+
+    if (ci.hasOldAbort) {
+      HiveConf.ConfVars oldAbortedTimeoutProp = 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
+      LOG.debug("Found an aborted transaction for " + 
ci.getFullPartitionName() + " with age older than threshold "
+              + oldAbortedTimeoutProp + ": " + 
conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
+              + "Initiating minor compaction.");
+      return CompactionType.MINOR;
+    }
+    AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds, conf);
+    long baseSize = getBaseSize(acidDirectory);
+    FileSystem fs = acidDirectory.getFs();
+    Map<Path, Long> deltaSizes = new HashMap<>();
+    for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
+      deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
+    }
+    long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
+    AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, 
ci.partName, conf, txnHandler, baseSize,
+            deltaSizes, acidDirectory.getObsolete());
+
+    if (CompactorUtil.runJobAsSelf(runAs)) {
+      return determineCompactionType(ci, acidDirectory, tblProperties, 
baseSize, deltaSize, conf);
+    } else {
+      LOG.info("Going to initiate as user " + runAs + " for " + 
ci.getFullPartitionName());
+      UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, 
UserGroupInformation.getLoginUser());
+      CompactionType compactionType;
+      try {
+        compactionType = ugi.doAs(
+                (PrivilegedExceptionAction<CompactionType>) () -> 
determineCompactionType(ci, acidDirectory, tblProperties,
+                        baseSize, deltaSize, conf));
+      } finally {
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi + 
" for " + ci.getFullPartitionName(),
+                  exception);
+        }
+      }
+      return compactionType;
+    }
+  }
+
+  public static ValidWriteIdList resolveValidWriteIds(Table t, TxnStore 
txnHandler, HiveConf conf)
+          throws NoSuchTxnException, MetaException {
+    ValidTxnList validTxnList = new 
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
+    // The response will have one entry per table and hence we get only one 
ValidWriteIdList
+    String fullTableName = TxnUtils.getFullTableName(t.getDbName(), 
t.getTableName());
+    GetValidWriteIdsRequest validWriteIdsRequest = new 
GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
+    validWriteIdsRequest.setValidTxnList(validTxnList.writeToString());
+
+    return 
TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0));
+  }
+
+  public static CompactionResponse scheduleCompactionIfRequired(CompactionInfo 
ci, Table t,
+                                                                Partition p, 
String runAs, boolean metricsEnabled, String hostName, TxnStore txnHandler, 
HiveConf conf)
+          throws MetaException {
+    StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(t, p);
+    try {
+      ValidWriteIdList validWriteIds = resolveValidWriteIds(t,txnHandler, 
conf);
+
+      CompactorUtil.checkInterrupt(Initiator.class.getName());
+
+      CompactionType type = checkForCompaction(ci, validWriteIds, sd, 
t.getParameters(), runAs, txnHandler, conf);
+      if (type != null) {
+        ci.type = type;
+        return requestCompaction(ci, runAs,hostName, ci.initiatorVersion, 
txnHandler);
+      }
+    } catch (InterruptedException e) {
+      //Handle InterruptedException separately so the compactionInfo won't be 
marked as failed.
+      LOG.info("Initiator pool is being shut down, task received 
interruption.");
+    } catch (Throwable ex) {
+      String errorMessage = "Caught exception while trying to determine if we 
should compact " + ci + ". Marking "
+              + "failed to avoid repeated failures, " + ex;
+      LOG.error(errorMessage);
+      ci.errorMessage = errorMessage;
+      if (metricsEnabled) {
+        
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc();
+      }
+      txnHandler.markFailed(ci);
+    }
+    return null;
+  }
+
+  public static CompactionResponse initiateCompactionForPartition(Table table, 
Partition partition,
+                                                           CompactionRequest 
compactionRequest,String hostName, TxnStore txnHandler, HiveConf conf) throws 
MetaException {
+    ValidTxnList validTxnList = 
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    CompactionResponse compactionResponse;
+    CompactionInfo compactionInfo =
+            new CompactionInfo(table.getDbName(), table.getTableName(), 
compactionRequest.getPartitionname(),
+                    compactionRequest.getType());
+    compactionInfo.initiatorId = compactionRequest.getInitiatorId();
+    compactionInfo.orderByClause = compactionRequest.getOrderByClause();
+    compactionInfo.initiatorVersion = compactionRequest.getInitiatorVersion();
+    if (compactionRequest.getNumberOfBuckets() > 0) {
+      compactionInfo.numberOfBuckets = compactionRequest.getNumberOfBuckets();
+    }
+    compactionInfo.poolName = compactionRequest.getPoolName();
+    try {
+      StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(table, 
partition);
+      String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
+      LOG.info("Checking to see if we should compact partition {} of table 
{}.{}",
+              compactionInfo.partName, table.getDbName(), 
table.getTableName());
+      compactionResponse = 
CompactorUtil.scheduleCompactionIfRequired(compactionInfo, table, partition, 
runAs, false, hostName, txnHandler, conf);
+    } catch (IOException | InterruptedException | MetaException e) {
+      LOG.error("Error occurred while Checking if we should compact partition 
{} of table {}.{} Exception: {}",
+              compactionInfo.partName, table.getDbName(), 
table.getTableName(), e.getMessage());
+      throw new RuntimeException(e);
+    }
+    return compactionResponse;
+  }
+
+  public static CompactionResponse 
initiateCompactionForTable(CompactionRequest request, TxnStore txnHandler) 
throws MetaException {
+    return txnHandler.compact(request);

Review Comment:
   In the previous version of the code before launching the compaction the code 
was setting the `VALID_TXNS_KEY`
   ```
   ValidTxnList validTxnList = 
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
       conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
   ```
   Is it intentional that now it is not done?
   
   Most of the other comments are nitpicks but this is important to clarify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org


Reply via email to