This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 68a1db1e3 [CELEBORN-2005][FOLLOWUP] Introduce ShuffleMetricGroup for 
numBytesIn, numBytesOut, numRecordsOut, numBytesInPerSecond, 
numBytesOutPerSecond, numRecordsOutPerSecond metrics
68a1db1e3 is described below

commit 68a1db1e3b037b8890d551d034dd73af3525f84b
Author: SteNicholas <programg...@163.com>
AuthorDate: Fri May 30 14:54:28 2025 +0800

    [CELEBORN-2005][FOLLOWUP] Introduce ShuffleMetricGroup for numBytesIn, 
numBytesOut, numRecordsOut, numBytesInPerSecond, numBytesOutPerSecond, 
numRecordsOutPerSecond metrics
    
    ### What changes were proposed in this pull request?
    
    Introduce `ShuffleMetricGroup` for `numBytesIn`, `numBytesOut`, 
`numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, 
`numRecordsOutPerSecond` metrics.
    
    Follow up #3272.
    
    ### Why are the changes needed?
    
    `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, 
`numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics should put shuffle id 
into variables, which could introduce `ShuffleMetricGroup` to support. 
Meanwhile, #3272 would print many same logs as follows that shoud be improved:
    
    ```
    2025-05-28 10:48:54,433 WARN  [flink-akka.actor.default-dispatcher-18] 
