This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d5ddb0fbdc41fa8230cff7c54b993e62ddef2447
Author: Nico Kruber <[email protected]>
AuthorDate: Tue Jul 31 14:48:58 2018 +0200

    [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning 
task name
    
    This closes #6470.
---
 .../io/network/partition/consumer/InputGate.java   |  2 +
 .../partition/consumer/SingleInputGate.java        |  5 ++
 .../network/partition/consumer/UnionInputGate.java |  6 ++
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 64 +++++++++++++++-------
 .../runtime/io/BarrierBufferMassiveRandomTest.java | 12 ++++
 .../flink/streaming/runtime/io/MockInputGate.java  | 12 ++++
 6 files changed, 81 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0413caa..c78abb5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -69,6 +69,8 @@ public interface InputGate {
 
        int getNumberOfInputChannels();
 
+       String getOwningTaskName();
+
        boolean isFinished();
 
        void requestPartitions() throws IOException, InterruptedException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index dbef46f..2e7d076 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -274,6 +274,11 @@ public class SingleInputGate implements InputGate {
                return 0;
        }
 
+       @Override
+       public String getOwningTaskName() {
+               return owningTaskName;
+       }
+
        // 
------------------------------------------------------------------------
        // Setup/Life-cycle
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 742592a..d3085cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -130,6 +130,12 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
        }
 
        @Override
+       public String getOwningTaskName() {
+               // all input gates have the same owning task
+               return inputGates[0].getOwningTaskName();
+       }
+
+       @Override
        public boolean isFinished() {
                for (InputGate inputGate : inputGates) {
                        if (!inputGate.isFinished()) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 78852b8..991635a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -213,7 +213,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        }
 
        private void completeBufferedSequence() throws IOException {
-               LOG.debug("Finished feeding back buffered data");
+               LOG.debug("{}: Finished feeding back buffered data.", 
inputGate.getOwningTaskName());
 
                currentBuffered.cleanup();
                currentBuffered = queuedBuffered.pollFirst();
@@ -247,8 +247,11 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        }
                        else if (barrierId > currentCheckpointId) {
                                // we did not complete the current checkpoint, 
another started before
-                               LOG.warn("Received checkpoint barrier for 
checkpoint {} before completing current checkpoint {}. " +
-                                               "Skipping current checkpoint.", 
barrierId, currentCheckpointId);
+                               LOG.warn("{}: Received checkpoint barrier for 
checkpoint {} before completing current checkpoint {}. " +
+                                               "Skipping current checkpoint.",
+                                       inputGate.getOwningTaskName(),
+                                       barrierId,
+                                       currentCheckpointId);
 
                                // let the task know we are not completing this
                                notifyAbort(currentCheckpointId, new 
CheckpointDeclineSubsumedException(barrierId));
@@ -279,8 +282,10 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                if (numBarriersReceived + numClosedChannels == 
totalNumberOfInputChannels) {
                        // actually trigger checkpoint
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("Received all barriers, triggering 
checkpoint {} at {}",
-                                               receivedBarrier.getId(), 
receivedBarrier.getTimestamp());
+                               LOG.debug("{}: Received all barriers, 
triggering checkpoint {} at {}.",
+                                       inputGate.getOwningTaskName(),
+                                       receivedBarrier.getId(),
+                                       receivedBarrier.getTimestamp());
                        }
 
                        releaseBlocksAndResetBarriers();
@@ -309,7 +314,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        if (barrierId == currentCheckpointId) {
                                // cancel this alignment
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Checkpoint {} canceled, 
aborting alignment", barrierId);
+                                       LOG.debug("{}: Checkpoint {} canceled, 
aborting alignment.",
+                                               inputGate.getOwningTaskName(),
+                                               barrierId);
                                }
 
                                releaseBlocksAndResetBarriers();
@@ -317,8 +324,11 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        }
                        else if (barrierId > currentCheckpointId) {
                                // we canceled the next which also cancels the 
current
-                               LOG.warn("Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
-                                               "Skipping current checkpoint.", 
barrierId, currentCheckpointId);
+                               LOG.warn("{}: Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
+                                               "Skipping current checkpoint.",
+                                       inputGate.getOwningTaskName(),
+                                       barrierId,
+                                       currentCheckpointId);
 
                                // this stops the current alignment
                                releaseBlocksAndResetBarriers();
@@ -347,7 +357,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        latestAlignmentDurationNanos = 0L;
 
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("Checkpoint {} canceled, skipping 
alignment", barrierId);
+                               LOG.debug("{}: Checkpoint {} canceled, skipping 
alignment.",
+                                       inputGate.getOwningTaskName(),
+                                       barrierId);
                        }
 
                        notifyAbortOnCancellationBarrier(barrierId);
@@ -401,8 +413,10 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        private void checkSizeLimit() throws Exception {
                if (maxBufferedBytes > 0 && (numQueuedBytes + 
bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
                        // exceeded our limit - abort this checkpoint
-                       LOG.info("Checkpoint {} aborted because alignment 
volume limit ({} bytes) exceeded",
-                                       currentCheckpointId, maxBufferedBytes);
+                       LOG.info("{}: Checkpoint {} aborted because alignment 
volume limit ({} bytes) exceeded.",
+                               inputGate.getOwningTaskName(),
+                               currentCheckpointId,
+                               maxBufferedBytes);
 
                        releaseBlocksAndResetBarriers();
                        notifyAbort(currentCheckpointId, new 
AlignmentLimitExceededException(maxBufferedBytes));
@@ -444,7 +458,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                startOfAlignmentTimestamp = System.nanoTime();
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Starting stream alignment for checkpoint " + 
checkpointId + '.');
+                       LOG.debug("{}: Starting stream alignment for checkpoint 
{}.",
+                               inputGate.getOwningTaskName(),
+                               checkpointId);
                }
        }
 
@@ -470,7 +486,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        numBarriersReceived++;
 
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("Received barrier from channel " + 
channelIndex);
+                               LOG.debug("{}: Received barrier from channel 
{}.",
+                                       inputGate.getOwningTaskName(),
+                                       channelIndex);
                        }
                }
                else {
@@ -483,7 +501,8 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * Makes sure the just written data is the next to be consumed.
         */
        private void releaseBlocksAndResetBarriers() throws IOException {
-               LOG.debug("End of stream alignment, feeding buffered data 
back");
+               LOG.debug("{}: End of stream alignment, feeding buffered data 
back.",
+                       inputGate.getOwningTaskName());
 
                for (int i = 0; i < blockedChannels.length; i++) {
                        blockedChannels[i] = false;
@@ -499,8 +518,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                else {
                        // uncommon case: buffered data pending
                        // push back the pending data, if we have any
-                       LOG.debug("Checkpoint skipped via buffered data:" +
-                                       "Pushing back current alignment buffers 
and feeding back new alignment data first.");
+                       LOG.debug("{}: Checkpoint skipped via buffered data:" +
+                                       "Pushing back current alignment buffers 
and feeding back new alignment data first.",
+                               inputGate.getOwningTaskName());
 
                        // since we did not fully drain the previous sequence, 
we need to allocate a new buffer for this one
                        BufferOrEventSequence bufferedNow = 
bufferBlocker.rollOverWithoutReusingResources();
@@ -513,8 +533,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                }
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Size of buffered data: {} bytes",
-                                       currentBuffered == null ? 0L : 
currentBuffered.size());
+                       LOG.debug("{}: Size of buffered data: {} bytes",
+                               inputGate.getOwningTaskName(),
+                               currentBuffered == null ? 0L : 
currentBuffered.size());
                }
 
                // the next barrier that comes must assume it is the first
@@ -555,7 +576,10 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
        @Override
        public String toString() {
-               return String.format("last checkpoint: %d, current barriers: 
%d, closed channels: %d",
-                               currentCheckpointId, numBarriersReceived, 
numClosedChannels);
+               return String.format("%s: last checkpoint: %d, current 
barriers: %d, closed channels: %d",
+                       inputGate.getOwningTaskName(),
+                       currentCheckpointId,
+                       numBarriersReceived,
+                       numClosedChannels);
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 6dd1e5e..e968101 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -139,11 +139,18 @@ public class BarrierBufferMassiveRandomTest {
                private int currentChannel = 0;
                private long c = 0;
 
+               private final String owningTaskName;
+
                public RandomGeneratingInputGate(BufferPool[] bufferPools, 
BarrierGenerator[] barrierGens) {
+                       this(bufferPools, barrierGens, "TestTask");
+               }
+
+               public RandomGeneratingInputGate(BufferPool[] bufferPools, 
BarrierGenerator[] barrierGens, String owningTaskName) {
                        this.numChannels = bufferPools.length;
                        this.currentBarriers = new int[numChannels];
                        this.bufferPools = bufferPools;
                        this.barrierGens = barrierGens;
+                       this.owningTaskName = owningTaskName;
                }
 
                @Override
@@ -152,6 +159,11 @@ public class BarrierBufferMassiveRandomTest {
                }
 
                @Override
+               public String getOwningTaskName() {
+                       return owningTaskName;
+               }
+
+               @Override
                public boolean isFinished() {
                        return false;
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index e62b709..6400a17 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -44,11 +44,18 @@ public class MockInputGate implements InputGate {
 
        private int closedChannels;
 
+       private final String owningTaskName;
+
        public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> 
bufferOrEvents) {
+               this(pageSize, numChannels, bufferOrEvents, "MockTask");
+       }
+
+       public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> 
bufferOrEvents, String owningTaskName) {
                this.pageSize = pageSize;
                this.numChannels = numChannels;
                this.bufferOrEvents = new 
ArrayDeque<BufferOrEvent>(bufferOrEvents);
                this.closed = new boolean[numChannels];
+               this.owningTaskName = owningTaskName;
        }
 
        @Override
@@ -62,6 +69,11 @@ public class MockInputGate implements InputGate {
        }
 
        @Override
+       public String getOwningTaskName() {
+               return owningTaskName;
+       }
+
+       @Override
        public boolean isFinished() {
                return bufferOrEvents.isEmpty();
        }

Reply via email to