This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 346aa4cc0 [CELEBORN-914][FOLLOWUP] Restore select flush worker index
logic
346aa4cc0 is described below
commit 346aa4cc03164ca24736d523bd93ffffa4205dba
Author: mingji <[email protected]>
AuthorDate: Fri Jun 28 16:57:57 2024 +0800
[CELEBORN-914][FOLLOWUP] Restore select flush worker index logic
### What changes were proposed in this pull request?
Restore select flush worker index logic.
### Why are the changes needed?
For better performance.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes #2591 from FMX/b914-2.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 303a9ff00307f80d199618fb4da9513b70bb7447)
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/service/deploy/worker/storage/PartitionDataWriter.java | 2 ++
.../org/apache/celeborn/service/deploy/worker/storage/Flusher.scala | 5 +++++
2 files changed, 7 insertions(+)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 589832dbe..f40f8ee66 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -143,6 +143,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
} else if (createFileResult._2() != null) {
this.diskFileInfo = createFileResult._3();
this.flusher = createFileResult._2();
+ this.flushWorkerIndex = this.flusher.getWorkerIndex();
File workingDir = createFileResult._4();
this.isMemoryShuffleFile.set(false);
initFileChannelsForDiskFile();
@@ -352,6 +353,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
if (createFileResult._4() != null) {
this.diskFileInfo = createFileResult._3();
this.flusher = createFileResult._2();
+ this.flushWorkerIndex = this.flusher.getWorkerIndex();
isMemoryShuffleFile.set(false);
initFileChannelsForDiskFile();
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 2ac06b829..e5d93fa32 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -99,6 +99,11 @@ abstract private[worker] class Flusher(
ThreadPoolSource.registerSource(s"$this", workers)
}
+ def getWorkerIndex: Int = synchronized {
+ nextWorkerIndex = (nextWorkerIndex + 1) % threadCount
+ nextWorkerIndex
+ }
+
def takeBuffer(): CompositeByteBuf = {
var buffer = bufferQueue.poll()
if (buffer == null) {