org.apache.flink.metrics.MetricGroup                         [] - Name 
collision: Group already contains a Metric with the name 'numRecordsOut'. 
Metric will not be reported.[11.66.62.202, taskmanager, 
antc4flink3980005426-taskmanager-3-70, antc4flink3980005426, 
[vertex-2]HashJoin(joinType=[LeftOuterJoin], where=[(f0 = f00)], select=[f0, 
f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60], build=[right]) -> S 
[...]
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Introduce `celeborn.client.flink.metrics.scope.shuffle` config option to 
define the scope format string that is applied to all metrics scoped to a 
shuffle:
    
    - Variables:
       - Shuffle: `<task_id>, <task_name>, <task_attempt_id>, 
<task_attempt_num>, <subtask_index>, <shuffle_id>`.
    - Metrics:
    
    Scope | Metrics | Description | Type
    -- | -- | -- | --
    Shuffle | numBytesIn | The total number of bytes this shuffle has read. | 
Counter |
    Shuffle | numBytesOut| The total number of bytes this shuffle has written. 
| Counter |
    Shuffle | numRecordsOut | The total number of records this shuffle has 
written. | Counter |
    Shuffle | numBytesInPerSecond | The number of bytes this shuffle reads per 
second. | Meter |
    Shuffle | numBytesOutPerSecond | The number of bytes this shuffle writes 
per second. | Meter |
    Shuffle | numRecordsOutPerSecond | The number of records this shuffle 
writes per second. | Meter |
    
    ### How was this patch tested?
    
    Manual test.
    
    
![image](https://github.com/user-attachments/assets/a10c18ab-84f9-44f5-bb2d-e6b08e5bc64e)
    
![image](https://github.com/user-attachments/assets/0cb29c17-3388-4608-b7a4-ee7e3c9b43c1)
    
    Closes #3296 from SteNicholas/CELEBORN-2005.
    
    Authored-by: SteNicholas <programg...@163.com>
    Signed-off-by: Weijie Guo <res...@163.com>
---
 .../flink/AbstractRemoteShuffleInputGate.java      |   8 +-
 .../AbstractRemoteShuffleInputGateFactory.java     |  13 ++-
 ...bstractRemoteShuffleResultPartitionFactory.java |  29 +++---
 .../plugin/flink/RemoteShuffleEnvironment.java     |  35 ++++++-
 .../flink/RemoteShuffleInputGateDelegation.java    |  20 ++--
 .../plugin/flink/RemoteShuffleOutputGate.java      |  11 +-
 .../metrics/dump/ShuffleQueryScopeInfo.java        |  64 ++++++++++++
 .../metrics/groups}/ShuffleIOMetricGroup.java      |  21 ++--
 .../runtime/metrics/groups/ShuffleMetricGroup.java | 114 +++++++++++++++++++++
 .../runtime/metrics/scope/ShuffleScopeFormat.java  |  78 ++++++++++++++
 .../flink/RemoteShuffleOutputGateSuiteJ.java       |  10 +-
 .../plugin/flink/RemoteShuffleInputGate.java       |   8 +-
 .../flink/RemoteShuffleInputGateFactory.java       |   8 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  11 +-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  |  10 +-
 .../plugin/flink/RemoteShuffleInputGate.java       |   8 +-
 .../flink/RemoteShuffleInputGateFactory.java       |   8 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  11 +-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  |  10 +-
 .../plugin/flink/RemoteShuffleInputGate.java       |   8 +-
 .../flink/RemoteShuffleInputGateFactory.java       |   8 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  11 +-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  |  10 +-
 .../plugin/flink/RemoteShuffleInputGate.java       |   8 +-
 .../flink/RemoteShuffleInputGateFactory.java       |   8 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  11 +-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  |  10 +-
 .../plugin/flink/RemoteShuffleInputGate.java       |   8 +-
 .../flink/RemoteShuffleInputGateFactory.java       |   8 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  11 +-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  |  10 +-
 .../plugin/flink/RemoteShuffleInputGate.java       |   8 +-
 .../flink/RemoteShuffleInputGateFactory.java       |   8 +-
 .../flink/RemoteShuffleResultPartitionFactory.java |  11 +-
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  |  10 +-
 .../org/apache/celeborn/common/CelebornConf.scala  |  11 ++
 docs/configuration/client.md                       |   1 +
 37 files changed, 529 insertions(+), 108 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
index d04497536..9f3b8b9a9 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java
@@ -20,6 +20,7 @@ package org.apache.celeborn.plugin.flink;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.function.SupplierWithException;
@@ -51,7 +53,8 @@ public abstract class AbstractRemoteShuffleInputGate extends 
IndexedInputGate {
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
       BufferDecompressor bufferDecompressor,
-      int numConcurrentReading) {
+      int numConcurrentReading,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     Tuple2<Integer, Integer> indexRange = 
getConsumedSubpartitionIndexRange(gateDescriptor);
     inputGateDelegation =
         new RemoteShuffleInputGateDelegation(
@@ -64,7 +67,8 @@ public abstract class AbstractRemoteShuffleInputGate extends 
IndexedInputGate {
             numConcurrentReading,
             availabilityHelper,
             indexRange.f0,
-            indexRange.f1);
+            indexRange.f1,
+            shuffleIOMetricGroups);
   }
 
   /** Setup gate and build network connections. */
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
index 0c9e2e8bb..22e39ffdf 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
@@ -19,12 +19,14 @@
 package org.apache.celeborn.plugin.flink;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 import org.slf4j.Logger;
@@ -81,7 +83,10 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
 
   /** Create RemoteShuffleInputGate from {@link 
InputGateDeploymentDescriptor}. */
   public IndexedInputGate create(
-      ShuffleIOOwnerContext ownerContext, int gateIndex, 
InputGateDeploymentDescriptor igdd) {
+      ShuffleIOOwnerContext ownerContext,
+      int gateIndex,
+      InputGateDeploymentDescriptor igdd,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     LOG.info(
         "Create input gate -- number of buffers per input gate={}, "
             + "number of concurrent readings={}.",
@@ -96,7 +101,8 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
         gateIndex,
         igdd,
         bufferPoolFactory,
-        celebornConf.shuffleCompressionCodec().name());
+        celebornConf.shuffleCompressionCodec().name(),
+        shuffleIOMetricGroups);
   }
 
   protected abstract IndexedInputGate createInputGate(
@@ -104,7 +110,8 @@ public abstract class AbstractRemoteShuffleInputGateFactory 
{
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
-      String compressionCodec);
+      String compressionCodec,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroupMap);
 
   private SupplierWithException<BufferPool, IOException> 
createBufferPoolFactory(
       BufferPoolFactory bufferPoolFactory, int numBuffers, boolean 
supportFloatingBuffers) {
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
index 929939e47..55a7d09e1 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,10 +89,11 @@ public abstract class 
AbstractRemoteShuffleResultPartitionFactory {
   }
 
   public ResultPartition create(
-      ShuffleIOOwnerContext ownerContext,
+      String taskNameWithSubtaskAndId,
       int partitionIndex,
       ResultPartitionDeploymentDescriptor desc,
-      CelebornConf celebornConf) {
+      CelebornConf celebornConf,
+      ShuffleIOMetricGroup shuffleIOMetricGroup) {
     LOG.info(
         "Create result partition -- number of buffers per result partition={}, 
"
             + "number of subpartitions={}.",
@@ -100,7 +101,7 @@ public abstract class 
AbstractRemoteShuffleResultPartitionFactory {
         desc.getNumberOfSubpartitions());
 
     return create(
-        ownerContext,
+        taskNameWithSubtaskAndId,
         partitionIndex,
         desc.getShuffleDescriptor().getResultPartitionID(),
         desc.getPartitionType(),
@@ -109,11 +110,12 @@ public abstract class 
AbstractRemoteShuffleResultPartitionFactory {
         createBufferPoolFactory(),
         (RemoteShuffleDescriptor) desc.getShuffleDescriptor(),
         celebornConf,
-        desc.getTotalNumberOfPartitions());
+        desc.getTotalNumberOfPartitions(),
+        shuffleIOMetricGroup);
   }
 
   public ResultPartition create(
-      ShuffleIOOwnerContext ownerContext,
+      String taskNameWithSubtaskAndId,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -122,10 +124,11 @@ public abstract class 
AbstractRemoteShuffleResultPartitionFactory {
       List<SupplierWithException<BufferPool, IOException>> bufferPoolFactories,
       RemoteShuffleDescriptor shuffleDescriptor,
       CelebornConf celebornConf,
-      int numMappers) {
+      int numMappers,
+      ShuffleIOMetricGroup shuffleIOMetricGroup) {
     ResultPartition partition =
         createRemoteShuffleResultPartitionInternal(
-            ownerContext,
+            taskNameWithSubtaskAndId,
             partitionIndex,
             id,
             type,
@@ -135,13 +138,14 @@ public abstract class 
AbstractRemoteShuffleResultPartitionFactory {
             celebornConf,
             numMappers,
             getBufferCompressor(),
-            shuffleDescriptor);
-    LOG.debug("{}: Initialized {}", ownerContext.getOwnerName(), this);
+            shuffleDescriptor,
+            shuffleIOMetricGroup);
+    LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
     return partition;
   }
 
   abstract ResultPartition createRemoteShuffleResultPartitionInternal(
-      ShuffleIOOwnerContext ownerContext,
+      String taskNameWithSubtaskAndId,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -151,7 +155,8 @@ public abstract class 
AbstractRemoteShuffleResultPartitionFactory {
       CelebornConf celebornConf,
       int numMappers,
       BufferCompressor bufferCompressor,
-      RemoteShuffleDescriptor rsd);
+      RemoteShuffleDescriptor rsd,
+      ShuffleIOMetricGroup shuffleIOMetricGroup);
 
   /**
    * Used to create 2 buffer pools -- sorting buffer pool (7/8), 
transportation buffer pool (1/8).
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
index dc197ae3a..e64b1202e 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
@@ -22,11 +22,13 @@ import static 
org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
 import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
 import static 
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT;
 import static 
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT;
+import static 
org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup.createShuffleIOMetricGroup;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -45,6 +47,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -52,6 +55,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.util.JavaUtils;
 import org.apache.celeborn.plugin.flink.netty.NettyShuffleEnvironmentWrapper;
 
 /**
@@ -89,6 +93,9 @@ public class RemoteShuffleEnvironment
   private final ConcurrentHashMap.KeySetView<IntermediateResultPartitionID, 
Boolean>
       nettyResultPartitionIds = ConcurrentHashMap.newKeySet();
 
+  private final Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups =
+      JavaUtils.newConcurrentHashMap();
+
   /**
    * @param networkBufferPool Network buffer pool for shuffle read and shuffle 
write.
    * @param resultPartitionManager A trivial {@link ResultPartitionManager}.
@@ -113,6 +120,9 @@ public class RemoteShuffleEnvironment
   public void close() {
     LOG.info("Close RemoteShuffleEnvironment.");
     synchronized (lock) {
+      nettyResultIds.clear();
+      nettyResultPartitionIds.clear();
+      shuffleIOMetricGroups.clear();
       try {
         networkBufferPool.destroyAllBufferPools();
       } catch (Throwable t) {
@@ -190,8 +200,19 @@ public class RemoteShuffleEnvironment
       CelebornConf conf) {
     if (resultPartitionDeploymentDescriptor.getShuffleDescriptor()
         instanceof RemoteShuffleDescriptor) {
+      int shuffleId =
+          ((RemoteShuffleDescriptor) 
resultPartitionDeploymentDescriptor.getShuffleDescriptor())
+              .getShuffleResource()
+              .getMapPartitionShuffleDescriptor()
+              .getShuffleId();
+      ;
       return resultPartitionFactory.create(
-          ownerContext, index, resultPartitionDeploymentDescriptor, conf);
+          ownerContext.getOwnerName(),
+          index,
+          resultPartitionDeploymentDescriptor,
+          conf,
+          shuffleIOMetricGroups.computeIfAbsent(
+              shuffleId, k -> createShuffleIOMetricGroup(ownerContext, 
shuffleId, conf)));
     } else {
       nettyResultIds.add(resultPartitionDeploymentDescriptor.getResultId());
       
nettyResultPartitionIds.add(resultPartitionDeploymentDescriptor.getPartitionId());
@@ -229,7 +250,12 @@ public class RemoteShuffleEnvironment
         InputGateDeploymentDescriptor igdd = 
inputGateDescriptors.get(gateIndex);
         IndexedInputGate inputGate =
             createInputGateInternal(
-                ownerContext, producerStateProvider, gateIndex, igdd, 
inputChannelMetrics);
+                ownerContext,
+                producerStateProvider,
+                gateIndex,
+                igdd,
+                inputChannelMetrics,
+                shuffleIOMetricGroups);
         inputGates[gateIndex] = inputGate;
       }
       return Arrays.asList(inputGates);
@@ -241,12 +267,13 @@ public class RemoteShuffleEnvironment
       PartitionProducerStateProvider producerStateProvider,
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
-      InputChannelMetrics inputChannelMetrics) {
+      InputChannelMetrics inputChannelMetrics,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     return nettyResultIds.contains(igdd.getConsumedResultId())
         ? shuffleEnvironmentWrapper
             .nettyInputGateFactory()
             .create(ownerContext, gateIndex, igdd, producerStateProvider, 
inputChannelMetrics)
-        : inputGateFactory.create(ownerContext, gateIndex, igdd);
+        : inputGateFactory.create(ownerContext, gateIndex, igdd, 
shuffleIOMetricGroups);
   }
 
   @VisibleForTesting
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
index fe871d68c..99f144c1c 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
@@ -19,6 +19,7 @@
 package org.apache.celeborn.plugin.flink;
 
 import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
+import static 
org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup.createShuffleIOMetricGroup;
 
 import java.io.IOException;
 import java.util.*;
@@ -28,7 +29,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -43,6 +43,7 @@ import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -58,7 +59,6 @@ import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
 import org.apache.celeborn.plugin.flink.buffer.TransferBufferPool;
 import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
-import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 
 public class RemoteShuffleInputGateDelegation {
@@ -133,7 +133,7 @@ public class RemoteShuffleInputGateDelegation {
   private int endSubIndex;
   private boolean partitionConnectionExceptionEnabled;
 
-  private final MetricGroup taskIOMetricGroup;
+  private final ShuffleIOMetricGroup shuffleIOMetricGroup;
 
   public RemoteShuffleInputGateDelegation(
       CelebornConf celebornConf,
@@ -145,9 +145,9 @@ public class RemoteShuffleInputGateDelegation {
       int numConcurrentReading,
       AvailabilityProvider.AvailabilityHelper availabilityHelper,
       int startSubIndex,
-      int endSubIndex) {
+      int endSubIndex,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     this.taskName = ownerContext.getOwnerName();
-    this.taskIOMetricGroup = ownerContext.getParentGroup();
     this.gateIndex = gateIndex;
     this.gateDescriptor = gateDescriptor;
     this.bufferPoolFactory = bufferPoolFactory;
@@ -161,6 +161,14 @@ public class RemoteShuffleInputGateDelegation {
     RemoteShuffleDescriptor remoteShuffleDescriptor =
         (RemoteShuffleDescriptor) gateDescriptor.getShuffleDescriptors()[0];
     RemoteShuffleResource shuffleResource = 
remoteShuffleDescriptor.getShuffleResource();
+    int shuffleId =
+        remoteShuffleDescriptor
+            .getShuffleResource()
+            .getMapPartitionShuffleDescriptor()
+            .getShuffleId();
+    this.shuffleIOMetricGroup =
+        shuffleIOMetricGroups.computeIfAbsent(
+            shuffleId, k -> createShuffleIOMetricGroup(ownerContext, 
shuffleId, celebornConf));
 
     try {
       String appUniqueId =
@@ -204,8 +212,6 @@ public class RemoteShuffleInputGateDelegation {
       RemoteShuffleDescriptor remoteDescriptor = (RemoteShuffleDescriptor) 
descriptor.getRight();
       ShuffleResourceDescriptor shuffleDescriptor =
           
remoteDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
-      ShuffleIOMetricGroup shuffleIOMetricGroup =
-          new ShuffleIOMetricGroup(taskIOMetricGroup, 
shuffleDescriptor.getShuffleId());
 
       LOG.debug("create shuffle reader for descriptor {}", shuffleDescriptor);
 
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
index fe25492fd..2630617a1 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
@@ -21,10 +21,10 @@ import java.io.IOException;
 import java.util.Optional;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.util.function.SupplierWithException;
 import org.slf4j.Logger;
@@ -37,7 +37,6 @@ import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.plugin.flink.buffer.BufferHeader;
 import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
 import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
-import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup;
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 import org.apache.celeborn.plugin.flink.utils.Utils;
 
@@ -99,7 +98,7 @@ public class RemoteShuffleOutputGate {
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
       CelebornConf celebornConf,
       int numMappers,
-      MetricGroup taskIOMetricGroup) {
+      ShuffleIOMetricGroup shuffleIOMetricGroup) {
     this(
         shuffleDesc,
         numSubs,
@@ -107,7 +106,7 @@ public class RemoteShuffleOutputGate {
         bufferPoolFactory,
         celebornConf,
         numMappers,
-        taskIOMetricGroup,
+        shuffleIOMetricGroup,
         null);
   }
 
@@ -118,7 +117,7 @@ public class RemoteShuffleOutputGate {
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
       CelebornConf celebornConf,
       int numMappers,
-      MetricGroup taskIOMetricGroup,
+      ShuffleIOMetricGroup shuffleIOMetricGroup,
       FlinkShuffleClientImpl flinkShuffleClient) {
     this.shuffleDesc = shuffleDesc;
     this.numSubs = numSubs;
@@ -141,7 +140,7 @@ public class RemoteShuffleOutputGate {
         shuffleDesc.getShuffleResource().getLifecycleManagerTimestamp();
     this.flinkShuffleClient = flinkShuffleClient == null ? getShuffleClient() 
: flinkShuffleClient;
     this.maxReviveTimes = celebornConf.clientPushMaxReviveTimes();
-    this.shuffleIOMetricGroup = new ShuffleIOMetricGroup(taskIOMetricGroup, 
shuffleId);
+    this.shuffleIOMetricGroup = shuffleIOMetricGroup;
   }
 
   /** Initialize transportation gate. */
diff --git 
a/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/dump/ShuffleQueryScopeInfo.java
 
b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/dump/ShuffleQueryScopeInfo.java
new file mode 100644
index 000000000..3c5e39d57
--- /dev/null
+++ 
b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/dump/ShuffleQueryScopeInfo.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.dump;
+
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo.TaskQueryScopeInfo;
+
+/**
+ * Container for the shuffle scope. Stores the id of the job/vertex, the 
subtask index and the id of
+ * the shuffle.
+ */
+public class ShuffleQueryScopeInfo extends TaskQueryScopeInfo {
+
+  public final int shuffleId;
+
+  public static final byte INFO_CATEGORY_SHUFFLE = 6;
+
+  public ShuffleQueryScopeInfo(
+      String jobID, String vertexID, int subtaskIndex, int attemptNumber, int 
shuffleId) {
+    this(jobID, vertexID, subtaskIndex, attemptNumber, shuffleId, "");
+  }
+
+  public ShuffleQueryScopeInfo(
+      String jobID,
+      String vertexID,
+      int subtaskIndex,
+      int attemptNumber,
+      int shuffleId,
+      String scope) {
+    super(jobID, vertexID, subtaskIndex, attemptNumber, scope);
+    this.shuffleId = shuffleId;
+  }
+
+  @Override
+  public ShuffleQueryScopeInfo copy(String additionalScope) {
+    return new ShuffleQueryScopeInfo(
+        this.jobID,
+        this.vertexID,
+        this.subtaskIndex,
+        this.attemptNumber,
+        this.shuffleId,
+        concatScopes(additionalScope));
+  }
+
+  @Override
+  public byte getCategory() {
+    return INFO_CATEGORY_SHUFFLE;
+  }
+}
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java
 
b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleIOMetricGroup.java
similarity index 82%
rename from 
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java
rename to 
client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleIOMetricGroup.java
index 16a9f4cf5..6690f8813 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/ShuffleIOMetricGroup.java
+++ 
b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleIOMetricGroup.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.plugin.flink.metric;
-
-import static 
org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup;
+package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
@@ -26,7 +24,9 @@ import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+
+import org.apache.celeborn.common.CelebornConf;
 
 /**
  * Metric group that contains shareable pre-defined IO-related metrics. The 
metrics registration is
@@ -42,8 +42,8 @@ public class ShuffleIOMetricGroup extends 
ProxyMetricGroup<MetricGroup> {
   private final Meter numBytesOutRate;
   private final Meter numRecordsOutRate;
 
-  public ShuffleIOMetricGroup(MetricGroup taskIOMetricGroup, int shuffleId) {
-    
super(createShuffleIOOwnerMetricGroup(taskIOMetricGroup).addGroup(shuffleId));
+  public ShuffleIOMetricGroup(ShuffleMetricGroup parent) {
+    super(parent);
     this.numBytesIn = counter(MetricNames.IO_NUM_BYTES_IN, new 
SimpleCounter());
     this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT, new 
SimpleCounter());
     this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new 
SimpleCounter());
@@ -80,4 +80,13 @@ public class ShuffleIOMetricGroup extends 
ProxyMetricGroup<MetricGroup> {
   public Meter getNumRecordsOutRate() {
     return numRecordsOutRate;
   }
+
+  public static ShuffleIOMetricGroup createShuffleIOMetricGroup(
+      ShuffleIOOwnerContext ownerContext, int shuffleId, CelebornConf 
celebornConf) {
+    return new ShuffleMetricGroup(
+            ownerContext.getParentGroup(),
+            shuffleId,
+            celebornConf.clientFlinkMetricsScopeNamingShuffle())
+        .getIOMetricGroup();
+  }
 }
diff --git 
a/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleMetricGroup.java
 
b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleMetricGroup.java
new file mode 100644
index 000000000..d2aa30151
--- /dev/null
+++ 
b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/groups/ShuffleMetricGroup.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import static 
org.apache.flink.runtime.metrics.scope.ShuffleScopeFormat.SCOPE_SHUFFLE_ID;
+import static 
org.apache.flink.runtime.metrics.scope.ShuffleScopeFormat.fromConfig;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.dump.ShuffleQueryScopeInfo;
+
+/** Special {@link MetricGroup} representing a Flink runtime Shuffle. */
+public class ShuffleMetricGroup extends ComponentMetricGroup<TaskMetricGroup> {
+
+  /** The shuffle id uniquely identifying the executed shuffle represented by 
this metrics group. */
+  private final int shuffleId;
+
+  private final ShuffleIOMetricGroup ioMetrics;
+
+  // ------------------------------------------------------------------------
+
+  public ShuffleMetricGroup(MetricGroup taskIOMetricGroup, int shuffleId, 
String scopeFormat) {
+    this(((TaskIOMetricGroup) taskIOMetricGroup).parentMetricGroup, shuffleId, 
scopeFormat);
+  }
+
+  public ShuffleMetricGroup(TaskMetricGroup parent, int shuffleId, String 
scopeFormat) {
+    super(
+        parent.registry,
+        fromConfig(scopeFormat, 
parent.registry.getScopeFormats().getTaskFormat())
+            .formatScope(checkNotNull(parent), shuffleId),
+        parent);
+    this.shuffleId = shuffleId;
+    this.ioMetrics = new ShuffleIOMetricGroup(this);
+  }
+
+  // ------------------------------------------------------------------------
+  //  properties
+  // ------------------------------------------------------------------------
+
+  public final TaskMetricGroup parent() {
+    return parent;
+  }
+
+  public int shuffleId() {
+    return shuffleId;
+  }
+
+  /**
+   * Returns the {@link ShuffleIOMetricGroup} for this shuffle.
+   *
+   * @return {@link ShuffleIOMetricGroup} for this shuffle.
+   */
+  public ShuffleIOMetricGroup getIOMetricGroup() {
+    return ioMetrics;
+  }
+
+  @Override
+  protected ShuffleQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter 
filter) {
+    return new ShuffleQueryScopeInfo(
+        this.parent.parent.jobId.toString(),
+        this.parent.vertexId.toString(),
+        this.parent.subtaskIndex,
+        this.parent.attemptNumber(),
+        this.shuffleId);
+  }
+
+  // ------------------------------------------------------------------------
+  //  cleanup
+  // ------------------------------------------------------------------------
+
+  @Override
+  public void close() {
+    super.close();
+  }
+
+  // ------------------------------------------------------------------------
+  //  Component Metric Group Specifics
+  // ------------------------------------------------------------------------
+
+  @Override
+  protected void putVariables(Map<String, String> variables) {
+    variables.put(SCOPE_SHUFFLE_ID, String.valueOf(shuffleId));
+  }
+
+  @Override
+  protected Iterable<? extends ComponentMetricGroup<?>> subComponents() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  protected String getGroupName(CharacterFilter filter) {
+    return "shuffle";
+  }
+}
diff --git 
a/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/scope/ShuffleScopeFormat.java
 
b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/scope/ShuffleScopeFormat.java
new file mode 100644
index 000000000..a81a3a2e7
--- /dev/null
+++ 
b/client-flink/common/src/main/java/org/apache/flink/runtime/metrics/scope/ShuffleScopeFormat.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.scope;
+
+import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+
+/** The scope format for the {@link ShuffleMetricGroup}. */
+public class ShuffleScopeFormat extends ScopeFormat {
+
+  public static final String SCOPE_SHUFFLE_ID = asVariable("shuffle_id");
+
+  public ShuffleScopeFormat(String format, TaskScopeFormat parentFormat) {
+    super(
+        format,
+        parentFormat,
+        new String[] {
+          SCOPE_HOST,
+          SCOPE_TASKMANAGER_ID,
+          SCOPE_JOB_ID,
+          SCOPE_JOB_NAME,
+          SCOPE_TASK_VERTEX_ID,
+          SCOPE_TASK_ATTEMPT_ID,
+          SCOPE_TASK_NAME,
+          SCOPE_TASK_SUBTASK_INDEX,
+          SCOPE_TASK_ATTEMPT_NUM,
+          SCOPE_SHUFFLE_ID
+        });
+  }
+
+  public String[] formatScope(TaskMetricGroup parent, int shuffleId) {
+    final String[] template = copyTemplate();
+    final String[] values = {
+      parent.parent().parent().hostname(),
+      parent.parent().parent().taskManagerId(),
+      valueOrNull(parent.parent().jobId()),
+      valueOrNull(parent.parent().jobName()),
+      valueOrNull(parent.vertexId()),
+      valueOrNull(parent.executionId()),
+      valueOrNull(parent.taskName()),
+      String.valueOf(parent.subtaskIndex()),
+      String.valueOf(parent.attemptNumber()),
+      valueOrNull(shuffleId)
+    };
+    return bindVariables(template, values);
+  }
+
+  // ------------------------------------------------------------------------
+  //  Parsing from Config
+  // ------------------------------------------------------------------------
+
+  /**
+   * Creates the scope format as defined in the given configuration.
+   *
+   * @param format The scope naming of the format.
+   * @param parent The parent {@link TaskScopeFormat} of the format.
+   * @return The {@link ShuffleScopeFormat} parsed from the configuration.
+   */
+  public static ShuffleScopeFormat fromConfig(String format, TaskScopeFormat 
parent) {
+    return new ShuffleScopeFormat(format, parent);
+  }
+}
diff --git 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
index cc3cf9761..3210125e1 100644
--- 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
+++ 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
@@ -25,11 +25,13 @@ import java.util.Optional;
 import java.util.Random;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.junit.Assert;
 import org.junit.Before;
@@ -54,7 +56,11 @@ public class RemoteShuffleOutputGateSuiteJ {
             () -> bufferPool,
             new CelebornConf(),
             10,
-            new UnregisteredMetricsGroup(),
+            new ShuffleIOMetricGroup(
+                new ShuffleMetricGroup(
+                    
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                    1,
+                    
CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString())),
             mock(FlinkShuffleClientImpl.class));
     NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 
BUFFER_SIZE);
     bufferPool = networkBufferPool.createBufferPool(10, 10);
diff --git 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 32b22fa1e..6e663fd68 100644
--- 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -46,6 +47,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
@@ -65,7 +67,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
       BufferDecompressor bufferDecompressor,
-      int numConcurrentReading) {
+      int numConcurrentReading,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     super(
         celebornConf,
         ownerContext,
@@ -73,7 +76,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
         gateDescriptor,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 
   @Override
diff --git 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index faf733bef..b19c77238 100644
--- 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -19,11 +19,13 @@
 package org.apache.celeborn.plugin.flink;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -43,7 +45,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
-      String compressionCodec) {
+      String compressionCodec,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, compressionCodec);
     return new RemoteShuffleInputGate(
@@ -53,6 +56,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
         igdd,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 }
diff --git 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 363366b46..d49827ed9 100644
--- 
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -48,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      ShuffleIOOwnerContext ownerContext,
+      String taskNameWithSubtaskAndId,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -58,9 +58,10 @@ public class RemoteShuffleResultPartitionFactory
       CelebornConf celebornConf,
       int numMappers,
       BufferCompressor bufferCompressor,
-      RemoteShuffleDescriptor rsd) {
+      RemoteShuffleDescriptor rsd,
+      ShuffleIOMetricGroup shuffleIOMetricGroup) {
     return new RemoteShuffleResultPartition(
-        ownerContext.getOwnerName(),
+        taskNameWithSubtaskAndId,
         partitionIndex,
         id,
         type,
@@ -77,7 +78,7 @@ public class RemoteShuffleResultPartitionFactory
             bufferPoolFactories.get(1),
             celebornConf,
             numMappers,
-            ownerContext.getParentGroup()));
+            shuffleIOMetricGroup));
   }
 
   protected BufferCompressor getBufferCompressor() {
diff --git 
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 22c4764f2..7e1946e76 100644
--- 
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,7 +44,6 @@ import java.util.stream.IntStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -56,6 +55,9 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.function.SupplierWithException;
 import org.junit.After;
 import org.junit.Before;
@@ -453,7 +455,11 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferPoolFactory,
           celebornConf,
           numMappers,
-          new UnregisteredMetricsGroup());
+          new ShuffleIOMetricGroup(
+              new ShuffleMetricGroup(
+                  UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                  1,
+                  
CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString())));
       isSetup = false;
       isFinished = false;
       isClosed = false;
