[
https://issues.apache.org/jira/browse/FLINK-10006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578035#comment-16578035
]
ASF GitHub Bot commented on FLINK-10006:
----------------------------------------
NicoK closed pull request #6470: [FLINK-10006][network] improve logging in
BarrierBuffer: prepend owning task name
URL: https://github.com/apache/flink/pull/6470
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0413caa8aec..c78abb5165a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -69,6 +69,8 @@
int getNumberOfInputChannels();
+ String getOwningTaskName();
+
boolean isFinished();
void requestPartitions() throws IOException, InterruptedException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 06e80ff531a..2e7d076f3f8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -62,10 +62,10 @@
/**
* An input gate consumes one or more partitions of a single produced
intermediate result.
*
- * <p> Each intermediate result is partitioned over its producing parallel
subtasks; each of these
+ * <p>Each intermediate result is partitioned over its producing parallel
subtasks; each of these
* partitions is furthermore partitioned into one or more subpartitions.
*
- * <p> As an example, consider a map-reduce program, where the map operator
produces data and the
+ * <p>As an example, consider a map-reduce program, where the map operator
produces data and the
* reduce operator consumes the produced data.
*
* <pre>{@code
@@ -74,7 +74,7 @@
* +-----+ +---------------------+ +--------+
* }</pre>
*
- * <p> When deploying such a program in parallel, the intermediate result will
be partitioned over its
+ * <p>When deploying such a program in parallel, the intermediate result will
be partitioned over its
* producing parallel subtasks; each of these partitions is furthermore
partitioned into one or more
* subpartitions.
*
@@ -95,7 +95,7 @@
* +-----------------------------------------+
* }</pre>
*
- * <p> In the above example, two map subtasks produce the intermediate result
in parallel, resulting
+ * <p>In the above example, two map subtasks produce the intermediate result
in parallel, resulting
* in two partitions (Partition 1 and 2). Each of these partitions is further
partitioned into two
* subpartitions -- one for each parallel reduce subtask.
*/
@@ -274,6 +274,11 @@ public int getNumberOfQueuedBuffers() {
return 0;
}
+ @Override
+ public String getOwningTaskName() {
+ return owningTaskName;
+ }
+
//
------------------------------------------------------------------------
// Setup/Life-cycle
//
------------------------------------------------------------------------
@@ -364,7 +369,7 @@ else if (partitionLocation.isRemote()) {
throw new IllegalStateException("Tried
to update unknown channel with unknown channel.");
}
- LOG.debug("Updated unknown input channel to
{}.", newChannel);
+ LOG.debug("{}: Updated unknown input channel to
{}.", owningTaskName, newChannel);
inputChannels.put(partitionId, newChannel);
@@ -393,7 +398,7 @@ public void
retriggerPartitionRequest(IntermediateResultPartitionID partitionId)
checkNotNull(ch, "Unknown input channel with ID
" + partitionId);
- LOG.debug("Retriggering partition request
{}:{}.", ch.partitionId, consumedSubpartitionIndex);
+ LOG.debug("{}: Retriggering partition request
{}:{}.", owningTaskName, ch.partitionId, consumedSubpartitionIndex);
if (ch.getClass() == RemoteInputChannel.class) {
final RemoteInputChannel rch =
(RemoteInputChannel) ch;
@@ -432,7 +437,8 @@ public void releaseAllResources() throws IOException {
inputChannel.releaseAllResources();
}
catch (IOException e) {
- LOG.warn("Error during
release of channel resources: " + e.getMessage(), e);
+ LOG.warn("{}: Error
during release of channel resources: {}.",
+ owningTaskName,
e.getMessage(), e);
}
}
@@ -725,7 +731,8 @@ else if (partitionLocation.isUnknown()) {
inputGate.setInputChannel(partitionId.getPartitionId(),
inputChannels[i]);
}
- LOG.debug("Created {} input channels (local: {}, remote: {},
unknown: {}).",
+ LOG.debug("{}: Created {} input channels (local: {}, remote:
{}, unknown: {}).",
+ owningTaskName,
inputChannels.length,
numLocalChannels,
numRemoteChannels,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 742592a93f0..d3085cb5279 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -129,6 +129,12 @@ public int getNumberOfInputChannels() {
return totalNumberOfInputChannels;
}
+ @Override
+ public String getOwningTaskName() {
+ // all input gates have the same owning task
+ return inputGates[0].getOwningTaskName();
+ }
+
@Override
public boolean isFinished() {
for (InputGate inputGate : inputGates) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 78852b82fdc..991635a9205 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -213,7 +213,7 @@ else if (bufferOrEvent.getEvent().getClass() ==
CancelCheckpointMarker.class) {
}
private void completeBufferedSequence() throws IOException {
- LOG.debug("Finished feeding back buffered data");
+ LOG.debug("{}: Finished feeding back buffered data.",
inputGate.getOwningTaskName());
currentBuffered.cleanup();
currentBuffered = queuedBuffered.pollFirst();
@@ -247,8 +247,11 @@ private void processBarrier(CheckpointBarrier
receivedBarrier, int channelIndex)
}
else if (barrierId > currentCheckpointId) {
// we did not complete the current checkpoint,
another started before
- LOG.warn("Received checkpoint barrier for
checkpoint {} before completing current checkpoint {}. " +
- "Skipping current checkpoint.",
barrierId, currentCheckpointId);
+ LOG.warn("{}: Received checkpoint barrier for
checkpoint {} before completing current checkpoint {}. " +
+ "Skipping current checkpoint.",
+ inputGate.getOwningTaskName(),
+ barrierId,
+ currentCheckpointId);
// let the task know we are not completing this
notifyAbort(currentCheckpointId, new
CheckpointDeclineSubsumedException(barrierId));
@@ -279,8 +282,10 @@ else if (barrierId > currentCheckpointId) {
if (numBarriersReceived + numClosedChannels ==
totalNumberOfInputChannels) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
- LOG.debug("Received all barriers, triggering
checkpoint {} at {}",
- receivedBarrier.getId(),
receivedBarrier.getTimestamp());
+ LOG.debug("{}: Received all barriers,
triggering checkpoint {} at {}.",
+ inputGate.getOwningTaskName(),
+ receivedBarrier.getId(),
+ receivedBarrier.getTimestamp());
}
releaseBlocksAndResetBarriers();
@@ -309,7 +314,9 @@ private void
processCancellationBarrier(CancelCheckpointMarker cancelBarrier) th
if (barrierId == currentCheckpointId) {
// cancel this alignment
if (LOG.isDebugEnabled()) {
- LOG.debug("Checkpoint {} canceled,
aborting alignment", barrierId);
+ LOG.debug("{}: Checkpoint {} canceled,
aborting alignment.",
+ inputGate.getOwningTaskName(),
+ barrierId);
}
releaseBlocksAndResetBarriers();
@@ -317,8 +324,11 @@ private void
processCancellationBarrier(CancelCheckpointMarker cancelBarrier) th
}
else if (barrierId > currentCheckpointId) {
// we canceled the next which also cancels the
current
- LOG.warn("Received cancellation barrier for
checkpoint {} before completing current checkpoint {}. " +
- "Skipping current checkpoint.",
barrierId, currentCheckpointId);
+ LOG.warn("{}: Received cancellation barrier for
checkpoint {} before completing current checkpoint {}. " +
+ "Skipping current checkpoint.",
+ inputGate.getOwningTaskName(),
+ barrierId,
+ currentCheckpointId);
// this stops the current alignment
releaseBlocksAndResetBarriers();
@@ -347,7 +357,9 @@ else if (barrierId > currentCheckpointId) {
latestAlignmentDurationNanos = 0L;
if (LOG.isDebugEnabled()) {
- LOG.debug("Checkpoint {} canceled, skipping
alignment", barrierId);
+ LOG.debug("{}: Checkpoint {} canceled, skipping
alignment.",
+ inputGate.getOwningTaskName(),
+ barrierId);
}
notifyAbortOnCancellationBarrier(barrierId);
@@ -401,8 +413,10 @@ private void notifyAbort(long checkpointId,
CheckpointDeclineException cause) th
private void checkSizeLimit() throws Exception {
if (maxBufferedBytes > 0 && (numQueuedBytes +
bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
// exceeded our limit - abort this checkpoint
- LOG.info("Checkpoint {} aborted because alignment
volume limit ({} bytes) exceeded",
- currentCheckpointId, maxBufferedBytes);
+ LOG.info("{}: Checkpoint {} aborted because alignment
volume limit ({} bytes) exceeded.",
+ inputGate.getOwningTaskName(),
+ currentCheckpointId,
+ maxBufferedBytes);
releaseBlocksAndResetBarriers();
notifyAbort(currentCheckpointId, new
AlignmentLimitExceededException(maxBufferedBytes));
@@ -444,7 +458,9 @@ private void beginNewAlignment(long checkpointId, int
channelIndex) throws IOExc
startOfAlignmentTimestamp = System.nanoTime();
if (LOG.isDebugEnabled()) {
- LOG.debug("Starting stream alignment for checkpoint " +
checkpointId + '.');
+ LOG.debug("{}: Starting stream alignment for checkpoint
{}.",
+ inputGate.getOwningTaskName(),
+ checkpointId);
}
}
@@ -470,7 +486,9 @@ private void onBarrier(int channelIndex) throws IOException
{
numBarriersReceived++;
if (LOG.isDebugEnabled()) {
- LOG.debug("Received barrier from channel " +
channelIndex);
+ LOG.debug("{}: Received barrier from channel
{}.",
+ inputGate.getOwningTaskName(),
+ channelIndex);
}
}
else {
@@ -483,7 +501,8 @@ private void onBarrier(int channelIndex) throws IOException
{
* Makes sure the just written data is the next to be consumed.
*/
private void releaseBlocksAndResetBarriers() throws IOException {
- LOG.debug("End of stream alignment, feeding buffered data
back");
+ LOG.debug("{}: End of stream alignment, feeding buffered data
back.",
+ inputGate.getOwningTaskName());
for (int i = 0; i < blockedChannels.length; i++) {
blockedChannels[i] = false;
@@ -499,8 +518,9 @@ private void releaseBlocksAndResetBarriers() throws
IOException {
else {
// uncommon case: buffered data pending
// push back the pending data, if we have any
- LOG.debug("Checkpoint skipped via buffered data:" +
- "Pushing back current alignment buffers
and feeding back new alignment data first.");
+ LOG.debug("{}: Checkpoint skipped via buffered data:" +
+ "Pushing back current alignment buffers
and feeding back new alignment data first.",
+ inputGate.getOwningTaskName());
// since we did not fully drain the previous sequence,
we need to allocate a new buffer for this one
BufferOrEventSequence bufferedNow =
bufferBlocker.rollOverWithoutReusingResources();
@@ -513,8 +533,9 @@ private void releaseBlocksAndResetBarriers() throws
IOException {
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Size of buffered data: {} bytes",
- currentBuffered == null ? 0L :
currentBuffered.size());
+ LOG.debug("{}: Size of buffered data: {} bytes",
+ inputGate.getOwningTaskName(),
+ currentBuffered == null ? 0L :
currentBuffered.size());
}
// the next barrier that comes must assume it is the first
@@ -555,7 +576,10 @@ public long getAlignmentDurationNanos() {
@Override
public String toString() {
- return String.format("last checkpoint: %d, current barriers:
%d, closed channels: %d",
- currentCheckpointId, numBarriersReceived,
numClosedChannels);
+ return String.format("%s: last checkpoint: %d, current
barriers: %d, closed channels: %d",
+ inputGate.getOwningTaskName(),
+ currentCheckpointId,
+ numBarriersReceived,
+ numClosedChannels);
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 6dd1e5ef2d6..e968101ca2d 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -139,11 +139,18 @@ public boolean isNextBarrier() {
private int currentChannel = 0;
private long c = 0;
+ private final String owningTaskName;
+
public RandomGeneratingInputGate(BufferPool[] bufferPools,
BarrierGenerator[] barrierGens) {
+ this(bufferPools, barrierGens, "TestTask");
+ }
+
+ public RandomGeneratingInputGate(BufferPool[] bufferPools,
BarrierGenerator[] barrierGens, String owningTaskName) {
this.numChannels = bufferPools.length;
this.currentBarriers = new int[numChannels];
this.bufferPools = bufferPools;
this.barrierGens = barrierGens;
+ this.owningTaskName = owningTaskName;
}
@Override
@@ -151,6 +158,11 @@ public int getNumberOfInputChannels() {
return numChannels;
}
+ @Override
+ public String getOwningTaskName() {
+ return owningTaskName;
+ }
+
@Override
public boolean isFinished() {
return false;
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index e62b709419f..6400a175e20 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -44,11 +44,18 @@
private int closedChannels;
+ private final String owningTaskName;
+
public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent>
bufferOrEvents) {
+ this(pageSize, numChannels, bufferOrEvents, "MockTask");
+ }
+
+ public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent>
bufferOrEvents, String owningTaskName) {
this.pageSize = pageSize;
this.numChannels = numChannels;
this.bufferOrEvents = new
ArrayDeque<BufferOrEvent>(bufferOrEvents);
this.closed = new boolean[numChannels];
+ this.owningTaskName = owningTaskName;
}
@Override
@@ -61,6 +68,11 @@ public int getNumberOfInputChannels() {
return numChannels;
}
+ @Override
+ public String getOwningTaskName() {
+ return owningTaskName;
+ }
+
@Override
public boolean isFinished() {
return bufferOrEvents.isEmpty();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Improve logging in BarrierBuffer
> --------------------------------
>
> Key: FLINK-10006
> URL: https://issues.apache.org/jira/browse/FLINK-10006
> Project: Flink
> Issue Type: Improvement
> Components: Network
> Affects Versions: 1.5.2, 1.6.0, 1.7.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Priority: Major
> Labels: pull-request-available
>
> Almost all log messages of {{BarrierBuffer}} do not contain the task name and
> are therefore of little use if either multiple slots are executed on a single
> TM or multiple checkpoints run in parallel.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)