This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ed61c3a46b074abdb1c33606aa464a70e46a4dc8 Author: Nico Kruber <[email protected]> AuthorDate: Tue Jul 31 14:48:58 2018 +0200 [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name This closes #6470. --- .../io/network/partition/consumer/InputGate.java | 2 + .../partition/consumer/SingleInputGate.java | 5 ++ .../network/partition/consumer/UnionInputGate.java | 6 ++ .../flink/streaming/runtime/io/BarrierBuffer.java | 64 +++++++++++++++------- .../runtime/io/BarrierBufferMassiveRandomTest.java | 12 ++++ .../flink/streaming/runtime/io/MockInputGate.java | 12 ++++ 6 files changed, 81 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 0413caa..c78abb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -69,6 +69,8 @@ public interface InputGate { int getNumberOfInputChannels(); + String getOwningTaskName(); + boolean isFinished(); void requestPartitions() throws IOException, InterruptedException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index dbef46f..2e7d076 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -274,6 +274,11 @@ public class SingleInputGate implements InputGate { return 0; } + @Override + public String getOwningTaskName() { + return owningTaskName; + } + // ------------------------------------------------------------------------ // Setup/Life-cycle // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 742592a..d3085cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -130,6 +130,12 @@ public class UnionInputGate implements InputGate, InputGateListener { } @Override + public String getOwningTaskName() { + // all input gates have the same owning task + return inputGates[0].getOwningTaskName(); + } + + @Override public boolean isFinished() { for (InputGate inputGate : inputGates) { if (!inputGate.isFinished()) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 78852b8..991635a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -213,7 +213,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } private void completeBufferedSequence() throws IOException { - LOG.debug("Finished feeding back buffered data"); + LOG.debug("{}: Finished feeding back buffered data.", inputGate.getOwningTaskName()); currentBuffered.cleanup(); currentBuffered = queuedBuffered.pollFirst(); @@ -247,8 +247,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } else if (barrierId > currentCheckpointId) { // we did not complete the current checkpoint, another started before - LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + - "Skipping current checkpoint.", barrierId, currentCheckpointId); + LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + + "Skipping current checkpoint.", + inputGate.getOwningTaskName(), + barrierId, + currentCheckpointId); // let the task know we are not completing this notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); @@ -279,8 +282,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { // actually trigger checkpoint if (LOG.isDebugEnabled()) { - LOG.debug("Received all barriers, triggering checkpoint {} at {}", - receivedBarrier.getId(), receivedBarrier.getTimestamp()); + LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.", + inputGate.getOwningTaskName(), + receivedBarrier.getId(), + receivedBarrier.getTimestamp()); } releaseBlocksAndResetBarriers(); @@ -309,7 +314,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (barrierId == currentCheckpointId) { // cancel this alignment if (LOG.isDebugEnabled()) { - LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId); + LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", + inputGate.getOwningTaskName(), + barrierId); } releaseBlocksAndResetBarriers(); @@ -317,8 +324,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } else if (barrierId > currentCheckpointId) { // we canceled the next which also cancels the current - LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " + - "Skipping current checkpoint.", barrierId, currentCheckpointId); + LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " + + "Skipping current checkpoint.", + inputGate.getOwningTaskName(), + barrierId, + currentCheckpointId); // this stops the current alignment releaseBlocksAndResetBarriers(); @@ -347,7 +357,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { latestAlignmentDurationNanos = 0L; if (LOG.isDebugEnabled()) { - LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId); + LOG.debug("{}: Checkpoint {} canceled, skipping alignment.", + inputGate.getOwningTaskName(), + barrierId); } notifyAbortOnCancellationBarrier(barrierId); @@ -401,8 +413,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private void checkSizeLimit() throws Exception { if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) { // exceeded our limit - abort this checkpoint - LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded", - currentCheckpointId, maxBufferedBytes); + LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.", + inputGate.getOwningTaskName(), + currentCheckpointId, + maxBufferedBytes); releaseBlocksAndResetBarriers(); notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes)); @@ -444,7 +458,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { startOfAlignmentTimestamp = System.nanoTime(); if (LOG.isDebugEnabled()) { - LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.'); + LOG.debug("{}: Starting stream alignment for checkpoint {}.", + inputGate.getOwningTaskName(), + checkpointId); } } @@ -470,7 +486,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { numBarriersReceived++; if (LOG.isDebugEnabled()) { - LOG.debug("Received barrier from channel " + channelIndex); + LOG.debug("{}: Received barrier from channel {}.", + inputGate.getOwningTaskName(), + channelIndex); } } else { @@ -483,7 +501,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * Makes sure the just written data is the next to be consumed. */ private void releaseBlocksAndResetBarriers() throws IOException { - LOG.debug("End of stream alignment, feeding buffered data back"); + LOG.debug("{}: End of stream alignment, feeding buffered data back.", + inputGate.getOwningTaskName()); for (int i = 0; i < blockedChannels.length; i++) { blockedChannels[i] = false; @@ -499,8 +518,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { else { // uncommon case: buffered data pending // push back the pending data, if we have any - LOG.debug("Checkpoint skipped via buffered data:" + - "Pushing back current alignment buffers and feeding back new alignment data first."); + LOG.debug("{}: Checkpoint skipped via buffered data:" + + "Pushing back current alignment buffers and feeding back new alignment data first.", + inputGate.getOwningTaskName()); // since we did not fully drain the previous sequence, we need to allocate a new buffer for this one BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources(); @@ -513,8 +533,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } if (LOG.isDebugEnabled()) { - LOG.debug("Size of buffered data: {} bytes", - currentBuffered == null ? 0L : currentBuffered.size()); + LOG.debug("{}: Size of buffered data: {} bytes", + inputGate.getOwningTaskName(), + currentBuffered == null ? 0L : currentBuffered.size()); } // the next barrier that comes must assume it is the first @@ -555,7 +576,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler { @Override public String toString() { - return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d", - currentCheckpointId, numBarriersReceived, numClosedChannels); + return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d", + inputGate.getOwningTaskName(), + currentCheckpointId, + numBarriersReceived, + numClosedChannels); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index 6dd1e5e..e968101 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -139,11 +139,18 @@ public class BarrierBufferMassiveRandomTest { private int currentChannel = 0; private long c = 0; + private final String owningTaskName; + public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) { + this(bufferPools, barrierGens, "TestTask"); + } + + public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens, String owningTaskName) { this.numChannels = bufferPools.length; this.currentBarriers = new int[numChannels]; this.bufferPools = bufferPools; this.barrierGens = barrierGens; + this.owningTaskName = owningTaskName; } @Override @@ -152,6 +159,11 @@ public class BarrierBufferMassiveRandomTest { } @Override + public String getOwningTaskName() { + return owningTaskName; + } + + @Override public boolean isFinished() { return false; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index e62b709..6400a17 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -44,11 +44,18 @@ public class MockInputGate implements InputGate { private int closedChannels; + private final String owningTaskName; + public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents) { + this(pageSize, numChannels, bufferOrEvents, "MockTask"); + } + + public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents, String owningTaskName) { this.pageSize = pageSize; this.numChannels = numChannels; this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents); this.closed = new boolean[numChannels]; + this.owningTaskName = owningTaskName; } @Override @@ -62,6 +69,11 @@ public class MockInputGate implements InputGate { } @Override + public String getOwningTaskName() { + return owningTaskName; + } + + @Override public boolean isFinished() { return bufferOrEvents.isEmpty(); }
