This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 01498ef [hotfix][task] Add task name to BufferDebloater logging
01498ef is described below
commit 01498ef24327f0ac275cbe959da169c2d8165728
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Mar 25 10:08:40 2022 +0100
[hotfix][task] Add task name to BufferDebloater logging
---
.../io/network/partition/consumer/SingleInputGateFactory.java | 6 ++++--
.../org/apache/flink/runtime/throughput/BufferDebloater.java | 9 +++++++--
.../io/network/partition/consumer/SingleInputGateBuilder.java | 1 +
.../org/apache/flink/runtime/throughput/BufferDebloaterTest.java | 8 +++++++-
4 files changed, 19 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index d862af3..0c45718 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -146,7 +146,7 @@ public class SingleInputGateFactory {
networkBufferSize,
new ThroughputCalculator(SystemClock.getInstance()),
maybeCreateBufferDebloater(
- gateIndex,
networkInputGroup.addGroup(gateIndex)));
+ owningTaskName, gateIndex,
networkInputGroup.addGroup(gateIndex)));
InputChannelMetrics metrics =
new InputChannelMetrics(networkInputGroup,
owner.getParentGroup());
@@ -154,10 +154,12 @@ public class SingleInputGateFactory {
return inputGate;
}
- private BufferDebloater maybeCreateBufferDebloater(int gateIndex,
MetricGroup inputGroup) {
+ private BufferDebloater maybeCreateBufferDebloater(
+ String owningTaskName, int gateIndex, MetricGroup inputGroup) {
if (debloatConfiguration.isEnabled()) {
final BufferDebloater bufferDebloater =
new BufferDebloater(
+ owningTaskName,
gateIndex,
debloatConfiguration.getTargetTotalBufferSize().toMillis(),
debloatConfiguration.getMaxBufferSize(),
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
index e9c32d7..db4cb3f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java
@@ -34,6 +34,7 @@ public class BufferDebloater {
private static final Logger LOG =
LoggerFactory.getLogger(BufferDebloater.class);
private static final long MILLIS_IN_SECOND = 1000;
+ private final String owningTaskName;
private final int gateIndex;
private final long targetTotalBufferSize;
private final int maxBufferSize;
@@ -45,12 +46,14 @@ public class BufferDebloater {
private int lastBufferSize;
public BufferDebloater(
+ String owningTaskName,
int gateIndex,
long targetTotalBufferSize,
int maxBufferSize,
int minBufferSize,
int bufferDebloatThresholdPercentages,
long numberOfSamples) {
+ this.owningTaskName = owningTaskName;
this.gateIndex = gateIndex;
this.targetTotalBufferSize = targetTotalBufferSize;
this.maxBufferSize = maxBufferSize;
@@ -61,7 +64,8 @@ public class BufferDebloater {
bufferSizeEMA = new BufferSizeEMA(maxBufferSize, minBufferSize,
numberOfSamples);
LOG.debug(
- "Buffer debloater init settings: gateIndex={},
targetTotalBufferSize={}, maxBufferSize={}, minBufferSize={},
bufferDebloatThresholdPercentages={}, numberOfSamples={}",
+ "{}: Buffer debloater init settings: gateIndex={},
targetTotalBufferSize={}, maxBufferSize={}, minBufferSize={},
bufferDebloatThresholdPercentages={}, numberOfSamples={}",
+ owningTaskName,
gateIndex,
targetTotalBufferSize,
maxBufferSize,
@@ -89,7 +93,8 @@ public class BufferDebloater {
boolean skipUpdate = skipUpdate(newSize);
LOG.debug(
- "Buffer size recalculation: gateIndex={}, currentSize={},
newSize={}, instantThroughput={}, desiredBufferSize={}, buffersInUse={},
estimatedTimeToConsumeBuffers={}, announceNewSize={}",
+ "{}: Buffer size recalculation: gateIndex={}, currentSize={},
newSize={}, instantThroughput={}, desiredBufferSize={}, buffersInUse={},
estimatedTimeToConsumeBuffers={}, announceNewSize={}",
+ owningTaskName,
gateIndex,
lastBufferSize,
newSize,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index c7e1faa..8d8f2b1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -190,6 +190,7 @@ public class SingleInputGateBuilder {
private BufferDebloater maybeCreateBufferDebloater(int gateIndex) {
if (bufferDebloatConfiguration.isEnabled()) {
return new BufferDebloater(
+ "Unknown task name in test",
gateIndex,
bufferDebloatConfiguration.getTargetTotalBufferSize().toMillis(),
bufferDebloatConfiguration.getMaxBufferSize(),
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
index 8ca0c9c..55d6d73 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java
@@ -274,7 +274,13 @@ public class BufferDebloaterTest extends TestLogger {
private BufferDebloater getBufferDebloater() {
return new BufferDebloater(
- 0, debloatTarget, maxBufferSize, minBufferSize,
thresholdPercentages, 1);
+ "Unknown task name in test",
+ 0,
+ debloatTarget,
+ maxBufferSize,
+ minBufferSize,
+ thresholdPercentages,
+ 1);
}
}
}