This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new a5d4ff79c [CELEBORN-717][FLINK] Fix ResultPartition lost
numBytesOut/numBuffersOut metrics
a5d4ff79c is described below
commit a5d4ff79cad0c3be1e4cf6230659e3cf2a5e294a
Author: Shuang <[email protected]>
AuthorDate: Tue Jun 27 11:49:00 2023 +0800
[CELEBORN-717][FLINK] Fix ResultPartition lost numBytesOut/numBuffersOut
metrics
### What changes were proposed in this pull request?
Reset numBytesOut/numBuffersOut metrics for RemoteShuffleResultPartition
### Why are the changes needed?
Currently ResultPartition lost numBytesOut/numBuffersOut metrics, this will
cause Flink AdaptiveScheduler can not dynamically adjust the task parallelism
based on the input amount of data
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test.
Closes #1626 from RexXiong/CELEBORN-717.
Authored-by: Shuang <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 22b21295e8ac59cff76b494c4ee8acf103ff6773)
Signed-off-by: mingji <[email protected]>
---
.../plugin/flink/RemoteShuffleResultPartitionDelegation.java | 5 +++++
.../apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java | 7 +++++++
.../apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java | 7 +++++++
3 files changed, 19 insertions(+)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
index 1440d3308..6298cd2ae 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionDelegation.java
@@ -332,4 +332,9 @@ public class RemoteShuffleResultPartitionDelegation {
public void setEndOfDataNotified(boolean endOfDataNotified) {
this.endOfDataNotified = endOfDataNotified;
}
+
+ public void setMetricCounters(Counter numBytesOut, Counter numBuffersOut) {
+ this.numBytesOut = numBytesOut;
+ this.numBuffersOut = numBuffersOut;
+ }
}
diff --git
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index b95ea81b4..6aa87c0f1 100644
---
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -34,6 +34,7 @@ import
org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.*;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
@@ -195,4 +196,10 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
public RemoteShuffleResultPartitionDelegation getDelegation() {
return delegation;
}
+
+ @Override
+ public void setMetricGroup(TaskIOMetricGroup metrics) {
+ super.setMetricGroup(metrics);
+ this.delegation.setMetricCounters(numBytesOut, numBuffersOut);
+ }
}
diff --git
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
index 0f79d1ebf..10b8bbe23 100644
---
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
+++
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -35,6 +35,7 @@ import
org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.*;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
@@ -207,4 +208,10 @@ public class RemoteShuffleResultPartition extends
ResultPartition {
public RemoteShuffleResultPartitionDelegation getDelegation() {
return delegation;
}
+
+ @Override
+ public void setMetricGroup(TaskIOMetricGroup metrics) {
+ super.setMetricGroup(metrics);
+ this.delegation.setMetricCounters(numBytesOut, numBuffersOut);
+ }
}