diff --git 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 56169b6d6..d340a74e6 100644
--- 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -46,6 +47,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
@@ -65,7 +67,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
       BufferDecompressor bufferDecompressor,
-      int numConcurrentReading) {
+      int numConcurrentReading,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     super(
         celebornConf,
         ownerContext,
@@ -73,7 +76,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
         gateDescriptor,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 
   @Override
diff --git 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index f4102ec01..38c5292bf 100644
--- 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -19,11 +19,13 @@
 package org.apache.celeborn.plugin.flink;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -44,7 +46,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
-      String compressionCodec) {
+      String compressionCodec,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, compressionCodec);
     return new RemoteShuffleInputGate(
@@ -54,6 +57,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
         igdd,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 }
diff --git 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 363366b46..d49827ed9 100644
--- 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -48,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      ShuffleIOOwnerContext ownerContext,
+      String taskNameWithSubtaskAndId,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -58,9 +58,10 @@ public class RemoteShuffleResultPartitionFactory
       CelebornConf celebornConf,
       int numMappers,
       BufferCompressor bufferCompressor,
-      RemoteShuffleDescriptor rsd) {
+      RemoteShuffleDescriptor rsd,
+      ShuffleIOMetricGroup shuffleIOMetricGroup) {
     return new RemoteShuffleResultPartition(
-        ownerContext.getOwnerName(),
+        taskNameWithSubtaskAndId,
         partitionIndex,
         id,
         type,
@@ -77,7 +78,7 @@ public class RemoteShuffleResultPartitionFactory
             bufferPoolFactories.get(1),
             celebornConf,
             numMappers,
-            ownerContext.getParentGroup()));
+            shuffleIOMetricGroup));
   }
 
   protected BufferCompressor getBufferCompressor() {
diff --git 
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 22c4764f2..7e1946e76 100644
--- 
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,7 +44,6 @@ import java.util.stream.IntStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -56,6 +55,9 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.function.SupplierWithException;
 import org.junit.After;
 import org.junit.Before;
@@ -453,7 +455,11 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferPoolFactory,
           celebornConf,
           numMappers,
-          new UnregisteredMetricsGroup());
+          new ShuffleIOMetricGroup(
+              new ShuffleMetricGroup(
+                  UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                  1,
+                  
CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString())));
       isSetup = false;
       isFinished = false;
       isClosed = false;
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 0f93e6975..bfba41530 100644
--- 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -46,6 +47,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
@@ -65,7 +67,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
       BufferDecompressor bufferDecompressor,
