deniskuzZ commented on a change in pull request #2971:
URL: https://github.com/apache/hive/pull/2971#discussion_r793757325
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -434,8 +437,18 @@ private boolean removeFiles(String location,
ValidWriteIdList writeIdList, Compa
return success;
}
- private boolean hasDataBelowWatermark(FileSystem fs, Path path, long
highWatermark) throws IOException {
- FileStatus[] children = fs.listStatus(path);
+ private boolean hasDataBelowWatermark(AcidDirectory acidDir, FileSystem fs,
Path path, long highWatermark)
+ throws IOException {
+ Set<Path> acidPaths = new HashSet<>();
+ for (ParsedDelta delta : acidDir.getCurrentDirectories()) {
+ acidPaths.add(delta.getPath());
+ }
+ if (acidDir.getBaseDirectory() != null) {
+ acidPaths.add(acidDir.getBaseDirectory());
+ }
+ FileStatus[] children = fs.listStatus(path, p -> {
+ return !acidPaths.contains(p);
+ });
for (FileStatus child : children) {
if (isFileBelowWatermark(child, highWatermark)) {
Review comment:
@kgyrtkirk, could you please try the following test case (simulate
legacy behavior when CQ_NEXT_TXN_ID=NULL by commenting out
updateWSCommitIdAndCleanUpMetadata in CompactionTxnHandler):
````
String dbName = "default";
String tblName = "trfcp";
String partName = "ds=today";
Table t = newTable(dbName, tblName, true);
Partition p = newPartition(t, "today");
// block cleaner with an open txn
long blockingTxn = openTxn();
// minor compaction
addBaseFile(t, p, 20L, 20);
addDeltaFile(t, p, 21L, 21L, 1);
addDeltaFile(t, p, 22L, 22L, 1);
burnThroughTransactions(dbName, tblName, 22);
CompactionRequest rqst = new CompactionRequest(dbName, tblName,
CompactionType.MINOR);
rqst.setPartitionname(partName);
compactInTxn(rqst);
addDeltaFile(t, p, 21, 22, 2);
startCleaner();
// make sure cleaner didn't remove anything, and cleaning is still queued
List<Path> paths = getDirectories(conf, t, p);
Assert.assertEquals("Expected 4 files after minor compaction, instead
these files were present " + paths,
4, paths.size());
ShowCompactResponse rsp = txnHandler.showCompact(new
ShowCompactRequest());
Assert.assertEquals("Expected 1 compaction in queue, got: " +
rsp.getCompacts(), 1, rsp.getCompactsSize());
Assert.assertEquals(TxnStore.CLEANING_RESPONSE,
rsp.getCompacts().get(0).getState());
Assert.assertEquals(CompactionType.MINOR,
rsp.getCompacts().get(0).getType());
````
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]