This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 654875924 [CELEBORN-1414] PartitionFilesSorter resolve DiskFileInfo
without sorting lock
654875924 is described below
commit 6548759243813b878f2fccccf8cc3fd822019a59
Author: SteNicholas <[email protected]>
AuthorDate: Wed May 15 17:16:30 2024 +0800
[CELEBORN-1414] PartitionFilesSorter resolve DiskFileInfo without sorting
lock
### What changes were proposed in this pull request?
`PartitionFilesSorter` calls `resolve` of `DiskFileInfo` without `sorting`
lock to reduce the lock scope for performance improvement of
`getSortedFileInfo`.
### Why are the changes needed?
`PartitionFilesSorter#resolve` is thread safe. Therefore,
`PartitionFilesSorter` invokes `resolve` with `sorting` lock at present, which
does not need to lock the resolving of `DiskFileInfo`. `PartitionFilesSorter`
could resolve `DiskFileInfo` without `sorting` lock to improve performance of
`getSortedFileInfo`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes #2498 from SteNicholas/CELEBORN-1414.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../worker/storage/PartitionFilesSorter.java | 47 ++++++++++------------
1 file changed, 22 insertions(+), 25 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 8eaa279f6..0025e6482 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -206,18 +206,11 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
String sortedFilePath = Utils.getSortedFilePath(fileInfo.getFilePath());
String indexFilePath = Utils.getIndexFilePath(fileInfo.getFilePath());
+ boolean fileSorting = true;
synchronized (sorting) {
if (sorted.contains(fileId)) {
- return resolve(
- shuffleKey,
- fileId,
- userIdentifier,
- sortedFilePath,
- indexFilePath,
- startMapIndex,
- endMapIndex);
- }
- if (!sorting.contains(fileId)) {
+ fileSorting = false;
+ } else if (!sorting.contains(fileId)) {
try {
FileSorter fileSorter = new FileSorter(fileInfo, fileId, shuffleKey);
sorting.add(fileId);
@@ -235,25 +228,29 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
}
}
- long sortStartTime = System.currentTimeMillis();
- while (!sorted.contains(fileId)) {
- if (sorting.contains(fileId)) {
- try {
- Thread.sleep(50);
- if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
- logger.error("Sorting file {} timeout after {}ms", fileId,
sortTimeout);
+ if (fileSorting) {
+ long sortStartTime = System.currentTimeMillis();
+ while (!sorted.contains(fileId)) {
+ if (sorting.contains(fileId)) {
+ try {
+ Thread.sleep(50);
+ if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
+ logger.error("Sorting file {} timeout after {}ms", fileId,
sortTimeout);
+ throw new IOException(
+ "Sort file " + fileInfo.getFilePath() + " timeout after " +
sortTimeout);
+ }
+ } catch (InterruptedException e) {
+ logger.error(
+ "Sorter scheduler thread is interrupted means worker is
shutting down.", e);
throw new IOException(
- "Sort file " + fileInfo.getFilePath() + " timeout after " +
sortTimeout);
+ "Sorter scheduler thread is interrupted means worker is
shutting down.", e);
}
- } catch (InterruptedException e) {
- logger.error("Sorter scheduler thread is interrupted means worker is
shutting down.", e);
+ } else {
+ logger.debug(
+ "Sorting shuffle file for {} {} failed.", shuffleKey,
fileInfo.getFilePath());
throw new IOException(
- "Sorter scheduler thread is interrupted means worker is shutting
down.", e);
+ "Sorting shuffle file for " + shuffleKey + " " +
fileInfo.getFilePath() + " failed.");
}
- } else {
- logger.debug("Sorting shuffle file for {} {} failed.", shuffleKey,
fileInfo.getFilePath());
- throw new IOException(
- "Sorting shuffle file for " + shuffleKey + " " +
fileInfo.getFilePath() + " failed.");
}
}