This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 01498ef  [hotfix][task] Add task name to BufferDebloater logging
01498ef is described below

commit 01498ef24327f0ac275cbe959da169c2d8165728
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Mar 25 10:08:40 2022 +0100

    [hotfix][task] Add task name to BufferDebloater logging
---
 .../io/network/partition/consumer/SingleInputGateFactory.java    | 6 ++++--
 .../org/apache/flink/runtime/throughput/BufferDebloater.java     | 9 +++++++--
 .../io/network/partition/consumer/SingleInputGateBuilder.java    | 1 +
 .../org/apache/flink/runtime/throughput/BufferDebloaterTest.java | 8 +++++++-
 4 files changed, 19 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index d862af3..0c45718 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -146,7 +146,7 @@ public class SingleInputGateFactory {
                         networkBufferSize,
                         new ThroughputCalculator(SystemClock.getInstance()),
                         maybeCreateBufferDebloater(
-                                gateIndex, 
networkInputGroup.addGroup(gateIndex)));
+                                owningTaskName, gateIndex, 
networkInputGroup.addGroup(gateIndex)));
 
         InputChannelMetrics metrics =
                 new InputChannelMetrics(networkInputGroup, 
owner.getParentGroup());
@@ -154,10 +154,12 @@ public class SingleInputGateFactory {
         return inputGate;
     }
 
-    private BufferDebloater maybeCreateBufferDebloater(int gateIndex, 
MetricGroup inputGroup) {
+    private BufferDebloater maybeCreateBufferDebloater(
+            String owningTaskName, int gateIndex, MetricGroup inputGroup) {
         if (debloatConfiguration.isEnabled()) {
             final BufferDebloater bufferDebloater =
                     new BufferDebloater(
+                            owningTaskName,
                             gateIndex,
                             
debloatConfiguration.getTargetTotalBufferSize().toMillis(),
                             debloatConfiguration.getMaxBufferSize(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
index e9c32d7..db4cb3f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
@@ -34,6 +34,7 @@ public class BufferDebloater {
     private static final Logger LOG = 
LoggerFactory.getLogger(BufferDebloater.class);
     private static final long MILLIS_IN_SECOND = 1000;
 
+    private final String owningTaskName;
     private final int gateIndex;
     private final long targetTotalBufferSize;
     private final int maxBufferSize;
@@ -45,12 +46,14 @@ public class BufferDebloater {
     private int lastBufferSize;
 
     public BufferDebloater(
+            String owningTaskName,
             int gateIndex,
             long targetTotalBufferSize,
             int maxBufferSize,
             int minBufferSize,
             int bufferDebloatThresholdPercentages,
             long numberOfSamples) {
+        this.owningTaskName = owningTaskName;
         this.gateIndex = gateIndex;
         this.targetTotalBufferSize = targetTotalBufferSize;
         this.maxBufferSize = maxBufferSize;
@@ -61,7 +64,8 @@ public class BufferDebloater {
         bufferSizeEMA = new BufferSizeEMA(maxBufferSize, minBufferSize, 
numberOfSamples);
 
         LOG.debug(
-                "Buffer debloater init settings: gateIndex={}, 
targetTotalBufferSize={}, maxBufferSize={}, minBufferSize={}, 
bufferDebloatThresholdPercentages={}, numberOfSamples={}",
+                "{}: Buffer debloater init settings: gateIndex={}, 
targetTotalBufferSize={}, maxBufferSize={}, minBufferSize={}, 
bufferDebloatThresholdPercentages={}, numberOfSamples={}",
+                owningTaskName,
                 gateIndex,
                 targetTotalBufferSize,
                 maxBufferSize,
@@ -89,7 +93,8 @@ public class BufferDebloater {
         boolean skipUpdate = skipUpdate(newSize);
 
         LOG.debug(
-                "Buffer size recalculation: gateIndex={}, currentSize={}, 
newSize={}, instantThroughput={}, desiredBufferSize={}, buffersInUse={}, 
estimatedTimeToConsumeBuffers={}, announceNewSize={}",
+                "{}: Buffer size recalculation: gateIndex={}, currentSize={}, 
newSize={}, instantThroughput={}, desiredBufferSize={}, buffersInUse={}, 
estimatedTimeToConsumeBuffers={}, announceNewSize={}",
+                owningTaskName,
                 gateIndex,
                 lastBufferSize,
                 newSize,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index c7e1faa..8d8f2b1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -190,6 +190,7 @@ public class SingleInputGateBuilder {
     private BufferDebloater maybeCreateBufferDebloater(int gateIndex) {
         if (bufferDebloatConfiguration.isEnabled()) {
             return new BufferDebloater(
+                    "Unknown task name in test",
                     gateIndex,
                     
bufferDebloatConfiguration.getTargetTotalBufferSize().toMillis(),
                     bufferDebloatConfiguration.getMaxBufferSize(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
index 8ca0c9c..55d6d73 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
@@ -274,7 +274,13 @@ public class BufferDebloaterTest extends TestLogger {
 
         private BufferDebloater getBufferDebloater() {
             return new BufferDebloater(
-                    0, debloatTarget, maxBufferSize, minBufferSize, 
thresholdPercentages, 1);
+                    "Unknown task name in test",
+                    0,
+                    debloatTarget,
+                    maxBufferSize,
+                    minBufferSize,
+                    thresholdPercentages,
+                    1);
         }
     }
 }

Reply via email to