yihua commented on a change in pull request #3664:
URL: https://github.com/apache/hudi/pull/3664#discussion_r709498852
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -68,107 +67,107 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
/**
* Performs all rollback actions that we have collected in parallel.
*/
- public List<HoodieRollbackStat> performRollback(HoodieEngineContext context,
HoodieInstant instantToRollback, List<ListingBasedRollbackRequest>
rollbackRequests) {
- Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs =
maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
-
- Map<String, List<Pair<String, HoodieRollbackStat>>> collect =
partitionPathRollbackStatsPairs.entrySet()
- .stream()
- .map(x -> Pair.of(x.getKey(),
x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
- return collect.values().stream()
- .map(pairs ->
pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null))
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
+ public List<HoodieRollbackStat> performRollback(HoodieEngineContext context,
HoodieInstant instantToRollback,
+
List<ListingBasedRollbackRequest> rollbackRequests) {
+ int parallelism = Math.max(Math.min(rollbackRequests.size(),
config.getRollbackParallelism()), 1);
+ context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback
actions");
+ return context.mapToPairAndReduceByKey(rollbackRequests,
+ rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest,
instantToRollback, true),
+ RollbackUtils::mergeRollbackStat,
+ parallelism);
}
/**
* Collect all file info that needs to be rollbacked.
*/
- public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext
context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest>
rollbackRequests) {
- Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs =
maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
- return new ArrayList<>(partitionPathRollbackStatsPairs.values());
+ public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext
context, HoodieInstant instantToRollback,
+
List<ListingBasedRollbackRequest> rollbackRequests) {
+ int parallelism = Math.max(Math.min(rollbackRequests.size(),
config.getRollbackParallelism()), 1);
+ context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback
stats for upgrade/downgrade");
+ return context.mapToPairAndReduceByKey(rollbackRequests,
+ rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest,
instantToRollback, false),
+ RollbackUtils::mergeRollbackStat,
+ parallelism);
}
/**
* May be delete interested files and collect stats or collect stats only.
*
- * @param context instance of {@link HoodieEngineContext} to use.
* @param instantToRollback {@link HoodieInstant} of interest for which
deletion or collect stats is requested.
- * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to
be operated on.
- * @param doDelete {@code true} if deletion has to be done. {@code
false} if only stats are to be collected w/o performing any deletes.
+ * @param doDelete {@code true} if deletion has to be done.
+ * {@code false} if only stats are to be collected
w/o performing any deletes.
* @return stats collected with or w/o actual deletions.
*/
- Map<String, HoodieRollbackStat>
maybeDeleteAndCollectStats(HoodieEngineContext context,
- HoodieInstant
instantToRollback,
-
List<ListingBasedRollbackRequest> rollbackRequests,
- boolean doDelete)
{
- return context.mapToPair(rollbackRequests, rollbackRequest -> {
- switch (rollbackRequest.getType()) {
- case DELETE_DATA_FILES_ONLY: {
- final Map<FileStatus, Boolean> filesToDeletedStatus =
deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
- rollbackRequest.getPartitionPath(), doDelete);
- return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withDeletedFileResults(filesToDeletedStatus).build());
- }
- case DELETE_DATA_AND_LOG_FILES: {
- final Map<FileStatus, Boolean> filesToDeletedStatus =
deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath(), doDelete);
- return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withDeletedFileResults(filesToDeletedStatus).build());
- }
- case APPEND_ROLLBACK_BLOCK: {
- String fileId = rollbackRequest.getFileId().get();
- String latestBaseInstant =
rollbackRequest.getLatestBaseInstant().get();
-
- // collect all log files that is supposed to be deleted with this
rollback
- Map<FileStatus, Long> writtenLogFileSizeMap =
FSUtils.getAllLogFiles(metaClient.getFs(),
- FSUtils.getPartitionPath(config.getBasePath(),
rollbackRequest.getPartitionPath()),
- fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(),
latestBaseInstant)
- .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value ->
value.getFileStatus().getLen()));
-
- HoodieLogFormat.Writer writer = null;
+ private Pair<String, HoodieRollbackStat>
maybeDeleteAndCollectStats(ListingBasedRollbackRequest rollbackRequest,
+
HoodieInstant instantToRollback,
+ boolean
doDelete) throws IOException {
+ switch (rollbackRequest.getType()) {
Review comment:
I only changed the return type so that it can be reused across different
engines.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]