nsivabalan commented on code in PR #9553:
URL: https://github.com/apache/hudi/pull/9553#discussion_r1316529900
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -162,24 +199,126 @@ List<Pair<String, HoodieRollbackStat>>
maybeDeleteAndCollectStats(HoodieEngineCo
1L
);
+ // With listing based rollback, sometimes we only get the fileID of
interest(so that we can add rollback command block) w/o the actual file name.
+ // So, we want to ignore such invalid files from this list before we
add it to the rollback stats.
+ String partitionFullPath = metaClient.getBasePathV2().toString();
+ Map<String, Long> validLogBlocksToDelete = new HashMap<>();
+
rollbackRequest.getLogBlocksToBeDeleted().entrySet().stream().forEach((kv) -> {
+ String logFileFullPath = kv.getKey();
+ String logFileName = logFileFullPath.replace(partitionFullPath, "");
+ if (!StringUtils.isNullOrEmpty(logFileName)) {
+ validLogBlocksToDelete.put(kv.getKey(), kv.getValue());
+ }
+ });
+
return Collections.singletonList(
- Pair.of(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder()
- .withPartitionPath(rollbackRequest.getPartitionPath())
- .withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .build()))
+ Pair.of(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder()
+ .withPartitionPath(rollbackRequest.getPartitionPath())
+
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
+ .withLogFilesFromFailedCommit(validLogBlocksToDelete)
+ .build()))
.stream();
} else {
return Collections.singletonList(
- Pair.of(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder()
- .withPartitionPath(rollbackRequest.getPartitionPath())
- .build()))
+ Pair.of(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder()
+ .withPartitionPath(rollbackRequest.getPartitionPath())
+ .build()))
.stream();
}
}, numPartitions);
}
+ private HoodieLogFileWriteCallback getRollbackLogMarkerCallback(final
WriteMarkers writeMarkers, String partitionPath, String fileId) {
+ return new HoodieLogFileWriteCallback() {
+ @Override
+ public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+ // there may be existed marker file if fs support append. So always
return true;
+ createAppendMarker(logFileToAppend);
+ return true;
+ }
+
+ @Override
+ public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+ return createAppendMarker(logFileToCreate);
+ }
+
+ private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
+ return writeMarkers.createIfNotExists(partitionPath,
logFileToAppend.getFileName(), IOType.APPEND,
+ config, fileId, metaClient.getActiveTimeline()).isPresent();
+ }
+ };
+ }
+
+ /**
+ * If there are log files created by previous rollback attempts, we want to
add them to rollback stats so that MDT is able to track them.
+ * @param context
+ * @param originalRollbackStats
+ * @param logPaths
+ * @return
+ */
+ private List<HoodieRollbackStat>
addLogFilesFromPreviousFailedRollbacksToStat(HoodieEngineContext context,
+
List<HoodieRollbackStat> originalRollbackStats,
+
Set<String> logPaths) {
+ if (logPaths.isEmpty()) {
+ // if rollback is not failed and re-attempted, we should not find any
additional log files here.
+ return originalRollbackStats;
+ }
+
+ final String basePathStr = config.getBasePath();
+ List<String> logFiles = new ArrayList<>(logPaths);
+ // populate partitionPath -> List<log file name>
+ HoodiePairData<String, List<String>> partitionPathToLogFilesHoodieData =
context.parallelize(logFiles)
+ // lets map each log file to partition path and log file name
+ .mapToPair((SerializablePairFunction<String, String, String>) t -> {
+ Path logFilePath = new Path(basePathStr, t);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePathStr), logFilePath.getParent());
+ return Pair.of(partitionPath, logFilePath.getName());
+ })
+ // lets group by partition path and collect it as log file list per
partition path
+ .groupByKey().mapToPair((SerializablePairFunction<Pair<String,
Iterable<String>>, String, List<String>>) t -> {
+ List<String> allFiles = new ArrayList<>();
+ t.getRight().forEach(entry -> allFiles.add(entry));
+ return Pair.of(t.getKey(), allFiles);
+ });
+
+ // populate partitionPath -> HoodieRollbackStat
+ HoodiePairData<String, HoodieRollbackStat>
partitionPathToRollbackStatsHoodieData =
+ context.parallelize(originalRollbackStats)
+ .mapToPair((SerializablePairFunction<HoodieRollbackStat, String,
HoodieRollbackStat>) t -> Pair.of(t.getPartitionPath(), t));
+
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(table.getHadoopConf());
Review Comment:
yeah, I did give it a thought. but spark might try to serialize entire class
if we make it a class variable. So, it has to be local variable so that spark
just sends serializableConfiguration to executors
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]