xiangfu0 commented on code in PR #18549:
URL: https://github.com/apache/pinot/pull/18549#discussion_r3279116035


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java:
##########
@@ -341,41 +359,146 @@ private void moveSegmentsToDeletedDir(String segmentId, 
Long deletedSegmentsRete
 
 
   /**
-   * Retrieves the URI for segment deletion by checking two possible segment 
file variants in deep store.
-   * Looks for the segment file in two formats:
+   * Retrieves the URIs for segment deletion by checking the possible segment 
file variants in deep store.
+   * Looks for segment files in these formats:
    * - Without extension (conventional naming)
    * - With .tar.gz extension (used by minions in 
BaseMultipleSegmentsConversionExecutor)
+   * - Upload-attempt files generated by parallel push protection
    *
    * @param rawTableName name of the table containing the segment
    * @param segmentId name of the segment
-   * @return URI of the existing segment file if found in either format, null 
if segment doesn't exist in either format
-   *         or if there are filesystem access errors
+   * @return URIs of existing segment files, empty if the segment doesn't 
exist or if there are filesystem access errors
    */
-  @Nullable
-  private URI getFileToDeleteURI(String rawTableName, String segmentId) {
+  private List<URI> getFilesToDeleteURIs(String rawTableName, String 
segmentId, PinotFS pinotFS,
+      @Nullable List<URI> uploadAttemptFileURIs) {
+    List<URI> filesToDelete = new ArrayList<>();
     try {
       URI plainFileUri = URIUtils.getUri(_dataDir, rawTableName, 
URIUtils.encode(segmentId));
-      PinotFS pinotFS = PinotFSFactory.create(plainFileUri.getScheme());
 
       // Check for plain segment file first
       if (pinotFS.exists(plainFileUri)) {
-        return plainFileUri;
+        filesToDelete.add(plainFileUri);
       }
 
       URI tarGzFileUri = URIUtils.getUri(_dataDir, rawTableName,
           URIUtils.encode(segmentId + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION));
 
       // Check for .tar.gz segment file
       if (pinotFS.exists(tarGzFileUri)) {
-        return tarGzFileUri;
+        filesToDelete.add(tarGzFileUri);
+      }
+      if (uploadAttemptFileURIs != null) {
+        filesToDelete.addAll(uploadAttemptFileURIs);
+      }
+      if (filesToDelete.isEmpty()) {
+        LOGGER.error("No file found for segment: {} in deep store", segmentId);
       }
-      LOGGER.error("No file found for segment: {} in deep store", segmentId);
-      return null;
     } catch (Exception e) {
       LOGGER.error("Caught exception while trying to find file for segment: {} 
in deep store", segmentId);
+    }
+    return filesToDelete;
+  }
+
+  @Nullable
+  private Set<String> getKnownSegmentNames(String tableNameWithType, 
Set<String> segmentsToDelete) {
+    Set<String> knownSegmentNames = new HashSet<>(segmentsToDelete);
+    try {
+      List<String> activeSegmentNames = 
ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
+      if (activeSegmentNames == null) {
+        LOGGER.warn("Failed to read active segment names for table: {}, 
skipping upload-attempt cleanup",
+            tableNameWithType);
+        return null;
+      }
+      knownSegmentNames.addAll(activeSegmentNames);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to read active segment names for table: {}, skipping 
upload-attempt cleanup",
+          tableNameWithType, e);
       return null;
     }
+    return knownSegmentNames;
   }
+
+  private Map<String, List<URI>> getUploadAttemptFileURIs(String rawTableName, 
Set<String> segmentsToDelete,
+      Set<String> knownSegmentNames, PinotFS pinotFS) {
+    Map<String, List<URI>> uploadAttemptFileURIsMap = new HashMap<>();
+    URI tableURI = URIUtils.getUri(_dataDir, rawTableName);
+    try {
+      if (!pinotFS.exists(tableURI) || !pinotFS.isDirectory(tableURI)) {
+        return uploadAttemptFileURIsMap;
+      }
+      String[] filePaths = pinotFS.listFiles(tableURI, false);
+      if (filePaths == null) {
+        return uploadAttemptFileURIsMap;
+      }
+      for (String filePath : filePaths) {
+        String fileName = URIUtils.getLastPart(filePath);
+        String segmentId = getUploadAttemptSegmentId(fileName, 
segmentsToDelete);
+        if (segmentId == null || isKnownSegmentFile(fileName, segmentId, 
knownSegmentNames, segmentsToDelete)) {
+          continue;
+        }
+        uploadAttemptFileURIsMap.computeIfAbsent(segmentId, key -> new 
ArrayList<>())
+            .add(URIUtils.getUri(_dataDir, rawTableName, fileName));
+      }
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while trying to find upload-attempt files 
for table: {} in deep store",
+          rawTableName, e);
+    }
+    return uploadAttemptFileURIsMap;
+  }
+
+  @Nullable
+  private String getUploadAttemptSegmentId(String fileName, Set<String> 
segmentsToDelete) {
+    String uploadAttemptMarker = 
CommonConstants.Segment.SEGMENT_UPLOAD_ATTEMPT_FILE_MARKER;
+    String decodedFileName = safeDecode(fileName);
+    String matchingSegmentName = null;
+    for (String segmentName : segmentsToDelete) {
+      if (isUploadAttemptFileForSegment(fileName, decodedFileName, 
segmentName, uploadAttemptMarker)
+          && (matchingSegmentName == null || segmentName.length() > 
matchingSegmentName.length())) {
+        matchingSegmentName = segmentName;
+      }

Review Comment:
   Fixed in the latest push by deriving the upload-attempt segment id from the 
marker-delimited filename and using set membership checks instead of scanning 
every segment for every listed file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to