deniskuzZ commented on code in PR #4855:
URL: https://github.com/apache/hive/pull/4855#discussion_r1414166953
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java:
##########
@@ -337,151 +341,16 @@ protected Boolean findNextCompactionAndExecute(boolean
collectGenericStats, bool
checkInterrupt();
- String fullTableName = TxnUtils.getFullTableName(table.getDbName(),
table.getTableName());
-
-
- // Find the partition we will be working with, if there is one.
- Partition p;
- try {
- p = resolvePartition(ci);
- if (p == null && ci.partName != null) {
- ci.errorMessage = "Unable to find partition " +
ci.getFullPartitionName() + ", assuming it was dropped and moving on.";
- LOG.warn(ci.errorMessage + " Compaction info: {}", ci);
- msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
- return false;
- }
- } catch (Exception e) {
- LOG.error("Unexpected error during resolving partition.", e);
- ci.errorMessage = e.getMessage();
- msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
- return false;
- }
-
- checkInterrupt();
-
- // Find the appropriate storage descriptor
- final StorageDescriptor sd = resolveStorageDescriptor(table, p);
+ CompactionExecutor compactionExecutor;
- // Check that the table or partition isn't sorted, as we don't yet
support that.
- if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
- ci.errorMessage = "Attempt to compact sorted table " +
ci.getFullTableName() + ", which is not yet supported!";
- LOG.warn(ci.errorMessage + " Compaction info: {}", ci);
- msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
- return false;
+ if (MetaStoreUtils.isIcebergTable(table.getParameters())) {
+ compactionExecutor = new IcebergCompactionExecutor(this, ci,
compactionTxn, table, collectGenericStats, collectMrStats);
}
-
- if (ci.runAs == null) {
- ci.runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
+ else {
+ compactionExecutor = new ACIDCompactionExecutor(this, ci,
compactionTxn, table, collectGenericStats, collectMrStats);
}
- checkInterrupt();
-
- /**
- * we cannot have Worker use HiveTxnManager (which is on ThreadLocal)
since
- * then the Driver would already have the an open txn but then this txn
would have
- * multiple statements in it (for query based compactor) which is not
supported (and since
- * this case some of the statements are DDL, even in the future will not
be allowed in a
- * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList,
long)} */
- compactionTxn.open(ci);
-
- ValidTxnList validTxnList = msc.getValidTxns(compactionTxn.getTxnId());
- //with this ValidWriteIdList is capped at whatever HWM validTxnList has
- final ValidCompactorWriteIdList tblValidWriteIds =
- TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds(
- Collections.singletonList(fullTableName),
validTxnList.writeToString()).get(0));
- LOG.debug("ValidCompactWriteIdList: " +
tblValidWriteIds.writeToString());
- conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
-
- ci.highestWriteId = tblValidWriteIds.getHighWatermark();
- //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we
keep metadata about
- //it until after any data written by it are physically removed
- msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci),
compactionTxn.getTxnId());
-
- checkInterrupt();
-
- // Don't start compaction or cleaning if not necessary
- if (isDynPartAbort(table, ci)) {
- msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
- compactionTxn.wasSuccessful();
- return false;
- }
- AcidDirectory dir = getAcidStateForWorker(ci, sd, tblValidWriteIds);
- if (!isEnoughToCompact(ci, dir, sd)) {
- if (needsCleaning(dir, sd)) {
- msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
- } else {
- // do nothing
- ci.errorMessage = "None of the compaction thresholds met, compaction
request is refused!";
- LOG.debug(ci.errorMessage + " Compaction info: {}", ci);
- msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
- }
- compactionTxn.wasSuccessful();
- return false;
- }
- if (!ci.isMajorCompaction() &&
!isMinorCompactionSupported(table.getParameters(), dir)) {
- ci.errorMessage = "Query based Minor compaction is not possible for
full acid tables having raw format " +
- "(non-acid) data in them.";
- LOG.error(ci.errorMessage + " Compaction info: {}", ci);
- try {
- msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
- } catch (Throwable tr) {
- LOG.error("Caught an exception while trying to mark compaction {} as
failed: {}", ci, tr);
- }
- return false;
- }
- checkInterrupt();
-
- try {
- failCompactionIfSetForTest();
-
- /*
- First try to run compaction via HiveQL queries.
- Compaction for MM tables happens here, or run compaction for Crud
tables if query-based compaction is enabled.
- todo Find a more generic approach to collecting files in the same
logical bucket to compact within the same
- task (currently we're using Tez split grouping).
- */
- CompactorPipeline compactorPipeline =
compactorFactory.getCompactorPipeline(table, conf, ci, msc);
- computeStats = (compactorPipeline.isMRCompaction() && collectMrStats)
|| collectGenericStats;
-
- LOG.info("Starting " + ci.type.toString() + " compaction for " +
ci.getFullPartitionName() + ", id:" +
- ci.id + " in " + compactionTxn + " with compute stats set to "
+ computeStats);
-
- CompactorContext compactorContext = new CompactorContext(conf, table,
p, sd, tblValidWriteIds, ci, dir);
- compactorPipeline.execute(compactorContext);
-
- LOG.info("Completed " + ci.type.toString() + " compaction for " +
ci.getFullPartitionName() + " in "
- + compactionTxn + ", marking as compacted.");
- msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
- compactionTxn.wasSuccessful();
-
- AcidMetricService.updateMetricsFromWorker(ci.dbname, ci.tableName,
ci.partName, ci.type,
- dir.getCurrentDirectories().size(), dir.getDeleteDeltas().size(),
conf, msc);
- } catch (Throwable e) {
- LOG.error("Caught exception while trying to compact " + ci +
- ". Marking failed to avoid repeated failures", e);
- final CompactionType ctype = ci.type;
- markFailed(ci, e.getMessage());
-
- computeStats = false;
-
- if (runJobAsSelf(ci.runAs)) {
- cleanupResultDirs(sd, tblValidWriteIds, ctype, dir);
Review Comment:
should stay
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]