[
https://issues.apache.org/jira/browse/HIVE-27019?focusedWorklogId=845133&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-845133
]
ASF GitHub Bot logged work on HIVE-27019:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Feb/23 13:27
Start Date: 13/Feb/23 13:27
Worklog Time Spent: 10m
Work Description: SourabhBadhya commented on code in PR #4032:
URL: https://github.com/apache/hive/pull/4032#discussion_r1104469718
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java:
##########
@@ -212,323 +140,8 @@ public void run() {
}
}
- private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean
metricsEnabled) throws MetaException {
- LOG.info("Starting cleaning for " + ci);
- PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
- String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
- (ci.type != null ? ci.type.toString().toLowerCase() : null);
- try {
- if (metricsEnabled) {
- perfLogger.perfLogBegin(CLASS_NAME, cleanerMetric);
- }
- final String location = ci.getProperty("location");
-
- Callable<Boolean> cleanUpTask;
- Table t = null;
- Partition p = null;
-
- if (location == null) {
- t = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci));
- if (t == null) {
- // The table was dropped before we got around to cleaning it.
- LOG.info("Unable to find table " + ci.getFullTableName() + ",
assuming it was dropped." +
- idWatermark(ci));
- txnHandler.markCleaned(ci);
- return;
- }
- if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
- // The table was marked no clean up true.
- LOG.info("Skipping table " + ci.getFullTableName() + " clean up, as
NO_CLEANUP set to true");
- txnHandler.markCleaned(ci);
- return;
- }
- if (ci.partName != null) {
- p = resolvePartition(ci);
- if (p == null) {
- // The partition was dropped before we got around to cleaning it.
- LOG.info("Unable to find partition " + ci.getFullPartitionName() +
- ", assuming it was dropped." + idWatermark(ci));
- txnHandler.markCleaned(ci);
- return;
- }
- if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
- // The partition was marked no clean up true.
- LOG.info("Skipping partition " + ci.getFullPartitionName() + "
clean up, as NO_CLEANUP set to true");
- txnHandler.markCleaned(ci);
- return;
- }
- }
- }
- txnHandler.markCleanerStart(ci);
-
- if (t != null || ci.partName != null) {
- String path = location == null
- ? resolveStorageDescriptor(t, p).getLocation()
- : location;
- boolean dropPartition = ci.partName != null && p == null;
- cleanUpTask = () -> removeFiles(path, minOpenTxnGLB, ci,
dropPartition);
- } else {
- cleanUpTask = () -> removeFiles(location, ci);
- }
-
- Ref<Boolean> removedFiles = Ref.from(false);
- if (runJobAsSelf(ci.runAs)) {
- removedFiles.value = cleanUpTask.call();
- } else {
- LOG.info("Cleaning as user " + ci.runAs + " for " +
ci.getFullPartitionName());
- UserGroupInformation ugi =
UserGroupInformation.createProxyUser(ci.runAs,
- UserGroupInformation.getLoginUser());
- try {
- ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
- removedFiles.value = cleanUpTask.call();
- return null;
- });
- } finally {
- try {
- FileSystem.closeAllForUGI(ugi);
- } catch (IOException exception) {
- LOG.error("Could not clean up file-system handles for UGI: " + ugi
+ " for " +
- ci.getFullPartitionName() + idWatermark(ci), exception);
- }
- }
- }
- if (removedFiles.value || isDynPartAbort(t, ci)) {
- txnHandler.markCleaned(ci);
- } else {
- txnHandler.clearCleanerStart(ci);
- LOG.warn("No files were removed. Leaving queue entry " + ci + " in
ready for cleaning state.");
- }
- } catch (Exception e) {
- LOG.error("Caught exception when cleaning, unable to complete cleaning
of " + ci + " " +
- StringUtils.stringifyException(e));
- ci.errorMessage = e.getMessage();
- if (metricsEnabled) {
-
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
- }
- handleCleanerAttemptFailure(ci);
- } finally {
- if (metricsEnabled) {
- perfLogger.perfLogEnd(CLASS_NAME, cleanerMetric);
- }
- }
- }
-
- private void handleCleanerAttemptFailure(CompactionInfo ci) throws
MetaException {
- long defaultRetention = getTimeVar(conf,
HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
- int cleanAttempts = 0;
- if (ci.retryRetention > 0) {
- cleanAttempts = (int)(Math.log(ci.retryRetention / defaultRetention) /
Math.log(2)) + 1;
- }
- if (cleanAttempts >= getIntVar(conf,
HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
- //Mark it as failed if the max attempt threshold is reached.
- txnHandler.markFailed(ci);
- } else {
- //Calculate retry retention time and update record.
- ci.retryRetention = (long)Math.pow(2, cleanAttempts) * defaultRetention;
- txnHandler.setCleanerRetryRetentionTimeOnError(ci);
- }
- }
-
- private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci,
ValidTxnList validTxnList)
- throws NoSuchTxnException, MetaException {
- List<String> tblNames =
Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName));
- GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
- request.setValidTxnList(validTxnList.writeToString());
- GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
- // we could have no write IDs for a table if it was never written to but
- // since we are in the Cleaner phase of compactions, there must have
- // been some delta/base dirs
- assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
- ValidReaderWriteIdList validWriteIdList =
-
TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
- /*
- * We need to filter the obsoletes dir list, to only remove directories
that were made obsolete by this compaction
- * If we have a higher retentionTime it is possible for a second
compaction to run on the same partition. Cleaning up the first compaction
- * should not touch the newer obsolete directories to not to violate the
retentionTime for those.
- */
- if (ci.highestWriteId < validWriteIdList.getHighWatermark()) {
- validWriteIdList =
validWriteIdList.updateHighWatermark(ci.highestWriteId);
- }
- return validWriteIdList;
- }
-
- private static boolean isDynPartAbort(Table t, CompactionInfo ci) {
- return Optional.ofNullable(t).map(Table::getPartitionKeys).filter(pk ->
pk.size() > 0).isPresent()
- && ci.partName == null;
- }
-
- private static String idWatermark(CompactionInfo ci) {
- return " id=" + ci.id;
- }
-
- private boolean removeFiles(String location, long minOpenTxnGLB,
CompactionInfo ci, boolean dropPartition)
- throws Exception {
-
- if (dropPartition) {
- LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE,
DataOperationType.DELETE);
- LockResponse res = null;
-
- try {
- res = txnHandler.lock(lockRequest);
- if (res.getState() == LockState.ACQUIRED) {
- //check if partition wasn't re-created
- if (resolvePartition(ci) == null) {
- return removeFiles(location, ci);
- }
- }
- } catch (NoSuchTxnException | TxnAbortedException e) {
- LOG.error(e.getMessage());
- } finally {
- if (res != null) {
- try {
- txnHandler.unlock(new UnlockRequest(res.getLockid()));
- } catch (NoSuchLockException | TxnOpenException e) {
- LOG.error(e.getMessage());
- }
- }
- }
- }
-
- ValidTxnList validTxnList =
- TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(),
minOpenTxnGLB);
- //save it so that getAcidState() sees it
- conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
- /**
- * {@code validTxnList} is capped by minOpenTxnGLB so if
- * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)}
sees a base/delta
- * produced by a compactor, that means every reader that could be active
right now see it
- * as well. That means if this base/delta shadows some earlier
base/delta, the it will be
- * used in favor of any files that it shadows. Thus the shadowed files
are safe to delete.
- *
- *
- * The metadata about aborted writeIds (and consequently aborted txn IDs)
cannot be deleted
- * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
- * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
- * For example given partition P1, txnid:150 starts and sees txnid:149 as
open.
- * Say compactor runs in txnid:160, but 149 is still open and P1 has the
largest resolved
- * writeId:17. Compactor will produce base_17_c160.
- * Suppose txnid:149 writes delta_18_18
- * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries
- * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and
perhaps corrupted) but
- * not visible based on 'validTxnList' capped at minOpenTxn so it will not
not be cleaned by
- * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so
we must keep the
- * metadata that says that 18 is aborted.
- * In a slightly different case, whatever txn created delta_18 (and all
other txn) may have
- * committed by the time cleaner runs and so cleaner will indeed see
delta_18_18 and remove
- * it (since it has nothing but aborted data). But we can't tell which
actually happened
- * in markCleaned() so make sure it doesn't delete meta above
CG_CQ_HIGHEST_WRITE_ID.
- *
- * We could perhaps make cleaning of aborted and obsolete and remove all
aborted files up
- * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta
can be removed
- * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID.
This could be
- * useful if there is all of a sudden a flood of aborted txns. (For
another day).
- */
-
- // Creating 'reader' list since we are interested in the set of 'obsolete'
files
- ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci,
validTxnList);
- LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
-
- return removeFiles(location, validWriteIdList, ci);
- }
- /**
- * @return true if any files were removed
- */
- private boolean removeFiles(String location, ValidWriteIdList writeIdList,
CompactionInfo ci)
- throws Exception {
- Path path = new Path(location);
- FileSystem fs = path.getFileSystem(conf);
-
- // Collect all of the files/dirs
- Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots =
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
- AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList,
Ref.from(false), false,
- dirSnapshots);
- Table table = computeIfAbsent(ci.getFullTableName(), () ->
resolveTable(ci));
- boolean isDynPartAbort = isDynPartAbort(table, ci);
-
- List<Path> obsoleteDirs = getObsoleteDirs(dir, isDynPartAbort);
- if (isDynPartAbort || dir.hasUncompactedAborts()) {
- ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
- }
- List<Path> deleted = remove(location, ci, obsoleteDirs, true, fs);
- if (dir.getObsolete().size() > 0) {
- AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName,
ci.partName, dir.getObsolete(), conf,
- txnHandler);
- }
- // Make sure there are no leftovers below the compacted watermark
- conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
- dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
- ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId,
Long.MAX_VALUE),
- Ref.from(false), false, dirSnapshots);
-
- List<Path> remained = subtract(getObsoleteDirs(dir, isDynPartAbort),
deleted);
- if (!remained.isEmpty()) {
- LOG.warn(idWatermark(ci) + " Remained " + remained.size() +
- " obsolete directories from " + location + ". " +
getDebugInfo(remained));
- return false;
- }
- LOG.debug(idWatermark(ci) + " All cleared below the watermark: " +
ci.highestWriteId + " from " + location);
- return true;
- }
-
- private List<Path> getObsoleteDirs(AcidDirectory dir, boolean
isDynPartAbort) {
- List<Path> obsoleteDirs = dir.getObsolete();
- /**
- * add anything in 'dir' that only has data from aborted transactions -
no one should be
- * trying to read anything in that dir (except getAcidState() that only
reads the name of
- * this dir itself)
- * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's
ok (suppose there
- * are no active txns when cleaner runs). The key is to not delete
metadata about aborted
- * txns with write IDs > {@link CompactionInfo#highestWriteId}.
- * See {@link TxnStore#markCleaned(CompactionInfo)}
- */
- obsoleteDirs.addAll(dir.getAbortedDirectories());
- if (isDynPartAbort) {
- // In the event of an aborted DP operation, we should only consider the
aborted directories for cleanup.
- // Including obsolete directories for partitioned tables can result in
data loss.
- obsoleteDirs = dir.getAbortedDirectories();
- }
- return obsoleteDirs;
- }
-
- private boolean removeFiles(String location, CompactionInfo ci) throws
IOException, MetaException {
- String strIfPurge = ci.getProperty("ifPurge");
- boolean ifPurge = strIfPurge != null ||
Boolean.parseBoolean(ci.getProperty("ifPurge"));
-
- Path path = new Path(location);
- return !remove(location, ci, Collections.singletonList(path), ifPurge,
- path.getFileSystem(conf)).isEmpty();
- }
-
- private List<Path> remove(String location, CompactionInfo ci, List<Path>
paths, boolean ifPurge, FileSystem fs)
- throws MetaException, IOException {
- List<Path> deleted = new ArrayList<>();
- if (paths.size() < 1) {
- return deleted;
- }
- LOG.info(idWatermark(ci) + " About to remove " + paths.size() +
- " obsolete directories from " + location + ". " + getDebugInfo(paths));
- boolean needCmRecycle;
- try {
- Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf),
ci.dbname);
- needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
- } catch (NoSuchObjectException ex) {
- // can not drop a database which is a source of replication
- needCmRecycle = false;
- }
- for (Path dead : paths) {
- LOG.debug("Going to delete path " + dead.toString());
- if (needCmRecycle) {
- replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE,
ifPurge);
- }
- if (FileUtils.moveToTrash(fs, dead, conf, ifPurge)) {
- deleted.add(dead);
- }
- }
- return deleted;
- }
-
- private String getDebugInfo(List<Path> paths) {
- return "[" +
paths.stream().map(Path::getName).collect(Collectors.joining(",")) + ']';
+ public void setHandlers(List<Handler> handlers) {
Review Comment:
Added default constructors & a constructor which takes in Handlers. Done.
Issue Time Tracking
-------------------
Worklog Id: (was: 845133)
Time Spent: 2h 10m (was: 2h)
> Split Cleaner into separate manageable modular entities
> -------------------------------------------------------
>
> Key: HIVE-27019
> URL: https://issues.apache.org/jira/browse/HIVE-27019
> Project: Hive
> Issue Type: Sub-task
> Reporter: Sourabh Badhya
> Assignee: Sourabh Badhya
> Priority: Major
> Labels: pull-request-available
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> As described by the parent task -
> Cleaner can be divided into separate entities like -
> *1) Handler* - This entity fetches the data from the metastore DB from
> relevant tables and converts it into a request entity called CleaningRequest.
> It would also do SQL operations post cleanup (postprocess). Every type of
> cleaning request is provided by a separate handler.
> *2) Filesystem remover* - This entity fetches the cleaning requests from
> various handlers and deletes them according to the cleaning request.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)