guanziyue commented on code in PR #4913:
URL: https://github.com/apache/hudi/pull/4913#discussion_r1205793997


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java:
##########
@@ -87,18 +88,58 @@ public void tearDown() throws Exception {
 
   @Test
   public void testMarkerBasedRollbackAppend() throws Exception {
+    tearDown();
+    tableType = HoodieTableType.MERGE_ON_READ;
+    setUp();
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
     String f0 = testTable.addRequestedCommit("000")
         .getFileIdsWithBaseFilesInPartitions("partA").get("partA");
     testTable.forCommit("001")
-        .withMarkerFile("partA", f0, IOType.APPEND);
+        .withLogMarkerFile("partA", f0, IOType.CREATE);
 
     HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, 
metaClient);
     List<HoodieRollbackRequest> rollbackRequests = new 
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
         "002").getRollbackRequests(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
"001"));
     assertEquals(1, rollbackRequests.size());
   }
 
+  @Test
+  public void testMarkerBasedRollbackAppendWithLogFileMarkers() throws 
Exception {
+    tearDown();
+    tableType = HoodieTableType.MERGE_ON_READ;
+    setUp();
+    // test append IO TYPE
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    String f0 = testTable.addRequestedCommit("000")
+        .getFileIdWithLogFile("partA");
+    testTable.forCommit("001")
+        .withLogMarkerFile("partA", f0, IOType.APPEND);
+
+    HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, 
metaClient);
+    List<HoodieRollbackRequest> rollbackRequests = new 
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
+        "002").getRollbackRequests(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, 
"001"));
+    assertEquals(1, rollbackRequests.size());
+    HoodieRollbackRequest rollbackRequest = rollbackRequests.get(0);
+    assertEquals("partA", rollbackRequest.getPartitionPath());
+    assertEquals(f0, rollbackRequest.getFileId());
+    assertEquals(0, rollbackRequest.getFilesToBeDeleted().size());
+    assertEquals(1, rollbackRequest.getLogBlocksToBeDeleted().size());
+
+    // test create IO TYPE

Review Comment:
   Fixed in new commit



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -752,6 +755,56 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>> 
twoUpsertCommitDataWithTwoP
     return Pair.of(records, records2);
   }
 
+  /**
+   * Since how markers are generated for log file changed in Version Six, we 
regenerate markers in the way version zero do.
+   *
+   * @param table instance of {@link HoodieTable}
+   */
+  private void prepForUpgradeFromZeroToOne(HoodieTable table) throws 
IOException {
+    List<HoodieInstant> instantsToBeParsed  =
+        metaClient.getActiveTimeline()
+        .getCommitsTimeline()
+        .getInstantsAsStream()
+        .collect(Collectors.toList());
+    for (HoodieInstant instant : instantsToBeParsed) {
+      WriteMarkers writeMarkers =
+          WriteMarkersFactory.get(table.getConfig().getMarkersType(), table, 
instant.getTimestamp());
+      Set<String> oldMarkers = writeMarkers.allMarkerFilePaths();
+      boolean hasAppendMarker = oldMarkers.stream().anyMatch(marker -> 
marker.contains(IOType.APPEND.name())
+          || marker.contains(IOType.CREATE.name()));
+      if (hasAppendMarker) {
+        // delete all markers and regenerate
+        writeMarkers.deleteMarkerDir(table.getContext(), 2);
+        for (String oldMarker : oldMarkers) {
+          String typeStr = oldMarker.substring(oldMarker.lastIndexOf(".") + 1);
+          IOType type = IOType.valueOf(typeStr);
+          String partitionFilePath = WriteMarkers.stripMarkerSuffix(oldMarker);
+          Path fullFilePath = new Path(basePath, partitionFilePath);
+          String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullFilePath.getParent());
+          if (FSUtils.isBaseFile(fullFilePath)) {
+            writeMarkers.create(partitionPath, fullFilePath.getName(), type);
+          } else {
+            String fileId = FSUtils.getFileIdFromFilePath(fullFilePath);
+            String baseInstant = 
FSUtils.getBaseCommitTimeFromLogPath(fullFilePath);
+            String writeToken = FSUtils.getWriteTokenFromLogPath(fullFilePath);
+            writeMarkers.create(partitionPath,
+                FSUtils.makeBaseFileName(baseInstant, writeToken, fileId, 
table.getBaseFileFormat().getFileExtension()), type);
+          }
+        }
+        writeMarkers.allMarkerFilePaths()
+            .forEach(markerPath -> 
assertFalse(markerPath.contains(HoodieLogFile.DELTA_EXTENSION)));
+      }
+    }
+  }
+
+  private void prepForDowngradeFromOneToZero() throws IOException {

Review Comment:
   removed in new commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to