-      int numConcurrentReading) {
+      int numConcurrentReading,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     super(
         celebornConf,
         ownerContext,
@@ -73,7 +76,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
         gateDescriptor,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 
   @Override
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index f4102ec01..38c5292bf 100644
--- 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -19,11 +19,13 @@
 package org.apache.celeborn.plugin.flink;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -44,7 +46,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
-      String compressionCodec) {
+      String compressionCodec,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, compressionCodec);
     return new RemoteShuffleInputGate(
@@ -54,6 +57,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
         igdd,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 }
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 363366b46..d49827ed9 100644
--- 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -48,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      ShuffleIOOwnerContext ownerContext,
+      String taskNameWithSubtaskAndId,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -58,9 +58,10 @@ public class RemoteShuffleResultPartitionFactory
       CelebornConf celebornConf,
       int numMappers,
       BufferCompressor bufferCompressor,
-      RemoteShuffleDescriptor rsd) {
+      RemoteShuffleDescriptor rsd,
+      ShuffleIOMetricGroup shuffleIOMetricGroup) {
     return new RemoteShuffleResultPartition(
-        ownerContext.getOwnerName(),
+        taskNameWithSubtaskAndId,
         partitionIndex,
         id,
         type,
@@ -77,7 +78,7 @@ public class RemoteShuffleResultPartitionFactory
             bufferPoolFactories.get(1),
             celebornConf,
             numMappers,
-            ownerContext.getParentGroup()));
+            shuffleIOMetricGroup));
   }
 
   protected BufferCompressor getBufferCompressor() {
diff --git 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 22c4764f2..7e1946e76 100644
--- 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,7 +44,6 @@ import java.util.stream.IntStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -56,6 +55,9 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.function.SupplierWithException;
 import org.junit.After;
 import org.junit.Before;
@@ -453,7 +455,11 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferPoolFactory,
           celebornConf,
           numMappers,
-          new UnregisteredMetricsGroup());
+          new ShuffleIOMetricGroup(
+              new ShuffleMetricGroup(
+                  UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                  1,
+                  
CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString())));
       isSetup = false;
       isFinished = false;
       isClosed = false;
