difin commented on code in PR #4855:
URL: https://github.com/apache/hive/pull/4855#discussion_r1414502326


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java:
##########
@@ -337,151 +341,16 @@ protected Boolean findNextCompactionAndExecute(boolean 
collectGenericStats, bool
 
       checkInterrupt();
 
-      String fullTableName = TxnUtils.getFullTableName(table.getDbName(), 
table.getTableName());
-
-
-      // Find the partition we will be working with, if there is one.
-      Partition p;
-      try {
-        p = resolvePartition(ci);
-        if (p == null && ci.partName != null) {
-          ci.errorMessage = "Unable to find partition " + 
ci.getFullPartitionName() + ", assuming it was dropped and moving on.";
-          LOG.warn(ci.errorMessage + " Compaction info: {}", ci);
-          msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
-          return false;
-        }
-      } catch (Exception e) {
-        LOG.error("Unexpected error during resolving partition.", e);
-        ci.errorMessage = e.getMessage();
-        msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
-        return false;
-      }
-
-      checkInterrupt();
-
-      // Find the appropriate storage descriptor
-      final StorageDescriptor sd =  resolveStorageDescriptor(table, p);
+      CompactionExecutor compactionExecutor;
 
-      // Check that the table or partition isn't sorted, as we don't yet 
support that.
-      if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
-        ci.errorMessage = "Attempt to compact sorted table " + 
ci.getFullTableName() + ", which is not yet supported!";
-        LOG.warn(ci.errorMessage + " Compaction info: {}", ci);
-        msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
-        return false;
+      if (MetaStoreUtils.isIcebergTable(table.getParameters())) {
+        compactionExecutor = new IcebergCompactionExecutor(this, ci, 
compactionTxn, table, collectGenericStats, collectMrStats);
       }
-
-      if (ci.runAs == null) {
-        ci.runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
+      else {
+        compactionExecutor = new ACIDCompactionExecutor(this, ci, 
compactionTxn, table, collectGenericStats, collectMrStats);
       }
 
-      checkInterrupt();
-
-      /**
-       * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) 
since
-       * then the Driver would already have the an open txn but then this txn 
would have
-       * multiple statements in it (for query based compactor) which is not 
supported (and since
-       * this case some of the statements are DDL, even in the future will not 
be allowed in a
-       * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, 
long)} */
-      compactionTxn.open(ci);
-
-      ValidTxnList validTxnList = msc.getValidTxns(compactionTxn.getTxnId());
-      //with this ValidWriteIdList is capped at whatever HWM validTxnList has
-      final ValidCompactorWriteIdList tblValidWriteIds =
-          TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds(
-              Collections.singletonList(fullTableName), 
validTxnList.writeToString()).get(0));
-      LOG.debug("ValidCompactWriteIdList: " + 
tblValidWriteIds.writeToString());
-      conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
-
-      ci.highestWriteId = tblValidWriteIds.getHighWatermark();
-      //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we 
keep metadata about
-      //it until after any data written by it are physically removed
-      msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), 
compactionTxn.getTxnId());
-
-      checkInterrupt();
-
-      // Don't start compaction or cleaning if not necessary
-      if (isDynPartAbort(table, ci)) {
-        msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
-        compactionTxn.wasSuccessful();
-        return false;
-      }
-      AcidDirectory dir = getAcidStateForWorker(ci, sd, tblValidWriteIds);
-      if (!isEnoughToCompact(ci, dir, sd)) {
-        if (needsCleaning(dir, sd)) {
-          msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
-        } else {
-          // do nothing
-          ci.errorMessage = "None of the compaction thresholds met, compaction 
request is refused!";
-          LOG.debug(ci.errorMessage + " Compaction info: {}", ci);
-          msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
-        }
-        compactionTxn.wasSuccessful();
-        return false;
-      }
-      if (!ci.isMajorCompaction() && 
!isMinorCompactionSupported(table.getParameters(), dir)) {
-        ci.errorMessage = "Query based Minor compaction is not possible for 
full acid tables having raw format " +
-            "(non-acid) data in them.";
-        LOG.error(ci.errorMessage + " Compaction info: {}", ci);
-        try {
-          msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
-        } catch (Throwable tr) {
-          LOG.error("Caught an exception while trying to mark compaction {} as 
failed: {}", ci, tr);
-        }
-        return false;
-      }
-      checkInterrupt();
-
-      try {
-        failCompactionIfSetForTest();
-
-        /*
-        First try to run compaction via HiveQL queries.
-        Compaction for MM tables happens here, or run compaction for Crud 
tables if query-based compaction is enabled.
-        todo Find a more generic approach to collecting files in the same 
logical bucket to compact within the same
-        task (currently we're using Tez split grouping).
-        */
-        CompactorPipeline compactorPipeline = 
compactorFactory.getCompactorPipeline(table, conf, ci, msc);
-        computeStats = (compactorPipeline.isMRCompaction() && collectMrStats) 
|| collectGenericStats;
-
-        LOG.info("Starting " + ci.type.toString() + " compaction for " + 
ci.getFullPartitionName() + ", id:" +
-                ci.id + " in " + compactionTxn + " with compute stats set to " 
+ computeStats);
-
-        CompactorContext compactorContext = new CompactorContext(conf, table, 
p, sd, tblValidWriteIds, ci, dir);
-        compactorPipeline.execute(compactorContext);
-
-        LOG.info("Completed " + ci.type.toString() + " compaction for " + 
ci.getFullPartitionName() + " in "
-            + compactionTxn + ", marking as compacted.");
-        msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
-        compactionTxn.wasSuccessful();
-
-        AcidMetricService.updateMetricsFromWorker(ci.dbname, ci.tableName, 
ci.partName, ci.type,
-            dir.getCurrentDirectories().size(), dir.getDeleteDeltas().size(), 
conf, msc);
-      } catch (Throwable e) {
-        LOG.error("Caught exception while trying to compact " + ci +
-            ". Marking failed to avoid repeated failures", e);
-        final CompactionType ctype = ci.type;
-        markFailed(ci, e.getMessage());
-
-        computeStats = false;
-
-        if (runJobAsSelf(ci.runAs)) {
-          cleanupResultDirs(sd, tblValidWriteIds, ctype, dir);

Review Comment:
   Done



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java:
##########
@@ -499,67 +368,24 @@ protected Boolean findNextCompactionAndExecute(boolean 
collectGenericStats, bool
       }
     }
 
-    if (computeStats) {
-       statsUpdater.gatherStats(ci, conf, runJobAsSelf(ci.runAs) ? ci.runAs : 
table.getOwner(),
-              CompactorUtil.getCompactorJobQueueName(conf, ci, table), msc);
-    }
-    return true;
+    return compactionResult;
   }
 
-  /**
-   * Just AcidUtils.getAcidState, but with impersonation if needed.
-   */
-  private AcidDirectory getAcidStateForWorker(CompactionInfo ci, 
StorageDescriptor sd,
-          ValidCompactorWriteIdList tblValidWriteIds) throws IOException, 
InterruptedException {
-    if (runJobAsSelf(ci.runAs)) {
-      return AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf,
-              tblValidWriteIds, Ref.from(false), true);
-    }
+  protected void computeStats(CompactionInfo ci, Table table, boolean 
computeStats) {
 
-    UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, 
UserGroupInformation.getLoginUser());
-    try {
-      return ugi.doAs((PrivilegedExceptionAction<AcidDirectory>) () ->
-              AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, 
tblValidWriteIds,
-                      Ref.from(false), true));
-    } finally {
-      try {
-        FileSystem.closeAllForUGI(ugi);
-      } catch (IOException exception) {
-        LOG.error("Could not clean up file-system handles for UGI: " + ugi + " 
for " + ci.getFullPartitionName(),
-                exception);
-      }
-    }
-  }
-
-  private void cleanupResultDirs(StorageDescriptor sd, ValidWriteIdList 
writeIds, CompactionType ctype, AcidDirectory dir) {
-    // result directory for compactor to write new files
-    Path resultDir = QueryCompactor.Util.getCompactionResultDir(sd, writeIds, 
conf,
-        ctype == CompactionType.MAJOR, false, false, dir);
-    LOG.info("Deleting result directories created by the compactor:\n");
-    try {
-      FileSystem fs = resultDir.getFileSystem(conf);
-      LOG.info(resultDir.toString());
-      fs.delete(resultDir, true);
-
-      if (ctype == CompactionType.MINOR) {
-        Path deleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(sd, 
writeIds, conf,
-            false, true, false, dir);
-
-        LOG.info(deleteDeltaDir.toString());
-        fs.delete(deleteDeltaDir, true);
-      }
-    } catch (IOException ex) {
-      LOG.error("Caught exception while cleaning result directories:", ex);
+    if (computeStats) {
+      statsUpdater.gatherStats(ci, conf, runJobAsSelf(ci.runAs) ? ci.runAs : 
table.getOwner(),
+          CompactorUtil.getCompactorJobQueueName(conf, ci, table), msc);
     }
   }
 
-  private void failCompactionIfSetForTest() {
+  protected void failCompactionIfSetForTest() {
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && 
conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new 
RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
     }
   }
 
-  private void markFailed(CompactionInfo ci, String errorMessage) {
+  protected void markFailed(CompactionInfo ci, String errorMessage) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to