This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 11ca1a785 [CELEBORN-2005] Introduce numBytesIn, numBytesOut, 
numBytesInPerSecond, numBytesOutPerSecond metrics for 
RemoteShuffleServiceFactory
11ca1a785 is described below

commit 11ca1a7858a503e52b304e0d67ec0ee05a2b8b77
Author: SteNicholas <[email protected]>
AuthorDate: Fri May 23 16:25:40 2025 +0800

    [CELEBORN-2005] Introduce numBytesIn, numBytesOut, numBytesInPerSecond, 
numBytesOutPerSecond metrics for RemoteShuffleServiceFactory
    
    ### What changes were proposed in this pull request?
    
    Introduce `numBytesIn`, `numBytesOut`, `numRecordsOut`, 
`numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics 
for `RemoteShuffleServiceFactory`.
    
    Scope | Infix | Metrics | Description | Type
    -- | -- | -- | -- | --
    Task | Shuffle.Remote.[ShuffleId] | numBytesIn | The total number of bytes 
this shuffle has read. | Counter |
    Task | Shuffle.Remote.[ShuffleId] | numBytesOut | The total number of bytes 
this shuffle has written. | Counter |
    Task | Shuffle.Remote.[ShuffleId] | numRecordsOut | The total number of 
records this shuffle has written. | Counter |
    Task | Shuffle.Remote.[ShuffleId] | numBytesInPerSecond | The number of 
bytes this shuffle reads per second. | Meter |
    Task | Shuffle.Remote.[ShuffleId] | numBytesOutPerSecond | The number of 
bytes this shuffle writes per second. | Meter |
    Task | Shuffle.Remote.[ShuffleId] | numRecordsOutPerSecond | The number of 
records this shuffle writes per second. | Meter |
    
    Note:
    
    - `numBytesIn` and `numBytesOut` metrics include the total number of bytes 
for records and events.
    - `numRecordsOut` metric only includes the total number of records, instead 
of records and events.
    
    ### Why are the changes needed?
    
    There is no any metrics related to shuffle read operations and operations 
writing shuffle data for flink shuffle. It's proposed to introduce 
`numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, 
`numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics for 
`RemoteShuffleServiceFactory`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - `RemoteShuffleOutputGateSuiteJ#testSimpleWriteData`
    - `RemoteShuffleResultPartitionSuiteJ`
    
    Closes #3272 from SteNicholas/CELEBORN-2005.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../flink/AbstractRemoteShuffleInputGate.java      |  5 +-
 .../AbstractRemoteShuffleInputGateFactory.java     |  7 +-
 ...bstractRemoteShuffleResultPartitionFactory.java | 20 +++---
 .../plugin/flink/RemoteShuffleEnvironment.java     |  4 +-
 .../flink/RemoteShuffleInputGateDelegation.java    | 22 ++++--
 .../plugin/flink/RemoteShuffleOutputGate.java      | 46 +++++++++---
 .../RemoteShuffleResultPartitionDelegation.java    |  8 +++
 .../plugin/flink/metric/ShuffleIOMetricGroup.java  | 83 ++++++++++++++++++++++
 .../flink/RemoteShuffleOutputGateSuiteJ.java       | 51 +++++++++++--
 .../plugin/flink/RemoteShuffleInputGate.java       |  5 +-
 .../flink/RemoteShuffleInputGateFactory.java       |  7 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  8 ++-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  | 52 +++++++++-----
 .../plugin/flink/RemoteShuffleInputGate.java       |  5 +-
 .../flink/RemoteShuffleInputGateFactory.java       |  7 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  8 ++-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  | 16 ++++-
 .../plugin/flink/RemoteShuffleInputGate.java       |  5 +-
 .../flink/RemoteShuffleInputGateFactory.java       |  7 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  8 ++-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  | 16 ++++-
 .../plugin/flink/RemoteShuffleInputGate.java       |  5 +-
 .../flink/RemoteShuffleInputGateFactory.java       |  7 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  8 ++-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  | 16 ++++-
 .../plugin/flink/RemoteShuffleInputGate.java       |  5 +-
 .../flink/RemoteShuffleInputGateFactory.java       |  7 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  8 ++-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  | 16 ++++-
 .../plugin/flink/RemoteShuffleInputGate.java       |  5 +-
 .../flink/RemoteShuffleInputGateFactory.java       |  7 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  8 ++-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  | 16 ++++-
 33 files changed, 386 insertions(+), 112 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
