This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 97991a34 [CELEBORN-126] Fileinfo adds member bufferSize (#1068)
97991a34 is described below

commit 97991a34043cff6e5c64218d27c5c0820479b0ea
Author: zhongqiangczq <[email protected]>
AuthorDate: Tue Dec 13 16:36:26 2022 +0800

    [CELEBORN-126] Fileinfo adds member bufferSize (#1068)
---
 common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java   | 5 +++++
 .../service/deploy/worker/storage/MapPartitionFileWriter.java        | 3 ++-
 .../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala  | 3 ++-
 3 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index 6a815e3e..35ddebfa 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -37,6 +37,7 @@ public class FileInfo {
   private final List<Long> chunkOffsets;
   private final UserIdentifier userIdentifier;
   private final PartitionType partitionType;
+  private int bufferSize;
 
   public FileInfo(String filePath, List<Long> chunkOffsets, UserIdentifier 
userIdentifier) {
     this(filePath, chunkOffsets, userIdentifier, PartitionType.REDUCE);
@@ -164,4 +165,8 @@ public class FileInfo {
         + partitionType
         + '}';
   }
+
+  public void setBufferSize(int bufferSize) {
+    this.bufferSize = bufferSize;
+  }
 }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
index 22a2410e..7d70f4be 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
@@ -84,8 +84,9 @@ public final class MapPartitionFileWriter extends FileWriter {
     return 0;
   }
 
-  public void pushDataHandShake(int numReducePartitions) {
+  public void pushDataHandShake(int numReducePartitions, int bufferSize) {
     this.numReducePartitions = numReducePartitions;
+    fileInfo.setBufferSize(bufferSize);
   }
 
   public void regionStart(int currentDataRegionIndex, boolean 
isBroadcastRegion) {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 020e2eec..b59eafd2 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -709,7 +709,8 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       messageType match {
         case Type.PUSH_DATA_HAND_SHAKE => {
           fileWriter.asInstanceOf[MapPartitionFileWriter].pushDataHandShake(
-            message.asInstanceOf[PushDataHandShake].numPartitions)
+            message.asInstanceOf[PushDataHandShake].numPartitions,
+            message.asInstanceOf[PushDataHandShake].bufferSize)
         }
         case Type.REGION_START => {
           fileWriter.asInstanceOf[MapPartitionFileWriter].regionStart(

Reply via email to