diff --git 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 6d513265e..c89f54489 100644
--- 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -47,6 +48,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
@@ -66,7 +68,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
       BufferDecompressor bufferDecompressor,
-      int numConcurrentReading) {
+      int numConcurrentReading,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     super(
         celebornConf,
         ownerContext,
@@ -74,7 +77,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
         gateDescriptor,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 
   @Override
diff --git 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index f4102ec01..38c5292bf 100644
--- 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -19,11 +19,13 @@
 package org.apache.celeborn.plugin.flink;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -44,7 +46,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
-      String compressionCodec) {
+      String compressionCodec,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, compressionCodec);
     return new RemoteShuffleInputGate(
@@ -54,6 +57,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
         igdd,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 }
diff --git 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 363366b46..d49827ed9 100644
--- 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -48,7 +48,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      ShuffleIOOwnerContext ownerContext,
+      String taskNameWithSubtaskAndId,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -58,9 +58,10 @@ public class RemoteShuffleResultPartitionFactory
       CelebornConf celebornConf,
       int numMappers,
       BufferCompressor bufferCompressor,
-      RemoteShuffleDescriptor rsd) {
+      RemoteShuffleDescriptor rsd,
+      ShuffleIOMetricGroup shuffleIOMetricGroup) {
     return new RemoteShuffleResultPartition(
-        ownerContext.getOwnerName(),
+        taskNameWithSubtaskAndId,
         partitionIndex,
         id,
         type,
@@ -77,7 +78,7 @@ public class RemoteShuffleResultPartitionFactory
             bufferPoolFactories.get(1),
             celebornConf,
             numMappers,
-            ownerContext.getParentGroup()));
+            shuffleIOMetricGroup));
   }
 
   protected BufferCompressor getBufferCompressor() {
diff --git 
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 3cc25bcbc..40a703db9 100644
--- 
a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -44,7 +44,6 @@ import java.util.stream.IntStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -56,6 +55,9 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.function.SupplierWithException;
 import org.junit.After;
 import org.junit.Before;
@@ -455,7 +457,11 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferPoolFactory,
           celebornConf,
           numMappers,
-          new UnregisteredMetricsGroup());
+          new ShuffleIOMetricGroup(
+              new ShuffleMetricGroup(
+                  UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                  1,
+                  
CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString())));
       isSetup = false;
       isFinished = false;
       isClosed = false;
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 6d513265e..c89f54489 100644
--- 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -47,6 +48,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
@@ -66,7 +68,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
       BufferDecompressor bufferDecompressor,
