This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new d8809793f [CELEBORN-1490][CIP-6] Impl worker write process for Flink 
Hybrid Shuffle
d8809793f is described below

commit d8809793f3c4842999bd2befa1207fbf4eb774ad
Author: Weijie Guo <[email protected]>
AuthorDate: Wed Sep 25 10:27:55 2024 +0800

    [CELEBORN-1490][CIP-6] Impl worker write process for Flink Hybrid Shuffle
    
    ### What changes were proposed in this pull request?
    
    Impl worker write process for Flink Hybrid Shuffle.
    
    ### Why are the changes needed?
    
    We supports tiered producer write data from flink to worker. In this PR, we 
enable the worker to write this kind of data to storage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    no need.
    
    Closes #2741 from reswqa/cip6-6-pr.
    
    Authored-by: Weijie Guo <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 assets/grafana/celeborn-dashboard.json             | 489 ++++++++++++++++++++-
 docs/monitoring.md                                 |   3 +
 .../worker/storage/MapPartitionDataWriter.java     |  47 +-
 .../deploy/worker/storage/PartitionDataWriter.java |   3 +
 .../segment/SegmentMapPartitionFileWriter.java     | 169 +++++++
 .../service/deploy/worker/Controller.scala         |   6 +-
 .../service/deploy/worker/PushDataHandler.scala    |  29 +-
 .../service/deploy/worker/WorkerSource.scala       |   6 +
 .../service/deploy/worker/storage/FlushTask.scala  |   2 +
 .../deploy/worker/storage/StorageManager.scala     |  16 +-
 10 files changed, 745 insertions(+), 25 deletions(-)

diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index d2c462c5b..9b07db6c6 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -7555,7 +7555,7 @@
             "h": 9,
             "w": 12,
             "x": 0,
-            "y": 280
+            "y": 10
           },
           "id": 139,
           "options": {
@@ -7647,7 +7647,7 @@
             "h": 9,
             "w": 12,
             "x": 12,
-            "y": 280
+            "y": 10
           },
           "id": 141,
           "options": {
@@ -7739,7 +7739,7 @@
             "h": 9,
             "w": 12,
             "x": 0,
-            "y": 289
+            "y": 19
           },
           "id": 142,
           "options": {
@@ -7831,7 +7831,7 @@
             "h": 9,
             "w": 12,
             "x": 12,
-            "y": 289
+            "y": 19
           },
           "id": 143,
           "options": {
@@ -7923,7 +7923,7 @@
             "h": 9,
             "w": 12,
             "x": 0,
-            "y": 298
+            "y": 28
           },
           "id": 144,
           "options": {
@@ -8015,7 +8015,7 @@
             "h": 9,
             "w": 12,
             "x": 12,
-            "y": 298
+            "y": 28
           },
           "id": 145,
           "options": {
@@ -8107,7 +8107,7 @@
             "h": 9,
             "w": 12,
             "x": 0,
-            "y": 307
+            "y": 37
           },
           "id": 146,
           "options": {
@@ -8199,7 +8199,7 @@
             "h": 9,
             "w": 12,
             "x": 12,
-            "y": 307
+            "y": 37
           },
           "id": 147,
           "options": {
@@ -8291,7 +8291,7 @@
             "h": 9,
             "w": 12,
             "x": 0,
-            "y": 316
+            "y": 46
           },
           "id": 148,
           "options": {
@@ -8383,7 +8383,7 @@
             "h": 9,
             "w": 12,
             "x": 12,
-            "y": 316
+            "y": 46
           },
           "id": 149,
           "options": {
@@ -8475,7 +8475,7 @@
             "h": 9,
             "w": 12,
             "x": 0,
-            "y": 325
+            "y": 55
           },
           "id": 150,
           "options": {
@@ -8567,7 +8567,7 @@
             "h": 9,
             "w": 12,
             "x": 12,
-            "y": 325
+            "y": 55
           },
           "id": 151,
           "options": {
@@ -8658,7 +8658,7 @@
             "h": 8,
             "w": 12,
             "x": 0,
-            "y": 334
+            "y": 64
           },
           "id": 153,
           "options": {
@@ -8749,7 +8749,7 @@
             "h": 8,
             "w": 12,
             "x": 12,
-            "y": 334
+            "y": 64
           },
           "id": 154,
           "options": {
@@ -8840,7 +8840,7 @@
             "h": 8,
             "w": 12,
             "x": 0,
-            "y": 342
+            "y": 72
           },
           "id": 155,
           "options": {
@@ -8870,6 +8870,465 @@
           ],
           "title": "metrics_RegionStartFailCount_Count",
           "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 12,
