[ 
https://issues.apache.org/jira/browse/HUDI-2351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411684#comment-17411684
 ] 

ASF GitHub Bot commented on HUDI-2351:
--------------------------------------

yihua commented on a change in pull request #3529:
URL: https://github.com/apache/hudi/pull/3529#discussion_r703579575



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java
##########
@@ -117,26 +117,26 @@ private void convertToDirectMarkers(final String 
commitInstantTime,
     } else {
       // In case of partial failures during downgrade, there is a chance that 
marker type file was deleted,
       // but timeline server based marker files are left.  So deletes them if 
any
-      deleteTimelineBasedMarkerFiles(markerDir, fileSystem);
+      deleteTimelineBasedMarkerFiles(
+          context, markerDir, fileSystem, 
table.getConfig().getMarkersDeleteParallelism());
     }
   }
 
-  private void deleteTimelineBasedMarkerFiles(String markerDir, FileSystem 
fileSystem) throws IOException {
+  private void deleteTimelineBasedMarkerFiles(HoodieEngineContext context, 
String markerDir,
+                                              FileSystem fileSystem, int 
parallelism) throws IOException {
     // Deletes timeline based marker files if any.
-    Path dirPath = new Path(markerDir);
-    FileStatus[] fileStatuses = fileSystem.listStatus(dirPath);
     Predicate<FileStatus> prefixFilter = fileStatus ->
         fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX);
-    List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
-        .filter(prefixFilter)
-        .map(fileStatus -> fileStatus.getPath().toString())
-        .collect(Collectors.toList());
-    markerDirSubPaths.forEach(fileToDelete -> {
-      try {
-        fileSystem.delete(new Path(fileToDelete), false);
-      } catch (IOException e) {
-        Log.warn("Deleting Timeline based marker files failed ", e);
-      }
-    });
+    FSUtils.parallelizeSubPathProcess(context, fileSystem, new 
Path(markerDir), parallelism,
+        prefixFilter, pairOfSubPathAndConf -> {

Review comment:
       Good Catch.  Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Fix `Task not serializable` due to new APIs in FSUtils for marker mechanism
> ---------------------------------------------------------------------------
>
>                 Key: HUDI-2351
>                 URL: https://issues.apache.org/jira/browse/HUDI-2351
>             Project: Apache Hudi
>          Issue Type: Sub-task
>            Reporter: Ethan Guo
>            Priority: Major
>              Labels: pull-request-available
>
> * Fix `Task not serializable` due to new APIs in FSUtils for recursive, level 
> by level listing (`java.io.NotSerializableException: 
> org.apache.hudi.common.fs.FSUtils$$Lambda$4224/1845791682`)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to