the-other-tim-brown commented on code in PR #18159:
URL: https://github.com/apache/hudi/pull/18159#discussion_r2843885519


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -614,4 +660,299 @@ private boolean 
isFileGroupInPendingMajorOrMinorCompaction(HoodieFileGroup fg) {
   private boolean noSubsequentReplaceCommit(String earliestCommitToRetain, 
String partitionPath) {
     return 
!hoodieTable.getHoodieView().getReplacedFileGroupsAfterOrOn(earliestCommitToRetain,
 partitionPath).findAny().isPresent();
   }
+
+  private <R> List<CleanFileInfo> getBlobFilesToRemove(HoodieTable table, 
HoodieSchema schema, Option<HoodieSchema> requestedSchema,
+                                                       Option<List<FileSlice>> 
retainedFileSlices, Option<List<FileSlice>> removedFileSlices) {
+    if (requestedSchema.isEmpty()) {
+      // no blob columns, no blob files to clean
+      return Collections.emptyList();
+    }
+    // Skip if there are no removed file slices
+    if (removedFileSlices.isEmpty()) {
+      return Collections.emptyList();
+    }
+    // Validate there is at least one completed commit
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    Option<String> latestCommitTimeOpt = 
metaClient.getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime);
+    if (!latestCommitTimeOpt.isPresent()) {
+      // no commits or blob files to clean
+      return Collections.emptyList();
+    }
+    Map<HoodieFileGroupId, List<FileSlice>> retainedFileSlicesByFileGroupId = 
retainedFileSlices.orElseThrow(() -> new HoodieException("Retained file slices 
must be set"))
+        .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+    Map<HoodieFileGroupId, List<FileSlice>> removedFileSlicesByFileGroupId = 
removedFileSlices.orElseThrow(() -> new HoodieException("Removed file slices 
must be set"))
+        .stream().collect(Collectors.groupingBy(FileSlice::getFileGroupId));
+
+    return getFileSliceComparisonGroups(retainedFileSlicesByFileGroupId, 
removedFileSlicesByFileGroupId).stream().flatMap(group -> {
+      HoodieReaderContext<R> readerContext = ((ReaderContextFactory<R>) 
table.getContext().getReaderContextFactory(metaClient)).getContext();
+      RecordContext<R> recordContext = readerContext.getRecordContext();
+      // Iterate through the removed file slices with skip merging to find all 
the blob files that are referenced by the removed file slices.
+      TypedProperties properties = TypedProperties.copy(config.getProps());
+      properties.put(HoodieReaderConfig.MERGE_TYPE.key(), REALTIME_SKIP_MERGE);
+
+      Set<String> managedBlobFilePaths = group.getRemovedFileSlices().stream()
+          .flatMap(fileSlice -> {
+            HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema, 
fileSlice, readerContext, metaClient, latestCommitTimeOpt, 
requestedSchema.get(), properties);
+            Set<String> managedBlobFilePathsInSlice = new HashSet<>();
+            try (ClosableIterator<R> recordItr = reader.getClosableIterator()) 
{
+              while (recordItr.hasNext()) {
+                R record = recordItr.next();
+                managedBlobFilePathsInSlice.addAll(getManagedBlobPaths(schema, 
record, recordContext));
+              }
+            } catch (IOException e) {
+              throw new HoodieIOException("Error reading records from file 
slice: " + fileSlice, e);
+            }
+            return managedBlobFilePathsInSlice.stream();
+          })
+          .collect(Collectors.toSet());
+      if (managedBlobFilePaths.isEmpty()) {
+        // no blob files referenced by the removed file slices, skip
+        return Stream.empty();
+      }
+      // Then iterate through the retained file slices with skip merging to 
find all the blob files that are still referenced by the retained file slices.
+      group.getRetainedFileSlices().forEach(fileSlice -> {
+        HoodieFileGroupReader<R> reader = getHoodieFileGroupReader(schema, 
fileSlice, readerContext, metaClient, latestCommitTimeOpt, 
requestedSchema.get(), properties);
+        try (ClosableIterator<R> recordItr = reader.getClosableIterator()) {
+          while (recordItr.hasNext()) {
+            R record = recordItr.next();
+            getManagedBlobPaths(schema, record, 
recordContext).forEach(managedBlobFilePaths::remove);
+            if (managedBlobFilePaths.isEmpty()) {
+              // all blob files referenced by the removed file slices are 
still referenced by the retained file slices, skip
+              break;
+            }
+          }
+        } catch (IOException e) {
+          throw new HoodieIOException("Error reading records from file slice: 
" + fileSlice, e);
+        }
+      });
+
+      // The remaining blob file paths in managedBlobFilePaths are the ones 
that can be removed.
+      return managedBlobFilePaths.stream().map(path -> new CleanFileInfo(path, 
false));
+    }).collect(Collectors.toList());
+  }
+
+  private <R> HoodieFileGroupReader<R> getHoodieFileGroupReader(HoodieSchema 
schema, FileSlice fileSlice, HoodieReaderContext<R> readerContext, 
HoodieTableMetaClient metaClient,
+                                                                Option<String> 
latestCommitTimeOpt, HoodieSchema requestedSchema, TypedProperties props) {
+    return HoodieFileGroupReader.<R>newBuilder()
+        .withReaderContext(readerContext)
+        .withHoodieTableMetaClient(metaClient)
+        .withLatestCommitTime(latestCommitTimeOpt.get())
+        .withFileSlice(fileSlice)
+        .withDataSchema(schema)
+        .withRequestedSchema(requestedSchema)
+        .withProps(props)
+        .build();
+  }
+
+  // TODO: How does this work for tables with global index?
+  /**
+   * Cleaning blob files requires that we inspect the contents of the retained 
and removed file slices to find the blob file references that can be removed.
+   * To optimize the process, we limit the comparisons required with the 
following grouping logic:
+   * 1) If a removed file slice has a corresponding retained file slice in the 
same file group, we will compare the removed file slice with the retained file 
slice(s) in the same file group.
+   * 2) If a removed file slice does not have a corresponding retained file 
slice in the same file group, this implies that there is a replace or 
clustering commit. This requires comparing the
+   *    removed file slice with all retained file slices that were created 
after the removed file slice's latest instant time, as these are the only 
retained file slices
+   *    that could be part of a replace or clustering commit that removes 
these files from the active view of the table. All slices that fall into this 
case are put into the same comparison group to
+   *    avoid reading the same file multiple times.
+   * @param retainedFileSlicesByFileGroupId the map of retained file slices 
grouped by HoodieFileGroupId
+   * @param removedFileSlicesByFileGroupId the map of removed file slices 
grouped by HoodieFileGroupId
+   * @return the list of FileSliceComparisonGroup which contains the groups of 
file slices to compare for finding dereferenced blobs
+   */
+  @VisibleForTesting
+  static List<FileSliceComparisonGroup> 
getFileSliceComparisonGroups(Map<HoodieFileGroupId, List<FileSlice>> 
retainedFileSlicesByFileGroupId,
+                                                                     
Map<HoodieFileGroupId, List<FileSlice>> removedFileSlicesByFileGroupId) {
+    List<FileSliceComparisonGroup> groupings = new ArrayList<>();
+    List<List<FileSlice>> removedFileSlicesWithoutRetainedSlices = new 
ArrayList<>();
+    removedFileSlicesByFileGroupId.keySet().forEach(fileGroupId -> {
+      List<FileSlice> removedSlices = 
removedFileSlicesByFileGroupId.get(fileGroupId);
+      List<FileSlice> retainedSlices = 
retainedFileSlicesByFileGroupId.get(fileGroupId);
+      if (retainedSlices == null) {
+        // This is due to replace commit or clustering so we must handle this 
case separately
+        removedFileSlicesWithoutRetainedSlices.add(removedSlices);
+      } else {
+        groupings.add(FileSliceComparisonGroup.builder().removedFileSlices(new 
HashSet<>(removedSlices)).retainedFileSlices(new 
HashSet<>(retainedSlices)).build());
+      }
+    });
+    if (!removedFileSlicesWithoutRetainedSlices.isEmpty()) {
+      // File slices that do not have a retained commit are due to replace 
commit or clustering, so we will compare them against all retained file slices 
created after the oldest removed file slice.
+      String instant = removedFileSlicesWithoutRetainedSlices.stream()
+          // for each file group we get the latest instant time
+          .map(fileSlices -> 
fileSlices.stream().map(FileSlice::getLatestInstantTime).reduce(InstantComparison::maxInstant)
+              .orElseThrow(() -> new HoodieException("File slices should have 
at least one file")))
+          .reduce(InstantComparison::minInstant).orElseThrow(() -> new 
HoodieException("There should be at least one file slice to remove"));
+      // find the file groups created after this instant and get the retained 
file slices for those file groups
+      Set<FileSlice> retainedFileSlicesRequiringComparison = 
retainedFileSlicesByFileGroupId.entrySet().stream()
+          .filter(entry -> entry.getValue().stream().allMatch(fileSlice -> 
compareTimestamps(fileSlice.getBaseInstantTime(), GREATER_THAN, instant)))
+          .flatMap(entry -> entry.getValue().stream())
+          .collect(Collectors.toSet());
+      Set<FileSlice> flattenedRemovedFileSlicesWithoutRetainedSlices = 
removedFileSlicesWithoutRetainedSlices.stream().flatMap(Collection::stream).collect(Collectors.toSet());
+      
groupings.add(FileSliceComparisonGroup.builder().removedFileSlices(flattenedRemovedFileSlicesWithoutRetainedSlices).retainedFileSlices(retainedFileSlicesRequiringComparison).build());
+    }
+    return groupings;
+  }
+
+  @Value
+  @Builder
+  static class FileSliceComparisonGroup {
+    Set<FileSlice> retainedFileSlices;
+    Set<FileSlice> removedFileSlices;
+  }
+
+  /**
+   * Finds all the blob columns and returns a schema limited to these columns.
+   * This is used to read only the blob columns when finding the blob files to 
clean.
+   * @param schema the table's schema
+   * @return an option of the reduced schema containing only the blob columns 
or empty option if no blobs are found
+   */
+  private Option<HoodieSchema> getReducedBlobSchema(HoodieSchema schema) {
+    List<HoodieSchemaField> blobFields = new ArrayList<>();
+
+    // Traverse schema to find blob columns
+    for (HoodieSchemaField field : schema.getFields()) {
+      filterSchemaForBlobFields(field, blobFields);
+    }
+
+    // Create projection schema with only blob fields
+    return blobFields.isEmpty()
+        ? Option.empty()
+        : Option.of(HoodieSchema.createRecord(
+            schema.getName(),
+            schema.getNamespace().orElse(null),
+            schema.getDoc().orElse(null),
+            blobFields));
+  }
+
+  private void filterSchemaForBlobFields(HoodieSchemaField field,
+                                         List<HoodieSchemaField> blobFields) {
+    HoodieSchema fieldSchema = field.schema();
+    HoodieSchema nonNullSchema = fieldSchema.getNonNullType();
+
+    switch (nonNullSchema.getType()) {
+      case BLOB:
+        blobFields.add(HoodieSchemaUtils.createNewSchemaField(field));
+        break;
+      case RECORD:
+        // Recursively traverse nested record fields
+        List<HoodieSchemaField> nestedBlobFields = new ArrayList<>();
+        for (HoodieSchemaField nestedField : nonNullSchema.getFields()) {
+          filterSchemaForBlobFields(nestedField, nestedBlobFields);
+        }
+
+        // If any nested field contains blob, include this record field
+        if (!nestedBlobFields.isEmpty()) {
+          HoodieSchema nestedRecordSchema = HoodieSchema.createRecord(
+              nonNullSchema.getName(),
+              nonNullSchema.getNamespace().orElse(null),
+              nonNullSchema.getDoc().orElse(null),
+              nestedBlobFields);
+
+          HoodieSchema finalSchema = fieldSchema.isNullable()
+              ? HoodieSchema.createNullable(nestedRecordSchema)
+              : nestedRecordSchema;
+
+          blobFields.add(HoodieSchemaUtils.createNewSchemaField(
+              field.name(), finalSchema, field.doc().orElse(null), 
field.defaultVal().orElse(null)));
+        }
+        break;
+      case ARRAY:
+        // Check if array element type contains blob
+        HoodieSchema elementType = nonNullSchema.getElementType();
+        if (elementType.containsBlobType()) {
+          blobFields.add(HoodieSchemaUtils.createNewSchemaField(field));
+        }
+        break;
+      case MAP:
+        // Check if map value type contains blob
+        HoodieSchema valueType = nonNullSchema.getValueType();
+        if (valueType.containsBlobType()) {
+          blobFields.add(HoodieSchemaUtils.createNewSchemaField(field));
+        }
+        break;
+      default:
+        // No blob type, do nothing
+        break;
+    }
+  }
+
+  /**
+   * Finds the blob file paths referenced by a record. The schema is used to 
find all blob columns in the record.
+   * Any blob column that has an external reference with isManaged=true will 
be included in the result.
+   * @param schema the record schema
+   * @param record the record to inspect
+   * @param recordContext the record context to use for retrieving values from 
the record
+   * @return a list of managed blob file paths referenced at this path
+   * @param <R> the record type
+   */
+  @VisibleForTesting
+  <R> List<String> getManagedBlobPaths(HoodieSchema schema, R record, 
RecordContext<R> recordContext) {
+    List<String> managedPaths = new ArrayList<>();
+
+    for (int i = 0; i < schema.getFields().size(); i++) {
+      HoodieSchemaField field = schema.getFields().get(i);
+      HoodieSchema fieldSchema = field.schema().getNonNullType();
+      Object value = recordContext.getValue(record, schema, field.name());
+      managedPaths.addAll(getManagedBlobPathsForField(value, fieldSchema, 
recordContext));
+    }
+    return managedPaths;
+  }
+
+  private <R> List<String> getManagedBlobPathsForField(Object value, 
HoodieSchema fieldSchema, RecordContext<R> recordContext) {
+    if (value == null) {
+      return Collections.emptyList();
+    }
+    List<String> managedPaths = new ArrayList<>();
+    switch (fieldSchema.getType()) {
+      case BLOB:
+        // Process blob field
+        extractManagedPathFromBlob((R) value, fieldSchema, 
recordContext).ifPresent(managedPaths::add);
+        break;
+      case RECORD:
+        managedPaths.addAll(getManagedBlobPaths(fieldSchema, (R) value, 
recordContext));
+        break;
+      case ARRAY:
+        if (value instanceof Iterable) {
+          for (Object element : (Iterable<?>) value) {
+            if (element != null) {
+              managedPaths.addAll(getManagedBlobPathsForField(element, 
fieldSchema.getElementType().getNonNullType(), recordContext));
+            }
+          }
+        }
+        break;
+      case MAP:
+        if (value instanceof Map) {
+          for (Object entry : ((Map<?, ?>) value).values()) {
+            if (entry != null) {
+              managedPaths.addAll(getManagedBlobPathsForField(entry, 
fieldSchema.getValueType().getNonNullType(), recordContext));
+            }
+          }
+        }
+        break;
+      default:
+        // No blob type, skip

Review Comment:
   Right now the schema pruning does not prune the other fields in the nested 
objects within maps/arrays. I'll need to fix that first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to