This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4dd04bd [FLINK-24658][runtime] Added debug logs for BufferDebloater
4dd04bd is described below
commit 4dd04bdc4cb54b2976c578a077720a93cf131c27
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Nov 3 17:22:11 2021 +0100
[FLINK-24658][runtime] Added debug logs for BufferDebloater
---
.../partition/consumer/SingleInputGateFactory.java | 6 +++--
.../flink/runtime/throughput/BufferDebloater.java | 27 ++++++++++++++++++++++
.../partition/consumer/SingleInputGateBuilder.java | 5 ++--
.../runtime/throughput/BufferDebloaterTest.java | 1 +
4 files changed, 35 insertions(+), 4 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index 031e021..17a9c1a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -140,7 +140,8 @@ public class SingleInputGateFactory {
networkBufferPool,
networkBufferSize,
new ThroughputCalculator(SystemClock.getInstance()),
-
maybeCreateBufferDebloater(networkInputGroup.addGroup(gateIndex)));
+ maybeCreateBufferDebloater(
+ gateIndex,
networkInputGroup.addGroup(gateIndex)));
InputChannelMetrics metrics =
new InputChannelMetrics(networkInputGroup,
owner.getParentGroup());
@@ -148,10 +149,11 @@ public class SingleInputGateFactory {
return inputGate;
}
- private BufferDebloater maybeCreateBufferDebloater(MetricGroup inputGroup)
{
+ private BufferDebloater maybeCreateBufferDebloater(int gateIndex,
MetricGroup inputGroup) {
if (debloatConfiguration.isEnabled()) {
final BufferDebloater bufferDebloater =
new BufferDebloater(
+ gateIndex,
debloatConfiguration.getTargetTotalBufferSize().toMillis(),
debloatConfiguration.getMaxBufferSize(),
debloatConfiguration.getMinBufferSize(),
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
index 389f2a1..7896794 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.throughput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.time.Duration;
import java.util.OptionalInt;
@@ -26,8 +29,10 @@ import java.util.OptionalInt;
* configuration.
*/
public class BufferDebloater {
+ private static final Logger LOG =
LoggerFactory.getLogger(BufferDebloater.class);
private static final long MILLIS_IN_SECOND = 1000;
+ private final int gateIndex;
private final long targetTotalBufferSize;
private final int maxBufferSize;
private final int minBufferSize;
@@ -38,11 +43,13 @@ public class BufferDebloater {
private int lastBufferSize;
public BufferDebloater(
+ int gateIndex,
long targetTotalBufferSize,
int maxBufferSize,
int minBufferSize,
int bufferDebloatThresholdPercentages,
long numberOfSamples) {
+ this.gateIndex = gateIndex;
this.targetTotalBufferSize = targetTotalBufferSize;
this.maxBufferSize = maxBufferSize;
this.minBufferSize = minBufferSize;
@@ -50,6 +57,15 @@ public class BufferDebloater {
this.lastBufferSize = maxBufferSize;
bufferSizeEMA = new BufferSizeEMA(maxBufferSize, minBufferSize,
numberOfSamples);
+
+ LOG.debug(
+ "Buffer debloater init settings: gateIndex={},
targetTotalBufferSize={}, maxBufferSize={}, minBufferSize={},
bufferDebloatThresholdPercentages={}, numberOfSamples={}",
+ gateIndex,
+ targetTotalBufferSize,
+ maxBufferSize,
+ minBufferSize,
+ bufferDebloatThresholdPercentages,
+ numberOfSamples);
}
public OptionalInt recalculateBufferSize(long currentThroughput, int
buffersInUse) {
@@ -74,6 +90,17 @@ public class BufferDebloater {
&& Math.abs(1 - ((double) lastBufferSize) /
newSize) * 100
< bufferDebloatThresholdPercentages;
+ LOG.debug(
+ "Buffer size recalculation: gateIndex={}, currentSize={},
newSize={}, instantThroughput={}, desiredBufferSize={}, buffersInUse={},
estimatedTimeToConsumeBuffers={}, announceNewSize={}",
+ gateIndex,
+ lastBufferSize,
+ newSize,
+ currentThroughput,
+ desiredTotalBufferSizeInBytes,
+ buffersInUse,
+ lastEstimatedTimeToConsumeBuffers,
+ !skipUpdate);
+
// Skip update if the new value pretty close to the old one.
if (skipUpdate) {
return OptionalInt.empty();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 8cad4c0..3ee0a74 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -169,7 +169,7 @@ public class SingleInputGateBuilder {
segmentProvider,
bufferSize,
createThroughputCalculator.apply(bufferDebloatConfiguration),
- maybeCreateBufferDebloater());
+ maybeCreateBufferDebloater(gateIndex));
if (channelFactory != null) {
gate.setInputChannels(
IntStream.range(0, numberOfChannels)
@@ -185,9 +185,10 @@ public class SingleInputGateBuilder {
return gate;
}
- private BufferDebloater maybeCreateBufferDebloater() {
+ private BufferDebloater maybeCreateBufferDebloater(int gateIndex) {
if (bufferDebloatConfiguration.isEnabled()) {
return new BufferDebloater(
+ gateIndex,
bufferDebloatConfiguration.getTargetTotalBufferSize().toMillis(),
bufferDebloatConfiguration.getMaxBufferSize(),
bufferDebloatConfiguration.getMinBufferSize(),
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
index 3a54954..cb22b64 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
@@ -219,6 +219,7 @@ public class BufferDebloaterTest extends TestLogger {
private BufferDebloater getBufferDebloater() {
return new BufferDebloater(
+ 0,
debloatTarget,
maxBufferSize,
minBufferSize,