deniskuzZ commented on a change in pull request #2971:
URL: https://github.com/apache/hive/pull/2971#discussion_r793757325
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -434,8 +437,18 @@ private boolean removeFiles(String location,
ValidWriteIdList writeIdList, Compa
return success;
}
- private boolean hasDataBelowWatermark(FileSystem fs, Path path, long
highWatermark) throws IOException {
- FileStatus[] children = fs.listStatus(path);
+ private boolean hasDataBelowWatermark(AcidDirectory acidDir, FileSystem fs,
Path path, long highWatermark)
+ throws IOException {
+ Set<Path> acidPaths = new HashSet<>();
+ for (ParsedDelta delta : acidDir.getCurrentDirectories()) {
+ acidPaths.add(delta.getPath());
+ }
+ if (acidDir.getBaseDirectory() != null) {
+ acidPaths.add(acidDir.getBaseDirectory());
+ }
+ FileStatus[] children = fs.listStatus(path, p -> {
+ return !acidPaths.contains(p);
+ });
for (FileStatus child : children) {
if (isFileBelowWatermark(child, highWatermark)) {
Review comment:
@kgyrtkirk, could you please try the following test case (simulate
legacy behavior when CQ_NEXT_TXN_ID=NULL by commenting out
updateWSCommitIdAndCleanUpMetadata in CompactionTxnHandler):
````
@Test
public void testReady() throws Exception {
String dbName = "default";
String tblName = "trfcp";
String partName = "ds=today";
Table t = newTable(dbName, tblName, true);
Partition p = newPartition(t, "today");
// minor compaction
addBaseFile(t, p, 19L, 19);
addDeltaFile(t, p, 20L, 20L, 1);
addDeltaFile(t, p, 21L, 21L, 1);
addDeltaFile(t, p, 22L, 22L, 1);
burnThroughTransactions(dbName, tblName, 22);
// block cleaner with an open txn
long blockingTxn = openTxn();
CompactionRequest rqst = new CompactionRequest(dbName, tblName,
CompactionType.MINOR);
rqst.setPartitionname(partName);
long ctxnid = compactInTxn(rqst);
addDeltaFile(t, p, 20, 22, 2, ctxnid);
startCleaner();
// make sure cleaner didn't remove anything, and cleaning is still queued
List<Path> paths = getDirectories(conf, t, p);
Assert.assertEquals("Expected 5 files after minor compaction, instead
these files were present " + paths,
5, paths.size());
ShowCompactResponse rsp = txnHandler.showCompact(new
ShowCompactRequest());
Assert.assertEquals("Expected 1 compaction in queue, got: " +
rsp.getCompacts(), 1, rsp.getCompactsSize());
Assert.assertEquals(TxnStore.CLEANING_RESPONSE,
rsp.getCompacts().get(0).getState());
}
````
We cannot simply filter out current and base dirs cleanup of which is
blocked by openTxn. This is breaking the idea implemented in HIVE-24314. If
there is a blocking open txn, retry until it's gone and we could proceed with
cleanup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]