This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d8809793f [CELEBORN-1490][CIP-6] Impl worker write process for Flink
Hybrid Shuffle
d8809793f is described below
commit d8809793f3c4842999bd2befa1207fbf4eb774ad
Author: Weijie Guo <[email protected]>
AuthorDate: Wed Sep 25 10:27:55 2024 +0800
[CELEBORN-1490][CIP-6] Impl worker write process for Flink Hybrid Shuffle
### What changes were proposed in this pull request?
Impl worker write process for Flink Hybrid Shuffle.
### Why are the changes needed?
We supports tiered producer write data from flink to worker. In this PR, we
enable the worker to write this kind of data to storage.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
no need.
Closes #2741 from reswqa/cip6-6-pr.
Authored-by: Weijie Guo <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 489 ++++++++++++++++++++-
docs/monitoring.md | 3 +
.../worker/storage/MapPartitionDataWriter.java | 47 +-
.../deploy/worker/storage/PartitionDataWriter.java | 3 +
.../segment/SegmentMapPartitionFileWriter.java | 169 +++++++
.../service/deploy/worker/Controller.scala | 6 +-
.../service/deploy/worker/PushDataHandler.scala | 29 +-
.../service/deploy/worker/WorkerSource.scala | 6 +
.../service/deploy/worker/storage/FlushTask.scala | 2 +
.../deploy/worker/storage/StorageManager.scala | 16 +-
10 files changed, 745 insertions(+), 25 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index d2c462c5b..9b07db6c6 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -7555,7 +7555,7 @@
"h": 9,
"w": 12,
"x": 0,
- "y": 280
+ "y": 10
},
"id": 139,
"options": {
@@ -7647,7 +7647,7 @@
"h": 9,
"w": 12,
"x": 12,
- "y": 280
+ "y": 10
},
"id": 141,
"options": {
@@ -7739,7 +7739,7 @@
"h": 9,
"w": 12,
"x": 0,
- "y": 289
+ "y": 19
},
"id": 142,
"options": {
@@ -7831,7 +7831,7 @@
"h": 9,
"w": 12,
"x": 12,
- "y": 289
+ "y": 19
},
"id": 143,
"options": {
@@ -7923,7 +7923,7 @@
"h": 9,
"w": 12,
"x": 0,
- "y": 298
+ "y": 28
},
"id": 144,
"options": {
@@ -8015,7 +8015,7 @@
"h": 9,
"w": 12,
"x": 12,
- "y": 298
+ "y": 28
},
"id": 145,
"options": {
@@ -8107,7 +8107,7 @@
"h": 9,
"w": 12,
"x": 0,
- "y": 307
+ "y": 37
},
"id": 146,
"options": {
@@ -8199,7 +8199,7 @@
"h": 9,
"w": 12,
"x": 12,
- "y": 307
+ "y": 37
},
"id": 147,
"options": {
@@ -8291,7 +8291,7 @@
"h": 9,
"w": 12,
"x": 0,
- "y": 316
+ "y": 46
},
"id": 148,
"options": {
@@ -8383,7 +8383,7 @@
"h": 9,
"w": 12,
"x": 12,
- "y": 316
+ "y": 46
},
"id": 149,
"options": {
@@ -8475,7 +8475,7 @@
"h": 9,
"w": 12,
"x": 0,
- "y": 325
+ "y": 55
},
"id": 150,
"options": {
@@ -8567,7 +8567,7 @@
"h": 9,
"w": 12,
"x": 12,
- "y": 325
+ "y": 55
},
"id": 151,
"options": {
@@ -8658,7 +8658,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 334
+ "y": 64
},
"id": 153,
"options": {
@@ -8749,7 +8749,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 334
+ "y": 64
},
"id": 154,
"options": {
@@ -8840,7 +8840,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 342
+ "y": 72
},
"id": 155,
"options": {
@@ -8870,6 +8870,465 @@
],
"title": "metrics_RegionStartFailCount_Count",
"type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 72
+ },
+ "id": 200,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr": "metrics_SegmentStartFailCount_Count",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_SegmentStartFailCount_Count",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 0,
+ "y": 80
+ },
+ "id": 198,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr": "metrics_ReplicaSegmentStartTime_Mean",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_ReplicaSegmentStartTime_Mean",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 12,
+ "y": 80
+ },
+ "id": 199,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr": "metrics_ReplicaSegmentStartTime_Max",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_ReplicaSegmentStartTime_Max",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 0,
+ "y": 89
+ },
+ "id": 196,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr": "metrics_PrimarySegmentStartTime_Mean",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_PrimarySegmentStartTime_Mean",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 12,
+ "y": 89
+ },
+ "id": 197,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr": "metrics_PrimarySegmentStartTime_Max",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_PrimarySegmentStartTime_Max",
+ "type": "timeseries"
}
],
"title": "MapPartitionRelatives",
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 8a1629454..5568b8f76 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -179,6 +179,8 @@ These metrics are exposed by Celeborn worker.
| FetchChunkFailCount | The count of fetching
chunk failed in current worker.
|
| PrimaryPushDataTime | The time for a worker to
handle a pushData RPC sent from a celeborn client.
|
| ReplicaPushDataTime | The time for a worker to
handle a pushData RPC sent from a celeborn worker by replicating.
|
+ | PrimarySegmentStartTime | The time for a worker to
handle a segmentStart RPC sent from a celeborn client.
|
+ | ReplicaSegmentStartTime | The time for a worker to
handle a segmentStart RPC sent from a celeborn worker by replicating.
|
| WriteDataHardSplitCount | The count of writing
PushData or PushMergedData to HARD_SPLIT partition in current worker.
|
| WriteDataSuccessCount | The count of writing
PushData or PushMergedData succeed in current worker.
|
| WriteDataFailCount | The count of writing
PushData or PushMergedData failed in current worker.
|
@@ -191,6 +193,7 @@ These metrics are exposed by Celeborn worker.
| PushDataHandshakeFailCount | The count of
PushDataHandshake failed in current worker.
|
| RegionStartFailCount | The count of RegionStart
failed in current worker.
|
| RegionFinishFailCount | The count of RegionFinish
failed in current worker.
|
+ | SegmentStartFailCount | The count of SegmentStart
failed in current worker.
|
| PrimaryPushDataHandshakeTime | PrimaryPushDataHandshake
means handle PushData of primary partition location.
|
| ReplicaPushDataHandshakeTime | ReplicaPushDataHandshake
means handle PushData of replica partition location.
|
| PrimaryRegionStartTime | PrimaryRegionStart means
handle RegionStart of primary partition location.
|
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
index cb671b05f..0a4cb1379 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
@@ -40,7 +40,7 @@ import org.apache.celeborn.common.util.Utils;
/*
* map partition file writer, it will create index for each partition
*/
-public final class MapPartitionDataWriter extends PartitionDataWriter {
+public class MapPartitionDataWriter extends PartitionDataWriter {
private static final Logger logger =
LoggerFactory.getLogger(MapPartitionDataWriter.class);
private int numSubpartitions;
@@ -115,7 +115,7 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
long length = data.readableBytes();
totalBytes += length;
numSubpartitionBytes[partitionId] += length;
- super.write(data);
+ writeDataToFile(data);
isRegionFinished = false;
}
@@ -186,6 +186,10 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
}
public void regionFinish() throws IOException {
+ // TODO: When region is finished, flush the data to be ready for the
reading, in scenarios that
+ // the upstream task writes and the downstream task reads simultaneously,
such as flink hybrid
+ // shuffle
+
logger.debug("FileWriter:{} regionFinish", diskFileInfo.getFilePath());
if (regionStartingOffset == totalBytes) {
return;
@@ -232,6 +236,10 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
isRegionFinished = true;
}
+ protected void writeDataToFile(ByteBuf data) throws IOException {
+ super.write(data);
+ }
+
private synchronized void destroyIndex() {
try {
if (indexChannel != null) {
@@ -246,7 +254,10 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
}
@SuppressWarnings("ByteBufferBackingArray")
- private void flushIndex() throws IOException {
+ protected void flushIndex() throws IOException {
+ // TODO: force flush the index file channel in scenarios which the
upstream task writes and
+ // downstream task reads simultaneously, such as flink hybrid shuffle
+
if (indexBuffer != null) {
logger.debug("flushIndex start:{}", diskFileInfo.getIndexPath());
long startTime = System.currentTimeMillis();
@@ -275,7 +286,11 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
}
}
- private ByteBuffer allocateIndexBuffer(int numSubpartitions) {
+ protected MapFileMeta getFileMeta() {
+ return (MapFileMeta) diskFileInfo.getFileMeta();
+ }
+
+ protected ByteBuffer allocateIndexBuffer(int numSubpartitions) {
// the returned buffer size is no smaller than 4096 bytes to improve disk
IO performance
int minBufferSize = 4096;
@@ -313,4 +328,28 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
}
return false;
}
+
+ public int getCurrentSubpartition() {
+ return currentSubpartition;
+ }
+
+ public long[] getNumSubpartitionBytes() {
+ return numSubpartitionBytes;
+ }
+
+ public long getTotalBytes() {
+ return totalBytes;
+ }
+
+ public void setCurrentSubpartition(int currentSubpartition) {
+ this.currentSubpartition = currentSubpartition;
+ }
+
+ public void setTotalBytes(long totalBytes) {
+ this.totalBytes = totalBytes;
+ }
+
+ public void setRegionFinished(boolean regionFinished) {
+ isRegionFinished = regionFinished;
+ }
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 5172193f6..5bb506270 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -214,6 +214,9 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
@VisibleForTesting
public void flush(boolean finalFlush, boolean fromEvict) throws IOException {
+ // TODO: force flush buffer in scenarios where the upstream task writes
and the downstream task
+ // reads simultaneously, such as flink hybrid shuffle.
+
// flushBuffer == null here means this writer is already closed
if (flushBuffer != null) {
int numBytes = flushBuffer.readableBytes();
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionFileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionFileWriter.java
new file mode 100644
index 000000000..634afc676
--- /dev/null
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionFileWriter.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage.segment;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.meta.MapFileMeta;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor;
+import
org.apache.celeborn.service.deploy.worker.storage.MapPartitionDataWriter;
+import
org.apache.celeborn.service.deploy.worker.storage.PartitionDataWriterContext;
+import org.apache.celeborn.service.deploy.worker.storage.StorageManager;
+
+/**
+ * Write the shuffle file in map partition format with segment granularity
visibility. This means
+ * that the shuffle file should be written intermediate, allowing it to be
read in segments rather
+ * than waiting for the entire shuffle file to be completed.
+ */
+public class SegmentMapPartitionFileWriter extends MapPartitionDataWriter {
+
+ public static final Logger logger =
LoggerFactory.getLogger(SegmentMapPartitionFileWriter.class);
+
+ /**
+ * subPartitionId -> started (boolean). There are 3 cases: 1. If the
subPartition key not exist,
+ * it indicates that the subPartition has not sent the {@link
+ * org.apache.celeborn.common.protocol.PbSegmentStart}. Therefore, shuffle
data should not be
+ * written in this case. 2. If the subPartition key exists and the started
value is true, it means
+ * that the subPartition has initiated the segment. In this situation, the
next buffer for this
+ * subPartition will be the first buffer of the current segment. The
information about the first
+ * buffer will be recorded in {@code
MapFileMeta#subPartitionSegmentIndexes}. 3. If the
+ * subPartition key exists and the started value is false, it means that the
subPartition has
+ * initiated the segment, but the next buffer for this subPartition is not
the first buffer of the
+ * current segment.
+ */
+ private final Map<Integer, Boolean> subPartitionHasStartSegment;
+
+ // current buffer index per subPartition
+ private int[] subPartitionBufferIndex;
+
+ public SegmentMapPartitionFileWriter(
+ StorageManager storageManager,
+ AbstractSource workerSource,
+ CelebornConf conf,
+ DeviceMonitor deviceMonitor,
+ PartitionDataWriterContext writerContext)
+ throws IOException {
+ super(storageManager, workerSource, conf, deviceMonitor, writerContext);
+ this.subPartitionHasStartSegment = new HashMap<>();
+ }
+
+ @Override
+ public void pushDataHandShake(int numSubpartitions, int bufferSize) {
+ super.pushDataHandShake(numSubpartitions, bufferSize);
+ subPartitionBufferIndex = new int[numSubpartitions];
+ Arrays.fill(subPartitionBufferIndex, 0);
+ getFileMeta().setIsWriterClosed(false);
+ getFileMeta().setSegmentGranularityVisible(true);
+ }
+
+ @Override
+ public void write(ByteBuf data) throws IOException {
+ data.markReaderIndex();
+ int subPartitionId = data.readInt();
+ int attemptId = data.readInt();
+ int batchId = data.readInt();
+ int size = data.readInt();
+
+ if (!subPartitionHasStartSegment.containsKey(subPartitionId)) {
+ throw new IllegalStateException(
+ String.format(
+ "This partition may not start a segment: subPartitionId:%s
attemptId:%s batchId:%s size:%s",
+ subPartitionId, attemptId, batchId, size));
+ }
+ int currentSubpartition = getCurrentSubpartition();
+ // the subPartitionId must be ordered in a region
+ if (subPartitionId < currentSubpartition) {
+ throw new IOException(
+ String.format(
+ "Must writing data in reduce partition index order, but now
supPartitionId is %s and the previous supPartitionId is %s, attemptId is %s,
batchId is %s, size is %s",
+ subPartitionId, currentSubpartition, attemptId, batchId, size));
+ }
+
+ data.resetReaderIndex();
+ logger.debug(
+ "mappartition filename:{} write partition:{} currentSubPartition:{}
attemptId:{} batchId:{} size:{}",
+ diskFileInfo.getFilePath(),
+ subPartitionId,
+ currentSubpartition,
+ attemptId,
+ batchId,
+ size);
+
+ if (subPartitionId > currentSubpartition) {
+ setCurrentSubpartition(subPartitionId);
+ }
+ long length = data.readableBytes();
+ setTotalBytes(getTotalBytes() + length);
+ getNumSubpartitionBytes()[subPartitionId] += length;
+ if (flushBuffer == null) {
+ takeBuffer();
+ }
+ writeDataToFile(data);
+ setRegionFinished(false);
+
+ MapFileMeta mapFileMeta = getFileMeta();
+ // Only when the sub partition has stated the segment, the buffer
index(this is the first buffer
+ // of this segment) will be added.
+ if (subPartitionHasStartSegment.get(subPartitionId)) {
+ mapFileMeta.addSegmentIdAndFirstBufferIndex(
+ subPartitionId,
+ subPartitionBufferIndex[subPartitionId],
+ mapFileMeta.getPartitionWritingSegmentId(subPartitionId));
+ logger.debug(
+ "Add a segment id, partitionId:{}, bufferIndex:{}, segmentId: {},
filename:{}, attemptId:{}.",
+ subPartitionId,
+ subPartitionBufferIndex[subPartitionId],
+ mapFileMeta.getPartitionWritingSegmentId(subPartitionId),
+ diskFileInfo.getFilePath(),
+ attemptId);
+ // After the first buffer index of the segment is added, the following
buffers in the segment
+ // should not be added anymore, so the subPartitionHasStartSegment is
updated to false.
+ subPartitionHasStartSegment.put(subPartitionId, false);
+ }
+ subPartitionBufferIndex[subPartitionId]++;
+ }
+
+ @Override
+ public synchronized long close() throws IOException {
+ subPartitionHasStartSegment.clear();
+ long fileLength = super.close();
+ logger.debug("Close {} for file {}", this, getFile());
+ getFileMeta().setIsWriterClosed(true);
+ return fileLength;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("SegmentMapPartitionFileWriter{filePath=%s}",
diskFileInfo.getFilePath());
+ }
+
+ public void segmentStart(int partitionId, int segmentId) {
+ getFileMeta().addPartitionSegmentId(partitionId, segmentId);
+ subPartitionHasStartSegment.put(partitionId, true);
+ }
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 0a141ede8..0b0c53c1f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -190,7 +190,8 @@ private[deploy] class Controller(
partitionType,
rangeReadFilter,
userIdentifier,
- partitionSplitEnabled)
+ partitionSplitEnabled,
+ isSegmentGranularityVisible)
primaryLocs.add(new WorkingPartition(location, writer))
} else {
primaryLocs.add(location)
@@ -230,7 +231,8 @@ private[deploy] class Controller(
partitionType,
rangeReadFilter,
userIdentifier,
- partitionSplitEnabled)
+ partitionSplitEnabled,
+ isSegmentGranularityVisible)
replicaLocs.add(new WorkingPartition(location, writer))
} else {
replicaLocs.add(location)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index c98acaf10..9fe99c4fd 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -38,13 +38,14 @@ import
org.apache.celeborn.common.network.client.{RpcResponseCallback, Transport
import org.apache.celeborn.common.network.protocol.{Message, PushData,
PushDataHandShake, PushMergedData, RegionFinish, RegionStart, RequestMessage,
RpcFailure, RpcRequest, RpcResponse, TransportMessage}
import org.apache.celeborn.common.network.protocol.Message.Type
import org.apache.celeborn.common.network.server.BaseMessageHandler
-import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType, PbPushDataHandShake, PbRegionFinish,
PbRegionStart}
+import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType, PbPushDataHandShake, PbRegionFinish,
PbRegionStart, PbSegmentStart}
import org.apache.celeborn.common.protocol.PbPartitionLocation.Mode
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.{DiskUtils, ExceptionUtils, Utils}
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher,
LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, S3Flusher,
StorageManager}
+import
org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter
class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler with Logging {
@@ -916,6 +917,16 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
rf.getShuffleKey,
rf.getPartitionUniqueId,
false)
+ case ss: PbSegmentStart =>
+ (
+ msg,
+ null,
+ false,
+ Type.SEGMENT_START,
+ ss.getMode,
+ ss.getShuffleKey,
+ ss.getPartitionUniqueId,
+ false)
}
} catch {
case _: Exception =>
@@ -980,6 +991,8 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
(WorkerSource.PRIMARY_REGION_START_TIME,
WorkerSource.REPLICA_REGION_START_TIME)
case Type.REGION_FINISH =>
(WorkerSource.PRIMARY_REGION_FINISH_TIME,
WorkerSource.REPLICA_REGION_FINISH_TIME)
+ case Type.SEGMENT_START =>
+ (WorkerSource.PRIMARY_SEGMENT_START_TIME,
WorkerSource.REPLICA_SEGMENT_START_TIME)
case _ => throw new IllegalArgumentException(s"Not support
$messageType yet")
}
@@ -1064,6 +1077,14 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
isBroadcast)
case Type.REGION_FINISH =>
fileWriter.asInstanceOf[MapPartitionDataWriter].regionFinish()
+ case Type.SEGMENT_START =>
+ val (subPartitionId, segmentId) =
+ (
+ pbMsg.asInstanceOf[PbSegmentStart].getSubPartitionId,
+ pbMsg.asInstanceOf[PbSegmentStart].getSegmentId)
+ fileWriter.asInstanceOf[SegmentMapPartitionFileWriter].segmentStart(
+ subPartitionId,
+ segmentId)
case _ => throw new IllegalArgumentException(s"Not support
$messageType yet")
}
// for primary , send data to replica
@@ -1123,6 +1144,9 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
case Type.REGION_FINISH =>
workerSource.incCounter(WorkerSource.REGION_FINISH_FAIL_COUNT)
callback.onFailure(new
CelebornIOException(StatusCode.REGION_FINISH_FAIL_REPLICA, e))
+ case Type.SEGMENT_START =>
+ workerSource.incCounter(WorkerSource.SEGMENT_START_FAIL_COUNT)
+ callback.onFailure(new
CelebornIOException(StatusCode.SEGMENT_START_FAIL_REPLICA, e))
case _ =>
workerSource.incCounter(WorkerSource.REPLICATE_DATA_FAIL_COUNT)
if (e.isInstanceOf[CelebornIOException]) {
@@ -1177,6 +1201,9 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
case Type.REGION_FINISH => (
StatusCode.REGION_FINISH_FAIL_PRIMARY,
StatusCode.REGION_FINISH_FAIL_REPLICA)
+ case Type.SEGMENT_START => (
+ StatusCode.SEGMENT_START_FAIL_PRIMARY,
+ StatusCode.SEGMENT_START_FAIL_REPLICA)
case _ => throw new IllegalArgumentException(s"Not support
$messageType yet")
}
callback.onFailure(new CelebornIOException(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 5358fab02..b2777c564 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -56,6 +56,7 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, MetricsSyste
addCounter(REGION_START_FAIL_COUNT)
addCounter(REGION_FINISH_FAIL_COUNT)
addCounter(ACTIVE_CONNECTION_COUNT)
+ addCounter(SEGMENT_START_FAIL_COUNT)
addCounter(SLOTS_ALLOCATED)
@@ -72,6 +73,8 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, MetricsSyste
addTimer(REPLICA_REGION_START_TIME)
addTimer(PRIMARY_REGION_FINISH_TIME)
addTimer(REPLICA_REGION_FINISH_TIME)
+ addTimer(PRIMARY_SEGMENT_START_TIME)
+ addTimer(REPLICA_SEGMENT_START_TIME)
addTimer(FETCH_CHUNK_TIME)
addTimer(OPEN_STREAM_TIME)
@@ -151,12 +154,15 @@ object WorkerSource {
val PUSH_DATA_HANDSHAKE_FAIL_COUNT = "PushDataHandshakeFailCount"
val REGION_START_FAIL_COUNT = "RegionStartFailCount"
val REGION_FINISH_FAIL_COUNT = "RegionFinishFailCount"
+ val SEGMENT_START_FAIL_COUNT = "SegmentStartFailCount"
val PRIMARY_PUSH_DATA_HANDSHAKE_TIME = "PrimaryPushDataHandshakeTime"
val REPLICA_PUSH_DATA_HANDSHAKE_TIME = "ReplicaPushDataHandshakeTime"
val PRIMARY_REGION_START_TIME = "PrimaryRegionStartTime"
val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime"
val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime"
val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime"
+ val PRIMARY_SEGMENT_START_TIME = "PrimarySegmentStartTime"
+ val REPLICA_SEGMENT_START_TIME = "ReplicaSegmentStartTime"
// pause push data
val PAUSE_PUSH_DATA_TIME = "PausePushDataTime"
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index df0d63be3..0c4fa4105 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -44,6 +44,8 @@ private[worker] class LocalFlushTask(
fileChannel.write(buffer)
}
}
+
+ // TODO: force flush file channel in scenarios where the upstream task
writes and the downstream task reads simultaneously, such as flink hybrid
shuffle.
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index c7c8f8adf..00e0a2887 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -49,6 +49,7 @@ import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager
import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
import org.apache.celeborn.service.deploy.worker.shuffledb.{DB, DBBackend,
DBProvider}
import
org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
+import
org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter
final private[worker] class StorageManager(conf: CelebornConf, workerSource:
AbstractSource)
extends ShuffleRecoverHelper with DeviceObserver with Logging with
MemoryPressureListener {
@@ -407,7 +408,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
partitionType,
rangeReadFilter,
userIdentifier,
- true)
+ true,
+ isSegmentGranularityVisible = false)
}
@throws[IOException]
@@ -420,7 +422,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
partitionType: PartitionType,
rangeReadFilter: Boolean,
userIdentifier: UserIdentifier,
- partitionSplitEnabled: Boolean): PartitionDataWriter = {
+ partitionSplitEnabled: Boolean,
+ isSegmentGranularityVisible: Boolean): PartitionDataWriter = {
if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage) {
throw new IOException("No available working dirs!")
}
@@ -438,7 +441,14 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val writer =
try {
partitionType match {
- case PartitionType.MAP => new MapPartitionDataWriter(
+ case PartitionType.MAP =>
+ if (isSegmentGranularityVisible) new SegmentMapPartitionFileWriter(
+ this,
+ workerSource,
+ conf,
+ deviceMonitor,
+ partitionDataWriterContext)
+ else new MapPartitionDataWriter(
this,
workerSource,
conf,