-      int numConcurrentReading) {
+      int numConcurrentReading,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     super(
         celebornConf,
         ownerContext,
@@ -74,7 +77,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
         gateDescriptor,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 
   @Override
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index bb4abbc70..a5f3781d7 100644
--- 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -19,12 +19,14 @@
 package org.apache.celeborn.plugin.flink;
 
 import java.io.IOException;
+import java.util.Map;
 
 import 
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -45,7 +47,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
-      String compressionCodec) {
+      String compressionCodec,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, 
CompressionCodec.valueOf(compressionCodec));
     return new RemoteShuffleInputGate(
@@ -55,6 +58,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
         igdd,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 }
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 69b9a71bc..691c70655 100644
--- 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -49,7 +49,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      ShuffleIOOwnerContext ownerContext,
+      String taskNameWithSubtaskAndId,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -59,9 +59,10 @@ public class RemoteShuffleResultPartitionFactory
       CelebornConf celebornConf,
       int numMappers,
       BufferCompressor bufferCompressor,
-      RemoteShuffleDescriptor rsd) {
+      RemoteShuffleDescriptor rsd,
+      ShuffleIOMetricGroup shuffleIOMetricGroup) {
     return new RemoteShuffleResultPartition(
-        ownerContext.getOwnerName(),
+        taskNameWithSubtaskAndId,
         partitionIndex,
         id,
         type,
@@ -78,7 +79,7 @@ public class RemoteShuffleResultPartitionFactory
             bufferPoolFactories.get(1),
             celebornConf,
             numMappers,
-            ownerContext.getParentGroup()));
+            shuffleIOMetricGroup));
   }
 
   @Override