+            "y": 72
+          },
+          "id": 200,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": "metrics_SegmentStartFailCount_Count",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_SegmentStartFailCount_Count",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              },
+              "unit": "ms"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 9,
+            "w": 12,
+            "x": 0,
+            "y": 80
+          },
+          "id": 198,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": "metrics_ReplicaSegmentStartTime_Mean",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_ReplicaSegmentStartTime_Mean",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              },
+              "unit": "ms"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 9,
+            "w": 12,
+            "x": 12,
+            "y": 80
+          },
+          "id": 199,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": "metrics_ReplicaSegmentStartTime_Max",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_ReplicaSegmentStartTime_Max",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              },
+              "unit": "ms"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 9,
+            "w": 12,
+            "x": 0,
+            "y": 89
+          },
+          "id": 196,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": "metrics_PrimarySegmentStartTime_Mean",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_PrimarySegmentStartTime_Mean",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              },
+              "unit": "ms"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 9,
+            "w": 12,
+            "x": 12,
+            "y": 89
+          },
+          "id": 197,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": "metrics_PrimarySegmentStartTime_Max",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_PrimarySegmentStartTime_Max",
+          "type": "timeseries"
         }
       ],
       "title": "MapPartitionRelatives",
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 8a1629454..5568b8f76 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -179,6 +179,8 @@ These metrics are exposed by Celeborn worker.
     | FetchChunkFailCount                         | The count of fetching 
chunk failed in current worker.                                                 
          |
     | PrimaryPushDataTime                         | The time for a worker to 
handle a pushData RPC sent from a celeborn client.                              
       |
     | ReplicaPushDataTime                         | The time for a worker to 
handle a pushData RPC sent from a celeborn worker by replicating.               
       |
+    | PrimarySegmentStartTime                     | The time for a worker to 
handle a segmentStart RPC sent from a celeborn client.                          
       |
+    | ReplicaSegmentStartTime                     | The time for a worker to 
handle a segmentStart RPC sent from a celeborn worker by replicating.           
       |
     | WriteDataHardSplitCount                     | The count of writing 
PushData or PushMergedData to HARD_SPLIT partition in current worker.           
           |
     | WriteDataSuccessCount                       | The count of writing 
PushData or PushMergedData succeed in current worker.                           
           |
     | WriteDataFailCount                          | The count of writing 
PushData or PushMergedData failed in current worker.                            
           |
@@ -191,6 +193,7 @@ These metrics are exposed by Celeborn worker.
     | PushDataHandshakeFailCount                  | The count of 
PushDataHandshake failed in current worker.                                     
                   |
     | RegionStartFailCount                        | The count of RegionStart 
failed in current worker.                                                       
       |
     | RegionFinishFailCount                       | The count of RegionFinish 
failed in current worker.                                                       
      |
+    | SegmentStartFailCount                       | The count of SegmentStart 
failed in current worker.                                                       
      |
     | PrimaryPushDataHandshakeTime                | PrimaryPushDataHandshake 
means handle PushData of primary partition location.                            
       |
     | ReplicaPushDataHandshakeTime                | ReplicaPushDataHandshake 
means handle PushData of replica partition location.                            
       |
     | PrimaryRegionStartTime                      | PrimaryRegionStart means 
handle RegionStart of primary partition location.                               
       |
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
index cb671b05f..0a4cb1379 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
@@ -40,7 +40,7 @@ import org.apache.celeborn.common.util.Utils;
 /*
  * map partition file writer, it will create index for each partition
  */
