This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit aa5f0618ef88161b23097b45df4055a58ca11685 Author: Piotr Nowojski <[email protected]> AuthorDate: Thu Jun 13 14:42:14 2019 +0200 [FLINK-12777][network] Extract CheckpointBarrierAligner from BarrierBuffer --- .../runtime/io/AbstractBufferStorage.java | 5 - .../flink/streaming/runtime/io/BarrierBuffer.java | 369 ++------------------ .../flink/streaming/runtime/io/BufferStorage.java | 2 - ...erBuffer.java => CheckpointBarrierAligner.java} | 379 ++++++--------------- 4 files changed, 144 insertions(+), 611 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java index f7e4dd7..5eb30cf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java @@ -164,11 +164,6 @@ public abstract class AbstractBufferStorage implements BufferStorage { } @Override - public long currentBufferedSize() { - return currentBuffered != null ? currentBuffered.size() : 0L; - } - - @Override public long getMaxBufferedBytes() { return maxBufferedBytes; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 0f8fa40..1594389 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -19,10 +19,6 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.checkpoint.CheckpointException; -import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -54,45 +50,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class); + private final CheckpointBarrierAligner barrierAligner; + /** The gate that the buffer draws its input from. */ private final InputGate inputGate; - /** Flags that indicate whether a channel is currently blocked/buffered. */ - private final boolean[] blockedChannels; - - /** The total number of channels that this buffer handles data from. */ - private final int totalNumberOfInputChannels; - - /** To utility to write blocked data to a file channel. */ private final BufferStorage bufferStorage; - private final String taskName; - - @Nullable - private final AbstractInvokable toNotifyOnCheckpoint; - - /** The ID of the checkpoint for which we expect barriers. */ - private long currentCheckpointId = -1L; - - /** - * The number of received barriers (= number of blocked/buffered channels) IMPORTANT: A canceled - * checkpoint must always have 0 barriers. - */ - private int numBarriersReceived; - - /** The number of already closed channels. */ - private int numClosedChannels; - - /** The timestamp as in {@link System#nanoTime()} at which the last alignment started. */ - private long startOfAlignmentTimestamp; - - /** The time (in nanoseconds) that the latest alignment took. */ - private long latestAlignmentDurationNanos; - /** Flag to indicate whether we have drawn all available input. */ - private boolean endOfStream; + private boolean endOfInputGate; - /** Indicate end of the input. Set to true after encountering {@link #endOfStream} and depleting + /** Indicate end of the input. Set to true after encountering {@link #endOfInputGate} and depleting * {@link #bufferStorage}. */ private boolean isFinished; @@ -122,19 +90,16 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * @param toNotifyOnCheckpoint optional Handler that receives the checkpoint notifications. */ BarrierBuffer( - InputGate inputGate, - BufferStorage bufferStorage, - String taskName, - @Nullable AbstractInvokable toNotifyOnCheckpoint) { - + InputGate inputGate, + BufferStorage bufferStorage, + String taskName, + @Nullable AbstractInvokable toNotifyOnCheckpoint) { this.inputGate = inputGate; - this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); - this.blockedChannels = new boolean[this.totalNumberOfInputChannels]; - this.bufferStorage = checkNotNull(bufferStorage); - - this.taskName = taskName; - this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; + this.barrierAligner = new CheckpointBarrierAligner( + inputGate.getNumberOfInputChannels(), + taskName, + toNotifyOnCheckpoint); } @Override @@ -145,10 +110,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler { return AVAILABLE; } - // ------------------------------------------------------------------------ - // Buffer and barrier handling - // ------------------------------------------------------------------------ - @Override public Optional<BufferOrEvent> pollNext() throws Exception { while (true) { @@ -170,28 +131,36 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } BufferOrEvent bufferOrEvent = next.get(); - if (isBlocked(bufferOrEvent.getChannelIndex())) { + if (barrierAligner.isBlocked(bufferOrEvent.getChannelIndex())) { // if the channel is blocked, we just store the BufferOrEvent bufferStorage.add(bufferOrEvent); if (bufferStorage.isFull()) { - sizeLimitExceeded(); + barrierAligner.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes()); + bufferStorage.rollOver(); } } else if (bufferOrEvent.isBuffer()) { return next; } else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { - if (!endOfStream) { + CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent(); + if (!endOfInputGate) { // process barriers only if there is a chance of the checkpoint completing - processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); + if (barrierAligner.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) { + bufferStorage.rollOver(); + } } } else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) { - processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent()); + if (barrierAligner.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) { + bufferStorage.rollOver(); + } } else { if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) { - processEndOfPartition(); + if (barrierAligner.processEndOfPartition()) { + bufferStorage.rollOver(); + } } return next; } @@ -203,216 +172,18 @@ public class BarrierBuffer implements CheckpointBarrierHandler { return Optional.empty(); } - if (endOfStream) { + if (endOfInputGate) { isFinished = true; return Optional.empty(); } else { // end of input stream. stream continues with the buffered data - endOfStream = true; - releaseBlocksAndResetBarriers(); + endOfInputGate = true; + barrierAligner.releaseBlocksAndResetBarriers(); + bufferStorage.rollOver(); return pollNext(); } } - private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { - final long barrierId = receivedBarrier.getId(); - - // fast path for single channel cases - if (totalNumberOfInputChannels == 1) { - if (barrierId > currentCheckpointId) { - // new checkpoint - currentCheckpointId = barrierId; - notifyCheckpoint(receivedBarrier); - } - return; - } - - // -- general code path for multiple input channels -- - - if (numBarriersReceived > 0) { - // this is only true if some alignment is already progress and was not canceled - - if (barrierId == currentCheckpointId) { - // regular case - onBarrier(channelIndex); - } - else if (barrierId > currentCheckpointId) { - // we did not complete the current checkpoint, another started before - LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + - "Skipping current checkpoint.", - taskName, - barrierId, - currentCheckpointId); - - // let the task know we are not completing this - notifyAbort(currentCheckpointId, - new CheckpointException( - "Barrier id: " + barrierId, - CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)); - - // abort the current checkpoint - releaseBlocksAndResetBarriers(); - - // begin a the new checkpoint - beginNewAlignment(barrierId, channelIndex); - } - else { - // ignore trailing barrier from an earlier checkpoint (obsolete now) - return; - } - } - else if (barrierId > currentCheckpointId) { - // first barrier of a new checkpoint - beginNewAlignment(barrierId, channelIndex); - } - else { - // either the current checkpoint was canceled (numBarriers == 0) or - // this barrier is from an old subsumed checkpoint - return; - } - - // check if we have all barriers - since canceled checkpoints always have zero barriers - // this can only happen on a non canceled checkpoint - if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { - // actually trigger checkpoint - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.", - taskName, - receivedBarrier.getId(), - receivedBarrier.getTimestamp()); - } - - releaseBlocksAndResetBarriers(); - notifyCheckpoint(receivedBarrier); - } - } - - private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception { - final long barrierId = cancelBarrier.getCheckpointId(); - - // fast path for single channel cases - if (totalNumberOfInputChannels == 1) { - if (barrierId > currentCheckpointId) { - // new checkpoint - currentCheckpointId = barrierId; - notifyAbortOnCancellationBarrier(barrierId); - } - return; - } - - // -- general code path for multiple input channels -- - - if (numBarriersReceived > 0) { - // this is only true if some alignment is in progress and nothing was canceled - - if (barrierId == currentCheckpointId) { - // cancel this alignment - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, barrierId); - } - - releaseBlocksAndResetBarriers(); - notifyAbortOnCancellationBarrier(barrierId); - } - else if (barrierId > currentCheckpointId) { - // we canceled the next which also cancels the current - LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " + - "Skipping current checkpoint.", - taskName, - barrierId, - currentCheckpointId); - - // this stops the current alignment - releaseBlocksAndResetBarriers(); - - // the next checkpoint starts as canceled - currentCheckpointId = barrierId; - startOfAlignmentTimestamp = 0L; - latestAlignmentDurationNanos = 0L; - - notifyAbortOnCancellationBarrier(barrierId); - } - - // else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now) - - } - else if (barrierId > currentCheckpointId) { - // first barrier of a new checkpoint is directly a cancellation - - // by setting the currentCheckpointId to this checkpoint while keeping the numBarriers - // at zero means that no checkpoint barrier can start a new alignment - currentCheckpointId = barrierId; - - startOfAlignmentTimestamp = 0L; - latestAlignmentDurationNanos = 0L; - - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Checkpoint {} canceled, skipping alignment.", taskName, barrierId); - } - - notifyAbortOnCancellationBarrier(barrierId); - } - - // else: trailing barrier from either - // - a previous (subsumed) checkpoint - // - the current checkpoint if it was already canceled - } - - private void processEndOfPartition() throws Exception { - numClosedChannels++; - - if (numBarriersReceived > 0) { - // let the task know we skip a checkpoint - notifyAbort(currentCheckpointId, - new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)); - - // no chance to complete this checkpoint - releaseBlocksAndResetBarriers(); - } - } - - private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception { - if (toNotifyOnCheckpoint != null) { - CheckpointMetaData checkpointMetaData = - new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp()); - - CheckpointMetrics checkpointMetrics = new CheckpointMetrics() - .setBytesBufferedInAlignment(bufferStorage.currentBufferedSize()) - .setAlignmentDurationNanos(latestAlignmentDurationNanos); - - toNotifyOnCheckpoint.triggerCheckpointOnBarrier( - checkpointMetaData, - checkpointBarrier.getCheckpointOptions(), - checkpointMetrics); - } - } - - private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception { - notifyAbort(checkpointId, - new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)); - } - - private void notifyAbort(long checkpointId, CheckpointException cause) throws Exception { - if (toNotifyOnCheckpoint != null) { - toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause); - } - } - - private void sizeLimitExceeded() throws Exception { - long maxBufferedBytes = bufferStorage.getMaxBufferedBytes(); - // exceeded our limit - abort this checkpoint - LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.", - taskName, - currentCheckpointId, - maxBufferedBytes); - - releaseBlocksAndResetBarriers(); - notifyAbort(currentCheckpointId, - new CheckpointException( - "Max buffered bytes: " + maxBufferedBytes, - CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)); - } - @Override public boolean isEmpty() { return bufferStorage.isEmpty(); @@ -428,69 +199,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler { bufferStorage.close(); } - private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException { - currentCheckpointId = checkpointId; - onBarrier(channelIndex); - - startOfAlignmentTimestamp = System.nanoTime(); - - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Starting stream alignment for checkpoint {}.", taskName, checkpointId); - } - } - - /** - * Checks whether the channel with the given index is blocked. - * - * @param channelIndex The channel index to check. - * @return True if the channel is blocked, false if not. - */ - private boolean isBlocked(int channelIndex) { - return blockedChannels[channelIndex]; - } - - /** - * Blocks the given channel index, from which a barrier has been received. - * - * @param channelIndex The channel index to block. - */ - private void onBarrier(int channelIndex) throws IOException { - if (!blockedChannels[channelIndex]) { - blockedChannels[channelIndex] = true; - - numBarriersReceived++; - - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Received barrier from channel {}.", taskName, channelIndex); - } - } - else { - throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex); - } - } - - /** - * Releases the blocks on all channels and resets the barrier count. - * Makes sure the just written data is the next to be consumed. - */ - private void releaseBlocksAndResetBarriers() throws IOException { - LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName); - - for (int i = 0; i < blockedChannels.length; i++) { - blockedChannels[i] = false; - } - - bufferStorage.rollOver(); - - // the next barrier that comes must assume it is the first - numBarriersReceived = 0; - - if (startOfAlignmentTimestamp > 0) { - latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp; - startOfAlignmentTimestamp = 0; - } - } - // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ @@ -501,22 +209,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * @return The ID of the pending of completed checkpoint. */ public long getCurrentCheckpointId() { - return this.currentCheckpointId; + return barrierAligner.getCurrentCheckpointId(); } @Override public long getAlignmentDurationNanos() { - long start = this.startOfAlignmentTimestamp; - if (start <= 0) { - return latestAlignmentDurationNanos; - } else { - return System.nanoTime() - start; - } + return barrierAligner.getAlignmentDurationNanos(); } @Override public int getNumberOfInputChannels() { - return totalNumberOfInputChannels; + return inputGate.getNumberOfInputChannels(); } // ------------------------------------------------------------------------ @@ -525,10 +228,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler { @Override public String toString() { - return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d", - taskName, - currentCheckpointId, - numBarriersReceived, - numClosedChannels); + return barrierAligner.toString(); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java index 8e6194d..4ad7eac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java @@ -67,8 +67,6 @@ public interface BufferStorage extends AutoCloseable { Optional<BufferOrEvent> pollNext() throws IOException; - long currentBufferedSize(); - long getMaxBufferedBytes(); /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java similarity index 55% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java index 0f8fa40..30e05c1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +19,12 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; -import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.slf4j.Logger; @@ -36,26 +33,16 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until - * all inputs have received the barrier for a given checkpoint. - * - * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the - * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until - * the blocks are released. + * {@link CheckpointBarrierAligner} keep tracks of received {@link CheckpointBarrier} on given + * channels and controls the alignment, by deciding which channels should be blocked and when to + * release blocked channels. */ @Internal -public class BarrierBuffer implements CheckpointBarrierHandler { - - private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class); +public class CheckpointBarrierAligner { - /** The gate that the buffer draws its input from. */ - private final InputGate inputGate; + private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierAligner.class); /** Flags that indicate whether a channel is currently blocked/buffered. */ private final boolean[] blockedChannels; @@ -63,9 +50,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler { /** The total number of channels that this buffer handles data from. */ private final int totalNumberOfInputChannels; - /** To utility to write blocked data to a file channel. */ - private final BufferStorage bufferStorage; - private final String taskName; @Nullable @@ -89,132 +73,47 @@ public class BarrierBuffer implements CheckpointBarrierHandler { /** The time (in nanoseconds) that the latest alignment took. */ private long latestAlignmentDurationNanos; - /** Flag to indicate whether we have drawn all available input. */ - private boolean endOfStream; - - /** Indicate end of the input. Set to true after encountering {@link #endOfStream} and depleting - * {@link #bufferStorage}. */ - private boolean isFinished; - - /** - * Creates a new checkpoint stream aligner. - * - * <p>There is no limit to how much data may be buffered during an alignment. - * - * @param inputGate The input gate to draw the buffers and events from. - * @param bufferStorage The storage to hold the buffers and events for blocked channels. - */ - @VisibleForTesting - BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) { - this (inputGate, bufferStorage, "Testing: No task associated", null); - } - - /** - * Creates a new checkpoint stream aligner. - * - * <p>The aligner will allow only alignments that buffer up to the given number of bytes. - * When that number is exceeded, it will stop the alignment and notify the task that the - * checkpoint has been cancelled. - * - * @param inputGate The input gate to draw the buffers and events from. - * @param bufferStorage The storage to hold the buffers and events for blocked channels. - * @param taskName The task name for logging. - * @param toNotifyOnCheckpoint optional Handler that receives the checkpoint notifications. - */ - BarrierBuffer( - InputGate inputGate, - BufferStorage bufferStorage, - String taskName, - @Nullable AbstractInvokable toNotifyOnCheckpoint) { - - this.inputGate = inputGate; - this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); - this.blockedChannels = new boolean[this.totalNumberOfInputChannels]; - - this.bufferStorage = checkNotNull(bufferStorage); - + CheckpointBarrierAligner( + int totalNumberOfInputChannels, + String taskName, + @Nullable AbstractInvokable toNotifyOnCheckpoint) { + this.totalNumberOfInputChannels = totalNumberOfInputChannels; this.taskName = taskName; this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; - } - @Override - public CompletableFuture<?> isAvailable() { - if (bufferStorage.isEmpty()) { - return inputGate.isAvailable(); - } - return AVAILABLE; + this.blockedChannels = new boolean[totalNumberOfInputChannels]; } - // ------------------------------------------------------------------------ - // Buffer and barrier handling - // ------------------------------------------------------------------------ + public void releaseBlocksAndResetBarriers() throws IOException { + LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName); - @Override - public Optional<BufferOrEvent> pollNext() throws Exception { - while (true) { - // process buffered BufferOrEvents before grabbing new ones - Optional<BufferOrEvent> next; - if (bufferStorage.isEmpty()) { - next = inputGate.pollNext(); - } - else { - // TODO: FLINK-12536 for non credit-based flow control, getNext method is blocking - next = bufferStorage.pollNext(); - if (!next.isPresent()) { - return pollNext(); - } - } + for (int i = 0; i < blockedChannels.length; i++) { + blockedChannels[i] = false; + } - if (!next.isPresent()) { - return handleEmptyBuffer(); - } + // the next barrier that comes must assume it is the first + numBarriersReceived = 0; - BufferOrEvent bufferOrEvent = next.get(); - if (isBlocked(bufferOrEvent.getChannelIndex())) { - // if the channel is blocked, we just store the BufferOrEvent - bufferStorage.add(bufferOrEvent); - if (bufferStorage.isFull()) { - sizeLimitExceeded(); - } - } - else if (bufferOrEvent.isBuffer()) { - return next; - } - else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { - if (!endOfStream) { - // process barriers only if there is a chance of the checkpoint completing - processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); - } - } - else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) { - processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent()); - } - else { - if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) { - processEndOfPartition(); - } - return next; - } + if (startOfAlignmentTimestamp > 0) { + latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp; + startOfAlignmentTimestamp = 0; } } - private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception { - if (!inputGate.isFinished()) { - return Optional.empty(); - } - - if (endOfStream) { - isFinished = true; - return Optional.empty(); - } else { - // end of input stream. stream continues with the buffered data - endOfStream = true; - releaseBlocksAndResetBarriers(); - return pollNext(); - } + /** + * Checks whether the channel with the given index is blocked. + * + * @param channelIndex The channel index to check. + * @return True if the channel is blocked, false if not. + */ + public boolean isBlocked(int channelIndex) { + return blockedChannels[channelIndex]; } - private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { + /** + * @return true if some blocked data should be unblocked/rolled over. + */ + public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception { final long barrierId = receivedBarrier.getId(); // fast path for single channel cases @@ -222,11 +121,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (barrierId > currentCheckpointId) { // new checkpoint currentCheckpointId = barrierId; - notifyCheckpoint(receivedBarrier); + notifyCheckpoint(receivedBarrier, bufferedBytes); } - return; + return false; } + boolean checkpointAborted = false; + // -- general code path for multiple input channels -- if (numBarriersReceived > 0) { @@ -252,13 +153,14 @@ public class BarrierBuffer implements CheckpointBarrierHandler { // abort the current checkpoint releaseBlocksAndResetBarriers(); + checkpointAborted = true; // begin a the new checkpoint beginNewAlignment(barrierId, channelIndex); } else { // ignore trailing barrier from an earlier checkpoint (obsolete now) - return; + return false; } } else if (barrierId > currentCheckpointId) { @@ -268,7 +170,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { else { // either the current checkpoint was canceled (numBarriers == 0) or // this barrier is from an old subsumed checkpoint - return; + return false; } // check if we have all barriers - since canceled checkpoints always have zero barriers @@ -283,11 +185,47 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } releaseBlocksAndResetBarriers(); - notifyCheckpoint(receivedBarrier); + notifyCheckpoint(receivedBarrier, bufferedBytes); + return true; + } + return checkpointAborted; + } + + protected void beginNewAlignment(long checkpointId, int channelIndex) throws IOException { + currentCheckpointId = checkpointId; + onBarrier(channelIndex); + + startOfAlignmentTimestamp = System.nanoTime(); + + if (LOG.isDebugEnabled()) { + LOG.debug("{}: Starting stream alignment for checkpoint {}.", taskName, checkpointId); } } - private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception { + /** + * Blocks the given channel index, from which a barrier has been received. + * + * @param channelIndex The channel index to block. + */ + protected void onBarrier(int channelIndex) throws IOException { + if (!blockedChannels[channelIndex]) { + blockedChannels[channelIndex] = true; + + numBarriersReceived++; + + if (LOG.isDebugEnabled()) { + LOG.debug("{}: Received barrier from channel {}.", taskName, channelIndex); + } + } + else { + throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex); + } + } + + /** + * @return true if some blocked data should be unblocked/rolled over. + */ + public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception { final long barrierId = cancelBarrier.getCheckpointId(); // fast path for single channel cases @@ -297,7 +235,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { currentCheckpointId = barrierId; notifyAbortOnCancellationBarrier(barrierId); } - return; + return false; } // -- general code path for multiple input channels -- @@ -313,6 +251,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { releaseBlocksAndResetBarriers(); notifyAbortOnCancellationBarrier(barrierId); + return true; } else if (barrierId > currentCheckpointId) { // we canceled the next which also cancels the current @@ -331,6 +270,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { latestAlignmentDurationNanos = 0L; notifyAbortOnCancellationBarrier(barrierId); + return true; } // else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now) @@ -351,33 +291,39 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } notifyAbortOnCancellationBarrier(barrierId); + return false; } // else: trailing barrier from either // - a previous (subsumed) checkpoint // - the current checkpoint if it was already canceled + return false; } - private void processEndOfPartition() throws Exception { + /** + * @return true if some blocked data should be unblocked/rolled over. + */ + public boolean processEndOfPartition() throws Exception { numClosedChannels++; if (numBarriersReceived > 0) { // let the task know we skip a checkpoint notifyAbort(currentCheckpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)); - // no chance to complete this checkpoint releaseBlocksAndResetBarriers(); + return true; } + return false; } - private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception { + private void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes) throws Exception { if (toNotifyOnCheckpoint != null) { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp()); CheckpointMetrics checkpointMetrics = new CheckpointMetrics() - .setBytesBufferedInAlignment(bufferStorage.currentBufferedSize()) + .setBytesBufferedInAlignment(bufferedBytes) .setAlignmentDurationNanos(latestAlignmentDurationNanos); toNotifyOnCheckpoint.triggerCheckpointOnBarrier( @@ -398,132 +344,19 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } } - private void sizeLimitExceeded() throws Exception { - long maxBufferedBytes = bufferStorage.getMaxBufferedBytes(); - // exceeded our limit - abort this checkpoint - LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.", - taskName, - currentCheckpointId, - maxBufferedBytes); - - releaseBlocksAndResetBarriers(); - notifyAbort(currentCheckpointId, - new CheckpointException( - "Max buffered bytes: " + maxBufferedBytes, - CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)); - } - - @Override - public boolean isEmpty() { - return bufferStorage.isEmpty(); - } - - @Override - public boolean isFinished() { - return isFinished; - } - - @Override - public void cleanup() throws IOException { - bufferStorage.close(); - } - - private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException { - currentCheckpointId = checkpointId; - onBarrier(channelIndex); - - startOfAlignmentTimestamp = System.nanoTime(); - - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Starting stream alignment for checkpoint {}.", taskName, checkpointId); - } - } - - /** - * Checks whether the channel with the given index is blocked. - * - * @param channelIndex The channel index to check. - * @return True if the channel is blocked, false if not. - */ - private boolean isBlocked(int channelIndex) { - return blockedChannels[channelIndex]; - } - - /** - * Blocks the given channel index, from which a barrier has been received. - * - * @param channelIndex The channel index to block. - */ - private void onBarrier(int channelIndex) throws IOException { - if (!blockedChannels[channelIndex]) { - blockedChannels[channelIndex] = true; - - numBarriersReceived++; - - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Received barrier from channel {}.", taskName, channelIndex); - } - } - else { - throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex); - } - } - - /** - * Releases the blocks on all channels and resets the barrier count. - * Makes sure the just written data is the next to be consumed. - */ - private void releaseBlocksAndResetBarriers() throws IOException { - LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName); - - for (int i = 0; i < blockedChannels.length; i++) { - blockedChannels[i] = false; - } - - bufferStorage.rollOver(); - - // the next barrier that comes must assume it is the first - numBarriersReceived = 0; - - if (startOfAlignmentTimestamp > 0) { - latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp; - startOfAlignmentTimestamp = 0; - } - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - /** - * Gets the ID defining the current pending, or just completed, checkpoint. - * - * @return The ID of the pending of completed checkpoint. - */ public long getCurrentCheckpointId() { - return this.currentCheckpointId; + return currentCheckpointId; } - @Override public long getAlignmentDurationNanos() { - long start = this.startOfAlignmentTimestamp; - if (start <= 0) { + if (startOfAlignmentTimestamp <= 0) { return latestAlignmentDurationNanos; } else { - return System.nanoTime() - start; + return System.nanoTime() - startOfAlignmentTimestamp; } } @Override - public int getNumberOfInputChannels() { - return totalNumberOfInputChannels; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override public String toString() { return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d", taskName, @@ -531,4 +364,12 @@ public class BarrierBuffer implements CheckpointBarrierHandler { numBarriersReceived, numClosedChannels); } + + public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception { + releaseBlocksAndResetBarriers(); + notifyAbort(currentCheckpointId, + new CheckpointException( + "Max buffered bytes: " + maxBufferedBytes, + CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)); + } }