diff --git 
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 0ceabcca8..d86792e07 100644
--- 
a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -45,7 +45,6 @@ import org.apache.flink.api.common.JobID;
 import 
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -57,6 +56,9 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.function.SupplierWithException;
 import org.junit.After;
 import org.junit.Before;
@@ -458,7 +460,11 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferPoolFactory,
           celebornConf,
           numMappers,
-          new UnregisteredMetricsGroup());
+          new ShuffleIOMetricGroup(
+              new ShuffleMetricGroup(
+                  UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                  1,
+                  
CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString())));
       isSetup = false;
       isFinished = false;
       isClosed = false;
diff --git 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index 7434b8cff..bd60bbe2b 100644
--- 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -47,6 +48,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
@@ -66,7 +68,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
       InputGateDeploymentDescriptor gateDescriptor,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
       BufferDecompressor bufferDecompressor,
-      int numConcurrentReading) {
+      int numConcurrentReading,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     super(
         celebornConf,
         ownerContext,
@@ -74,7 +77,8 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
         gateDescriptor,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 
   @Override
diff --git 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index bb4abbc70..a5f3781d7 100644
--- 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -19,12 +19,14 @@
 package org.apache.celeborn.plugin.flink;
 
 import java.io.IOException;
+import java.util.Map;
 
 import 
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -45,7 +47,8 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
       int gateIndex,
       InputGateDeploymentDescriptor igdd,
       SupplierWithException<BufferPool, IOException> bufferPoolFactory,
-      String compressionCodec) {
+      String compressionCodec,
+      Map<Integer, ShuffleIOMetricGroup> shuffleIOMetricGroups) {
     BufferDecompressor bufferDecompressor =
         new BufferDecompressor(networkBufferSize, 
CompressionCodec.valueOf(compressionCodec));
     return new RemoteShuffleInputGate(
@@ -55,6 +58,7 @@ public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGat
         igdd,
         bufferPoolFactory,
         bufferDecompressor,
-        numConcurrentReading);
+        numConcurrentReading,
+        shuffleIOMetricGroups);
   }
 }
