yihua commented on code in PR #18279:
URL: https://github.com/apache/hudi/pull/18279#discussion_r3048902200


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java:
##########
@@ -168,6 +255,23 @@ public List<HoodieRollbackStat> 
performRollback(HoodieEngineContext context, Str
     return addLogFilesFromPreviousFailedRollbacksToStat(context, 
mergedRollbackStatByPartitionPath, logPaths);
   }
 
+  /**
+   * Collect all file info that needs to be rolled back, using the V1-specific
+   * 6-param {@code maybeDeleteAndCollectStats} so V6 log-block requests are 
handled correctly.
+   */
+  @Override
+  public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext 
context, HoodieInstant instantToRollback,
+                                                       
List<HoodieRollbackRequest> rollbackRequests) {
+    int parallelism = Math.max(Math.min(rollbackRequests.size(), 
config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback 
stats: " + config.getTableName());
+    List<SerializableHoodieRollbackRequest> serializableRequests = 
rollbackRequests.stream()

Review Comment:
   🤖 This passes `EMPTY_STRING` as the `instantTime` for 
`maybeDeleteAndCollectStats`, which gets used to construct `WriteMarkers` (line 
~207). When `doDelete=false` but log block requests exist, the writer is still 
built (creating a log file), and the `LogFileCreationCallback` fires, creating 
a marker under an empty instant time directory. The old parent-class 
`collectRollbackStats` called the 5-param overload that had no marker logic at 
all. Could you pass `instantToRollback.requestedTime()` here instead, or skip 
marker creation when `doDelete=false`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java:
##########
@@ -80,6 +81,92 @@ public RollbackHelperV1(HoodieTable table, HoodieWriteConfig 
config) {
     super(table, config);
   }
 
+  /**
+   * Builds the lookup key for pre-computed log versions.
+   */
+  static String logVersionLookupKey(String partitionPath, String fileId, 
String commitTime) {
+    return partitionPath + "|" + fileId + "|" + commitTime;
+  }
+
+  /**
+   * Pre-compute the latest log version for each (partition, fileId, 
deltaCommitTime) tuple
+   * by listing each unique partition directory once. This replaces N 
per-request listing
+   * calls (one per rollback request) with P per-partition listings (where P 
is much less than N).
+   *
+   * <p>For file groups with no existing log files in a successfully listed 
partition, a sentinel
+   * of (LOGFILE_BASE_VERSION, UNKNOWN_WRITE_TOKEN) is inserted so the caller 
avoids a redundant
+   * per-request listing. If a partition listing fails (IOException), no 
sentinels are inserted
+   * and the caller falls back to per-request listing naturally.
+   */
+  Map<String, Pair<Integer, String>> preComputeLogVersions(
+      List<SerializableHoodieRollbackRequest> rollbackRequests) {
+    List<SerializableHoodieRollbackRequest> logBlockRequests = 
rollbackRequests.stream()
+        .filter(req -> !req.getLogBlocksToBeDeleted().isEmpty())
+        .collect(Collectors.toList());
+
+    if (logBlockRequests.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Map<String, Set<String>> expectedKeysByPartition = new HashMap<>();
+    for (SerializableHoodieRollbackRequest req : logBlockRequests) {
+      String key = logVersionLookupKey(req.getPartitionPath(), 
req.getFileId(), req.getLatestBaseInstant());
+      expectedKeysByPartition.computeIfAbsent(req.getPartitionPath(), k -> new 
HashSet<>()).add(key);
+    }
+
+    log.info("Pre-computing log versions for {} partition(s) to avoid 
per-request listStatus calls",
+        expectedKeysByPartition.size());
+
+    Map<String, Pair<Integer, String>> logVersionMap = new HashMap<>();
+
+    for (Map.Entry<String, Set<String>> entry : 
expectedKeysByPartition.entrySet()) {
+      String relPartPath = entry.getKey();
+      Set<String> expectedKeys = entry.getValue();
+      StoragePath absolutePartPath = 
FSUtils.constructAbsolutePath(metaClient.getBasePath(), relPartPath);
+      try {
+        List<StoragePathInfo> statuses = 
metaClient.getStorage().listDirectEntries(absolutePartPath,
+            path -> path.getName().contains(HoodieLogFile.DELTA_EXTENSION));
+
+        for (StoragePathInfo status : statuses) {
+          try {

Review Comment:
   🤖 The listing filter uses `contains(HoodieLogFile.DELTA_EXTENSION)` which 
matches any file with `.log` anywhere in its name (e.g., `.log.crc` checksum 
files). The `InvalidHoodiePathException` catch handles most of these, but it 
adds unnecessary warn-level log noise. Have you considered using a stricter 
filter like checking the extension pattern (e.g., `.log.` prefix after the file 
ID) to reduce false matches?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java:
##########
@@ -146,14 +151,25 @@ List<Pair<String, HoodieRollbackStat>> 
maybeDeleteAndCollectStats(HoodieEngineCo
                   )
               .withStorage(metaClient.getStorage())
               .withTableVersion(tableVersion)
-              .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+              .withFileExtension(HoodieLogFile.DELTA_EXTENSION);
+
+          String logVersionKey = 
logVersionLookupKey(rollbackRequest.getPartitionPath(), fileId, 
rollbackRequest.getLatestBaseInstant());
+          Pair<Integer, String> preComputedVersion = 
logVersionMap.get(logVersionKey);
+          if (preComputedVersion != null) {
+            writerBuilder.withLogVersion(preComputedVersion.getLeft())
+                .withLogWriteToken(preComputedVersion.getRight());
+          }

Review Comment:
   🤖 The sentinel `(LOGFILE_BASE_VERSION, UNKNOWN_WRITE_TOKEN)` already handles 
this case — it's inserted via `putIfAbsent` at line 360 for any expected key 
not found by the listing. So a missing log file correctly gets 
`UNKNOWN_WRITE_TOKEN`, matching the original `build()` fallback behavior when 
`FSUtils.getLatestLogVersion()` returns empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to