This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa5f0618ef88161b23097b45df4055a58ca11685
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Jun 13 14:42:14 2019 +0200

    [FLINK-12777][network] Extract CheckpointBarrierAligner from BarrierBuffer
---
 .../runtime/io/AbstractBufferStorage.java          |   5 -
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 369 ++------------------
 .../flink/streaming/runtime/io/BufferStorage.java  |   2 -
 ...erBuffer.java => CheckpointBarrierAligner.java} | 379 ++++++---------------
 4 files changed, 144 insertions(+), 611 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java
index f7e4dd7..5eb30cf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java
@@ -164,11 +164,6 @@ public abstract class AbstractBufferStorage implements 
BufferStorage {
        }
 
        @Override
-       public long currentBufferedSize() {
-               return currentBuffered != null ? currentBuffered.size() : 0L;
-       }
-
-       @Override
        public long getMaxBufferedBytes() {
                return maxBufferedBytes;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 0f8fa40..1594389 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -19,10 +19,6 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -54,45 +50,17 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(BarrierBuffer.class);
 
+       private final CheckpointBarrierAligner barrierAligner;
+
        /** The gate that the buffer draws its input from. */
        private final InputGate inputGate;
 
-       /** Flags that indicate whether a channel is currently 
blocked/buffered. */
-       private final boolean[] blockedChannels;
-
-       /** The total number of channels that this buffer handles data from. */
-       private final int totalNumberOfInputChannels;
-
-       /** To utility to write blocked data to a file channel. */
        private final BufferStorage bufferStorage;
 
-       private final String taskName;
-
-       @Nullable
-       private final AbstractInvokable toNotifyOnCheckpoint;
-
-       /** The ID of the checkpoint for which we expect barriers. */
-       private long currentCheckpointId = -1L;
-
-       /**
-        * The number of received barriers (= number of blocked/buffered 
channels) IMPORTANT: A canceled
-        * checkpoint must always have 0 barriers.
-        */
-       private int numBarriersReceived;
-
-       /** The number of already closed channels. */
-       private int numClosedChannels;
-
-       /** The timestamp as in {@link System#nanoTime()} at which the last 
alignment started. */
-       private long startOfAlignmentTimestamp;
-
-       /** The time (in nanoseconds) that the latest alignment took. */
-       private long latestAlignmentDurationNanos;
-
        /** Flag to indicate whether we have drawn all available input. */
-       private boolean endOfStream;
+       private boolean endOfInputGate;
 
-       /** Indicate end of the input. Set to true after encountering {@link 
#endOfStream} and depleting
+       /** Indicate end of the input. Set to true after encountering {@link 
#endOfInputGate} and depleting
         * {@link #bufferStorage}. */
        private boolean isFinished;
 
@@ -122,19 +90,16 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * @param toNotifyOnCheckpoint optional Handler that receives the 
checkpoint notifications.
         */
        BarrierBuffer(
-               InputGate inputGate,
-               BufferStorage bufferStorage,
-               String taskName,
-               @Nullable AbstractInvokable toNotifyOnCheckpoint) {
-
+                       InputGate inputGate,
+                       BufferStorage bufferStorage,
+                       String taskName,
+                       @Nullable AbstractInvokable toNotifyOnCheckpoint) {
                this.inputGate = inputGate;
-               this.totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
-               this.blockedChannels = new 
boolean[this.totalNumberOfInputChannels];
-
                this.bufferStorage = checkNotNull(bufferStorage);
-
-               this.taskName = taskName;
-               this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
+               this.barrierAligner = new CheckpointBarrierAligner(
+                       inputGate.getNumberOfInputChannels(),
+                       taskName,
+                       toNotifyOnCheckpoint);
        }
 
        @Override
@@ -145,10 +110,6 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                return AVAILABLE;
        }
 
