This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new a5d4ff79c [CELEBORN-717][FLINK] Fix ResultPartition lost 
numBytesOut/numBuffersOut metrics
a5d4ff79c is described below

commit a5d4ff79cad0c3be1e4cf6230659e3cf2a5e294a
Author: Shuang <[email protected]>
AuthorDate: Tue Jun 27 11:49:00 2023 +0800

    [CELEBORN-717][FLINK] Fix ResultPartition lost numBytesOut/numBuffersOut 
metrics
    
    ### What changes were proposed in this pull request?
    Reset  numBytesOut/numBuffersOut metrics for RemoteShuffleResultPartition
    
    ### Why are the changes needed?
    Currently ResultPartition lost numBytesOut/numBuffersOut metrics, this will 
cause Flink AdaptiveScheduler can not dynamically adjust the task parallelism 
based on the input amount of data
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test.
    
    Closes #1626 from RexXiong/CELEBORN-717.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit 22b21295e8ac59cff76b494c4ee8acf103ff6773)
    Signed-off-by: mingji <[email protected]>
---
 .../plugin/flink/RemoteShuffleResultPartitionDelegation.java       | 5 +++++
 .../apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java | 7 +++++++
 .../apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java | 7 +++++++
 3 files changed, 19 insertions(+)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
index 1440d3308..6298cd2ae 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
@@ -332,4 +332,9 @@ public class RemoteShuffleResultPartitionDelegation {
   public void setEndOfDataNotified(boolean endOfDataNotified) {
     this.endOfDataNotified = endOfDataNotified;
   }
+
+  public void setMetricCounters(Counter numBytesOut, Counter numBuffersOut) {
+    this.numBytesOut = numBytesOut;
+    this.numBuffersOut = numBuffersOut;
+  }
 }
diff --git 
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index b95ea81b4..6aa87c0f1 100644
--- 
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.*;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
@@ -195,4 +196,10 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
   public RemoteShuffleResultPartitionDelegation getDelegation() {
     return delegation;
   }
+
+  @Override
+  public void setMetricGroup(TaskIOMetricGroup metrics) {
+    super.setMetricGroup(metrics);
+    this.delegation.setMetricCounters(numBytesOut, numBuffersOut);
+  }
 }
diff --git 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 0f79d1ebf..10b8bbe23 100644
--- 
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++ 
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.*;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
@@ -207,4 +208,10 @@ public class RemoteShuffleResultPartition extends 
ResultPartition {
   public RemoteShuffleResultPartitionDelegation getDelegation() {
     return delegation;
   }
+
+  @Override
+  public void setMetricGroup(TaskIOMetricGroup metrics) {
+    super.setMetricGroup(metrics);
+    this.delegation.setMetricCounters(numBytesOut, numBuffersOut);
+  }
 }

Reply via email to