diff --git 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 69b9a71bc..691c70655 100644
--- 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.common.CelebornConf;
@@ -49,7 +49,7 @@ public class RemoteShuffleResultPartitionFactory
 
   @Override
   ResultPartition createRemoteShuffleResultPartitionInternal(
-      ShuffleIOOwnerContext ownerContext,
+      String taskNameWithSubtaskAndId,
       int partitionIndex,
       ResultPartitionID id,
       ResultPartitionType type,
@@ -59,9 +59,10 @@ public class RemoteShuffleResultPartitionFactory
       CelebornConf celebornConf,
       int numMappers,
       BufferCompressor bufferCompressor,
-      RemoteShuffleDescriptor rsd) {
+      RemoteShuffleDescriptor rsd,
+      ShuffleIOMetricGroup shuffleIOMetricGroup) {
     return new RemoteShuffleResultPartition(
-        ownerContext.getOwnerName(),
+        taskNameWithSubtaskAndId,
         partitionIndex,
         id,
         type,
@@ -78,7 +79,7 @@ public class RemoteShuffleResultPartitionFactory
             bufferPoolFactories.get(1),
             celebornConf,
             numMappers,
-            ownerContext.getParentGroup()));
+            shuffleIOMetricGroup));
   }
 
   @Override
diff --git 
a/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index 0ceabcca8..d86792e07 100644
--- 
a/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-2.0/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -45,7 +45,6 @@ import org.apache.flink.api.common.JobID;
 import 
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -57,6 +56,9 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.ShuffleIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ShuffleMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.function.SupplierWithException;
 import org.junit.After;
 import org.junit.Before;
@@ -458,7 +460,11 @@ public class RemoteShuffleResultPartitionSuiteJ {
           bufferPoolFactory,
           celebornConf,
           numMappers,
-          new UnregisteredMetricsGroup());
+          new ShuffleIOMetricGroup(
+              new ShuffleMetricGroup(
+                  UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                  1,
+                  
CelebornConf.CLIENT_METRICS_SCOPE_NAMING_SHUFFLE().defaultValueString())));
       isSetup = false;
       isFinished = false;
       isClosed = false;
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 6140bd713..a70381bbd 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1410,6 +1410,8 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def clientFlinkResultPartitionSupportFloatingBuffer: Boolean =
     get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER)
   def clientFlinkDataCompressionEnabled: Boolean = 
get(CLIENT_DATA_COMPRESSION_ENABLED)
+  def clientFlinkMetricsScopeNamingShuffle: String =
+    get(CLIENT_METRICS_SCOPE_NAMING_SHUFFLE)
   def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
   def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)
 
@@ -5840,6 +5842,15 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val CLIENT_METRICS_SCOPE_NAMING_SHUFFLE: ConfigEntry[String] =
+    buildConf("celeborn.client.flink.metrics.scope.shuffle")
+      .categories("client")
+      .version("0.6.0")
+      .doc("Defines the scope format string that is applied to all metrics 
scoped to a shuffle. Only effective when a identifier-based reporter is 
configured")
+      .stringConf
+      .createWithDefault(
+        
"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>.<shuffle_id>")
+
   val CLIENT_MR_PUSH_DATA_MAX: ConfigEntry[Long] =
     buildConf("celeborn.client.mr.pushData.max")
       .categories("client")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 6a41b129a..5c4cbf006 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -40,6 +40,7 @@ license: |
 | celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | false | 
Max concurrent reading channels for a input gate. | 0.3.0 | 
remote-shuffle.job.concurrent-readings-per-gate | 
 | celeborn.client.flink.inputGate.memory | 32m | false | Memory reserved for a 
input gate. | 0.3.0 | remote-shuffle.job.memory-per-gate | 
 | celeborn.client.flink.inputGate.supportFloatingBuffer | true | false | 
Whether to support floating buffer in Flink input gates. | 0.3.0 | 
remote-shuffle.job.support-floating-buffer-per-input-gate | 
+| celeborn.client.flink.metrics.scope.shuffle | 
&lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;.&lt;task_name&gt;.&lt;subtask_index&gt;.&lt;shuffle_id&gt;
 | false | Defines the scope format string that is applied to all metrics 
scoped to a shuffle. Only effective when a identifier-based reporter is 
configured | 0.6.0 |  | 
 | celeborn.client.flink.partitionConnectionException.enabled | false | false | 
If enabled, 
`org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException`
 would be thrown when RemoteBufferStreamReader finds that the current exception 
is about connection failure, then Flink can be aware of the lost Celeborn 
server side nodes and be able to re-compute affected data. | 0.6.0 |  | 
 | celeborn.client.flink.resultPartition.memory | 64m | false | Memory reserved 
for a result partition. | 0.3.0 | remote-shuffle.job.memory-per-partition | 
 | celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | 
Whether to support floating buffer for result partitions. | 0.3.0 | 
remote-shuffle.job.support-floating-buffer-per-output-gate | 


Reply via email to