guanziyue commented on code in PR #4913:
URL: https://github.com/apache/hudi/pull/4913#discussion_r1205793997
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java:
##########
@@ -87,18 +88,58 @@ public void tearDown() throws Exception {
@Test
public void testMarkerBasedRollbackAppend() throws Exception {
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
+ setUp();
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f0 = testTable.addRequestedCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
testTable.forCommit("001")
- .withMarkerFile("partA", f0, IOType.APPEND);
+ .withLogMarkerFile("partA", f0, IOType.CREATE);
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
List<HoodieRollbackRequest> rollbackRequests = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
"002").getRollbackRequests(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
"001"));
assertEquals(1, rollbackRequests.size());
}
+ @Test
+ public void testMarkerBasedRollbackAppendWithLogFileMarkers() throws
Exception {
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
+ setUp();
+ // test append IO TYPE
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ String f0 = testTable.addRequestedCommit("000")
+ .getFileIdWithLogFile("partA");
+ testTable.forCommit("001")
+ .withLogMarkerFile("partA", f0, IOType.APPEND);
+
+ HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
+ List<HoodieRollbackRequest> rollbackRequests = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
+ "002").getRollbackRequests(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
"001"));
+ assertEquals(1, rollbackRequests.size());
+ HoodieRollbackRequest rollbackRequest = rollbackRequests.get(0);
+ assertEquals("partA", rollbackRequest.getPartitionPath());
+ assertEquals(f0, rollbackRequest.getFileId());
+ assertEquals(0, rollbackRequest.getFilesToBeDeleted().size());
+ assertEquals(1, rollbackRequest.getLogBlocksToBeDeleted().size());
+
+ // test create IO TYPE
Review Comment:
Fixed in new commit
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -752,6 +755,56 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>>
twoUpsertCommitDataWithTwoP
return Pair.of(records, records2);
}
+ /**
+ * Since how markers are generated for log file changed in Version Six, we
regenerate markers in the way version zero do.
+ *
+ * @param table instance of {@link HoodieTable}
+ */
+ private void prepForUpgradeFromZeroToOne(HoodieTable table) throws
IOException {
+ List<HoodieInstant> instantsToBeParsed =
+ metaClient.getActiveTimeline()
+ .getCommitsTimeline()
+ .getInstantsAsStream()
+ .collect(Collectors.toList());
+ for (HoodieInstant instant : instantsToBeParsed) {
+ WriteMarkers writeMarkers =
+ WriteMarkersFactory.get(table.getConfig().getMarkersType(), table,
instant.getTimestamp());
+ Set<String> oldMarkers = writeMarkers.allMarkerFilePaths();
+ boolean hasAppendMarker = oldMarkers.stream().anyMatch(marker ->
marker.contains(IOType.APPEND.name())
+ || marker.contains(IOType.CREATE.name()));
+ if (hasAppendMarker) {
+ // delete all markers and regenerate
+ writeMarkers.deleteMarkerDir(table.getContext(), 2);
+ for (String oldMarker : oldMarkers) {
+ String typeStr = oldMarker.substring(oldMarker.lastIndexOf(".") + 1);
+ IOType type = IOType.valueOf(typeStr);
+ String partitionFilePath = WriteMarkers.stripMarkerSuffix(oldMarker);
+ Path fullFilePath = new Path(basePath, partitionFilePath);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullFilePath.getParent());
+ if (FSUtils.isBaseFile(fullFilePath)) {
+ writeMarkers.create(partitionPath, fullFilePath.getName(), type);
+ } else {
+ String fileId = FSUtils.getFileIdFromFilePath(fullFilePath);
+ String baseInstant =
FSUtils.getBaseCommitTimeFromLogPath(fullFilePath);
+ String writeToken = FSUtils.getWriteTokenFromLogPath(fullFilePath);
+ writeMarkers.create(partitionPath,
+ FSUtils.makeBaseFileName(baseInstant, writeToken, fileId,
table.getBaseFileFormat().getFileExtension()), type);
+ }
+ }
+ writeMarkers.allMarkerFilePaths()
+ .forEach(markerPath ->
assertFalse(markerPath.contains(HoodieLogFile.DELTA_EXTENSION)));
+ }
+ }
+ }
+
+ private void prepForDowngradeFromOneToZero() throws IOException {
Review Comment:
removed in new commit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]