nsivabalan commented on code in PR #9553:
URL: https://github.com/apache/hudi/pull/9553#discussion_r1316529900


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -162,24 +199,126 @@ List<Pair<String, HoodieRollbackStat>> 
maybeDeleteAndCollectStats(HoodieEngineCo
             1L
         );
 
+        // With listing based rollback, sometimes we only get the fileID of 
interest(so that we can add rollback command block) w/o the actual file name.
+        // So, we want to ignore such invalid files from this list before we 
add it to the rollback stats.
+        String partitionFullPath = metaClient.getBasePathV2().toString();
+        Map<String, Long> validLogBlocksToDelete = new HashMap<>();
+        
rollbackRequest.getLogBlocksToBeDeleted().entrySet().stream().forEach((kv) -> {
+          String logFileFullPath = kv.getKey();
+          String logFileName = logFileFullPath.replace(partitionFullPath, "");
+          if (!StringUtils.isNullOrEmpty(logFileName)) {
+            validLogBlocksToDelete.put(kv.getKey(), kv.getValue());
+          }
+        });
+
         return Collections.singletonList(
-            Pair.of(rollbackRequest.getPartitionPath(),
-                HoodieRollbackStat.newBuilder()
-                    .withPartitionPath(rollbackRequest.getPartitionPath())
-                    .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-                    .build()))
+                Pair.of(rollbackRequest.getPartitionPath(),
+                    HoodieRollbackStat.newBuilder()
+                        .withPartitionPath(rollbackRequest.getPartitionPath())
+                        
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
+                        .withLogFilesFromFailedCommit(validLogBlocksToDelete)
+                        .build()))
             .stream();
       } else {
         return Collections.singletonList(
-            Pair.of(rollbackRequest.getPartitionPath(),
-                HoodieRollbackStat.newBuilder()
-                    .withPartitionPath(rollbackRequest.getPartitionPath())
-                    .build()))
+                Pair.of(rollbackRequest.getPartitionPath(),
+                    HoodieRollbackStat.newBuilder()
+                        .withPartitionPath(rollbackRequest.getPartitionPath())
+                        .build()))
             .stream();
       }
     }, numPartitions);
   }
 
+  private HoodieLogFileWriteCallback getRollbackLogMarkerCallback(final 
WriteMarkers writeMarkers, String partitionPath, String fileId) {
+    return new HoodieLogFileWriteCallback() {
+      @Override
+      public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+        // there may be existed marker file if fs support append. So always 
return true;
+        createAppendMarker(logFileToAppend);
+        return true;
+      }
+
+      @Override
+      public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+        return createAppendMarker(logFileToCreate);
+      }
+
+      private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
+        return writeMarkers.createIfNotExists(partitionPath, 
logFileToAppend.getFileName(), IOType.APPEND,
+            config, fileId, metaClient.getActiveTimeline()).isPresent();
+      }
+    };
+  }
+
+  /**
+   * If there are log files created by previous rollback attempts, we want to 
add them to rollback stats so that MDT is able to track them.
+   * @param context
+   * @param originalRollbackStats
+   * @param logPaths
+   * @return
+   */
+  private List<HoodieRollbackStat> 
addLogFilesFromPreviousFailedRollbacksToStat(HoodieEngineContext context,
+                                                                               
 List<HoodieRollbackStat> originalRollbackStats,
+                                                                               
 Set<String> logPaths) {
+    if (logPaths.isEmpty()) {
+      // if rollback is not failed and re-attempted, we should not find any 
additional log files here.
+      return originalRollbackStats;
+    }
+
+    final String basePathStr = config.getBasePath();
+    List<String> logFiles = new ArrayList<>(logPaths);
+    // populate partitionPath -> List<log file name>
+    HoodiePairData<String, List<String>> partitionPathToLogFilesHoodieData = 
context.parallelize(logFiles)
+        // lets map each log file to partition path and log file name
+        .mapToPair((SerializablePairFunction<String, String, String>) t -> {
+          Path logFilePath = new Path(basePathStr, t);
+          String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePathStr), logFilePath.getParent());
+          return Pair.of(partitionPath, logFilePath.getName());
+        })
+        // lets group by partition path and collect it as log file list per 
partition path
+        .groupByKey().mapToPair((SerializablePairFunction<Pair<String, 
Iterable<String>>, String, List<String>>) t -> {
+          List<String> allFiles = new ArrayList<>();
+          t.getRight().forEach(entry -> allFiles.add(entry));
+          return Pair.of(t.getKey(), allFiles);
+        });
+
+    // populate partitionPath -> HoodieRollbackStat
+    HoodiePairData<String, HoodieRollbackStat> 
partitionPathToRollbackStatsHoodieData =
+        context.parallelize(originalRollbackStats)
+            .mapToPair((SerializablePairFunction<HoodieRollbackStat, String, 
HoodieRollbackStat>) t -> Pair.of(t.getPartitionPath(), t));
+
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(table.getHadoopConf());

Review Comment:
   yeah, I did give it a thought. but spark might try to serialize entire class 
if we make it a class variable. So, it has to be local variable so that spark 
just sends serializableConfiguration to executors



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to