This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 97991a34 [CELEBORN-126] Fileinfo adds member bufferSize (#1068)
97991a34 is described below
commit 97991a34043cff6e5c64218d27c5c0820479b0ea
Author: zhongqiangczq <[email protected]>
AuthorDate: Tue Dec 13 16:36:26 2022 +0800
[CELEBORN-126] Fileinfo adds member bufferSize (#1068)
---
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java | 5 +++++
.../service/deploy/worker/storage/MapPartitionFileWriter.java | 3 ++-
.../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala | 3 ++-
3 files changed, 9 insertions(+), 2 deletions(-)
diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index 6a815e3e..35ddebfa 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -37,6 +37,7 @@ public class FileInfo {
private final List<Long> chunkOffsets;
private final UserIdentifier userIdentifier;
private final PartitionType partitionType;
+ private int bufferSize;
public FileInfo(String filePath, List<Long> chunkOffsets, UserIdentifier
userIdentifier) {
this(filePath, chunkOffsets, userIdentifier, PartitionType.REDUCE);
@@ -164,4 +165,8 @@ public class FileInfo {
+ partitionType
+ '}';
}
+
+ public void setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
index 22a2410e..7d70f4be 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
@@ -84,8 +84,9 @@ public final class MapPartitionFileWriter extends FileWriter {
return 0;
}
- public void pushDataHandShake(int numReducePartitions) {
+ public void pushDataHandShake(int numReducePartitions, int bufferSize) {
this.numReducePartitions = numReducePartitions;
+ fileInfo.setBufferSize(bufferSize);
}
public void regionStart(int currentDataRegionIndex, boolean
isBroadcastRegion) {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 020e2eec..b59eafd2 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -709,7 +709,8 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
messageType match {
case Type.PUSH_DATA_HAND_SHAKE => {
fileWriter.asInstanceOf[MapPartitionFileWriter].pushDataHandShake(
- message.asInstanceOf[PushDataHandShake].numPartitions)
+ message.asInstanceOf[PushDataHandShake].numPartitions,
+ message.asInstanceOf[PushDataHandShake].bufferSize)
}
case Type.REGION_START => {
fileWriter.asInstanceOf[MapPartitionFileWriter].regionStart(