This is an automated email from the ASF dual-hosted git repository.
agonzalez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a13a96d Avoid materializing list of segment files when finding a
partition file during shuffle (#11903)
a13a96d is described below
commit a13a96d5e090e3fc73df3419a19907112efa4fda
Author: Agustin Gonzalez <[email protected]>
AuthorDate: Thu Nov 11 10:51:52 2021 -0700
Avoid materializing list of segment files when finding a partition file
during shuffle (#11903)
* Avoid materializing list of segment files (it can cause OOM/memory
pressure) as well as looping over the files.
* Validate subTaskId
---
.../worker/shuffle/LocalIntermediaryDataManager.java | 13 ++++---------
1 file changed, 4 insertions(+), 9 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
index 4c56b00..045e8df 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
@@ -373,24 +373,19 @@ public class LocalIntermediaryDataManager implements
IntermediaryDataManager
public Optional<ByteSource> findPartitionFile(String supervisorTaskId,
String subTaskId, Interval interval, int bucketId)
{
IdUtils.validateId("supervisorTaskId", supervisorTaskId);
+ IdUtils.validateId("subTaskId", subTaskId);
for (StorageLocation location : shuffleDataLocations) {
final File partitionDir = new File(location.getPath(),
getPartitionDirPath(supervisorTaskId, interval, bucketId));
if (partitionDir.exists()) {
supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow());
- final File[] segmentFiles = partitionDir.listFiles();
- if (segmentFiles == null) {
- return Optional.empty();
+ final File segmentFile = new File(partitionDir, subTaskId);
+ if (segmentFile.exists()) {
+ return Optional.of(Files.asByteSource(segmentFile));
} else {
- for (File segmentFile : segmentFiles) {
- if (segmentFile.getName().equals(subTaskId)) {
- return Optional.of(Files.asByteSource(segmentFile));
- }
- }
return Optional.empty();
}
}
}
-
return Optional.empty();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]