zhuzhurk commented on a change in pull request #17905:
URL: https://github.com/apache/flink/pull/17905#discussion_r759094376
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -391,6 +391,7 @@ private BufferBuilder requestNewBufferBuilderFromPool(int
targetSubpartition)
private void finishUnicastBufferBuilder(int targetSubpartition) {
final BufferBuilder bufferBuilder =
unicastBufferBuilders[targetSubpartition];
if (bufferBuilder != null) {
+ numBytesProduced.inc(bufferBuilder.finish());
Review comment:
We should invoke `finish()` once and record the result. The result can
be used multiple times.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -406,6 +407,7 @@ private void finishUnicastBufferBuilders() {
private void finishBroadcastBufferBuilder() {
if (broadcastBufferBuilder != null) {
+ numBytesProduced.inc(broadcastBufferBuilder.finish());
Review comment:
We should invoke `finish()` once and record the result. The result can
be used multiple times.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
##########
@@ -111,6 +111,14 @@
protected Counter numBuffersOut = new SimpleCounter();
+ /**
+ * The difference with {@link #numBytesOut} : numBytesProduced represents
the number of bytes
+ * actually produced, and numBytesOut represents the number of bytes sent
downstream tasks. In
Review comment:
sent -> sent to
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
##########
@@ -71,4 +74,29 @@ public void testTaskIOMetricGroup() throws
InterruptedException {
assertThat(
taskIO.getBackPressuredTimePerSecond().getCount(),
greaterThanOrEqualTo(sleepTime));
}
+
+ @Test
+ public void testNumBytesProducedOfPartitionsMetrics() {
+ TaskMetricGroup task =
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+ TaskIOMetricGroup taskIO = task.getIOMetricGroup();
+
+ Counter c1 = new SimpleCounter();
+ c1.inc(32L);
+ Counter c2 = new SimpleCounter();
+ c2.inc(64L);
+
+ IntermediateResultPartitionID resultPartitionID1 = new
IntermediateResultPartitionID();
+ IntermediateResultPartitionID resultPartitionID2 = new
IntermediateResultPartitionID();
+
+ taskIO.registerNumBytesProducedCounterForPartition(resultPartitionID1,
c1);
+ taskIO.registerNumBytesProducedCounterForPartition(resultPartitionID2,
c2);
+
+ // check
Review comment:
I think this comment is not needed
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
##########
@@ -822,6 +822,55 @@ public void testBufferSizeNotChanged() throws IOException {
assertEquals(bufferSize,
subpartition1.pollBuffer().buffer().getSize());
}
+ @Test
+ public void testNumBytesProducedCounterForPipelinedUnicast() throws
IOException {
Review comment:
The change does not differentiate with ResultPartitionType, so I think
there is not need to add different tests against different ResultPartitionTypes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]