This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 303a9ff00 [CELEBORN-914][FOLLOWUP] Restore select flush worker index 
logic
303a9ff00 is described below

commit 303a9ff00307f80d199618fb4da9513b70bb7447
Author: mingji <[email protected]>
AuthorDate: Fri Jun 28 16:57:57 2024 +0800

    [CELEBORN-914][FOLLOWUP] Restore select flush worker index logic
    
    ### What changes were proposed in this pull request?
    Restore select flush worker index logic.
    
    ### Why are the changes needed?
    For better performance.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA.
    
    Closes #2591 from FMX/b914-2.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/service/deploy/worker/storage/PartitionDataWriter.java  | 2 ++
 .../org/apache/celeborn/service/deploy/worker/storage/Flusher.scala  | 5 +++++
 2 files changed, 7 insertions(+)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 589832dbe..f40f8ee66 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -143,6 +143,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     } else if (createFileResult._2() != null) {
       this.diskFileInfo = createFileResult._3();
       this.flusher = createFileResult._2();
+      this.flushWorkerIndex = this.flusher.getWorkerIndex();
       File workingDir = createFileResult._4();
       this.isMemoryShuffleFile.set(false);
       initFileChannelsForDiskFile();
@@ -352,6 +353,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     if (createFileResult._4() != null) {
       this.diskFileInfo = createFileResult._3();
       this.flusher = createFileResult._2();
+      this.flushWorkerIndex = this.flusher.getWorkerIndex();
 
       isMemoryShuffleFile.set(false);
       initFileChannelsForDiskFile();
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 2ac06b829..e5d93fa32 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -99,6 +99,11 @@ abstract private[worker] class Flusher(
     ThreadPoolSource.registerSource(s"$this", workers)
   }
 
+  def getWorkerIndex: Int = synchronized {
+    nextWorkerIndex = (nextWorkerIndex + 1) % threadCount
+    nextWorkerIndex
+  }
+
   def takeBuffer(): CompositeByteBuf = {
     var buffer = bufferQueue.poll()
     if (buffer == null) {

Reply via email to