the-other-tim-brown commented on code in PR #18159:
URL: https://github.com/apache/hudi/pull/18159#discussion_r2843878532


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -614,4 +660,299 @@ private boolean 
isFileGroupInPendingMajorOrMinorCompaction(HoodieFileGroup fg) {
   private boolean noSubsequentReplaceCommit(String earliestCommitToRetain, 
String partitionPath) {
     return 
!hoodieTable.getHoodieView().getReplacedFileGroupsAfterOrOn(earliestCommitToRetain,
 partitionPath).findAny().isPresent();
   }
+
+  private <R> List<CleanFileInfo> getBlobFilesToRemove(HoodieTable table, 
HoodieSchema schema, Option<HoodieSchema> requestedSchema,
+                                                       Option<List<FileSlice>> 
retainedFileSlices, Option<List<FileSlice>> removedFileSlices) {
+    if (requestedSchema.isEmpty()) {
+      // no blob columns, no blob files to clean
+      return Collections.emptyList();
+    }
+    // Skip if there are no removed file slices
+    if (removedFileSlices.isEmpty()) {
+      return Collections.emptyList();
+    }
+    // Validate there is at least one completed commit
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    Option<String> latestCommitTimeOpt = 
metaClient.getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime);
+    if (!latestCommitTimeOpt.isPresent()) {
+      // no commits or blob files to clean
+      return Collections.emptyList();
+    }
+    Map<HoodieFileGroupId, List<FileSlice>> retainedFileSlicesByFileGroupId = 
retainedFileSlices.orElseThrow(() -> new HoodieException("Retained file slices 
must be set"))
+        .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+    Map<HoodieFileGroupId, List<FileSlice>> removedFileSlicesByFileGroupId = 
removedFileSlices.orElseThrow(() -> new HoodieException("Removed file slices 
must be set"))
+        .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+
+    return getFileSliceComparisonGroups(retainedFileSlicesByFileGroupId, 
removedFileSlicesByFileGroupId).stream().flatMap(group -> {
+      HoodieReaderContext<R> readerContext = ((ReaderContextFactory<R>) 
table.getContext().getReaderContextFactory(metaClient)).getContext();
+      RecordContext<R> recordContext = readerContext.getRecordContext();
+      // Iterate through the removed file slices with skip merging to find all 
the blob files that are referenced by the removed file slices.
+      TypedProperties properties = TypedProperties.copy(config.getProps());
+      properties.put(HoodieReaderConfig.MERGE_TYPE.key(), REALTIME_SKIP_MERGE);
+
+      Set<String> managedBlobFilePaths = group.getRemovedFileSlices().stream()
+          .flatMap(fileSlice -> {
+            HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema, 
fileSlice, readerContext, metaClient, latestCommitTimeOpt, 
requestedSchema.get(), properties);
+            Set<String> managedBlobFilePathsInSlice = new HashSet<>();
+            try (ClosableIterator<R> recordItr = reader.getClosableIterator()) 
{
+              while (recordItr.hasNext()) {
+                R record = recordItr.next();
+                managedBlobFilePathsInSlice.addAll(getManagedBlobPaths(schema, 
record, recordContext));
+              }
+            } catch (IOException e) {
+              throw new HoodieIOException("Error reading records from file 
slice: " + fileSlice, e);
+            }
+            return managedBlobFilePathsInSlice.stream();
+          })
+          .collect(Collectors.toSet());
+      if (managedBlobFilePaths.isEmpty()) {
+        // no blob files referenced by the removed file slices, skip
+        return Stream.empty();
+      }
+      // Then iterate through the retained file slices with skip merging to 
find all the blob files that are still referenced by the retained file slices.
+      group.getRetainedFileSlices().forEach(fileSlice -> {
+        HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema, 
fileSlice, readerContext, metaClient, latestCommitTimeOpt, 
requestedSchema.get(), properties);
+        try (ClosableIterator<R> recordItr = reader.getClosableIterator()) {
+          while (recordItr.hasNext()) {
+            R record = recordItr.next();
+            getManagedBlobPaths(schema, record, 
recordContext).forEach(managedBlobFilePaths::remove);
+            if (managedBlobFilePaths.isEmpty()) {
+              // all blob files referenced by the removed file slices are 
still referenced by the retained file slices, skip
+              break;
+            }
+          }
+        } catch (IOException e) {
+          throw new HoodieIOException("Error reading records from file slice: 
" + fileSlice, e);
+        }
+      });
+
+      // The remaining blob file paths in managedBlobFilePaths are the ones 
that can be removed.
+      return managedBlobFilePaths.stream().map(path -> new CleanFileInfo(path, 
false));
+    }).collect(Collectors.toList());
+  }
+
+  private <R> HoodieFileGroupReader<R> getHoodieFileGroupReader(HoodieSchema 
schema, FileSlice fileSlice, HoodieReaderContext<R> readerContext, 
HoodieTableMetaClient metaClient,
+                                                                Option<String> 
latestCommitTimeOpt, HoodieSchema requestedSchema, TypedProperties props) {
+    return HoodieFileGroupReader.<R>newBuilder()
+        .withReaderContext(readerContext)
+        .withHoodieTableMetaClient(metaClient)
+        .withLatestCommitTime(latestCommitTimeOpt.get())
+        .withFileSlice(fileSlice)
+        .withDataSchema(schema)
+        .withRequestedSchema(requestedSchema)
+        .withProps(props)
+        .build();
+  }
+
+  // TODO: How does this work for tables with global index?
+  /**
+   * Cleaning blob files requires that we inspect the contents of the retained 
and removed file slices to find the blob file references that can be removed.
+   * To optimize the process, we limit the comparisons required with the 
following grouping logic:
+   * 1) If a removed file slice has a corresponding retained file slice in the 
same file group, we will compare the removed file slice with the retained file 
slice(s) in the same file group.
+   * 2) If a removed file slice does not have a corresponding retained file 
slice in the same file group, this implies that there is a replace or 
clustering commit. This requires comparing the
+   *    removed file slice with all retained file slices that were created 
after the removed file slice's latest instant time, as these are the only 
retained file slices
+   *    that could be part of a replace or clustering commit that removes 
these files from the active view of the table. All slices that fall into this 
case are put into the same comparison group to
+   *    avoid reading the same file multiple times.
+   * @param retainedFileSlicesByFileGroupId the map of retained file slices 
grouped by HoodieFileGroupId
+   * @param removedFileSlicesByFileGroupId the map of removed file slices 
grouped by HoodieFileGroupId
+   * @return the list of FileSliceComparisonGroup which contains the groups of 
file slices to compare for finding dereferenced blobs
+   */
+  @VisibleForTesting
+  static List<FileSliceComparisonGroup> 
getFileSliceComparisonGroups(Map<HoodieFileGroupId, List<FileSlice>> 
retainedFileSlicesByFileGroupId,
+                                                                     
Map<HoodieFileGroupId, List<FileSlice>> removedFileSlicesByFileGroupId) {
+    List<FileSliceComparisonGroup> groupings = new ArrayList<>();
+    List<List<FileSlice>> removedFileSlicesWithoutRetainedSlices = new 
ArrayList<>();
+    removedFileSlicesByFileGroupId.keySet().forEach(fileGroupId -> {
+      List<FileSlice> removedSlices = 
removedFileSlicesByFileGroupId.get(fileGroupId);
+      List<FileSlice> retainedSlices = 
retainedFileSlicesByFileGroupId.get(fileGroupId);
+      if (retainedSlices == null) {
+        // This is due to replace commit or clustering so we must handle this 
case separately
+        removedFileSlicesWithoutRetainedSlices.add(removedSlices);
+      } else {
+        groupings.add(FileSliceComparisonGroup.builder().removedFileSlices(new 
HashSet<>(removedSlices)).retainedFileSlices(new 
HashSet<>(retainedSlices)).build());
+      }
+    });
+    if (!removedFileSlicesWithoutRetainedSlices.isEmpty()) {
+      // File slices that do not have a retained commit are due to replace 
commit or clustering, so we will compare them against all retained file slices 
created after the oldest removed file slice.
+      String instant = removedFileSlicesWithoutRetainedSlices.stream()
+          // for each file group we get the latest instant time
+          .map(fileSlices -> 
fileSlices.stream().map(FileSlice::getLatestInstantTime).reduce(InstantComparison::maxInstant)
+              .orElseThrow(() -> new HoodieException("File slices should have 
at least one file")))
+          .reduce(InstantComparison::minInstant).orElseThrow(() -> new 
HoodieException("There should be at least one file slice to remove"));
+      // find the file groups created after this instant and get the retained 
file slices for those file groups
+      Set<FileSlice> retainedFileSlicesRequiringComparison = 
retainedFileSlicesByFileGroupId.entrySet().stream()
+          .filter(entry -> entry.getValue().stream().allMatch(fileSlice -> 
compareTimestamps(fileSlice.getBaseInstantTime(), GREATER_THAN, instant)))

Review Comment:
   The base instant time is used since this is to handle file groups that are 
potentially created by the replace commit that removed the other file group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to