-       // 
------------------------------------------------------------------------
-       //  Buffer and barrier handling
-       // 
------------------------------------------------------------------------
-
        @Override
        public Optional<BufferOrEvent> pollNext() throws Exception {
                while (true) {
@@ -170,28 +131,36 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        }
 
                        BufferOrEvent bufferOrEvent = next.get();
-                       if (isBlocked(bufferOrEvent.getChannelIndex())) {
+                       if 
(barrierAligner.isBlocked(bufferOrEvent.getChannelIndex())) {
                                // if the channel is blocked, we just store the 
BufferOrEvent
                                bufferStorage.add(bufferOrEvent);
                                if (bufferStorage.isFull()) {
-                                       sizeLimitExceeded();
+                                       
barrierAligner.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
+                                       bufferStorage.rollOver();
                                }
                        }
                        else if (bufferOrEvent.isBuffer()) {
                                return next;
                        }
                        else if (bufferOrEvent.getEvent().getClass() == 
CheckpointBarrier.class) {
-                               if (!endOfStream) {
+                               CheckpointBarrier checkpointBarrier = 
(CheckpointBarrier) bufferOrEvent.getEvent();
+                               if (!endOfInputGate) {
                                        // process barriers only if there is a 
chance of the checkpoint completing
-                                       processBarrier((CheckpointBarrier) 
bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
+                                       if 
(barrierAligner.processBarrier(checkpointBarrier, 
bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
+                                               bufferStorage.rollOver();
+                                       }
                                }
                        }
                        else if (bufferOrEvent.getEvent().getClass() == 
CancelCheckpointMarker.class) {
-                               
processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
+                               if 
(barrierAligner.processCancellationBarrier((CancelCheckpointMarker) 
bufferOrEvent.getEvent())) {
+                                       bufferStorage.rollOver();
+                               }
                        }
                        else {
                                if (bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class) {
-                                       processEndOfPartition();
+                                       if 
(barrierAligner.processEndOfPartition()) {
+                                               bufferStorage.rollOver();
+                                       }
                                }
                                return next;
                        }
@@ -203,216 +172,18 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        return Optional.empty();
                }
 
-               if (endOfStream) {
+               if (endOfInputGate) {
                        isFinished = true;
                        return Optional.empty();
                } else {
                        // end of input stream. stream continues with the 
buffered data
-                       endOfStream = true;
-                       releaseBlocksAndResetBarriers();
+                       endOfInputGate = true;
+                       barrierAligner.releaseBlocksAndResetBarriers();
+                       bufferStorage.rollOver();
                        return pollNext();
                }
        }
 
-       private void processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex) throws Exception {
-               final long barrierId = receivedBarrier.getId();
-
-               // fast path for single channel cases
-               if (totalNumberOfInputChannels == 1) {
-                       if (barrierId > currentCheckpointId) {
-                               // new checkpoint
-                               currentCheckpointId = barrierId;
-                               notifyCheckpoint(receivedBarrier);
-                       }
-                       return;
-               }
-
-               // -- general code path for multiple input channels --
-
-               if (numBarriersReceived > 0) {
-                       // this is only true if some alignment is already 
progress and was not canceled
-
-                       if (barrierId == currentCheckpointId) {
-                               // regular case
-                               onBarrier(channelIndex);
-                       }
-                       else if (barrierId > currentCheckpointId) {
-                               // we did not complete the current checkpoint, 
another started before
-                               LOG.warn("{}: Received checkpoint barrier for 
checkpoint {} before completing current checkpoint {}. " +
-                                               "Skipping current checkpoint.",
-                                       taskName,
-                                       barrierId,
-                                       currentCheckpointId);
-
-                               // let the task know we are not completing this
-                               notifyAbort(currentCheckpointId,
-                                       new CheckpointException(
-                                               "Barrier id: " + barrierId,
-                                               
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
-
-                               // abort the current checkpoint
-                               releaseBlocksAndResetBarriers();
-
-                               // begin a the new checkpoint
-                               beginNewAlignment(barrierId, channelIndex);
-                       }
-                       else {
-                               // ignore trailing barrier from an earlier 
checkpoint (obsolete now)
-                               return;
-                       }
-               }
-               else if (barrierId > currentCheckpointId) {
-                       // first barrier of a new checkpoint
-                       beginNewAlignment(barrierId, channelIndex);
-               }
-               else {
-                       // either the current checkpoint was canceled 
(numBarriers == 0) or
-                       // this barrier is from an old subsumed checkpoint
-                       return;
-               }
-
-               // check if we have all barriers - since canceled checkpoints 
always have zero barriers
-               // this can only happen on a non canceled checkpoint
-               if (numBarriersReceived + numClosedChannels == 
totalNumberOfInputChannels) {
-                       // actually trigger checkpoint
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("{}: Received all barriers, 
triggering checkpoint {} at {}.",
-                                       taskName,
-                                       receivedBarrier.getId(),
-                                       receivedBarrier.getTimestamp());
-                       }
-
-                       releaseBlocksAndResetBarriers();
-                       notifyCheckpoint(receivedBarrier);
-               }
-       }
-
-       private void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
-               final long barrierId = cancelBarrier.getCheckpointId();
-
-               // fast path for single channel cases
-               if (totalNumberOfInputChannels == 1) {
-                       if (barrierId > currentCheckpointId) {
-                               // new checkpoint
-                               currentCheckpointId = barrierId;
-                               notifyAbortOnCancellationBarrier(barrierId);
-                       }
-                       return;
-               }
-
-               // -- general code path for multiple input channels --
-
-               if (numBarriersReceived > 0) {
-                       // this is only true if some alignment is in progress 
and nothing was canceled
-
-                       if (barrierId == currentCheckpointId) {
-                               // cancel this alignment
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("{}: Checkpoint {} canceled, 
aborting alignment.", taskName, barrierId);
-                               }
-
-                               releaseBlocksAndResetBarriers();
-                               notifyAbortOnCancellationBarrier(barrierId);
-                       }
-                       else if (barrierId > currentCheckpointId) {
-                               // we canceled the next which also cancels the 
current
-                               LOG.warn("{}: Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
-                                               "Skipping current checkpoint.",
-                                       taskName,
-                                       barrierId,
-                                       currentCheckpointId);
-
-                               // this stops the current alignment
-                               releaseBlocksAndResetBarriers();
-
-                               // the next checkpoint starts as canceled
-                               currentCheckpointId = barrierId;
-                               startOfAlignmentTimestamp = 0L;
-                               latestAlignmentDurationNanos = 0L;
-
-                               notifyAbortOnCancellationBarrier(barrierId);
-                       }
-
-                       // else: ignore trailing (cancellation) barrier from an 
earlier checkpoint (obsolete now)
-
-               }
-               else if (barrierId > currentCheckpointId) {
-                       // first barrier of a new checkpoint is directly a 
cancellation
-
-                       // by setting the currentCheckpointId to this 
checkpoint while keeping the numBarriers
-                       // at zero means that no checkpoint barrier can start a 
new alignment
-                       currentCheckpointId = barrierId;
-
-                       startOfAlignmentTimestamp = 0L;
-                       latestAlignmentDurationNanos = 0L;
-
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("{}: Checkpoint {} canceled, skipping 
alignment.", taskName, barrierId);
-                       }
-
-                       notifyAbortOnCancellationBarrier(barrierId);
-               }
-
-               // else: trailing barrier from either
-               //   - a previous (subsumed) checkpoint
-               //   - the current checkpoint if it was already canceled
-       }
-
-       private void processEndOfPartition() throws Exception {
-               numClosedChannels++;
-
-               if (numBarriersReceived > 0) {
-                       // let the task know we skip a checkpoint
-                       notifyAbort(currentCheckpointId,
-                               new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
-
-                       // no chance to complete this checkpoint
-                       releaseBlocksAndResetBarriers();
-               }
-       }
-
-       private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) 
throws Exception {
-               if (toNotifyOnCheckpoint != null) {
-                       CheckpointMetaData checkpointMetaData =
-                               new 
CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
-
-                       CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
-                               
.setBytesBufferedInAlignment(bufferStorage.currentBufferedSize())
-                               
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
-
-                       toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
-                               checkpointMetaData,
-                               checkpointBarrier.getCheckpointOptions(),
-                               checkpointMetrics);
-               }
-       }
-
-       private void notifyAbortOnCancellationBarrier(long checkpointId) throws 
Exception {
-               notifyAbort(checkpointId,
-                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
-       }
-
-       private void notifyAbort(long checkpointId, CheckpointException cause) 
throws Exception {
-               if (toNotifyOnCheckpoint != null) {
-                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
-               }
-       }
-
-       private void sizeLimitExceeded() throws Exception {
-               long maxBufferedBytes = bufferStorage.getMaxBufferedBytes();
-               // exceeded our limit - abort this checkpoint
-               LOG.info("{}: Checkpoint {} aborted because alignment volume 
limit ({} bytes) exceeded.",
-                       taskName,
-                       currentCheckpointId,
-                       maxBufferedBytes);
-
-               releaseBlocksAndResetBarriers();
-               notifyAbort(currentCheckpointId,
-                       new CheckpointException(
-                               "Max buffered bytes: " + maxBufferedBytes,
-                               
CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED));
-       }
-
        @Override
        public boolean isEmpty() {
                return bufferStorage.isEmpty();
@@ -428,69 +199,6 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                bufferStorage.close();
        }
 
-       private void beginNewAlignment(long checkpointId, int channelIndex) 
throws IOException {
-               currentCheckpointId = checkpointId;
-               onBarrier(channelIndex);
-
-               startOfAlignmentTimestamp = System.nanoTime();
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("{}: Starting stream alignment for checkpoint 
{}.", taskName, checkpointId);
-               }
-       }
-
-       /**
-        * Checks whether the channel with the given index is blocked.
-        *
-        * @param channelIndex The channel index to check.
-        * @return True if the channel is blocked, false if not.
-        */
-       private boolean isBlocked(int channelIndex) {
-               return blockedChannels[channelIndex];
-       }
-
-       /**
-        * Blocks the given channel index, from which a barrier has been 
received.
-        *
-        * @param channelIndex The channel index to block.
-        */
-       private void onBarrier(int channelIndex) throws IOException {
-               if (!blockedChannels[channelIndex]) {
-                       blockedChannels[channelIndex] = true;
-
-                       numBarriersReceived++;
-
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("{}: Received barrier from channel 
{}.", taskName, channelIndex);
-                       }
-               }
-               else {
-                       throw new IOException("Stream corrupt: Repeated barrier 
for same checkpoint on input " + channelIndex);
-               }
-       }
-
-       /**
-        * Releases the blocks on all channels and resets the barrier count.
-        * Makes sure the just written data is the next to be consumed.
-        */
-       private void releaseBlocksAndResetBarriers() throws IOException {
-               LOG.debug("{}: End of stream alignment, feeding buffered data 
back.", taskName);
-
-               for (int i = 0; i < blockedChannels.length; i++) {
-                       blockedChannels[i] = false;
-               }
-
-               bufferStorage.rollOver();
-
-               // the next barrier that comes must assume it is the first
-               numBarriersReceived = 0;
-
-               if (startOfAlignmentTimestamp > 0) {
-                       latestAlignmentDurationNanos = System.nanoTime() - 
startOfAlignmentTimestamp;
-                       startOfAlignmentTimestamp = 0;
-               }
-       }
-
        // 
------------------------------------------------------------------------
        //  Properties
        // 
------------------------------------------------------------------------
@@ -501,22 +209,17 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * @return The ID of the pending of completed checkpoint.
         */
        public long getCurrentCheckpointId() {
-               return this.currentCheckpointId;
+               return barrierAligner.getCurrentCheckpointId();
        }
 
        @Override
        public long getAlignmentDurationNanos() {
-               long start = this.startOfAlignmentTimestamp;
-               if (start <= 0) {
-                       return latestAlignmentDurationNanos;
-               } else {
-                       return System.nanoTime() - start;
-               }
+               return barrierAligner.getAlignmentDurationNanos();
        }
 
        @Override
        public int getNumberOfInputChannels() {
-               return totalNumberOfInputChannels;
+               return inputGate.getNumberOfInputChannels();
        }
 
        // 
------------------------------------------------------------------------
@@ -525,10 +228,6 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
        @Override
        public String toString() {
-               return String.format("%s: last checkpoint: %d, current 
barriers: %d, closed channels: %d",
-                       taskName,
-                       currentCheckpointId,
-                       numBarriersReceived,
-                       numClosedChannels);
+               return barrierAligner.toString();
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
index 8e6194d..4ad7eac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
@@ -67,8 +67,6 @@ public interface BufferStorage extends AutoCloseable {
 
        Optional<BufferOrEvent> pollNext() throws IOException;
 
-       long currentBufferedSize();
-
        long getMaxBufferedBytes();
 
        /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
similarity index 55%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
index 0f8fa40..30e05c1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +19,12 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
 import org.slf4j.Logger;
@@ -36,26 +33,16 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs 
with barriers until
- * all inputs have received the barrier for a given checkpoint.
- *
- * <p>To avoid back-pressuring the input streams (which may cause distributed 
deadlocks), the
- * BarrierBuffer continues receiving buffers from the blocked channels and 
stores them internally until
- * the blocks are released.
+ * {@link CheckpointBarrierAligner} keep tracks of received {@link 
CheckpointBarrier} on given
+ * channels and controls the alignment, by deciding which channels should be 
blocked and when to
+ * release blocked channels.
  */
 @Internal
-public class BarrierBuffer implements CheckpointBarrierHandler {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(BarrierBuffer.class);
+public class CheckpointBarrierAligner {
 
-       /** The gate that the buffer draws its input from. */
-       private final InputGate inputGate;
+       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointBarrierAligner.class);
 
        /** Flags that indicate whether a channel is currently 
blocked/buffered. */
        private final boolean[] blockedChannels;
@@ -63,9 +50,6 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        /** The total number of channels that this buffer handles data from. */
        private final int totalNumberOfInputChannels;
 
-       /** To utility to write blocked data to a file channel. */
-       private final BufferStorage bufferStorage;
-
        private final String taskName;
 
        @Nullable
@@ -89,132 +73,47 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        /** The time (in nanoseconds) that the latest alignment took. */
        private long latestAlignmentDurationNanos;
 
-       /** Flag to indicate whether we have drawn all available input. */
-       private boolean endOfStream;
-
-       /** Indicate end of the input. Set to true after encountering {@link 
#endOfStream} and depleting
-        * {@link #bufferStorage}. */
-       private boolean isFinished;
-
-       /**
-        * Creates a new checkpoint stream aligner.
-        *
-        * <p>There is no limit to how much data may be buffered during an 
alignment.
-        *
-        * @param inputGate The input gate to draw the buffers and events from.
-        * @param bufferStorage The storage to hold the buffers and events for 
blocked channels.
-        */
-       @VisibleForTesting
-       BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) {
-               this (inputGate, bufferStorage, "Testing: No task associated", 
null);
-       }
-
-       /**
-        * Creates a new checkpoint stream aligner.
-        *
-        * <p>The aligner will allow only alignments that buffer up to the 
given number of bytes.
-        * When that number is exceeded, it will stop the alignment and notify 
the task that the
-        * checkpoint has been cancelled.
-        *
-        * @param inputGate The input gate to draw the buffers and events from.
-        * @param bufferStorage The storage to hold the buffers and events for 
blocked channels.
-        * @param taskName The task name for logging.
-        * @param toNotifyOnCheckpoint optional Handler that receives the 
checkpoint notifications.
-        */
-       BarrierBuffer(
-               InputGate inputGate,
-               BufferStorage bufferStorage,
-               String taskName,
-               @Nullable AbstractInvokable toNotifyOnCheckpoint) {
-
-               this.inputGate = inputGate;
-               this.totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
-               this.blockedChannels = new 
boolean[this.totalNumberOfInputChannels];
-
-               this.bufferStorage = checkNotNull(bufferStorage);
-
+       CheckpointBarrierAligner(
+                       int totalNumberOfInputChannels,
+                       String taskName,
+                       @Nullable AbstractInvokable toNotifyOnCheckpoint) {
+               this.totalNumberOfInputChannels = totalNumberOfInputChannels;
                this.taskName = taskName;
                this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
-       }
 
-       @Override
-       public CompletableFuture<?> isAvailable() {
-               if (bufferStorage.isEmpty()) {
-                       return inputGate.isAvailable();
-               }
-               return AVAILABLE;
+               this.blockedChannels = new boolean[totalNumberOfInputChannels];
        }
 
-       // 
------------------------------------------------------------------------
-       //  Buffer and barrier handling
-       // 
------------------------------------------------------------------------
+       public void releaseBlocksAndResetBarriers() throws IOException {
+               LOG.debug("{}: End of stream alignment, feeding buffered data 
back.", taskName);
 
-       @Override
-       public Optional<BufferOrEvent> pollNext() throws Exception {
-               while (true) {
-                       // process buffered BufferOrEvents before grabbing new 
ones
-                       Optional<BufferOrEvent> next;
-                       if (bufferStorage.isEmpty()) {
-                               next = inputGate.pollNext();
-                       }
-                       else {
-                               // TODO: FLINK-12536 for non credit-based flow 
control, getNext method is blocking
-                               next = bufferStorage.pollNext();
-                               if (!next.isPresent()) {
-                                       return pollNext();
-                               }
-                       }
+               for (int i = 0; i < blockedChannels.length; i++) {
+                       blockedChannels[i] = false;
+               }
 
-                       if (!next.isPresent()) {
-                               return handleEmptyBuffer();
-                       }
+               // the next barrier that comes must assume it is the first
+               numBarriersReceived = 0;
 
-                       BufferOrEvent bufferOrEvent = next.get();
-                       if (isBlocked(bufferOrEvent.getChannelIndex())) {
-                               // if the channel is blocked, we just store the 
BufferOrEvent
-                               bufferStorage.add(bufferOrEvent);
-                               if (bufferStorage.isFull()) {
-                                       sizeLimitExceeded();
-                               }
-                       }
-                       else if (bufferOrEvent.isBuffer()) {
-                               return next;
-                       }
-                       else if (bufferOrEvent.getEvent().getClass() == 
CheckpointBarrier.class) {
-                               if (!endOfStream) {
-                                       // process barriers only if there is a 
chance of the checkpoint completing
-                                       processBarrier((CheckpointBarrier) 
bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
-                               }
-                       }
-                       else if (bufferOrEvent.getEvent().getClass() == 
CancelCheckpointMarker.class) {
-                               
processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
-                       }
-                       else {
-                               if (bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class) {
-                                       processEndOfPartition();
-                               }
-                               return next;
-                       }
+               if (startOfAlignmentTimestamp > 0) {
+                       latestAlignmentDurationNanos = System.nanoTime() - 
startOfAlignmentTimestamp;
+                       startOfAlignmentTimestamp = 0;
                }
        }
 
-       private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
-               if (!inputGate.isFinished()) {
-                       return Optional.empty();
-               }
-
-               if (endOfStream) {
-                       isFinished = true;
-                       return Optional.empty();
-               } else {
-                       // end of input stream. stream continues with the 
buffered data
-                       endOfStream = true;
-                       releaseBlocksAndResetBarriers();
-                       return pollNext();
-               }
+       /**
+        * Checks whether the channel with the given index is blocked.
+        *
+        * @param channelIndex The channel index to check.
+        * @return True if the channel is blocked, false if not.
+        */
+       public boolean isBlocked(int channelIndex) {
+               return blockedChannels[channelIndex];
        }
 
-       private void processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex) throws Exception {
+       /**
+        * @return true if some blocked data should be unblocked/rolled over.
+        */
+       public boolean processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex, long bufferedBytes) throws Exception {
                final long barrierId = receivedBarrier.getId();
 
                // fast path for single channel cases
@@ -222,11 +121,13 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        if (barrierId > currentCheckpointId) {
                                // new checkpoint
                                currentCheckpointId = barrierId;
-                               notifyCheckpoint(receivedBarrier);
+                               notifyCheckpoint(receivedBarrier, 
bufferedBytes);
                        }
-                       return;
+                       return false;
                }
 
+               boolean checkpointAborted = false;
+
                // -- general code path for multiple input channels --
 
                if (numBarriersReceived > 0) {
@@ -252,13 +153,14 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
                                // abort the current checkpoint
                                releaseBlocksAndResetBarriers();
+                               checkpointAborted = true;
 
                                // begin a the new checkpoint
                                beginNewAlignment(barrierId, channelIndex);
                        }
                        else {
                                // ignore trailing barrier from an earlier 
checkpoint (obsolete now)
-                               return;
+                               return false;
                        }
                }
                else if (barrierId > currentCheckpointId) {
@@ -268,7 +170,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                else {
                        // either the current checkpoint was canceled 
(numBarriers == 0) or
                        // this barrier is from an old subsumed checkpoint
-                       return;
+                       return false;
                }
 
                // check if we have all barriers - since canceled checkpoints 
always have zero barriers
@@ -283,11 +185,47 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        }
 
                        releaseBlocksAndResetBarriers();
-                       notifyCheckpoint(receivedBarrier);
+                       notifyCheckpoint(receivedBarrier, bufferedBytes);
+                       return true;
+               }
+               return checkpointAborted;
+       }
+
+       protected void beginNewAlignment(long checkpointId, int channelIndex) 
throws IOException {
+               currentCheckpointId = checkpointId;
+               onBarrier(channelIndex);
+
+               startOfAlignmentTimestamp = System.nanoTime();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("{}: Starting stream alignment for checkpoint 
{}.", taskName, checkpointId);
                }
        }
 
-       private void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
+       /**
+        * Blocks the given channel index, from which a barrier has been 
received.
+        *
+        * @param channelIndex The channel index to block.
+        */
+       protected void onBarrier(int channelIndex) throws IOException {
+               if (!blockedChannels[channelIndex]) {
+                       blockedChannels[channelIndex] = true;
+
+                       numBarriersReceived++;
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("{}: Received barrier from channel 
{}.", taskName, channelIndex);
+                       }
+               }
+               else {
+                       throw new IOException("Stream corrupt: Repeated barrier 
for same checkpoint on input " + channelIndex);
+               }
+       }
+
+       /**
+        * @return true if some blocked data should be unblocked/rolled over.
+        */
+       public boolean processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
                final long barrierId = cancelBarrier.getCheckpointId();
 
                // fast path for single channel cases
@@ -297,7 +235,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                currentCheckpointId = barrierId;
                                notifyAbortOnCancellationBarrier(barrierId);
                        }
-                       return;
+                       return false;
                }
 
                // -- general code path for multiple input channels --
@@ -313,6 +251,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
                                releaseBlocksAndResetBarriers();
                                notifyAbortOnCancellationBarrier(barrierId);
+                               return true;
                        }
                        else if (barrierId > currentCheckpointId) {
                                // we canceled the next which also cancels the 
current
@@ -331,6 +270,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                latestAlignmentDurationNanos = 0L;
 
                                notifyAbortOnCancellationBarrier(barrierId);
+                               return true;
                        }
 
                        // else: ignore trailing (cancellation) barrier from an 
earlier checkpoint (obsolete now)
@@ -351,33 +291,39 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        }
 
                        notifyAbortOnCancellationBarrier(barrierId);
+                       return false;
                }
 
                // else: trailing barrier from either
                //   - a previous (subsumed) checkpoint
                //   - the current checkpoint if it was already canceled
+               return false;
        }
 
-       private void processEndOfPartition() throws Exception {
+       /**
+        * @return true if some blocked data should be unblocked/rolled over.
+        */
+       public boolean processEndOfPartition() throws Exception {
                numClosedChannels++;
 
                if (numBarriersReceived > 0) {
                        // let the task know we skip a checkpoint
                        notifyAbort(currentCheckpointId,
                                new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
-
                        // no chance to complete this checkpoint
                        releaseBlocksAndResetBarriers();
+                       return true;
                }
+               return false;
        }
 
-       private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) 
throws Exception {
+       private void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long 
bufferedBytes) throws Exception {
                if (toNotifyOnCheckpoint != null) {
                        CheckpointMetaData checkpointMetaData =
                                new 
CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
 
                        CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
-                               
.setBytesBufferedInAlignment(bufferStorage.currentBufferedSize())
+                               .setBytesBufferedInAlignment(bufferedBytes)
                                
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
 
                        toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
@@ -398,132 +344,19 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                }
        }
 
-       private void sizeLimitExceeded() throws Exception {
-               long maxBufferedBytes = bufferStorage.getMaxBufferedBytes();
-               // exceeded our limit - abort this checkpoint
-               LOG.info("{}: Checkpoint {} aborted because alignment volume 
limit ({} bytes) exceeded.",
-                       taskName,
-                       currentCheckpointId,
-                       maxBufferedBytes);
-
-               releaseBlocksAndResetBarriers();
-               notifyAbort(currentCheckpointId,
-                       new CheckpointException(
-                               "Max buffered bytes: " + maxBufferedBytes,
-                               
CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED));
-       }
-
-       @Override
-       public boolean isEmpty() {
-               return bufferStorage.isEmpty();
-       }
-
-       @Override
-       public boolean isFinished() {
-               return isFinished;
-       }
-
-       @Override
-       public void cleanup() throws IOException {
-               bufferStorage.close();
-       }
-
-       private void beginNewAlignment(long checkpointId, int channelIndex) 
throws IOException {
-               currentCheckpointId = checkpointId;
-               onBarrier(channelIndex);
-
-               startOfAlignmentTimestamp = System.nanoTime();
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("{}: Starting stream alignment for checkpoint 
{}.", taskName, checkpointId);
-               }
-       }
-
-       /**
-        * Checks whether the channel with the given index is blocked.
-        *
-        * @param channelIndex The channel index to check.
-        * @return True if the channel is blocked, false if not.
-        */
-       private boolean isBlocked(int channelIndex) {
-               return blockedChannels[channelIndex];
-       }
-
-       /**
-        * Blocks the given channel index, from which a barrier has been 
received.
-        *
-        * @param channelIndex The channel index to block.
-        */
-       private void onBarrier(int channelIndex) throws IOException {
-               if (!blockedChannels[channelIndex]) {
-                       blockedChannels[channelIndex] = true;
-
-                       numBarriersReceived++;
-
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("{}: Received barrier from channel 
{}.", taskName, channelIndex);
-                       }
-               }
-               else {
-                       throw new IOException("Stream corrupt: Repeated barrier 
for same checkpoint on input " + channelIndex);
-               }
-       }
-
-       /**
-        * Releases the blocks on all channels and resets the barrier count.
-        * Makes sure the just written data is the next to be consumed.
-        */
-       private void releaseBlocksAndResetBarriers() throws IOException {
-               LOG.debug("{}: End of stream alignment, feeding buffered data 
back.", taskName);
-
-               for (int i = 0; i < blockedChannels.length; i++) {
-                       blockedChannels[i] = false;
-               }
-
-               bufferStorage.rollOver();
-
-               // the next barrier that comes must assume it is the first
-               numBarriersReceived = 0;
-
-               if (startOfAlignmentTimestamp > 0) {
-                       latestAlignmentDurationNanos = System.nanoTime() - 
startOfAlignmentTimestamp;
-                       startOfAlignmentTimestamp = 0;
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Properties
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets the ID defining the current pending, or just completed, 
checkpoint.
-        *
-        * @return The ID of the pending of completed checkpoint.
-        */
        public long getCurrentCheckpointId() {
-               return this.currentCheckpointId;
+               return currentCheckpointId;
        }
 
-       @Override
        public long getAlignmentDurationNanos() {
-               long start = this.startOfAlignmentTimestamp;
-               if (start <= 0) {
+               if (startOfAlignmentTimestamp <= 0) {
                        return latestAlignmentDurationNanos;
                } else {
-                       return System.nanoTime() - start;
+                       return System.nanoTime() - startOfAlignmentTimestamp;
                }
        }
 
        @Override
-       public int getNumberOfInputChannels() {
-               return totalNumberOfInputChannels;
-       }
-
-       // 
------------------------------------------------------------------------
-       // Utilities
-       // 
------------------------------------------------------------------------
-
-       @Override
        public String toString() {
                return String.format("%s: last checkpoint: %d, current 
barriers: %d, closed channels: %d",
                        taskName,
@@ -531,4 +364,12 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        numBarriersReceived,
                        numClosedChannels);
        }
+
+       public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws 
Exception {
+               releaseBlocksAndResetBarriers();
+               notifyAbort(currentCheckpointId,
+                       new CheckpointException(
+                               "Max buffered bytes: " + maxBufferedBytes,
+                               
CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED));
+       }
 }

Reply via email to