-public final class MapPartitionDataWriter extends PartitionDataWriter {
+public class MapPartitionDataWriter extends PartitionDataWriter {
   private static final Logger logger = 
LoggerFactory.getLogger(MapPartitionDataWriter.class);
 
   private int numSubpartitions;
@@ -115,7 +115,7 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
     long length = data.readableBytes();
     totalBytes += length;
     numSubpartitionBytes[partitionId] += length;
-    super.write(data);
+    writeDataToFile(data);
     isRegionFinished = false;
   }
 
@@ -186,6 +186,10 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
   }
 
   public void regionFinish() throws IOException {
+    // TODO: When region is finished, flush the data to be ready for the 
reading, in scenarios that
+    // the upstream task writes and the downstream task reads simultaneously, 
such as flink hybrid
+    // shuffle
+
     logger.debug("FileWriter:{} regionFinish", diskFileInfo.getFilePath());
     if (regionStartingOffset == totalBytes) {
       return;
@@ -232,6 +236,10 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
     isRegionFinished = true;
   }
 
+  protected void writeDataToFile(ByteBuf data) throws IOException {
+    super.write(data);
+  }
+
   private synchronized void destroyIndex() {
     try {
       if (indexChannel != null) {
@@ -246,7 +254,10 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
   }
 
   @SuppressWarnings("ByteBufferBackingArray")
-  private void flushIndex() throws IOException {
+  protected void flushIndex() throws IOException {
+    // TODO: force flush the index file channel in scenarios which the 
upstream task writes and
+    // downstream task reads simultaneously, such as flink hybrid shuffle
+
     if (indexBuffer != null) {
       logger.debug("flushIndex start:{}", diskFileInfo.getIndexPath());
       long startTime = System.currentTimeMillis();
@@ -275,7 +286,11 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
     }
   }
 
-  private ByteBuffer allocateIndexBuffer(int numSubpartitions) {
+  protected MapFileMeta getFileMeta() {
+    return (MapFileMeta) diskFileInfo.getFileMeta();
+  }
+
+  protected ByteBuffer allocateIndexBuffer(int numSubpartitions) {
 
     // the returned buffer size is no smaller than 4096 bytes to improve disk 
IO performance
     int minBufferSize = 4096;
@@ -313,4 +328,28 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
     }
     return false;
   }
+
+  public int getCurrentSubpartition() {
+    return currentSubpartition;
+  }
+
+  public long[] getNumSubpartitionBytes() {
+    return numSubpartitionBytes;
+  }
+
+  public long getTotalBytes() {
+    return totalBytes;
+  }
+
+  public void setCurrentSubpartition(int currentSubpartition) {
+    this.currentSubpartition = currentSubpartition;
+  }
+
+  public void setTotalBytes(long totalBytes) {
+    this.totalBytes = totalBytes;
+  }
+
+  public void setRegionFinished(boolean regionFinished) {
+    isRegionFinished = regionFinished;
+  }
 }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 5172193f6..5bb506270 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -214,6 +214,9 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
 
   @VisibleForTesting
   public void flush(boolean finalFlush, boolean fromEvict) throws IOException {
+    // TODO: force flush buffer in scenarios where the upstream task writes 
and the downstream task
+    // reads simultaneously, such as flink hybrid shuffle.
+
     // flushBuffer == null here means this writer is already closed
     if (flushBuffer != null) {
       int numBytes = flushBuffer.readableBytes();
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionFileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionFileWriter.java
new file mode 100644
index 000000000..634afc676
--- /dev/null
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionFileWriter.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage.segment;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.meta.MapFileMeta;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor;
+import 
org.apache.celeborn.service.deploy.worker.storage.MapPartitionDataWriter;
+import 
org.apache.celeborn.service.deploy.worker.storage.PartitionDataWriterContext;
+import org.apache.celeborn.service.deploy.worker.storage.StorageManager;
+
+/**
+ * Write the shuffle file in map partition format with segment granularity 
visibility. This means
+ * that the shuffle file should be written intermediate, allowing it to be 
read in segments rather
+ * than waiting for the entire shuffle file to be completed.
+ */
+public class SegmentMapPartitionFileWriter extends MapPartitionDataWriter {
+
+  public static final Logger logger = 
LoggerFactory.getLogger(SegmentMapPartitionFileWriter.class);
+
+  /**
+   * subPartitionId -> started (boolean). There are 3 cases: 1. If the 
subPartition key not exist,
+   * it indicates that the subPartition has not sent the {@link
+   * org.apache.celeborn.common.protocol.PbSegmentStart}. Therefore, shuffle 
data should not be
+   * written in this case. 2. If the subPartition key exists and the started 
value is true, it means
+   * that the subPartition has initiated the segment. In this situation, the 
next buffer for this
+   * subPartition will be the first buffer of the current segment. The 
information about the first
+   * buffer will be recorded in {@code 
MapFileMeta#subPartitionSegmentIndexes}. 3. If the
+   * subPartition key exists and the started value is false, it means that the 
subPartition has
+   * initiated the segment, but the next buffer for this subPartition is not 
the first buffer of the
+   * current segment.
+   */
+  private final Map<Integer, Boolean> subPartitionHasStartSegment;
+
+  // current buffer index per subPartition
+  private int[] subPartitionBufferIndex;
+
+  public SegmentMapPartitionFileWriter(
+      StorageManager storageManager,
+      AbstractSource workerSource,
+      CelebornConf conf,
+      DeviceMonitor deviceMonitor,
+      PartitionDataWriterContext writerContext)
+      throws IOException {
+    super(storageManager, workerSource, conf, deviceMonitor, writerContext);
+    this.subPartitionHasStartSegment = new HashMap<>();
+  }
+
+  @Override
+  public void pushDataHandShake(int numSubpartitions, int bufferSize) {
+    super.pushDataHandShake(numSubpartitions, bufferSize);
+    subPartitionBufferIndex = new int[numSubpartitions];
+    Arrays.fill(subPartitionBufferIndex, 0);
+    getFileMeta().setIsWriterClosed(false);
+    getFileMeta().setSegmentGranularityVisible(true);
+  }
+
+  @Override
+  public void write(ByteBuf data) throws IOException {
+    data.markReaderIndex();
+    int subPartitionId = data.readInt();
+    int attemptId = data.readInt();
+    int batchId = data.readInt();
+    int size = data.readInt();
+
+    if (!subPartitionHasStartSegment.containsKey(subPartitionId)) {
+      throw new IllegalStateException(
+          String.format(
+              "This partition may not start a segment: subPartitionId:%s 
attemptId:%s batchId:%s size:%s",
+              subPartitionId, attemptId, batchId, size));
+    }
+    int currentSubpartition = getCurrentSubpartition();
+    // the subPartitionId must be ordered in a region
+    if (subPartitionId < currentSubpartition) {
+      throw new IOException(
+          String.format(
+              "Must writing data in reduce partition index order, but now 
supPartitionId is %s and the previous supPartitionId is %s, attemptId is %s, 
batchId is %s, size is %s",
+              subPartitionId, currentSubpartition, attemptId, batchId, size));
+    }
+
+    data.resetReaderIndex();
+    logger.debug(
+        "mappartition filename:{} write partition:{} currentSubPartition:{} 
attemptId:{} batchId:{} size:{}",
+        diskFileInfo.getFilePath(),
+        subPartitionId,
+        currentSubpartition,
+        attemptId,
+        batchId,
+        size);
+
+    if (subPartitionId > currentSubpartition) {
+      setCurrentSubpartition(subPartitionId);
+    }
+    long length = data.readableBytes();
+    setTotalBytes(getTotalBytes() + length);
+    getNumSubpartitionBytes()[subPartitionId] += length;
+    if (flushBuffer == null) {
+      takeBuffer();
+    }
+    writeDataToFile(data);
+    setRegionFinished(false);
+
+    MapFileMeta mapFileMeta = getFileMeta();
+    // Only when the sub partition has stated the segment, the buffer 
index(this is the first buffer
+    // of this segment) will be added.
+    if (subPartitionHasStartSegment.get(subPartitionId)) {
+      mapFileMeta.addSegmentIdAndFirstBufferIndex(
+          subPartitionId,
+          subPartitionBufferIndex[subPartitionId],
+          mapFileMeta.getPartitionWritingSegmentId(subPartitionId));
+      logger.debug(
+          "Add a segment id, partitionId:{}, bufferIndex:{}, segmentId: {}, 
filename:{}, attemptId:{}.",
+          subPartitionId,
+          subPartitionBufferIndex[subPartitionId],
+          mapFileMeta.getPartitionWritingSegmentId(subPartitionId),
+          diskFileInfo.getFilePath(),
+          attemptId);
+      // After the first buffer index of the segment is added, the following 
buffers in the segment
+      // should not be added anymore, so the subPartitionHasStartSegment is 
updated to false.
+      subPartitionHasStartSegment.put(subPartitionId, false);
+    }
+    subPartitionBufferIndex[subPartitionId]++;
+  }
+
+  @Override
+  public synchronized long close() throws IOException {
+    subPartitionHasStartSegment.clear();
+    long fileLength = super.close();
+    logger.debug("Close {} for file {}", this, getFile());
+    getFileMeta().setIsWriterClosed(true);
+    return fileLength;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("SegmentMapPartitionFileWriter{filePath=%s}", 
diskFileInfo.getFilePath());
+  }
+
+  public void segmentStart(int partitionId, int segmentId) {
+    getFileMeta().addPartitionSegmentId(partitionId, segmentId);
+    subPartitionHasStartSegment.put(partitionId, true);
+  }
+}
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 0a141ede8..0b0c53c1f 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -190,7 +190,8 @@ private[deploy] class Controller(
             partitionType,
             rangeReadFilter,
             userIdentifier,
-            partitionSplitEnabled)
+            partitionSplitEnabled,
+            isSegmentGranularityVisible)
           primaryLocs.add(new WorkingPartition(location, writer))
         } else {
           primaryLocs.add(location)
@@ -230,7 +231,8 @@ private[deploy] class Controller(
             partitionType,
             rangeReadFilter,
             userIdentifier,
-            partitionSplitEnabled)
+            partitionSplitEnabled,
+            isSegmentGranularityVisible)
           replicaLocs.add(new WorkingPartition(location, writer))
         } else {
           replicaLocs.add(location)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index c98acaf10..9fe99c4fd 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -38,13 +38,14 @@ import 
org.apache.celeborn.common.network.client.{RpcResponseCallback, Transport
 import org.apache.celeborn.common.network.protocol.{Message, PushData, 
PushDataHandShake, PushMergedData, RegionFinish, RegionStart, RequestMessage, 
RpcFailure, RpcRequest, RpcResponse, TransportMessage}
 import org.apache.celeborn.common.network.protocol.Message.Type
 import org.apache.celeborn.common.network.server.BaseMessageHandler
-import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType, PbPushDataHandShake, PbRegionFinish, 
PbRegionStart}
+import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType, PbPushDataHandShake, PbRegionFinish, 
PbRegionStart, PbSegmentStart}
 import org.apache.celeborn.common.protocol.PbPartitionLocation.Mode
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.unsafe.Platform
 import org.apache.celeborn.common.util.{DiskUtils, ExceptionUtils, Utils}
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
 import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher, 
LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, S3Flusher, 
StorageManager}
+import 
org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter
 
 class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler with Logging {
 
@@ -916,6 +917,16 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
             rf.getShuffleKey,
             rf.getPartitionUniqueId,
             false)
+        case ss: PbSegmentStart =>
+          (
+            msg,
+            null,
+            false,
+            Type.SEGMENT_START,
+            ss.getMode,
+            ss.getShuffleKey,
+            ss.getPartitionUniqueId,
+            false)
       }
     } catch {
       case _: Exception =>
@@ -980,6 +991,8 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
           (WorkerSource.PRIMARY_REGION_START_TIME, 
WorkerSource.REPLICA_REGION_START_TIME)
         case Type.REGION_FINISH =>
           (WorkerSource.PRIMARY_REGION_FINISH_TIME, 
WorkerSource.REPLICA_REGION_FINISH_TIME)
+        case Type.SEGMENT_START =>
+          (WorkerSource.PRIMARY_SEGMENT_START_TIME, 
WorkerSource.REPLICA_SEGMENT_START_TIME)
         case _ => throw new IllegalArgumentException(s"Not support 
$messageType yet")
       }
 
@@ -1064,6 +1077,14 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
             isBroadcast)
         case Type.REGION_FINISH =>
           fileWriter.asInstanceOf[MapPartitionDataWriter].regionFinish()
