This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dd04bd  [FLINK-24658][runtime] Added debug logs for BufferDebloater
4dd04bd is described below

commit 4dd04bdc4cb54b2976c578a077720a93cf131c27
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Nov 3 17:22:11 2021 +0100

    [FLINK-24658][runtime] Added debug logs for BufferDebloater
---
 .../partition/consumer/SingleInputGateFactory.java |  6 +++--
 .../flink/runtime/throughput/BufferDebloater.java  | 27 ++++++++++++++++++++++
 .../partition/consumer/SingleInputGateBuilder.java |  5 ++--
 .../runtime/throughput/BufferDebloaterTest.java    |  1 +
 4 files changed, 35 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index 031e021..17a9c1a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -140,7 +140,8 @@ public class SingleInputGateFactory {
                         networkBufferPool,
                         networkBufferSize,
                         new ThroughputCalculator(SystemClock.getInstance()),
-                        
maybeCreateBufferDebloater(networkInputGroup.addGroup(gateIndex)));
+                        maybeCreateBufferDebloater(
+                                gateIndex, 
networkInputGroup.addGroup(gateIndex)));
 
         InputChannelMetrics metrics =
                 new InputChannelMetrics(networkInputGroup, 
owner.getParentGroup());
@@ -148,10 +149,11 @@ public class SingleInputGateFactory {
         return inputGate;
     }
 
-    private BufferDebloater maybeCreateBufferDebloater(MetricGroup inputGroup) 
{
+    private BufferDebloater maybeCreateBufferDebloater(int gateIndex, 
MetricGroup inputGroup) {
         if (debloatConfiguration.isEnabled()) {
             final BufferDebloater bufferDebloater =
                     new BufferDebloater(
+                            gateIndex,
                             
debloatConfiguration.getTargetTotalBufferSize().toMillis(),
                             debloatConfiguration.getMaxBufferSize(),
                             debloatConfiguration.getMinBufferSize(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
index 389f2a1..7896794 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.throughput;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.time.Duration;
 import java.util.OptionalInt;
 
@@ -26,8 +29,10 @@ import java.util.OptionalInt;
  * configuration.
  */
 public class BufferDebloater {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BufferDebloater.class);
     private static final long MILLIS_IN_SECOND = 1000;
 
+    private final int gateIndex;
     private final long targetTotalBufferSize;
     private final int maxBufferSize;
     private final int minBufferSize;
@@ -38,11 +43,13 @@ public class BufferDebloater {
     private int lastBufferSize;
 
     public BufferDebloater(
+            int gateIndex,
             long targetTotalBufferSize,
             int maxBufferSize,
             int minBufferSize,
             int bufferDebloatThresholdPercentages,
             long numberOfSamples) {
+        this.gateIndex = gateIndex;
         this.targetTotalBufferSize = targetTotalBufferSize;
         this.maxBufferSize = maxBufferSize;
         this.minBufferSize = minBufferSize;
@@ -50,6 +57,15 @@ public class BufferDebloater {
 
         this.lastBufferSize = maxBufferSize;
         bufferSizeEMA = new BufferSizeEMA(maxBufferSize, minBufferSize, 
numberOfSamples);
+
+        LOG.debug(
+                "Buffer debloater init settings: gateIndex={}, 
targetTotalBufferSize={}, maxBufferSize={}, minBufferSize={}, 
bufferDebloatThresholdPercentages={}, numberOfSamples={}",
+                gateIndex,
+                targetTotalBufferSize,
+                maxBufferSize,
+                minBufferSize,
+                bufferDebloatThresholdPercentages,
+                numberOfSamples);
     }
 
     public OptionalInt recalculateBufferSize(long currentThroughput, int 
buffersInUse) {
@@ -74,6 +90,17 @@ public class BufferDebloater {
                                 && Math.abs(1 - ((double) lastBufferSize) / 
newSize) * 100
                                         < bufferDebloatThresholdPercentages;
 
+        LOG.debug(
+                "Buffer size recalculation: gateIndex={}, currentSize={}, 
newSize={}, instantThroughput={}, desiredBufferSize={}, buffersInUse={}, 
estimatedTimeToConsumeBuffers={}, announceNewSize={}",
+                gateIndex,
+                lastBufferSize,
+                newSize,
+                currentThroughput,
+                desiredTotalBufferSizeInBytes,
+                buffersInUse,
+                lastEstimatedTimeToConsumeBuffers,
+                !skipUpdate);
+
         // Skip update if the new value pretty close to the old one.
         if (skipUpdate) {
             return OptionalInt.empty();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 8cad4c0..3ee0a74 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -169,7 +169,7 @@ public class SingleInputGateBuilder {
                         segmentProvider,
                         bufferSize,
                         
createThroughputCalculator.apply(bufferDebloatConfiguration),
-                        maybeCreateBufferDebloater());
+                        maybeCreateBufferDebloater(gateIndex));
         if (channelFactory != null) {
             gate.setInputChannels(
                     IntStream.range(0, numberOfChannels)
@@ -185,9 +185,10 @@ public class SingleInputGateBuilder {
         return gate;
     }
 
-    private BufferDebloater maybeCreateBufferDebloater() {
+    private BufferDebloater maybeCreateBufferDebloater(int gateIndex) {
         if (bufferDebloatConfiguration.isEnabled()) {
             return new BufferDebloater(
+                    gateIndex,
                     
bufferDebloatConfiguration.getTargetTotalBufferSize().toMillis(),
                     bufferDebloatConfiguration.getMaxBufferSize(),
                     bufferDebloatConfiguration.getMinBufferSize(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
index 3a54954..cb22b64 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
@@ -219,6 +219,7 @@ public class BufferDebloaterTest extends TestLogger {
 
         private BufferDebloater getBufferDebloater() {
             return new BufferDebloater(
+                    0,
                     debloatTarget,
                     maxBufferSize,
                     minBufferSize,

Reply via email to