nsivabalan commented on a change in pull request #3664:
URL: https://github.com/apache/hudi/pull/3664#discussion_r709432675



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
##########
@@ -70,4 +71,14 @@
       }
     };
   }
+
+  public static <V> BinaryOperator<V> 
throwingReduceWrapper(SerializableBiFunction<V, V, V> throwingReduceFunction) {

Review comment:
       nit: can we name this as throwableReduceWrapper ("ble" instead of "ing")

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -68,107 +67,107 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient 
metaClient, HoodieWriteC
   /**
    * Performs all rollback actions that we have collected in parallel.
    */
-  public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, 
HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> 
rollbackRequests) {
-    Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = 
maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
-
-    Map<String, List<Pair<String, HoodieRollbackStat>>> collect = 
partitionPathRollbackStatsPairs.entrySet()
-        .stream()
-        .map(x -> Pair.of(x.getKey(), 
x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
-    return collect.values().stream()
-        .map(pairs -> 
pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null))
-        .filter(Objects::nonNull)
-        .collect(Collectors.toList());
+  public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, 
HoodieInstant instantToRollback,
+                                                  
List<ListingBasedRollbackRequest> rollbackRequests) {
+    int parallelism = Math.max(Math.min(rollbackRequests.size(), 
config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback 
actions");
+    return context.mapToPairAndReduceByKey(rollbackRequests,
+        rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, 
instantToRollback, true),
+        RollbackUtils::mergeRollbackStat,
+        parallelism);
   }
 
   /**
    * Collect all file info that needs to be rollbacked.
    */
-  public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext 
context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> 
rollbackRequests) {
-    Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = 
maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
-    return new ArrayList<>(partitionPathRollbackStatsPairs.values());
+  public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext 
context, HoodieInstant instantToRollback,
+                                                       
List<ListingBasedRollbackRequest> rollbackRequests) {
+    int parallelism = Math.max(Math.min(rollbackRequests.size(), 
config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback 
stats for upgrade/downgrade");
+    return context.mapToPairAndReduceByKey(rollbackRequests,
+        rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, 
instantToRollback, false),
+        RollbackUtils::mergeRollbackStat,
+        parallelism);
   }
 
   /**
    * May be delete interested files and collect stats or collect stats only.
    *
-   * @param context           instance of {@link HoodieEngineContext} to use.
    * @param instantToRollback {@link HoodieInstant} of interest for which 
deletion or collect stats is requested.
-   * @param rollbackRequests  List of {@link ListingBasedRollbackRequest} to 
be operated on.
-   * @param doDelete          {@code true} if deletion has to be done. {@code 
false} if only stats are to be collected w/o performing any deletes.
+   * @param doDelete          {@code true} if deletion has to be done.
+   *                          {@code false} if only stats are to be collected 
w/o performing any deletes.
    * @return stats collected with or w/o actual deletions.
    */
-  Map<String, HoodieRollbackStat> 
maybeDeleteAndCollectStats(HoodieEngineContext context,
-                                                             HoodieInstant 
instantToRollback,
-                                                             
List<ListingBasedRollbackRequest> rollbackRequests,
-                                                             boolean doDelete) 
{
-    return context.mapToPair(rollbackRequests, rollbackRequest -> {
-      switch (rollbackRequest.getType()) {
-        case DELETE_DATA_FILES_ONLY: {
-          final Map<FileStatus, Boolean> filesToDeletedStatus = 
deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
-              rollbackRequest.getPartitionPath(), doDelete);
-          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-              
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withDeletedFileResults(filesToDeletedStatus).build());
-        }
-        case DELETE_DATA_AND_LOG_FILES: {
-          final Map<FileStatus, Boolean> filesToDeletedStatus = 
deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), 
rollbackRequest.getPartitionPath(), doDelete);
-          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-              
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withDeletedFileResults(filesToDeletedStatus).build());
-        }
-        case APPEND_ROLLBACK_BLOCK: {
-          String fileId = rollbackRequest.getFileId().get();
-          String latestBaseInstant = 
rollbackRequest.getLatestBaseInstant().get();
-
-          // collect all log files that is supposed to be deleted with this 
rollback
-          Map<FileStatus, Long> writtenLogFileSizeMap = 
FSUtils.getAllLogFiles(metaClient.getFs(),
-              FSUtils.getPartitionPath(config.getBasePath(), 
rollbackRequest.getPartitionPath()),
-              fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), 
latestBaseInstant)
-              .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> 
value.getFileStatus().getLen()));
-
-          HoodieLogFormat.Writer writer = null;
+  private Pair<String, HoodieRollbackStat> 
maybeDeleteAndCollectStats(ListingBasedRollbackRequest rollbackRequest,
+                                                                      
HoodieInstant instantToRollback,
+                                                                      boolean 
doDelete) throws IOException {
+    switch (rollbackRequest.getType()) {

Review comment:
       I assume you have not changed anything within this method. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to