+        case Type.SEGMENT_START =>
+          val (subPartitionId, segmentId) =
+            (
+              pbMsg.asInstanceOf[PbSegmentStart].getSubPartitionId,
+              pbMsg.asInstanceOf[PbSegmentStart].getSegmentId)
+          fileWriter.asInstanceOf[SegmentMapPartitionFileWriter].segmentStart(
+            subPartitionId,
+            segmentId)
         case _ => throw new IllegalArgumentException(s"Not support 
$messageType yet")
       }
       // for primary , send data to replica
@@ -1123,6 +1144,9 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         case Type.REGION_FINISH =>
           workerSource.incCounter(WorkerSource.REGION_FINISH_FAIL_COUNT)
           callback.onFailure(new 
CelebornIOException(StatusCode.REGION_FINISH_FAIL_REPLICA, e))
+        case Type.SEGMENT_START =>
+          workerSource.incCounter(WorkerSource.SEGMENT_START_FAIL_COUNT)
+          callback.onFailure(new 
CelebornIOException(StatusCode.SEGMENT_START_FAIL_REPLICA, e))
         case _ =>
           workerSource.incCounter(WorkerSource.REPLICATE_DATA_FAIL_COUNT)
           if (e.isInstanceOf[CelebornIOException]) {
@@ -1177,6 +1201,9 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         case Type.REGION_FINISH => (
             StatusCode.REGION_FINISH_FAIL_PRIMARY,
             StatusCode.REGION_FINISH_FAIL_REPLICA)
+        case Type.SEGMENT_START => (
+            StatusCode.SEGMENT_START_FAIL_PRIMARY,
+            StatusCode.SEGMENT_START_FAIL_REPLICA)
         case _ => throw new IllegalArgumentException(s"Not support 
$messageType yet")
       }
     callback.onFailure(new CelebornIOException(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 5358fab02..b2777c564 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -56,6 +56,7 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, MetricsSyste
   addCounter(REGION_START_FAIL_COUNT)
   addCounter(REGION_FINISH_FAIL_COUNT)
   addCounter(ACTIVE_CONNECTION_COUNT)
+  addCounter(SEGMENT_START_FAIL_COUNT)
 
   addCounter(SLOTS_ALLOCATED)
 
@@ -72,6 +73,8 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, MetricsSyste
   addTimer(REPLICA_REGION_START_TIME)
   addTimer(PRIMARY_REGION_FINISH_TIME)
   addTimer(REPLICA_REGION_FINISH_TIME)
+  addTimer(PRIMARY_SEGMENT_START_TIME)
+  addTimer(REPLICA_SEGMENT_START_TIME)
 
   addTimer(FETCH_CHUNK_TIME)
   addTimer(OPEN_STREAM_TIME)
@@ -151,12 +154,15 @@ object WorkerSource {
   val PUSH_DATA_HANDSHAKE_FAIL_COUNT = "PushDataHandshakeFailCount"
   val REGION_START_FAIL_COUNT = "RegionStartFailCount"
   val REGION_FINISH_FAIL_COUNT = "RegionFinishFailCount"
+  val SEGMENT_START_FAIL_COUNT = "SegmentStartFailCount"
   val PRIMARY_PUSH_DATA_HANDSHAKE_TIME = "PrimaryPushDataHandshakeTime"
   val REPLICA_PUSH_DATA_HANDSHAKE_TIME = "ReplicaPushDataHandshakeTime"
   val PRIMARY_REGION_START_TIME = "PrimaryRegionStartTime"
   val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime"
   val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime"
   val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime"
+  val PRIMARY_SEGMENT_START_TIME = "PrimarySegmentStartTime"
+  val REPLICA_SEGMENT_START_TIME = "ReplicaSegmentStartTime"
 
   // pause push data
   val PAUSE_PUSH_DATA_TIME = "PausePushDataTime"
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index df0d63be3..0c4fa4105 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -44,6 +44,8 @@ private[worker] class LocalFlushTask(
         fileChannel.write(buffer)
       }
     }
+
+    // TODO: force flush file channel in scenarios where the upstream task 
writes and the downstream task reads simultaneously, such as flink hybrid 
shuffle.
   }
 }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index c7c8f8adf..00e0a2887 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -49,6 +49,7 @@ import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager
 import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
 import org.apache.celeborn.service.deploy.worker.shuffledb.{DB, DBBackend, 
DBProvider}
 import 
org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
+import 
org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter
 
 final private[worker] class StorageManager(conf: CelebornConf, workerSource: 
AbstractSource)
   extends ShuffleRecoverHelper with DeviceObserver with Logging with 
MemoryPressureListener {
@@ -407,7 +408,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       partitionType,
       rangeReadFilter,
       userIdentifier,
-      true)
+      true,
+      isSegmentGranularityVisible = false)
   }
 
   @throws[IOException]
@@ -420,7 +422,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       partitionType: PartitionType,
       rangeReadFilter: Boolean,
       userIdentifier: UserIdentifier,
-      partitionSplitEnabled: Boolean): PartitionDataWriter = {
+      partitionSplitEnabled: Boolean,
+      isSegmentGranularityVisible: Boolean): PartitionDataWriter = {
     if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage) {
       throw new IOException("No available working dirs!")
     }
@@ -438,7 +441,14 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     val writer =
       try {
         partitionType match {
-          case PartitionType.MAP => new MapPartitionDataWriter(
+          case PartitionType.MAP =>
+            if (isSegmentGranularityVisible) new SegmentMapPartitionFileWriter(
+              this,
+              workerSource,
+              conf,
+              deviceMonitor,
+              partitionDataWriterContext)
+            else new MapPartitionDataWriter(
               this,
               workerSource,
               conf,


Reply via email to