the-other-tim-brown commented on code in PR #18159:
URL: https://github.com/apache/hudi/pull/18159#discussion_r2835502404


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -614,4 +660,299 @@ private boolean 
isFileGroupInPendingMajorOrMinorCompaction(HoodieFileGroup fg) {
   private boolean noSubsequentReplaceCommit(String earliestCommitToRetain, 
String partitionPath) {
     return 
!hoodieTable.getHoodieView().getReplacedFileGroupsAfterOrOn(earliestCommitToRetain,
 partitionPath).findAny().isPresent();
   }
+
+  private <R> List<CleanFileInfo> getBlobFilesToRemove(HoodieTable table, 
HoodieSchema schema, Option<HoodieSchema> requestedSchema,
+                                                       Option<List<FileSlice>> 
retainedFileSlices, Option<List<FileSlice>> removedFileSlices) {
+    if (requestedSchema.isEmpty()) {
+      // no blob columns, no blob files to clean
+      return Collections.emptyList();
+    }
+    // Skip if there are no removed file slices
+    if (removedFileSlices.isEmpty()) {
+      return Collections.emptyList();
+    }
+    // Validate there is at least one completed commit
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    Option<String> latestCommitTimeOpt = 
metaClient.getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime);
+    if (!latestCommitTimeOpt.isPresent()) {
+      // no commits or blob files to clean
+      return Collections.emptyList();
+    }
+    Map<HoodieFileGroupId, List<FileSlice>> retainedFileSlicesByFileGroupId = 
retainedFileSlices.orElseThrow(() -> new HoodieException("Retained file slices 
must be set"))
+        .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+    Map<HoodieFileGroupId, List<FileSlice>> removedFileSlicesByFileGroupId = 
removedFileSlices.orElseThrow(() -> new HoodieException("Removed file slices 
must be set"))
+        .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+
+    return getFileSliceComparisonGroups(retainedFileSlicesByFileGroupId, 
removedFileSlicesByFileGroupId).stream().flatMap(group -> {
+      HoodieReaderContext<R> readerContext = ((ReaderContextFactory<R>) 
table.getContext().getReaderContextFactory(metaClient)).getContext();
+      RecordContext<R> recordContext = readerContext.getRecordContext();
+      // Iterate through the removed file slices with skip merging to find all 
the blob files that are referenced by the removed file slices.
+      TypedProperties properties = TypedProperties.copy(config.getProps());
+      properties.put(HoodieReaderConfig.MERGE_TYPE.key(), REALTIME_SKIP_MERGE);
+
+      Set<String> managedBlobFilePaths = group.getRemovedFileSlices().stream()
+          .flatMap(fileSlice -> {
+            HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema, 
fileSlice, readerContext, metaClient, latestCommitTimeOpt, 
requestedSchema.get(), properties);
+            Set<String> managedBlobFilePathsInSlice = new HashSet<>();
+            try (ClosableIterator<R> recordItr = reader.getClosableIterator()) 
{
+              while (recordItr.hasNext()) {
+                R record = recordItr.next();
+                managedBlobFilePathsInSlice.addAll(getManagedBlobPaths(schema, 
record, recordContext));
+              }
+            } catch (IOException e) {
+              throw new HoodieIOException("Error reading records from file 
slice: " + fileSlice, e);
+            }
+            return managedBlobFilePathsInSlice.stream();
+          })
+          .collect(Collectors.toSet());
+      if (managedBlobFilePaths.isEmpty()) {
+        // no blob files referenced by the removed file slices, skip
+        return Stream.empty();
+      }
+      // Then iterate through the retained file slices with skip merging to 
find all the blob files that are still referenced by the retained file slices.
+      group.getRetainedFileSlices().forEach(fileSlice -> {
+        HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema, 
fileSlice, readerContext, metaClient, latestCommitTimeOpt, 
requestedSchema.get(), properties);
+        try (ClosableIterator<R> recordItr = reader.getClosableIterator()) {
+          while (recordItr.hasNext()) {
+            R record = recordItr.next();
+            getManagedBlobPaths(schema, record, 
recordContext).forEach(managedBlobFilePaths::remove);
+            if (managedBlobFilePaths.isEmpty()) {
+              // all blob files referenced by the removed file slices are 
still referenced by the retained file slices, skip
+              break;
+            }
+          }
+        } catch (IOException e) {
+          throw new HoodieIOException("Error reading records from file slice: 
" + fileSlice, e);
+        }
+      });
+
+      // The remaining blob file paths in managedBlobFilePaths are the ones 
that can be removed.
+      return managedBlobFilePaths.stream().map(path -> new CleanFileInfo(path, 
false));
+    }).collect(Collectors.toList());
+  }
+
+  private <R> HoodieFileGroupReader<R> getHoodieFileGroupReader(HoodieSchema 
schema, FileSlice fileSlice, HoodieReaderContext<R> readerContext, 
HoodieTableMetaClient metaClient,
+                                                                Option<String> 
latestCommitTimeOpt, HoodieSchema requestedSchema, TypedProperties props) {
+    return HoodieFileGroupReader.<R>newBuilder()
+        .withReaderContext(readerContext)
+        .withHoodieTableMetaClient(metaClient)
+        .withLatestCommitTime(latestCommitTimeOpt.get())
+        .withFileSlice(fileSlice)
+        .withDataSchema(schema)
+        .withRequestedSchema(requestedSchema)
+        .withProps(props)
+        .build();
+  }
+
+  // TODO: How does this work for tables with global index?

Review Comment:
   Right now the clean plan is done per partition. If we have references move 
between partitions then we can no longer do the computation per partitions 
since we would need to check for references to the blob in other partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to