This is an automated email from the ASF dual-hosted git repository.

agonzalez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a13a96d  Avoid materializing list of segment files when finding a 
partition file during shuffle (#11903)
a13a96d is described below

commit a13a96d5e090e3fc73df3419a19907112efa4fda
Author: Agustin Gonzalez <[email protected]>
AuthorDate: Thu Nov 11 10:51:52 2021 -0700

    Avoid materializing list of segment files when finding a partition file 
during shuffle (#11903)
    
    * Avoid materializing list of segment files (it can cause OOM/memory 
pressure) as well as looping over the files.
    
    * Validate subTaskId
---
 .../worker/shuffle/LocalIntermediaryDataManager.java        | 13 ++++---------
 1 file changed, 4 insertions(+), 9 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
index 4c56b00..045e8df 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
@@ -373,24 +373,19 @@ public class LocalIntermediaryDataManager implements 
IntermediaryDataManager
   public Optional<ByteSource> findPartitionFile(String supervisorTaskId, 
String subTaskId, Interval interval, int bucketId)
   {
     IdUtils.validateId("supervisorTaskId", supervisorTaskId);
+    IdUtils.validateId("subTaskId", subTaskId);
     for (StorageLocation location : shuffleDataLocations) {
       final File partitionDir = new File(location.getPath(), 
getPartitionDirPath(supervisorTaskId, interval, bucketId));
       if (partitionDir.exists()) {
         supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow());
-        final File[] segmentFiles = partitionDir.listFiles();
-        if (segmentFiles == null) {
-          return Optional.empty();
+        final File segmentFile = new File(partitionDir, subTaskId);
+        if (segmentFile.exists()) {
+          return Optional.of(Files.asByteSource(segmentFile));
         } else {
-          for (File segmentFile : segmentFiles) {
-            if (segmentFile.getName().equals(subTaskId)) {
-              return Optional.of(Files.asByteSource(segmentFile));
-            }
-          }
           return Optional.empty();
         }
       }
     }
-
     return Optional.empty();
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to