nbalajee commented on code in PR #18279:
URL: https://github.com/apache/hudi/pull/18279#discussion_r3048090602


##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackHelper.java:
##########
@@ -498,5 +450,369 @@ private void 
validateStateAfterRollback(List<SerializableHoodieRollbackRequest>
       }
     });
   }
+
+  @Test
+  void testPreComputeLogVersionsListsOncePerPartition() throws Exception {
+    when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.SIX);
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+    String baseInstant = "001";
+
+    // fileId-001 in partition1 with three log file versions
+    createLogFilesToRollback(partition1, "fileId-001", baseInstant, 
IntStream.rangeClosed(1, 3), 10L);
+    // fileId-002 in partition1 with one log file version
+    createLogFilesToRollback(partition1, "fileId-002", baseInstant, 
IntStream.of(1), 10L);
+    // fileId-003 in partition2 with two log file versions
+    createLogFilesToRollback(partition2, "fileId-003", baseInstant, 
IntStream.rangeClosed(1, 2), 10L);
+
+    Map<String, Long> dummyLogBlocks = new HashMap<>();
+    dummyLogBlocks.put("dummyLogPath", 100L);
+
+    List<SerializableHoodieRollbackRequest> requests = new ArrayList<>();
+    requests.add(new SerializableHoodieRollbackRequest(
+        HoodieRollbackRequest.newBuilder()
+            
.setPartitionPath(partition1).setFileId("fileId-001").setLatestBaseInstant(baseInstant)
+            
.setFilesToBeDeleted(Collections.emptyList()).setLogBlocksToBeDeleted(dummyLogBlocks).build()));
+    requests.add(new SerializableHoodieRollbackRequest(
+        HoodieRollbackRequest.newBuilder()
+            
.setPartitionPath(partition1).setFileId("fileId-002").setLatestBaseInstant(baseInstant)
+            
.setFilesToBeDeleted(Collections.emptyList()).setLogBlocksToBeDeleted(dummyLogBlocks).build()));
+    requests.add(new SerializableHoodieRollbackRequest(
+        HoodieRollbackRequest.newBuilder()
+            
.setPartitionPath(partition2).setFileId("fileId-003").setLatestBaseInstant(baseInstant)
+            
.setFilesToBeDeleted(Collections.emptyList()).setLogBlocksToBeDeleted(dummyLogBlocks).build()));
+
+    HoodieStorage spiedStorage = Mockito.spy(storage);
+    when(metaClient.getStorage()).thenReturn(spiedStorage);
+
+    RollbackHelper helper = new RollbackHelper(table, config);
+    Map<String, Pair<Integer, String>> result = 
helper.preComputeLogVersions(requests);
+
+    // Two listing calls: one per partition (not one per log file)
+    Mockito.verify(spiedStorage, Mockito.times(2))
+        .listDirectEntries(Mockito.any(StoragePath.class), Mockito.any());
+
+    assertEquals(3, result.size());
+    assertEquals(3, (int) 
result.get(RollbackHelper.logVersionLookupKey(partition1, "fileId-001", 
baseInstant)).getLeft());
+    assertEquals(1, (int) 
result.get(RollbackHelper.logVersionLookupKey(partition1, "fileId-002", 
baseInstant)).getLeft());
+    assertEquals(2, (int) 
result.get(RollbackHelper.logVersionLookupKey(partition2, "fileId-003", 
baseInstant)).getLeft());
+  }
+
+  @Test
+  void testPreComputeLogVersionsEmptyWhenNoLogBlockRequests() throws Exception 
{
+    when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.SIX);
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+
+    List<SerializableHoodieRollbackRequest> requests = new ArrayList<>();
+    requests.add(new SerializableHoodieRollbackRequest(
+        HoodieRollbackRequest.newBuilder()
+            
.setPartitionPath(partition1).setFileId("fileId-001").setLatestBaseInstant("001")
+            
.setFilesToBeDeleted(Collections.singletonList("/some/file/to/delete"))
+            .setLogBlocksToBeDeleted(Collections.emptyMap()).build()));
+    requests.add(new SerializableHoodieRollbackRequest(
+        HoodieRollbackRequest.newBuilder()
+            
.setPartitionPath(partition2).setFileId("fileId-002").setLatestBaseInstant("001")
+            .setFilesToBeDeleted(Collections.singletonList("/another/file"))
+            .setLogBlocksToBeDeleted(Collections.emptyMap()).build()));
+
+    HoodieStorage spiedStorage = Mockito.spy(storage);
+    when(metaClient.getStorage()).thenReturn(spiedStorage);
+
+    RollbackHelper helper = new RollbackHelper(table, config);
+    Map<String, Pair<Integer, String>> result = 
helper.preComputeLogVersions(requests);
+
+    assertTrue(result.isEmpty());
+    Mockito.verify(spiedStorage, Mockito.times(0))
+        .listDirectEntries(Mockito.any(StoragePath.class), Mockito.any());
+  }
+
+  @Test
+  void testMaybeDeleteAndCollectStatsDoDeleteFalseForLogBlocks() throws 
IOException {
+    when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.SIX);
+    String rollbackInstantTime = "003";
+    String instantToRollback = "002";
+    RollbackHelperV1 rollbackHelper = new RollbackHelperV1(table, config);
+
+    List<SerializableHoodieRollbackRequest> rollbackRequests = new 
ArrayList<>();
+    String baseInstantTimeOfLogFiles = "001";
+    String partition = "partition1";
+    String logFileId = UUID.randomUUID().toString();
+    Map<String, Long> logFilesToRollback = addRollbackRequestForLogFiles(
+        rollbackRequests, HoodieTableVersion.SIX, partition, logFileId,
+        baseInstantTimeOfLogFiles, IntStream.range(1, 5));
+
+    setupMocksAndValidateInitialState(rollbackInstantTime, rollbackRequests);
+    List<Pair<String, HoodieRollbackStat>> rollbackStats = 
rollbackHelper.maybeDeleteAndCollectStats(
+        new HoodieLocalEngineContext(storage.getConf()),
+        rollbackInstantTime,
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
+            HoodieTimeline.DELTA_COMMIT_ACTION, instantToRollback),
+        rollbackRequests, false, 5);
+
+    StoragePath partitionStoragePath = new StoragePath(basePath, partition);
+    for (String logFileName : logFilesToRollback.keySet()) {
+      assertTrue(storage.exists(new StoragePath(partitionStoragePath, 
logFileName)));
+    }
+
+    StoragePath rollbackLogPath = new StoragePath(partitionStoragePath,
+        FileCreateUtils.logFileName(baseInstantTimeOfLogFiles, logFileId, 4));
+    List<Pair<String, HoodieRollbackStat>> expected = 
Collections.singletonList(
+        Pair.of(partition,
+            HoodieRollbackStat.newBuilder()
+                .withPartitionPath(partition)
+                .withRollbackBlockAppendResults(Collections.singletonMap(
+                    storage.getPathInfo(rollbackLogPath), 1L))
+                .withLogFilesFromFailedCommit(logFilesToRollback)
+                .build()));
+    assertRollbackStatsEquals(expected, rollbackStats);
+  }
+
+  @Test
+  void testPreComputeLogVersionsSkipsInvalidLogFiles() throws Exception {
+    when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.SIX);
+    String partition = "partition1";
+    String baseInstant = "001";
+
+    createLogFilesToRollback(partition, "fileId-001", baseInstant,
+        IntStream.rangeClosed(1, 2), 10L);
+    StoragePath partitionPath = new StoragePath(basePath, partition);
+    storage.create(new StoragePath(partitionPath, 
"invalid-file.log.backup")).close();
+
+    Map<String, Long> dummyLogBlocks = new HashMap<>();
+    dummyLogBlocks.put("dummyLogPath", 100L);
+
+    List<SerializableHoodieRollbackRequest> requests = 
Collections.singletonList(
+        new SerializableHoodieRollbackRequest(
+            HoodieRollbackRequest.newBuilder()
+                .setPartitionPath(partition).setFileId("fileId-001")
+                .setLatestBaseInstant(baseInstant)
+                .setFilesToBeDeleted(Collections.emptyList())
+                .setLogBlocksToBeDeleted(dummyLogBlocks).build()));
+
+    RollbackHelper helper = new RollbackHelper(table, config);
+    Map<String, Pair<Integer, String>> result = 
helper.preComputeLogVersions(requests);
+
+    assertEquals(1, result.size());
+    assertEquals(2,
+        (int) result.get(RollbackHelper.logVersionLookupKey(partition, 
"fileId-001", baseInstant)).getLeft());
+  }
+
+  @Test
+  void testPreComputeLogVersionsFallsBackOnIOException() throws Exception {
+    when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.SIX);
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+    String baseInstant = "001";
+
+    createLogFilesToRollback(partition1, "fileId-001", baseInstant,
+        IntStream.rangeClosed(1, 3), 10L);
+    createLogFilesToRollback(partition2, "fileId-002", baseInstant,
+        IntStream.of(1), 10L);
+
+    Map<String, Long> dummyLogBlocks = new HashMap<>();
+    dummyLogBlocks.put("dummyLogPath", 100L);
+
+    List<SerializableHoodieRollbackRequest> requests = new ArrayList<>();
+    requests.add(new SerializableHoodieRollbackRequest(
+        HoodieRollbackRequest.newBuilder()
+            .setPartitionPath(partition1).setFileId("fileId-001")
+            .setLatestBaseInstant(baseInstant)
+            .setFilesToBeDeleted(Collections.emptyList())
+            .setLogBlocksToBeDeleted(dummyLogBlocks).build()));
+    requests.add(new SerializableHoodieRollbackRequest(
+        HoodieRollbackRequest.newBuilder()
+            .setPartitionPath(partition2).setFileId("fileId-002")
+            .setLatestBaseInstant(baseInstant)
+            .setFilesToBeDeleted(Collections.emptyList())
+            .setLogBlocksToBeDeleted(dummyLogBlocks).build()));
+
+    HoodieStorage spiedStorage = Mockito.spy(storage);
+    when(metaClient.getStorage()).thenReturn(spiedStorage);
+    StoragePath partition2AbsPath = FSUtils.constructAbsolutePath(basePath, 
partition2);
+    Mockito.doThrow(new IOException("Simulated listing failure"))
+        .when(spiedStorage)
+        .listDirectEntries(Mockito.eq(partition2AbsPath), Mockito.any());
+
+    RollbackHelper helper = new RollbackHelper(table, config);
+    Map<String, Pair<Integer, String>> result = 
helper.preComputeLogVersions(requests);
+
+    assertEquals(1, result.size());
+    assertEquals(3,
+        (int) result.get(RollbackHelper.logVersionLookupKey(partition1, 
"fileId-001", baseInstant)).getLeft());
+    assertFalse(result.containsKey(
+        RollbackHelper.logVersionLookupKey(partition2, "fileId-002", 
baseInstant)));
+  }
+
+  @Test
+  void testV1MaybeDeleteAndCollectStatsWithMultipleRequestsPerFileGroup() 
throws IOException {
+    HoodieTableVersion tableVersion = HoodieTableVersion.SIX;
+    when(tableConfig.getTableVersion()).thenReturn(tableVersion);
+    String rollbackInstantTime = "003";
+    String instantToRollback = "002";
+    RollbackHelperV1 rollbackHelper = new RollbackHelperV1(table, config);
+
+    List<SerializableHoodieRollbackRequest> rollbackRequests = new 
ArrayList<>();
+    String baseInstantTimeOfLogFiles = "001";
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+    String baseFileId1 = UUID.randomUUID().toString();
+    String baseFileId2 = UUID.randomUUID().toString();
+    String baseFileId3 = UUID.randomUUID().toString();
+    String logFileId1 = UUID.randomUUID().toString();
+    String logFileId2 = UUID.randomUUID().toString();
+    StoragePath baseFilePath1 = 
addRollbackRequestForBaseFile(rollbackRequests, partition1, baseFileId1, 
instantToRollback);
+    StoragePath baseFilePath2 = 
addRollbackRequestForBaseFile(rollbackRequests, partition2, baseFileId2, 
instantToRollback);
+    StoragePath baseFilePath3 = 
addRollbackRequestForBaseFile(rollbackRequests, partition2, baseFileId3, 
instantToRollback);
+    Map<String, Long> logFilesToRollback1 = addRollbackRequestForLogFiles(
+        rollbackRequests, tableVersion, partition2, logFileId1, 
baseInstantTimeOfLogFiles, IntStream.of(1));
+    Map<String, Long> logFilesToRollback2 = IntStream.range(1, 
ROLLBACK_LOG_VERSION).boxed()
+        .flatMap(version -> addRollbackRequestForLogFiles(
+            rollbackRequests, tableVersion, partition2, logFileId2, 
baseInstantTimeOfLogFiles, IntStream.of(version))
+            .entrySet().stream())
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    rollbackRequests.add(new SerializableHoodieRollbackRequest(
+        HoodieRollbackRequest.newBuilder()
+            .setPartitionPath(partition2)
+            .setFileId(baseFileId3)
+            .setLatestBaseInstant(instantToRollback)
+            .setFilesToBeDeleted(Collections.emptyList())
+            .setLogBlocksToBeDeleted(Collections.emptyMap()).build()));
+
+    setupMocksAndValidateInitialState(rollbackInstantTime, rollbackRequests);
+    List<Pair<String, HoodieRollbackStat>> rollbackStats = 
rollbackHelper.maybeDeleteAndCollectStats(
+        new HoodieLocalEngineContext(storage.getConf()),
+        rollbackInstantTime,
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, instantToRollback),
+        rollbackRequests, true, 5);
+    validateStateAfterRollback(rollbackRequests);
+
+    StoragePath rollbackLogPath1 = new StoragePath(new StoragePath(basePath, 
partition2),
+        FileCreateUtils.logFileName(baseInstantTimeOfLogFiles, logFileId1, 2));
+    StoragePath rollbackLogPath2 = new StoragePath(new StoragePath(basePath, 
partition2),
+        FileCreateUtils.logFileName(baseInstantTimeOfLogFiles, logFileId2, 
ROLLBACK_LOG_VERSION));
+
+    List<Pair<String, HoodieRollbackStat>> expected = new ArrayList<>();
+    expected.add(Pair.of(partition1,
+        HoodieRollbackStat.newBuilder()
+            .withPartitionPath(partition1)
+            .withDeletedFileResult(baseFilePath1.toString(), true)
+            .build()));
+    expected.add(Pair.of(partition2,
+        HoodieRollbackStat.newBuilder()
+            .withPartitionPath(partition2)
+            .withDeletedFileResult(baseFilePath2.toString(), true)
+            .build()));
+    expected.add(Pair.of(partition2,
+        HoodieRollbackStat.newBuilder()
+            .withPartitionPath(partition2)
+            .withDeletedFileResult(baseFilePath3.toString(), true)
+            .build()));
+    expected.add(Pair.of(partition2,
+        HoodieRollbackStat.newBuilder()
+            .withPartitionPath(partition2)
+            .withRollbackBlockAppendResults(Collections.singletonMap(
+                new StoragePathInfo(rollbackLogPath1, 0L, false, (short) 0, 0, 
0), 1L))
+            .withLogFilesFromFailedCommit(logFilesToRollback1)
+            .build()));
+    expected.add(Pair.of(partition2,
+        HoodieRollbackStat.newBuilder()
+            .withPartitionPath(partition2)
+            .withRollbackBlockAppendResults(Collections.singletonMap(
+                new StoragePathInfo(rollbackLogPath2, 0L, false, (short) 0, 0, 
0), 1L))
+            .withLogFilesFromFailedCommit(logFilesToRollback2)
+            .build()));
+    expected.add(Pair.of(partition2,
+        HoodieRollbackStat.newBuilder()
+            .withPartitionPath(partition2)
+            .build()));
+    assertRollbackStatsEquals(expected, rollbackStats);
+  }
+
+  @Test
+  void testV1MaybeDeleteAndCollectStatsWithSingleRequestPerFileGroup() throws 
IOException {

Review Comment:
   Refactored by extracting the common functionality and using it for V6, V8 
tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to