yihua commented on code in PR #18279:
URL: https://github.com/apache/hudi/pull/18279#discussion_r3048841555
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java:
##########
@@ -168,6 +255,23 @@ public List<HoodieRollbackStat>
performRollback(HoodieEngineContext context, Str
return addLogFilesFromPreviousFailedRollbacksToStat(context,
mergedRollbackStatByPartitionPath, logPaths);
}
+ /**
+ * Collect all file info that needs to be rolled back, using the V1-specific
+ * 6-param {@code maybeDeleteAndCollectStats} so V6 log-block requests are
handled correctly.
+ */
+ @Override
+ public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext
context, HoodieInstant instantToRollback,
+
List<HoodieRollbackRequest> rollbackRequests) {
+ int parallelism = Math.max(Math.min(rollbackRequests.size(),
config.getRollbackParallelism()), 1);
+ context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback
stats: " + config.getTableName());
+ List<SerializableHoodieRollbackRequest> serializableRequests =
rollbackRequests.stream()
+
.map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
+ return context.reduceByKey(
+ maybeDeleteAndCollectStats(context, EMPTY_STRING, instantToRollback,
+ serializableRequests, false, parallelism),
+ RollbackUtils::mergeRollbackStat, parallelism);
+ }
+
/**
Review Comment:
🤖 Passing `EMPTY_STRING` as `instantTime` here flows into
`WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)` at line
207 of `maybeDeleteAndCollectStats`. Even with `doDelete=false`, the writer's
`build()` triggers the `FileCreationCallback` which creates a marker under
`.hoodie/.temp/<instantTime>/...`. With an empty string, markers end up in a
malformed directory. Before this PR, the base class's `collectRollbackStats`
called the 5-param `maybeDeleteAndCollectStats` which had no `WriteMarkers` at
all, so this is a behavioral regression. Could you either skip the callback
when `doDelete=false`, or pass a meaningful instant time here?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java:
##########
@@ -80,6 +81,92 @@ public RollbackHelperV1(HoodieTable table, HoodieWriteConfig
config) {
super(table, config);
}
+ /**
+ * Builds the lookup key for pre-computed log versions.
+ */
+ static String logVersionLookupKey(String partitionPath, String fileId,
String commitTime) {
+ return partitionPath + "|" + fileId + "|" + commitTime;
+ }
+
+ /**
+ * Pre-compute the latest log version for each (partition, fileId,
deltaCommitTime) tuple
+ * by listing each unique partition directory once. This replaces N
per-request listing
+ * calls (one per rollback request) with P per-partition listings (where P
is much less than N).
+ *
+ * <p>For file groups with no existing log files in a successfully listed
partition, a sentinel
+ * of (LOGFILE_BASE_VERSION, UNKNOWN_WRITE_TOKEN) is inserted so the caller
avoids a redundant
+ * per-request listing. If a partition listing fails (IOException), no
sentinels are inserted
+ * and the caller falls back to per-request listing naturally.
+ */
+ Map<String, Pair<Integer, String>> preComputeLogVersions(
+ List<SerializableHoodieRollbackRequest> rollbackRequests) {
+ List<SerializableHoodieRollbackRequest> logBlockRequests =
rollbackRequests.stream()
+ .filter(req -> !req.getLogBlocksToBeDeleted().isEmpty())
+ .collect(Collectors.toList());
+
+ if (logBlockRequests.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Map<String, Set<String>> expectedKeysByPartition = new HashMap<>();
+ for (SerializableHoodieRollbackRequest req : logBlockRequests) {
+ String key = logVersionLookupKey(req.getPartitionPath(),
req.getFileId(), req.getLatestBaseInstant());
+ expectedKeysByPartition.computeIfAbsent(req.getPartitionPath(), k -> new
HashSet<>()).add(key);
+ }
+
+ log.info("Pre-computing log versions for {} partition(s) to avoid
per-request listStatus calls",
+ expectedKeysByPartition.size());
+
+ Map<String, Pair<Integer, String>> logVersionMap = new HashMap<>();
+
+ for (Map.Entry<String, Set<String>> entry :
expectedKeysByPartition.entrySet()) {
+ String relPartPath = entry.getKey();
+ Set<String> expectedKeys = entry.getValue();
+ StoragePath absolutePartPath =
FSUtils.constructAbsolutePath(metaClient.getBasePath(), relPartPath);
+ try {
+ List<StoragePathInfo> statuses =
metaClient.getStorage().listDirectEntries(absolutePartPath,
+ path -> path.getName().contains(HoodieLogFile.DELTA_EXTENSION));
+
+ for (StoragePathInfo status : statuses) {
+ try {
+ HoodieLogFile logFile = new HoodieLogFile(status);
+ String key = logVersionLookupKey(relPartPath, logFile.getFileId(),
logFile.getDeltaCommitTime());
+ Pair<Integer, String> existing = logVersionMap.get(key);
+ if (existing == null || logFile.getLogVersion() >
existing.getLeft()) {
+ logVersionMap.put(key, Pair.of(logFile.getLogVersion(),
logFile.getLogWriteToken()));
Review Comment:
🤖 The `listDirectEntries` filter uses
`contains(HoodieLogFile.DELTA_EXTENSION)` which matches `.log` as a substring.
This would match files like `somefile.log.backup` or `data.log.tmp` — those are
handled by the `InvalidHoodiePathException` catch below, but it means we're
parsing more files than necessary. Would it be worth tightening the filter
(e.g., matching the expected log file pattern) to reduce the noise?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java:
##########
@@ -146,14 +151,25 @@ List<Pair<String, HoodieRollbackStat>>
maybeDeleteAndCollectStats(HoodieEngineCo
)
.withStorage(metaClient.getStorage())
.withTableVersion(tableVersion)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION);
+
+ String logVersionKey =
logVersionLookupKey(rollbackRequest.getPartitionPath(), fileId,
rollbackRequest.getLatestBaseInstant());
+ Pair<Integer, String> preComputedVersion =
logVersionMap.get(logVersionKey);
+ if (preComputedVersion != null) {
+ writerBuilder.withLogVersion(preComputedVersion.getLeft())
+ .withLogWriteToken(preComputedVersion.getRight());
+ }
Review Comment:
🤖 I traced the builder's `build()` path: when `preComputedVersion` is null
(no entry in the map), the builder falls through to `build()` with `logVersion
== null`, which triggers `FSUtils.getLatestLogVersion()` — the original
listing-based discovery that sets both `logVersion` and `logWriteToken`
(including defaulting to `UNKNOWN_WRITE_TOKEN` when no file exists). So the
fallback behavior is preserved without setting the token here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]