index a7330a2cb..d04497536 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -45,7 +46,7 @@ public abstract class AbstractRemoteShuffleInputGate extends 
IndexedInputGate {
 
   public AbstractRemoteShuffleInputGate(
       CelebornConf celebornConf,
-      String taskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -55,7 +56,7 @@ public abstract class AbstractRemoteShuffleInputGate extends 
IndexedInputGate {
     inputGateDelegation =
         new RemoteShuffleInputGateDelegation(
             celebornConf,
-            taskName,
+            ownerContext,
             gateIndex,
             gateDescriptor,
             bufferPoolFactory,
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
index 9568ac546..0c9e2e8bb 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,7 +81,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
 
   /** Create RemoteShuffleInputGate from {@link 
InputGateDeploymentDescriptor}. */
   public IndexedInputGate create(
-      String owningTaskName, int gateIndex, InputGateDeploymentDescriptor 
igdd) {
+      ShuffleIOOwnerContext ownerContext, int gateIndex, 
InputGateDeploymentDescriptor igdd) {
     LOG.info(
         "Create input gate -- number of buffers per input gate={}, "
             + "number of concurrent readings={}.",
@@ -91,7 +92,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
         createBufferPoolFactory(networkBufferPool, numBuffersPerGate, 
supportFloatingBuffers);
 
     return createInputGate(
-        owningTaskName,
+        ownerContext,
         gateIndex,
         igdd,
         bufferPoolFactory,
@@ -99,7 +100,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
   }
 
   protected abstract IndexedInputGate createInputGate(
-      String owningTaskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
index dfde34787..929939e47 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,7 +89,7 @@ public abstract class 
AbstractRemoteShuffleResultPartitionFactory {
   }
 
   public ResultPartition create(
-      String taskNameWithSubtaskAndId,
+      ShuffleIOOwnerContext ownerContext,
       int partitionIndex,
       ResultPartitionDeploymentDescriptor desc,
       CelebornConf celebornConf) {
@@ -100,32 +100,32 @@ public abstract class 
AbstractRemoteShuffleResultPartitionFactory {
         desc.getNumberOfSubpartitions());
 
     return create(
-        taskNameWithSubtaskAndId,
+        ownerContext,
         partitionIndex,
         desc.getShuffleDescriptor().getResultPartitionID(),
         desc.getPartitionType(),
         desc.getNumberOfSubpartitions(),
         desc.getMaxParallelism(),
         createBufferPoolFactory(),
-        desc.getShuffleDescriptor(),
+        (RemoteShuffleDescriptor) desc.getShuffleDescriptor(),
         celebornConf,
         desc.getTotalNumberOfPartitions());
   }
 
   public ResultPartition create(
-      String taskNameWithSubtaskAndId,
+      ShuffleIOOwnerContext ownerContext,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
       int numSubpartitions,
       int maxParallelism,
       List<SupplierWithException<BufferPool, IOException>> bufferPoolFactories,
-      ShuffleDescriptor shuffleDescriptor,
+      RemoteShuffleDescriptor shuffleDescriptor,
       CelebornConf celebornConf,
       int numMappers) {
     ResultPartition partition =
         createRemoteShuffleResultPartitionInternal(
-            taskNameWithSubtaskAndId,
+            ownerContext,
             partitionIndex,
             id,
             type,
@@ -135,13 +135,13 @@ public abstract class 
AbstractRemoteShuffleResultPartitionFactory {
             celebornConf,
             numMappers,
             getBufferCompressor(),
-            (RemoteShuffleDescriptor) shuffleDescriptor);
-    LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
+            shuffleDescriptor);
+    LOG.debug("{}: Initialized {}", ownerContext.getOwnerName(), this);
     return partition;
   }
 
   abstract ResultPartition createRemoteShuffleResultPartitionInternal(
-      String taskNameWithSubtaskAndId,
+      ShuffleIOOwnerContext ownerContext,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
index 8851cc812..dc197ae3a 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
@@ -191,7 +191,7 @@ public class RemoteShuffleEnvironment
     if (resultPartitionDeploymentDescriptor.getShuffleDescriptor()
         instanceof RemoteShuffleDescriptor) {
       return resultPartitionFactory.create(
-          ownerContext.getOwnerName(), index, 
resultPartitionDeploymentDescriptor, conf);
+          ownerContext, index, resultPartitionDeploymentDescriptor, conf);
     } else {
       nettyResultIds.add(resultPartitionDeploymentDescriptor.getResultId());
       
nettyResultPartitionIds.add(resultPartitionDeploymentDescriptor.getPartitionId());
@@ -246,7 +246,7 @@ public class RemoteShuffleEnvironment
         ? shuffleEnvironmentWrapper
             .nettyInputGateFactory()
             .create(ownerContext, gateIndex, igdd, producerStateProvider, 
inputChannelMetrics)
-        : inputGateFactory.create(ownerContext.getOwnerName(), gateIndex, 
igdd);
+        : inputGateFactory.create(ownerContext, gateIndex, igdd);
   }
 
   @VisibleForTesting
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
index 3a0d9f077..fe871d68c 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -43,6 +44,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.SupplierWithException;
@@ -56,6 +58,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
 import org.apache.celeborn.plugin.flink.buffer.TransferBufferPool;
 import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
+import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 
 public class RemoteShuffleInputGateDelegation {
@@ -130,9 +133,11 @@ public class RemoteShuffleInputGateDelegation {
   private int endSubIndex;
   private boolean partitionConnectionExceptionEnabled;
 
+  private final MetricGroup taskIOMetricGroup;
+
   public RemoteShuffleInputGateDelegation(
       CelebornConf celebornConf,
-      String taskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -141,7 +146,8 @@ public class RemoteShuffleInputGateDelegation {
       AvailabilityProvider.AvailabilityHelper availabilityHelper,
       int startSubIndex,
       int endSubIndex) {
-    this.taskName = taskName;
+    this.taskName = ownerContext.getOwnerName();
+    this.taskIOMetricGroup = ownerContext.getParentGroup();
     this.gateIndex = gateIndex;
     this.gateDescriptor = gateDescriptor;
     this.bufferPoolFactory = bufferPoolFactory;
@@ -198,6 +204,8 @@ public class RemoteShuffleInputGateDelegation {
       RemoteShuffleDescriptor remoteDescriptor = (RemoteShuffleDescriptor) 
descriptor.getRight();
       ShuffleResourceDescriptor shuffleDescriptor =
           
remoteDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+      ShuffleIOMetricGroup shuffleIOMetricGroup =
+          new ShuffleIOMetricGroup(taskIOMetricGroup, 
shuffleDescriptor.getShuffleId());
 
       LOG.debug("create shuffle reader for descriptor {}", shuffleDescriptor);
 
@@ -208,7 +216,7 @@ public class RemoteShuffleInputGateDelegation {
               startSubIndex,
               endSubIndex,
               transferBufferPool,
-              getDataListener(descriptor.getLeft()),
+              getDataListener(descriptor.getLeft(), shuffleIOMetricGroup),
               getFailureListener(remoteDescriptor.getResultPartitionID()));
 
       bufferReaders.add(reader);
@@ -235,13 +243,14 @@ public class RemoteShuffleInputGateDelegation {
         .collect(Collectors.toList());
   }
 
-  private Consumer<ByteBuf> getDataListener(int channelIdx) {
+  private Consumer<ByteBuf> getDataListener(
+      int channelIdx, ShuffleIOMetricGroup shuffleIOMetricGroup) {
     return byteBuf -> {
       Queue<Buffer> unpackedBuffers = null;
       try {
         unpackedBuffers = BufferPacker.unpack(byteBuf);
         while (!unpackedBuffers.isEmpty()) {
-          onBuffer(unpackedBuffers.poll(), channelIdx);
+          onBuffer(unpackedBuffers.poll(), channelIdx, shuffleIOMetricGroup);
         }
       } catch (Throwable throwable) {
         synchronized (lock) {
@@ -279,7 +288,7 @@ public class RemoteShuffleInputGateDelegation {
     };
   }
 
-  private void onBuffer(Buffer buffer, int channelIdx) {
+  private void onBuffer(Buffer buffer, int channelIdx, ShuffleIOMetricGroup 
shuffleIOMetricGroup) {
     synchronized (lock) {
       if (closed || cause != null) {
         buffer.recycleBuffer();
@@ -293,6 +302,7 @@ public class RemoteShuffleInputGateDelegation {
         checkState(channelInfo.getInputChannelIdx() == channelIdx, "Illegal 
channel index.");
         LOG.debug("ReceivedBuffers is adding buffer {} on {}", buffer, 
channelInfo);
         receivedBuffers.add(Pair.of(buffer, channelInfo));
+        shuffleIOMetricGroup.getNumBytesIn().inc(buffer.getSize());
         needRecycle = false;
         if (wasEmpty) {
           availabilityHelper.getUnavailableToResetAvailable().complete(null);
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
index dc131f971..fe25492fd 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Optional;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -36,6 +37,7 @@ import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.plugin.flink.buffer.BufferHeader;
 import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
 import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
+import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 import org.apache.celeborn.plugin.flink.utils.Utils;
 
@@ -83,6 +85,7 @@ public class RemoteShuffleOutputGate {
   private boolean isRegisterShuffle = false;
   private int maxReviveTimes;
   private boolean hasSentHandshake = false;
+  protected final ShuffleIOMetricGroup shuffleIOMetricGroup;
 
   /**
    * @param shuffleDesc Describes shuffle meta and shuffle worker address.
@@ -95,8 +98,28 @@ public class RemoteShuffleOutputGate {
       int bufferSize,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
       CelebornConf celebornConf,
-      int numMappers) {
+      int numMappers,
+      MetricGroup taskIOMetricGroup) {
+    this(
+        shuffleDesc,
+        numSubs,
+        bufferSize,
+        bufferPoolFactory,
+        celebornConf,
+        numMappers,
+        taskIOMetricGroup,
+        null);
+  }
 
+  public RemoteShuffleOutputGate(
+      RemoteShuffleDescriptor shuffleDesc,
+      int numSubs,
+      int bufferSize,
+      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+      CelebornConf celebornConf,
+      int numMappers,
+      MetricGroup taskIOMetricGroup,
+      FlinkShuffleClientImpl flinkShuffleClient) {
     this.shuffleDesc = shuffleDesc;
     this.numSubs = numSubs;
     this.bufferPoolFactory = bufferPoolFactory;
@@ -116,8 +139,9 @@ public class RemoteShuffleOutputGate {
     this.lifecycleManagerPort = 
shuffleDesc.getShuffleResource().getLifecycleManagerPort();
     this.lifecycleManagerTimestamp =
         shuffleDesc.getShuffleResource().getLifecycleManagerTimestamp();
-    this.flinkShuffleClient = getShuffleClient();
+    this.flinkShuffleClient = flinkShuffleClient == null ? getShuffleClient() 
: flinkShuffleClient;
     this.maxReviveTimes = celebornConf.clientPushMaxReviveTimes();
+    this.shuffleIOMetricGroup = new ShuffleIOMetricGroup(taskIOMetricGroup, 
shuffleId);
   }
 
   /** Initialize transportation gate. */
@@ -210,14 +234,16 @@ public class RemoteShuffleOutputGate {
   /** Writes a piece of data to a subpartition. */
   public void write(ByteBuf byteBuf, BufferHeader bufferHeader) {
     try {
-      flinkShuffleClient.pushDataToLocation(
-          shuffleId,
-          mapId,
-          attemptId,
-          bufferHeader.getSubPartitionId(),
-          io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
-          partitionLocation,
-          () -> byteBuf.release());
+      int bytesWritten =
+          flinkShuffleClient.pushDataToLocation(
+              shuffleId,
+              mapId,
+              attemptId,
+              bufferHeader.getSubPartitionId(),
+              io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
+              partitionLocation,
+              byteBuf::release);
+      shuffleIOMetricGroup.getNumBytesOut().inc(bytesWritten);
     } catch (IOException e) {
       Utils.rethrowAsRuntimeException(e);
     }
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
index 1bd4285ee..d2e255968 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
@@ -108,6 +108,7 @@ public class RemoteShuffleResultPartitionDelegation {
 
     DataBuffer dataBuffer = isBroadcast ? getBroadcastDataBuffer() : 
getUnicastDataBuffer();
     if (dataBuffer.append(record, targetSubpartition, dataType)) {
+      incNumRecordsOut(dataType);
       return;
     }
 
@@ -117,6 +118,7 @@ public class RemoteShuffleResultPartitionDelegation {
         dataBuffer.finish();
         dataBuffer.release();
         writeLargeRecord(record, targetSubpartition, dataType, isBroadcast);
+        incNumRecordsOut(dataType);
         return;
       }
       flushDataBuffer(dataBuffer, isBroadcast);
@@ -127,6 +129,12 @@ public class RemoteShuffleResultPartitionDelegation {
     emit(record, targetSubpartition, dataType, isBroadcast);
   }
 
+  private void incNumRecordsOut(Buffer.DataType dataType) {
+    if (dataType.isBuffer()) {
+      outputGate.shuffleIOMetricGroup.getNumRecordsOut().inc();
+    }
+  }
+
   @VisibleForTesting
   public DataBuffer getUnicastDataBuffer() throws IOException {
     flushBroadcastDataBuffer();
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java
new file mode 100644
index 000000000..16a9f4cf5
--- /dev/null
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.metric;
+
+import static 
org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+/**
+ * Metric group that contains shareable pre-defined IO-related metrics. The 
metrics registration is
+ * forwarded to the parent shuffle metric group.
+ */
+public class ShuffleIOMetricGroup extends ProxyMetricGroup<MetricGroup> {
+
+  private final Counter numBytesIn;
+  private final Counter numBytesOut;
+  private final Counter numRecordsOut;
+
+  private final Meter numBytesInRate;
+  private final Meter numBytesOutRate;
+  private final Meter numRecordsOutRate;
+
+  public ShuffleIOMetricGroup(MetricGroup taskIOMetricGroup, int shuffleId) {
+    
super(createShuffleIOOwnerMetricGroup(taskIOMetricGroup).addGroup(shuffleId));
+    this.numBytesIn = counter(MetricNames.IO_NUM_BYTES_IN, new 
SimpleCounter());
+    this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT, new 
SimpleCounter());
+    this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new 
SimpleCounter());
+    this.numBytesInRate = meter(MetricNames.IO_NUM_BYTES_IN_RATE, new 
MeterView(numBytesIn));
+    this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new 
MeterView(numBytesOut));
+    this.numRecordsOutRate =
+        meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new 
MeterView(numRecordsOut));
+  }
+
+  // 
============================================================================================
+  // Getters
+  // 
============================================================================================
+
+  public Counter getNumBytesIn() {
+    return numBytesIn;
+  }
+
+  public Counter getNumBytesOut() {
+    return numBytesOut;
+  }
+
+  public Counter getNumRecordsOut() {
+    return numRecordsOut;
+  }
+
+  public Meter getNumBytesInRate() {
+    return numBytesInRate;
+  }
+
+  public Meter getNumBytesOutRate() {
+    return numBytesOutRate;
+  }
+
+  public Meter getNumRecordsOutRate() {
+    return numRecordsOutRate;
+  }
+}
diff --git 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
index 33c31b2e4..cc3cf9761 100644
--- 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
+++ 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
@@ -22,39 +22,52 @@ import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.util.Optional;
+import java.util.Random;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
 
 public class RemoteShuffleOutputGateSuiteJ {
-  private final RemoteShuffleOutputGate remoteShuffleOutputGate =
-      mock(RemoteShuffleOutputGate.class);
-  private final FlinkShuffleClientImpl shuffleClient = 
mock(FlinkShuffleClientImpl.class);
   private static final int BUFFER_SIZE = 20;
   private BufferPool bufferPool;
+  private RemoteShuffleOutputGate remoteShuffleOutputGate;
 
   @Before
   public void setup() throws IOException {
-    remoteShuffleOutputGate.flinkShuffleClient = shuffleClient;
+    remoteShuffleOutputGate =
+        new RemoteShuffleOutputGate(
+            shuffleDescriptor(),
+            2,
+            BUFFER_SIZE,
+            () -> bufferPool,
+            new CelebornConf(),
+            10,
+            new UnregisteredMetricsGroup(),
+            mock(FlinkShuffleClientImpl.class));
     NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 
BUFFER_SIZE);
     bufferPool = networkBufferPool.createBufferPool(10, 10);
   }
 
   @Test
-  public void TestSimpleWriteData() throws IOException, InterruptedException {
+  public void testSimpleWriteData() throws IOException, InterruptedException {
 
     PartitionLocation partitionLocation =
         new PartitionLocation(
             1, 0, "localhost", 123, 245, 789, 238, 
PartitionLocation.Mode.PRIMARY);
-    when(shuffleClient.registerMapPartitionTask(anyInt(), anyInt(), anyInt(), 
anyInt(), anyInt()))
+    when(remoteShuffleOutputGate.flinkShuffleClient.registerMapPartitionTask(
+            anyInt(), anyInt(), anyInt(), anyInt(), anyInt()))
         .thenAnswer(t -> partitionLocation);
     when(remoteShuffleOutputGate.flinkShuffleClient.pushDataHandShake(
             anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), any()))
@@ -67,7 +80,19 @@ public class RemoteShuffleOutputGateSuiteJ {
         .thenAnswer(t -> Optional.empty());
     remoteShuffleOutputGate.regionStart(false);
 
-    remoteShuffleOutputGate.write(bufferPool.requestBuffer(), 0);
+    Buffer buffer = bufferPool.requestBuffer();
+    buffer.asByteBuf().writeByte(10);
+    remoteShuffleOutputGate.write(buffer, 0);
+
+    when(remoteShuffleOutputGate.flinkShuffleClient.pushDataToLocation(
+            anyInt(), anyInt(), anyInt(), anyInt(), any(), any(), any()))
+        .thenReturn(buffer.getSize());
+    remoteShuffleOutputGate.write(buffer, 1);
+    Assert.assertEquals(
+        buffer.getSize(), 
remoteShuffleOutputGate.shuffleIOMetricGroup.getNumBytesOut().getCount());
+    Assert.assertEquals(
+        buffer.getSize(),
+        
remoteShuffleOutputGate.shuffleIOMetricGroup.getNumBytesOutRate().getCount());
 
     doNothing()
         .when(remoteShuffleOutputGate.flinkShuffleClient)
@@ -97,4 +122,16 @@ public class RemoteShuffleOutputGateSuiteJ {
     Assert.assertEquals(0, byteBuf.refCnt());
     Assert.assertEquals(0, celebornByteBuf.refCnt());
   }
+
+  private RemoteShuffleDescriptor shuffleDescriptor() {
+    byte[] bytes = new byte[16];
+    new Random().nextBytes(bytes);
+    return new RemoteShuffleDescriptor(
+        new JobID(bytes).toString(),
+        new JobID(bytes),
+        new JobID(bytes).toString(),
+        new ResultPartitionID(),
+        new RemoteShuffleResource(
+            "1", 1, System.currentTimeMillis(), new 
ShuffleResourceDescriptor(1, 1, 1, 0)));
+  }
 }
diff --git 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 348fb3556..32b22fa1e 100644
--- 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
 import org.apache.flink.util.CloseableIterator;
@@ -59,7 +60,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
 
   public RemoteShuffleInputGate(
       CelebornConf celebornConf,
-      String taskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -67,7 +68,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       int numConcurrentReading) {
     super(
         celebornConf,
-        taskName,
+        ownerContext,
         gateIndex,
         gateDescriptor,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 54737f03a..faf733bef 100644
--- 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -38,7 +39,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
 
   @Override
   protected RemoteShuffleInputGate createInputGate(
-      String owningTaskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -46,8 +47,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, compressionCodec);
     return new RemoteShuffleInputGate(
-        this.celebornConf,
-        owningTaskName,
+        celebornConf,
+        ownerContext,
         gateIndex,
         igdd,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 547d3fa08..363366b46 100644
--- 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      String taskNameWithSubtaskAndId,
+      ShuffleIOOwnerContext ownerContext,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory
       BufferCompressor bufferCompressor,
       RemoteShuffleDescriptor rsd) {
     return new RemoteShuffleResultPartition(
-        taskNameWithSubtaskAndId,
+        ownerContext.getOwnerName(),
         partitionIndex,
         id,
         type,
@@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory
             networkBufferSize,
             bufferPoolFactories.get(1),
             celebornConf,
-            numMappers));
+            numMappers,
+            ownerContext.getParentGroup()));
   }
 
   protected BufferCompressor getBufferCompressor() {
diff --git 
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 14c2a7b59..22c4764f2 100644
--- 
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,6 +44,7 @@ import java.util.stream.IntStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -68,9 +69,9 @@ import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 
 public class RemoteShuffleResultPartitionSuiteJ {
   private final int networkBufferSize = 32 * 1024;
-  private BufferCompressor bufferCompressor = new 
BufferCompressor(networkBufferSize, "lz4");
-  private RemoteShuffleOutputGate remoteShuffleOutputGate = 
mock(RemoteShuffleOutputGate.class);
-  private final String compressCodec = "LZ4";
+  private final BufferCompressor bufferCompressor = new 
BufferCompressor(networkBufferSize, "lz4");
+  private final RemoteShuffleOutputGate remoteShuffleOutputGate =
+      mock(RemoteShuffleOutputGate.class);
   private final CelebornConf conf = new CelebornConf();
   BufferDecompressor bufferDecompressor = new 
BufferDecompressor(networkBufferSize, "LZ4");
 
@@ -80,7 +81,7 @@ public class RemoteShuffleResultPartitionSuiteJ {
 
   private NetworkBufferPool globalBufferPool;
 
-  private BufferPool dataBufferPool;
+  private BufferPool sortBufferPool;
 
   private BufferPool nettyBufferPool;
 
@@ -99,8 +100,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
       outputGate.release();
     }
 
-    if (dataBufferPool != null) {
-      dataBufferPool.lazyDestroy();
+    if (sortBufferPool != null) {
+      sortBufferPool.lazyDestroy();
     }
     if (nettyBufferPool != null) {
       nettyBufferPool.lazyDestroy();
@@ -173,7 +174,9 @@ public class RemoteShuffleResultPartitionSuiteJ {
     random.nextBytes(dataWritten);
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.emitRecord(recordWritten, 0);
-    assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -206,7 +209,9 @@ public class RemoteShuffleResultPartitionSuiteJ {
     random.nextBytes(dataWritten);
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.broadcastRecord(recordWritten);
-    assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -245,20 +250,26 @@ public class RemoteShuffleResultPartitionSuiteJ {
 
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
-    assertEquals(3, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
-    assertEquals(2, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flush(0);
-    assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
 
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
-    assertEquals(3, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flushAll();
-    assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
             record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, 
numBytesWritten);
       }
     }
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     assertTrue(outputGate.isFinished());
@@ -357,14 +370,14 @@ public class RemoteShuffleResultPartitionSuiteJ {
 
   private void initResultPartitionWriter(
       int numSubpartitions,
-      int dataBufferPoolSize,
+      int sortBufferPoolSize,
       int nettyBufferPoolSize,
       boolean compressionEnabled,
       CelebornConf conf,
       int numMappers)
       throws Exception {
 
-    dataBufferPool = globalBufferPool.createBufferPool(dataBufferPoolSize, 
dataBufferPoolSize);
+    sortBufferPool = globalBufferPool.createBufferPool(sortBufferPoolSize, 
sortBufferPoolSize);
     nettyBufferPool = globalBufferPool.createBufferPool(nettyBufferPoolSize, 
nettyBufferPoolSize);
 
     outputGate =
@@ -384,7 +397,7 @@ public class RemoteShuffleResultPartitionSuiteJ {
               bufferSize,
               new ResultPartitionManager(),
               bufferCompressor,
-              () -> dataBufferPool,
+              () -> sortBufferPool,
               outputGate);
     } else {
       partitionWriter =
@@ -398,7 +411,7 @@ public class RemoteShuffleResultPartitionSuiteJ {
               bufferSize,
               new ResultPartitionManager(),
               null,
-              () -> dataBufferPool,
+              () -> sortBufferPool,
               outputGate);
     }
   }
@@ -439,7 +452,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferSize,
           bufferPoolFactory,
           celebornConf,
-          numMappers);
+          numMappers,
+          new UnregisteredMetricsGroup());
       isSetup = false;
       isFinished = false;
       isClosed = false;
@@ -551,7 +565,7 @@ public class RemoteShuffleResultPartitionSuiteJ {
     }
   }
 
-  private RemoteShuffleDescriptor getShuffleDescriptor() throws Exception {
+  private RemoteShuffleDescriptor getShuffleDescriptor() {
     Random random = new Random();
     byte[] bytes = new byte[16];
     random.nextBytes(bytes);
diff --git 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index a8fdf4774..56169b6d6 100644
--- 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
 import org.apache.flink.util.CloseableIterator;
@@ -59,7 +60,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
 
   public RemoteShuffleInputGate(
       CelebornConf celebornConf,
-      String taskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -67,7 +68,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       int numConcurrentReading) {
     super(
         celebornConf,
-        taskName,
+        ownerContext,
         gateIndex,
         gateDescriptor,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index ef8ea9655..f4102ec01 100644
--- 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -39,7 +40,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
   // For testing.
   @Override
   protected RemoteShuffleInputGate createInputGate(
-      String owningTaskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -47,8 +48,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, compressionCodec);
     return new RemoteShuffleInputGate(
-        this.celebornConf,
-        owningTaskName,
+        celebornConf,
+        ownerContext,
         gateIndex,
         igdd,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 547d3fa08..363366b46 100644
--- 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      String taskNameWithSubtaskAndId,
+      ShuffleIOOwnerContext ownerContext,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory
       BufferCompressor bufferCompressor,
       RemoteShuffleDescriptor rsd) {
     return new RemoteShuffleResultPartition(
-        taskNameWithSubtaskAndId,
+        ownerContext.getOwnerName(),
         partitionIndex,
         id,
         type,
@@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory
             networkBufferSize,
             bufferPoolFactories.get(1),
             celebornConf,
-            numMappers));
+            numMappers,
+            ownerContext.getParentGroup()));
   }
 
   protected BufferCompressor getBufferCompressor() {
diff --git 
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index c02e1ad8b..22c4764f2 100644
--- 
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,6 +44,7 @@ import java.util.stream.IntStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -174,6 +175,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.emitRecord(recordWritten, 0);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -207,6 +210,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.broadcastRecord(recordWritten);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -246,9 +251,13 @@ public class RemoteShuffleResultPartitionSuiteJ {
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
     assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
     assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flush(0);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -256,6 +265,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
     assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flushAll();
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
             record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, 
numBytesWritten);
       }
     }
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     assertTrue(outputGate.isFinished());
@@ -439,7 +452,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferSize,
           bufferPoolFactory,
           celebornConf,
-          numMappers);
+          numMappers,
+          new UnregisteredMetricsGroup());
       isSetup = false;
       isFinished = false;
       isClosed = false;
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 90041fa0e..0f93e6975 100644
--- 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
 import org.apache.flink.util.CloseableIterator;
@@ -59,7 +60,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
 
   public RemoteShuffleInputGate(
       CelebornConf celebornConf,
-      String taskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -67,7 +68,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       int numConcurrentReading) {
     super(
         celebornConf,
-        taskName,
+        ownerContext,
         gateIndex,
         gateDescriptor,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index ef8ea9655..f4102ec01 100644
--- 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -39,7 +40,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
   // For testing.
   @Override
   protected RemoteShuffleInputGate createInputGate(
-      String owningTaskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -47,8 +48,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, compressionCodec);
     return new RemoteShuffleInputGate(
-        this.celebornConf,
-        owningTaskName,
+        celebornConf,
+        ownerContext,
         gateIndex,
         igdd,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 547d3fa08..363366b46 100644
--- 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      String taskNameWithSubtaskAndId,
+      ShuffleIOOwnerContext ownerContext,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory
       BufferCompressor bufferCompressor,
       RemoteShuffleDescriptor rsd) {
     return new RemoteShuffleResultPartition(
-        taskNameWithSubtaskAndId,
+        ownerContext.getOwnerName(),
         partitionIndex,
         id,
         type,
@@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory
             networkBufferSize,
             bufferPoolFactories.get(1),
             celebornConf,
-            numMappers));
+            numMappers,
+            ownerContext.getParentGroup()));
   }
 
   protected BufferCompressor getBufferCompressor() {
diff --git 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index c02e1ad8b..22c4764f2 100644
--- 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,6 +44,7 @@ import java.util.stream.IntStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -174,6 +175,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.emitRecord(recordWritten, 0);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -207,6 +210,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.broadcastRecord(recordWritten);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -246,9 +251,13 @@ public class RemoteShuffleResultPartitionSuiteJ {
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
     assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
     assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flush(0);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -256,6 +265,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
     assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flushAll();
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
             record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, 
numBytesWritten);
       }
     }
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     assertTrue(outputGate.isFinished());
@@ -439,7 +452,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferSize,
           bufferPoolFactory,
           celebornConf,
-          numMappers);
+          numMappers,
+          new UnregisteredMetricsGroup());
       isSetup = false;
       isFinished = false;
       isClosed = false;
diff --git 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index dfe2eeca8..6d513265e 100644
--- 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
 import org.apache.flink.util.CloseableIterator;
@@ -60,7 +61,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
 
   public RemoteShuffleInputGate(
       CelebornConf celebornConf,
-      String taskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -68,7 +69,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       int numConcurrentReading) {
     super(
         celebornConf,
-        taskName,
+        ownerContext,
         gateIndex,
         gateDescriptor,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index ef8ea9655..f4102ec01 100644
--- 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -39,7 +40,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
   // For testing.
   @Override
   protected RemoteShuffleInputGate createInputGate(
-      String owningTaskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -47,8 +48,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, compressionCodec);
     return new RemoteShuffleInputGate(
-        this.celebornConf,
-        owningTaskName,
+        celebornConf,
+        ownerContext,
         gateIndex,
         igdd,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 547d3fa08..363366b46 100644
--- 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -47,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      String taskNameWithSubtaskAndId,
+      ShuffleIOOwnerContext ownerContext,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -59,7 +60,7 @@ public class RemoteShuffleResultPartitionFactory
       BufferCompressor bufferCompressor,
       RemoteShuffleDescriptor rsd) {
     return new RemoteShuffleResultPartition(
-        taskNameWithSubtaskAndId,
+        ownerContext.getOwnerName(),
         partitionIndex,
         id,
         type,
@@ -75,7 +76,8 @@ public class RemoteShuffleResultPartitionFactory
             networkBufferSize,
             bufferPoolFactories.get(1),
             celebornConf,
-            numMappers));
+            numMappers,
+            ownerContext.getParentGroup()));
   }
 
   protected BufferCompressor getBufferCompressor() {
diff --git 
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index fece38532..3cc25bcbc 100644
--- 
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,6 +44,7 @@ import java.util.stream.IntStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -174,6 +175,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.emitRecord(recordWritten, 0);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -207,6 +210,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.broadcastRecord(recordWritten);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -246,9 +251,13 @@ public class RemoteShuffleResultPartitionSuiteJ {
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
     assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
     assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flush(0);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -256,6 +265,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
     assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flushAll();
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -307,6 +318,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
             record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, 
numBytesWritten);
       }
     }
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     assertTrue(outputGate.isFinished());
@@ -441,7 +454,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferSize,
           bufferPoolFactory,
           celebornConf,
-          numMappers);
+          numMappers,
+          new UnregisteredMetricsGroup());
       isSetup = false;
       isFinished = false;
       isClosed = false;
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index dfe2eeca8..6d513265e 100644
--- 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
 import org.apache.flink.util.CloseableIterator;
@@ -60,7 +61,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
 
   public RemoteShuffleInputGate(
       CelebornConf celebornConf,
-      String taskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -68,7 +69,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       int numConcurrentReading) {
     super(
         celebornConf,
-        taskName,
+        ownerContext,
         gateIndex,
         gateDescriptor,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 6f1fd803e..bb4abbc70 100644
--- 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -40,7 +41,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
   // For testing.
   @Override
   protected RemoteShuffleInputGate createInputGate(
-      String owningTaskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -48,8 +49,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, 
CompressionCodec.valueOf(compressionCodec));
     return new RemoteShuffleInputGate(
-        this.celebornConf,
-        owningTaskName,
+        celebornConf,
+        ownerContext,
         gateIndex,
         igdd,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index a9742cc86..69b9a71bc 100644
--- 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -48,7 +49,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      String taskNameWithSubtaskAndId,
+      ShuffleIOOwnerContext ownerContext,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -60,7 +61,7 @@ public class RemoteShuffleResultPartitionFactory
       BufferCompressor bufferCompressor,
       RemoteShuffleDescriptor rsd) {
     return new RemoteShuffleResultPartition(
-        taskNameWithSubtaskAndId,
+        ownerContext.getOwnerName(),
         partitionIndex,
         id,
         type,
@@ -76,7 +77,8 @@ public class RemoteShuffleResultPartitionFactory
             networkBufferSize,
             bufferPoolFactories.get(1),
             celebornConf,
-            numMappers));
+            numMappers,
+            ownerContext.getParentGroup()));
   }
 
   @Override
diff --git 
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 99f502d87..0ceabcca8 100644
--- 
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.common.JobID;
 import 
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -177,6 +178,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.emitRecord(recordWritten, 0);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -210,6 +213,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.broadcastRecord(recordWritten);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -249,9 +254,13 @@ public class RemoteShuffleResultPartitionSuiteJ {
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
     assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
     assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flush(0);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -259,6 +268,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
     assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flushAll();
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -310,6 +321,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
             record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, 
numBytesWritten);
       }
     }
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     assertTrue(outputGate.isFinished());
@@ -444,7 +457,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferSize,
           bufferPoolFactory,
           celebornConf,
-          numMappers);
+          numMappers,
+          new UnregisteredMetricsGroup());
       isSetup = false;
       isFinished = false;
       isClosed = false;
diff --git 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 85443db36..7434b8cff 100644
--- 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
 import org.apache.flink.util.CloseableIterator;
@@ -60,7 +61,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
 
   public RemoteShuffleInputGate(
       CelebornConf celebornConf,
-      String taskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -68,7 +69,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       int numConcurrentReading) {
     super(
         celebornConf,
-        taskName,
+        ownerContext,
         gateIndex,
         gateDescriptor,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 6f1fd803e..bb4abbc70 100644
--- 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -40,7 +41,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
   // For testing.
   @Override
   protected RemoteShuffleInputGate createInputGate(
-      String owningTaskName,
+      ShuffleIOOwnerContext ownerContext,
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -48,8 +49,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, 
CompressionCodec.valueOf(compressionCodec));
     return new RemoteShuffleInputGate(
-        this.celebornConf,
-        owningTaskName,
+        celebornConf,
+        ownerContext,
         gateIndex,
         igdd,
         bufferPoolFactory,
diff --git 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index a9742cc86..69b9a71bc 100644
--- 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -48,7 +49,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      String taskNameWithSubtaskAndId,
+      ShuffleIOOwnerContext ownerContext,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -60,7 +61,7 @@ public class RemoteShuffleResultPartitionFactory
       BufferCompressor bufferCompressor,
       RemoteShuffleDescriptor rsd) {
     return new RemoteShuffleResultPartition(
-        taskNameWithSubtaskAndId,
+        ownerContext.getOwnerName(),
         partitionIndex,
         id,
         type,
@@ -76,7 +77,8 @@ public class RemoteShuffleResultPartitionFactory
             networkBufferSize,
             bufferPoolFactories.get(1),
             celebornConf,
-            numMappers));
+            numMappers,
+            ownerContext.getParentGroup()));
   }
 
   @Override
diff --git 
a/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 99f502d87..0ceabcca8 100644
--- 
a/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.common.JobID;
 import 
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -177,6 +178,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.emitRecord(recordWritten, 0);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -210,6 +213,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
     partitionWriter.broadcastRecord(recordWritten);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(1, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     partitionWriter.close();
@@ -249,9 +254,13 @@ public class RemoteShuffleResultPartitionSuiteJ {
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
     assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(2, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
     assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(3, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flush(0);
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -259,6 +268,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
     partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
     assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(5, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.flushAll();
     assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
@@ -310,6 +321,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
             record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, 
numBytesWritten);
       }
     }
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOut().getCount());
+    assertEquals(numRecords, 
outputGate.shuffleIOMetricGroup.getNumRecordsOutRate().getCount());
 
     partitionWriter.finish();
     assertTrue(outputGate.isFinished());
@@ -444,7 +457,8 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferSize,
           bufferPoolFactory,
           celebornConf,
-          numMappers);
+          numMappers,
+          new UnregisteredMetricsGroup());
       isSetup = false;
       isFinished = false;
       isClosed = false;


Reply via email to