yihua commented on code in PR #14031:
URL: https://github.com/apache/hudi/pull/14031#discussion_r2402702145
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java:
##########
@@ -2506,6 +2506,184 @@ private HoodieWriteStat getHoodieWriteStat(String
partitionPath, String fileId,
return writeStat;
}
+ @Test
+ public void testGetLatestMergedFileSlicesBeforeOrOnIncludingInflight()
throws IOException {
+ String partitionPath = "2023/01/01";
+ String fileId = UUID.randomUUID().toString();
+ String instantTime1 = "001";
+ String instantTime2 = "002";
+ String compactionInstant = "003";
+ String instantTime4 = "004";
+ String instantTime5 = "005";
+
+ new File(basePath + "/" + partitionPath).mkdirs();
+ HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+
+ // Create base file at instant1
+ String baseFileName = FSUtils.makeBaseFileName(instantTime1,
TEST_WRITE_TOKEN, fileId, BASE_FILE_EXTENSION);
+ new File(basePath + "/" + partitionPath + "/" +
baseFileName).createNewFile();
+ HoodieInstant instant1 =
INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, instantTime1);
+ saveAsComplete(commitTimeline, instant1, new HoodieCommitMetadata());
+
+ // Add log files at instant2
+ HoodieInstant instant2 =
INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, instantTime2);
+ String logFileName2 = FSUtils.makeLogFileName(fileId,
HoodieLogFile.DELTA_EXTENSION, instantTime2, 1, TEST_WRITE_TOKEN);
+ new File(basePath + "/" + partitionPath + "/" +
logFileName2).createNewFile();
+ saveAsComplete(commitTimeline, instant2, new HoodieCommitMetadata());
+
+ refreshFsView();
+
+ List<FileSlice> actual =
fsView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, instantTime2)
+ .collect(Collectors.toList());
+ assertEquals(1, actual.size());
+ FileSlice fileSlice = actual.get(0);
+ assertTrue(fileSlice.getBaseFile().isPresent());
+ assertEquals(baseFileName, fileSlice.getBaseFile().get().getFileName());
+ assertEquals(1, fileSlice.getLogFiles().count());
+ assertEquals(logFileName2,
fileSlice.getLogFiles().collect(Collectors.toList()).get(0).getFileName());
+
+ actual =
fsView.getLatestMergedFileSlicesBeforeOrOnIncludingInflight(partitionPath,
instantTime2, instantTime2)
+ .collect(Collectors.toList());
+ assertEquals(1, actual.size());
+ fileSlice = actual.get(0);
+ assertTrue(fileSlice.getBaseFile().isPresent());
+ assertEquals(baseFileName, fileSlice.getBaseFile().get().getFileName());
+ assertEquals(1, fileSlice.getLogFiles().count());
+ assertEquals(logFileName2,
fileSlice.getLogFiles().collect(Collectors.toList()).get(0).getFileName());
+
+ // Schedule compaction at instant3
+ List<FileSlice> fileSlices =
rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
+ List<Pair<String, FileSlice>> partitionFileSlicesPairs = new ArrayList<>();
+ partitionFileSlicesPairs.add(Pair.of(partitionPath, fileSlices.get(0)));
+ HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(
+ partitionFileSlicesPairs, Option.empty(), Option.empty());
+ HoodieInstant compactionRequestedInstant =
INSTANT_GENERATOR.createNewInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, compactionInstant);
+ commitTimeline.createNewInstant(compactionRequestedInstant);
+ commitTimeline.saveToCompactionRequested(compactionRequestedInstant,
compactionPlan);
+ commitTimeline.transitionRequestedToInflight(compactionRequestedInstant,
Option.empty());
+ String baseFileName2 = FSUtils.makeBaseFileName(compactionInstant,
TEST_WRITE_TOKEN, fileId, BASE_FILE_EXTENSION);
+ new File(basePath + "/" + partitionPath + "/" +
baseFileName2).createNewFile();
+
+ // Add log files at instant4
+ HoodieInstant instant4 =
INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, instantTime4);
+ String logFileName4 = FSUtils.makeLogFileName(fileId,
HoodieLogFile.DELTA_EXTENSION, instantTime4, 1, TEST_WRITE_TOKEN);
+ new File(basePath + "/" + partitionPath + "/" +
logFileName4).createNewFile();
+ saveAsComplete(commitTimeline, instant4, new HoodieCommitMetadata());
+
+ // Add log files at instant5
+ String logFileName5 = FSUtils.makeLogFileName(fileId,
HoodieLogFile.DELTA_EXTENSION, instantTime5, 1, TEST_WRITE_TOKEN);
+ new File(basePath + "/" + partitionPath + "/" +
logFileName5).createNewFile();
+
commitTimeline.createNewInstant(INSTANT_GENERATOR.createNewInstant(State.REQUESTED,
HoodieTimeline.DELTA_COMMIT_ACTION, instantTime5));
+
commitTimeline.createNewInstant(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, instantTime5));
+
+ refreshFsView();
+ actual = fsView.getLatestMergedFileSlicesBeforeOrOn(partitionPath,
instantTime5)
+ .collect(Collectors.toList());
+ assertEquals(1, actual.size());
+ fileSlice = actual.get(0);
+ assertTrue(fileSlice.getBaseFile().isPresent());
+ assertEquals(baseFileName, fileSlice.getBaseFile().get().getFileName());
+ assertEquals(2, fileSlice.getLogFiles().count());
+ assertEquals(
+ Arrays.asList(logFileName2, logFileName4),
+
fileSlice.getLogFiles().map(HoodieLogFile::getFileName).sorted().collect(Collectors.toList()));
+
+ actual =
fsView.getLatestMergedFileSlicesBeforeOrOnIncludingInflight(partitionPath,
instantTime5, instantTime5)
+ .collect(Collectors.toList());
+ assertEquals(1, actual.size());
+ fileSlice = actual.get(0);
+ assertTrue(fileSlice.getBaseFile().isPresent());
+ assertEquals(baseFileName, fileSlice.getBaseFile().get().getFileName());
+ assertEquals(3, fileSlice.getLogFiles().count());
+ assertEquals(
+ Arrays.asList(logFileName2, logFileName4, logFileName5),
+
fileSlice.getLogFiles().map(HoodieLogFile::getFileName).sorted().collect(Collectors.toList()));
+
+ actual =
fsView.getLatestMergedFileSlicesBeforeOrOnIncludingInflight(partitionPath,
instantTime5, compactionInstant)
+ .collect(Collectors.toList());
+ assertEquals(1, actual.size());
+ fileSlice = actual.get(0);
+ assertTrue(fileSlice.getBaseFile().isPresent());
+ assertEquals(baseFileName2, fileSlice.getBaseFile().get().getFileName());
+ assertEquals(2, fileSlice.getLogFiles().count());
+ assertEquals(
+ Arrays.asList(logFileName4, logFileName5),
+
fileSlice.getLogFiles().map(HoodieLogFile::getFileName).sorted().collect(Collectors.toList()));
+
+
commitTimeline.saveAsComplete(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, compactionInstant), Option.empty());
Review Comment:
Fixed alongside removing a failing test that is redundant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]