[FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data for 
exactly-once

This closes #5400.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3126bf52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3126bf52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3126bf52

Branch: refs/heads/master
Commit: 3126bf522925a4f9a5cbf4c33077dbe7664363fa
Parents: 831349a
Author: Zhijiang <wangzhijiang...@aliyun.com>
Authored: Fri Feb 2 15:45:49 2018 +0800
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Feb 22 14:27:36 2018 +0100

----------------------------------------------------------------------
 .../flink/configuration/TaskManagerOptions.java |   12 +
 .../streaming/runtime/io/BarrierBuffer.java     |   36 +-
 .../streaming/runtime/io/BufferBlocker.java     |   68 +
 .../runtime/io/BufferOrEventSequence.java       |   59 +
 .../streaming/runtime/io/BufferSpiller.java     |   78 +-
 .../runtime/io/CachedBufferBlocker.java         |  147 ++
 .../runtime/io/InputProcessorUtil.java          |   71 +
 .../runtime/io/StreamInputProcessor.java        |   25 +-
 .../runtime/io/StreamTwoInputProcessor.java     |   25 +-
 .../io/BarrierBufferAlignmentLimitTest.java     |    4 +-
 .../io/BarrierBufferMassiveRandomTest.java      |    2 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 1545 ------------------
 .../runtime/io/BarrierBufferTestBase.java       | 1426 ++++++++++++++++
 .../runtime/io/BufferBlockerTestBase.java       |  325 ++++
 .../streaming/runtime/io/BufferSpillerTest.java |  341 +---
 .../runtime/io/CachedBufferBlockerTest.java     |   53 +
 .../io/CreditBasedBarrierBufferTest.java        |   49 +
 .../runtime/io/SpillingBarrierBufferTest.java   |   81 +
 18 files changed, 2361 insertions(+), 1986 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index e01cf0f..cc3284c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -317,6 +317,18 @@ public class TaskManagerOptions {
                        
key("taskmanager.network.credit-based-flow-control.enabled")
                        .defaultValue(true);
 
+       /**
+        * Config parameter defining whether to spill data for channels with 
barrier or not in exactly-once
+        * mode based on credit-based flow control.
+        *
+        * @deprecated Will be removed for Flink 1.6 when the old code will be 
dropped in favour of
+        * credit-based flow control.
+        */
+       @Deprecated
+       public static final ConfigOption<Boolean> 
EXACTLY_ONCE_BLOCKING_DATA_ENABLED =
+                       key("taskmanager.exactly-once.blocking.data.enabled")
+                       .defaultValue(true);
+
        // 
------------------------------------------------------------------------
        //  Task Options
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 7ef9fef..78852b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -41,6 +40,7 @@ import java.util.ArrayDeque;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs 
with barriers until
@@ -65,13 +65,13 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        private final int totalNumberOfInputChannels;
 
        /** To utility to write blocked data to a file channel. */
-       private final BufferSpiller bufferSpiller;
+       private final BufferBlocker bufferBlocker;
 
        /**
         * The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
         * from the input gate.
         */
-       private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> 
queuedBuffered;
+       private final ArrayDeque<BufferOrEventSequence> queuedBuffered;
 
        /**
         * The maximum number of bytes that may be buffered before an alignment 
is broken. -1 means
@@ -83,7 +83,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * The sequence of buffers/events that has been unblocked and must now 
be consumed before
         * requesting further data from the input gate.
         */
-       private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
+       private BufferOrEventSequence currentBuffered;
 
        /** Handler that receives the checkpoint notifications. */
        private AbstractInvokable toNotifyOnCheckpoint;
@@ -118,12 +118,12 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * <p>There is no limit to how much data may be buffered during an 
alignment.
         *
         * @param inputGate The input gate to draw the buffers and events from.
-        * @param ioManager The I/O manager that gives access to the temp 
directories.
+        * @param bufferBlocker The buffer blocker to hold the buffers and 
events for channels with barrier.
         *
         * @throws IOException Thrown, when the spilling to temp files cannot 
be initialized.
         */
-       public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws 
IOException {
-               this (inputGate, ioManager, -1);
+       public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) 
throws IOException {
+               this (inputGate, bufferBlocker, -1);
        }
 
        /**
@@ -134,12 +134,13 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * checkpoint has been cancelled.
         *
         * @param inputGate The input gate to draw the buffers and events from.
-        * @param ioManager The I/O manager that gives access to the temp 
directories.
+        * @param bufferBlocker The buffer blocker to hold the buffers and 
events for channels with barrier.
         * @param maxBufferedBytes The maximum bytes to be buffered before the 
checkpoint aborts.
         *
         * @throws IOException Thrown, when the spilling to temp files cannot 
be initialized.
         */
-       public BarrierBuffer(InputGate inputGate, IOManager ioManager, long 
maxBufferedBytes) throws IOException {
+       public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, 
long maxBufferedBytes)
+                       throws IOException {
                checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
 
                this.inputGate = inputGate;
@@ -147,8 +148,8 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                this.totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
                this.blockedChannels = new 
boolean[this.totalNumberOfInputChannels];
 
-               this.bufferSpiller = new BufferSpiller(ioManager, 
inputGate.getPageSize());
-               this.queuedBuffered = new 
ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
+               this.bufferBlocker = checkNotNull(bufferBlocker);
+               this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();
        }
 
        // 
------------------------------------------------------------------------
@@ -185,10 +186,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        }
 
                        BufferOrEvent bufferOrEvent = next.get();
-
                        if (isBlocked(bufferOrEvent.getChannelIndex())) {
                                // if the channel is blocked we, we just store 
the BufferOrEvent
-                               bufferSpiller.add(bufferOrEvent);
+                               bufferBlocker.add(bufferOrEvent);
                                checkSizeLimit();
                        }
                        else if (bufferOrEvent.isBuffer()) {
@@ -399,7 +399,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        }
 
        private void checkSizeLimit() throws Exception {
-               if (maxBufferedBytes > 0 && (numQueuedBytes + 
bufferSpiller.getBytesWritten()) > maxBufferedBytes) {
+               if (maxBufferedBytes > 0 && (numQueuedBytes + 
bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
                        // exceeded our limit - abort this checkpoint
                        LOG.info("Checkpoint {} aborted because alignment 
volume limit ({} bytes) exceeded",
                                        currentCheckpointId, maxBufferedBytes);
@@ -426,11 +426,11 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
        @Override
        public void cleanup() throws IOException {
-               bufferSpiller.close();
+               bufferBlocker.close();
                if (currentBuffered != null) {
                        currentBuffered.cleanup();
                }
-               for (BufferSpiller.SpilledBufferOrEventSequence seq : 
queuedBuffered) {
+               for (BufferOrEventSequence seq : queuedBuffered) {
                        seq.cleanup();
                }
                queuedBuffered.clear();
@@ -491,7 +491,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
                if (currentBuffered == null) {
                        // common case: no more buffered data
-                       currentBuffered = bufferSpiller.rollOver();
+                       currentBuffered = 
bufferBlocker.rollOverReusingResources();
                        if (currentBuffered != null) {
                                currentBuffered.open();
                        }
@@ -503,7 +503,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                        "Pushing back current alignment buffers 
and feeding back new alignment data first.");
 
                        // since we did not fully drain the previous sequence, 
we need to allocate a new buffer for this one
-                       BufferSpiller.SpilledBufferOrEventSequence bufferedNow 
= bufferSpiller.rollOverWithNewBuffer();
+                       BufferOrEventSequence bufferedNow = 
bufferBlocker.rollOverWithoutReusingResources();
                        if (bufferedNow != null) {
                                bufferedNow.open();
                                queuedBuffered.addFirst(currentBuffered);

http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
new file mode 100644
index 0000000..4d0f66f
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+
+/**
+ * The buffer blocker takes the buffers and events from a data stream and adds 
them in a sequence.
+ * After a number of elements have been added, the blocker can "roll over": It 
presents the added
+ * elements as a readable sequence, and creates a new sequence.
+ */
+@Internal
+public interface BufferBlocker {
+
+       /**
+        * Adds a buffer or event to the blocker.
+        *
+        * @param boe The buffer or event to be added into the blocker.
+        */
+       void add(BufferOrEvent boe) throws IOException;
+
+       /**
+        * Starts a new sequence of buffers and event without reusing the same 
resources and
+        * returns the current sequence of buffers for reading.
+        *
+        * @return The readable sequence of buffers and events, or 'null', if 
nothing was added.
+        */
+       BufferOrEventSequence rollOverWithoutReusingResources() throws 
IOException;
+
+       /**
+        * Starts a new sequence of buffers and event reusing the same 
resources and
+        * returns the current sequence of buffers for reading.
+        *
+        * @return The readable sequence of buffers and events, or 'null', if 
nothing was added.
+        */
+       BufferOrEventSequence rollOverReusingResources() throws IOException;
+
+       /**
+        * Cleans up all the resources in the current sequence.
+        */
+       void close() throws IOException;
+
+       /**
+        * Gets the number of bytes blocked in the current sequence.
+        *
+        * @return the number of bytes blocked in the current sequence.
+        */
+       long getBytesBlocked();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
new file mode 100644
index 0000000..c5bde1b
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * This class represents a sequence of buffers and events which are blocked by
+ * {@link CheckpointBarrierHandler}. The sequence of buffers and events can be
+ * read back using the method {@link #getNext()}.
+ */
+@Internal
+public interface BufferOrEventSequence {
+
+       /**
+        * Initializes the sequence for reading.
+        */
+       void open();
+
+       /**
+        * Gets the next BufferOrEvent from the sequence, or {@code null}, if 
the
+        * sequence is exhausted.
+        *
+        * @return The next BufferOrEvent from the buffered sequence, or {@code 
null} (end of sequence).
+        */
+       @Nullable
+       BufferOrEvent getNext() throws IOException;
+
+       /**
+        * Cleans up all the resources held by the sequence.
+        */
+       void cleanup() throws IOException;
+
+       /**
+        * Gets the size of the sequence.
+        */
+       long size();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 33aac7e..7a0be33 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -45,13 +45,14 @@ import java.util.concurrent.atomic.AtomicInteger;
  *
  * <p>This implementation buffers data effectively in the OS cache, which 
gracefully extends to the
  * disk. Most data is written and re-read milliseconds later. The file is 
deleted after the read.
- * Consequently, in most cases, the data will never actually hit the physical 
disks.</p>
+ * Consequently, in most cases, the data will never actually hit the physical 
disks.
  *
  * <p>IMPORTANT: The SpilledBufferOrEventSequences created by this spiller all 
reuse the same
- * reading memory (to reduce overhead) and can consequently not be read 
concurrently.</p>
+ * reading memory (to reduce overhead) and can consequently not be read 
concurrently.
  */
 @Internal
-public class BufferSpiller {
+@Deprecated
+public class BufferSpiller implements BufferBlocker {
 
        /** Size of header in bytes (see add method). */
        static final int HEADER_SIZE = 9;
@@ -127,6 +128,7 @@ public class BufferSpiller {
         * @param boe The buffer or event to add and spill.
         * @throws IOException Thrown, if the buffer of event could not be 
spilled.
         */
+       @Override
        public void add(BufferOrEvent boe) throws IOException {
                try {
                        ByteBuffer contents;
@@ -157,40 +159,35 @@ public class BufferSpiller {
        }
 
        /**
-        * Starts a new sequence of spilled buffers and event and returns the 
current sequence of spilled buffers
-        * for reading. This method returns {@code null}, if nothing was added 
since the creation of the spiller, or the
-        * last call to this method.
+        * NOTE: The BufferOrEventSequences created by this method all reuse 
the same reading memory
+        * (to reduce overhead) and can consequently not be read concurrently 
with each other.
         *
-        * <p>NOTE: The SpilledBufferOrEventSequences created by this method 
all reuse the same
-        * reading memory (to reduce overhead) and can consequently not be read 
concurrently with each other.
-        * To create a sequence that can be read concurrently with the previous 
SpilledBufferOrEventSequence, use the
-        * {@link #rollOverWithNewBuffer()} method.</p>
+        * <p>To create a sequence that can be read concurrently with the 
previous BufferOrEventSequence,
+        * use the {@link #rollOverWithoutReusingResources()} ()} method.
         *
         * @return The readable sequence of spilled buffers and events, or 
'null', if nothing was added.
         * @throws IOException Thrown, if the readable sequence could not be 
created, or no new spill
         *                     file could be created.
         */
-       public SpilledBufferOrEventSequence rollOver() throws IOException {
-               return rollOverInternal(false);
+       @Override
+       public BufferOrEventSequence rollOverReusingResources() throws 
IOException {
+               return rollOver(false);
        }
 
        /**
-        * Starts a new sequence of spilled buffers and event and returns the 
current sequence of spilled buffers
-        * for reading. This method returns {@code null}, if nothing was added 
since the creation of the spiller, or the
-        * last call to this method.
-        *
-        * <p>The SpilledBufferOrEventSequence returned by this method is safe 
for concurrent consumption with
-        * any previously returned sequence.</p>
+        * The BufferOrEventSequence returned by this method is safe for 
concurrent consumption with
+        * any previously returned sequence.
         *
         * @return The readable sequence of spilled buffers and events, or 
'null', if nothing was added.
         * @throws IOException Thrown, if the readable sequence could not be 
created, or no new spill
         *                     file could be created.
         */
-       public SpilledBufferOrEventSequence rollOverWithNewBuffer() throws 
IOException {
-               return rollOverInternal(true);
+       @Override
+       public BufferOrEventSequence rollOverWithoutReusingResources() throws 
IOException {
+               return rollOver(true);
        }
 
-       private SpilledBufferOrEventSequence rollOverInternal(boolean 
newBuffer) throws IOException {
+       private BufferOrEventSequence rollOver(boolean newBuffer) throws 
IOException {
                if (bytesWritten == 0) {
                        return null;
                }
@@ -219,10 +216,11 @@ public class BufferSpiller {
         * Cleans up the current spilling channel and file.
         *
         * <p>Does not clean up the SpilledBufferOrEventSequences generated by 
calls to
-        * {@link #rollOver()}.
+        * {@link #rollOver(boolean false)}.
         *
         * @throws IOException Thrown if channel closing or file deletion fail.
         */
+       @Override
        public void close() throws IOException {
                currentChannel.close();
                if (!currentSpillFile.delete()) {
@@ -232,9 +230,11 @@ public class BufferSpiller {
 
        /**
         * Gets the number of bytes written in the current spill file.
+        *
         * @return the number of bytes written in the current spill file
         */
-       public long getBytesWritten() {
+       @Override
+       public long getBytesBlocked() {
                return bytesWritten;
        }
 
@@ -264,10 +264,10 @@ public class BufferSpiller {
 
        /**
         * This class represents a sequence of spilled buffers and events, 
created by the
-        * {@link BufferSpiller}. The sequence of buffers and events can be 
read back using the
-        * method {@link #getNext()}.
+        * {@link BufferSpiller}.
         */
-       public static class SpilledBufferOrEventSequence {
+       @Deprecated
+       public static class SpilledBufferOrEventSequence implements 
BufferOrEventSequence {
 
                /** Header is "channel index" (4 bytes) + length (4 bytes) + 
buffer/event (1 byte). */
                private static final int HEADER_LENGTH = 9;
@@ -308,10 +308,10 @@ public class BufferSpiller {
                }
 
                /**
-                * Initializes the sequence for reading.
-                * This method needs to be called before the first call to 
{@link #getNext()}. Otherwise
-                * the results of {@link #getNext()} are not predictable.
+                * This method needs to be called before the first call to 
{@link #getNext()}.
+                * Otherwise the results of {@link #getNext()} are not 
predictable.
                 */
+               @Override
                public void open() {
                        if (!opened) {
                                opened = true;
@@ -320,13 +320,7 @@ public class BufferSpiller {
                        }
                }
 
-               /**
-                * Gets the next BufferOrEvent from the spilled sequence, or 
{@code null}, if the
-                * sequence is exhausted.
-                *
-                * @return The next BufferOrEvent from the spilled sequence, or 
{@code null} (end of sequence).
-                * @throws IOException Thrown, if the reads failed, of if the 
byte stream is corrupt.
-                */
+               @Override
                public BufferOrEvent getNext() throws IOException {
                        if (buffer.remaining() < HEADER_LENGTH) {
                                buffer.compact();
@@ -413,11 +407,7 @@ public class BufferSpiller {
                        }
                }
 
-               /**
-                * Cleans up all file resources held by this spilled sequence.
-                *
-                * @throws IOException Thrown, if file channel closing or file 
deletion fail.
-                */
+               @Override
                public void cleanup() throws IOException {
                        fileChannel.close();
                        if (!file.delete()) {
@@ -425,10 +415,8 @@ public class BufferSpiller {
                        }
                }
 
-               /**
-                * Gets the size of this spilled sequence.
-                */
-               public long size() throws IOException {
+               @Override
+               public long size() {
                        return size;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java
new file mode 100644
index 0000000..f91e8cc
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+
+/**
+ * The cached buffer blocker takes the buffers and events from a data stream 
and adds them to a memory queue.
+ * After a number of elements have been cached, the blocker can "roll over": 
It presents the cached
+ * elements as a readable sequence, and creates a new memory queue.
+ *
+ * <p>This buffer blocked can be used in credit-based flow control for better 
barrier alignment in exactly-once mode.
+ */
+@Internal
+public class CachedBufferBlocker implements BufferBlocker {
+
+       /** The page size, to estimate the total cached data size. */
+       private final int pageSize;
+
+       /** The number of bytes cached since the last roll over. */
+       private long bytesBlocked;
+
+       /** The current memory queue for caching the buffers or events. */
+       private ArrayDeque<BufferOrEvent> currentBuffers;
+
+       /**
+        * Creates a new buffer blocker, caching the buffers or events in 
memory queue.
+        *
+        * @param pageSize The page size used to estimate the cached size.
+        */
+       public CachedBufferBlocker(int pageSize) {
+               this.pageSize = pageSize;
+               this.currentBuffers = new ArrayDeque<BufferOrEvent>();
+       }
+
+       @Override
+       public void add(BufferOrEvent boe) {
+               bytesBlocked += pageSize;
+
+               currentBuffers.add(boe);
+       }
+
+       /**
+        * It is never reusing resources and is defaulting to {@link 
#rollOverWithoutReusingResources()}.
+        */
+       @Override
+       public BufferOrEventSequence rollOverReusingResources() {
+               return rollOverWithoutReusingResources();
+       }
+
+       @Override
+       public BufferOrEventSequence rollOverWithoutReusingResources() {
+               if (bytesBlocked == 0) {
+                       return null;
+               }
+
+               CachedBufferOrEventSequence currentSequence = new 
CachedBufferOrEventSequence(currentBuffers, bytesBlocked);
+               currentBuffers = new ArrayDeque<BufferOrEvent>();
+               bytesBlocked = 0L;
+
+               return currentSequence;
+       }
+
+       @Override
+       public void close() {
+               BufferOrEvent boe;
+               while ((boe = currentBuffers.poll()) != null) {
+                       if (boe.isBuffer()) {
+                               boe.getBuffer().recycleBuffer();
+                       }
+               }
+       }
+
+       @Override
+       public long getBytesBlocked() {
+               return bytesBlocked;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This class represents a sequence of cached buffers and events, 
created by the
+        * {@link CachedBufferBlocker}.
+        */
+       public static class CachedBufferOrEventSequence implements 
BufferOrEventSequence {
+
+               /** The sequence of buffers and events to be consumed by {@link 
BarrierBuffer}.*/
+               private final ArrayDeque<BufferOrEvent> queuedBuffers;
+
+               /** The total size of the cached data. */
+               private final long size;
+
+               /**
+                * Creates a reader that reads a sequence of buffers and events.
+                *
+                * @param size The total size of cached data.
+                */
+               CachedBufferOrEventSequence(ArrayDeque<BufferOrEvent> buffers, 
long size) {
+                       this.queuedBuffers = buffers;
+                       this.size = size;
+               }
+
+               @Override
+               public void open() {}
+
+               @Override
+               @Nullable
+               public BufferOrEvent getNext() {
+                       return queuedBuffers.poll();
+               }
+
+               @Override
+               public void cleanup() {
+                       BufferOrEvent boe;
+                       while ((boe = queuedBuffers.poll()) != null) {
+                               if (boe.isBuffer()) {
+                                       boe.getBuffer().recycleBuffer();
+                               }
+                       }
+               }
+
+               @Override
+               public long size() {
+                       return size;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
new file mode 100644
index 0000000..cb56eee
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.io.IOException;
+
+/**
+ * Utility for creating {@link CheckpointBarrierHandler} based on checkpoint 
mode
+ * for {@link StreamInputProcessor} and {@link StreamTwoInputProcessor}.
+ */
+@Internal
+public class InputProcessorUtil {
+
+       public static CheckpointBarrierHandler createCheckpointBarrierHandler(
+                       StreamTask<?, ?> checkpointedTask,
+                       CheckpointingMode checkpointMode,
+                       IOManager ioManager,
+                       InputGate inputGate,
+                       Configuration taskManagerConfig) throws IOException {
+
+               CheckpointBarrierHandler barrierHandler;
+               if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
+                       long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+                       if (!(maxAlign == -1 || maxAlign > 0)) {
+                               throw new IllegalConfigurationException(
+                                       
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
+                                       + " must be positive or -1 (infinite)");
+                       }
+
+                       if 
(taskManagerConfig.getBoolean(TaskManagerOptions.EXACTLY_ONCE_BLOCKING_DATA_ENABLED))
 {
+                               barrierHandler = new BarrierBuffer(inputGate, 
new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
+                       } else {
+                               barrierHandler = new BarrierBuffer(inputGate, 
new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);
+                       }
+               } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
+                       barrierHandler = new BarrierTracker(inputGate);
+               } else {
+                       throw new IllegalArgumentException("Unrecognized 
Checkpointing Mode: " + checkpointMode);
+               }
+
+               if (checkpointedTask != null) {
+                       
barrierHandler.registerCheckpointEventHandler(checkpointedTask);
+               }
+
+               return barrierHandler;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index dc3dc5c..a9c64b5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -21,8 +21,6 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -127,25 +125,8 @@ public class StreamInputProcessor<IN> {
 
                InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
-               if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-                       long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
-                       if (!(maxAlign == -1 || maxAlign > 0)) {
-                               throw new IllegalConfigurationException(
-                                               
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-                                               + " must be positive or -1 
(infinite)");
-                       }
-                       this.barrierHandler = new BarrierBuffer(inputGate, 
ioManager, maxAlign);
-               }
-               else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
-                       this.barrierHandler = new BarrierTracker(inputGate);
-               }
-               else {
-                       throw new IllegalArgumentException("Unrecognized 
Checkpointing Mode: " + checkpointMode);
-               }
-
-               if (checkpointedTask != null) {
-                       
this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
-               }
+               this.barrierHandler = 
InputProcessorUtil.createCheckpointBarrierHandler(
+                       checkpointedTask, checkpointMode, ioManager, inputGate, 
taskManagerConfig);
 
                this.lock = checkNotNull(lock);
 
@@ -157,7 +138,7 @@ public class StreamInputProcessor<IN> {
 
                for (int i = 0; i < recordDeserializers.length; i++) {
                        recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<>(
-                                       
ioManager.getSpillingDirectoriesPaths());
+                               ioManager.getSpillingDirectoriesPaths());
                }
 
                this.numInputChannels = inputGate.getNumberOfInputChannels();

http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 494a82a..ab4f90d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,8 +21,6 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -146,25 +144,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 
                final InputGate inputGate = 
InputGateUtil.createInputGate(inputGates1, inputGates2);
 
-               if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-                       long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
-                       if (!(maxAlign == -1 || maxAlign > 0)) {
-                               throw new IllegalConfigurationException(
-                                               
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-                                                               + " must be 
positive or -1 (infinite)");
-                       }
-                       this.barrierHandler = new BarrierBuffer(inputGate, 
ioManager, maxAlign);
-               }
-               else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
-                       this.barrierHandler = new BarrierTracker(inputGate);
-               }
-               else {
-                       throw new IllegalArgumentException("Unrecognized 
CheckpointingMode: " + checkpointMode);
-               }
-
-               if (checkpointedTask != null) {
-                       
this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
-               }
+               this.barrierHandler = 
InputProcessorUtil.createCheckpointBarrierHandler(
+                       checkpointedTask, checkpointMode, ioManager, inputGate, 
taskManagerConfig);
 
                this.lock = checkNotNull(lock);
 
@@ -179,7 +160,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 
                for (int i = 0; i < recordDeserializers.length; i++) {
                        recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<>(
-                                       
ioManager.getSpillingDirectoriesPaths());
+                               ioManager.getSpillingDirectoriesPaths());
                }
 
                // determine which unioned channels belong to input 1 and which 
belong to input 2

http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index c31d5ad..9f46ed7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -115,7 +115,7 @@ public class BarrierBufferAlignmentLimitTest {
 
                // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
-               BarrierBuffer buffer = new BarrierBuffer(gate, ioManager, 1000);
+               BarrierBuffer buffer = new BarrierBuffer(gate, new 
BufferSpiller(ioManager, gate.getPageSize()), 1000);
 
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
                buffer.registerCheckpointEventHandler(toNotify);
@@ -209,7 +209,7 @@ public class BarrierBufferAlignmentLimitTest {
 
                // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
-               BarrierBuffer buffer = new BarrierBuffer(gate, ioManager, 500);
+               BarrierBuffer buffer = new BarrierBuffer(gate, new 
BufferSpiller(ioManager, gate.getPageSize()), 500);
 
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
                buffer.registerCheckpointEventHandler(toNotify);

http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 39c41ef..6dd1e5e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -63,7 +63,7 @@ public class BarrierBufferMassiveRandomTest {
                                        new BufferPool[] { pool1, pool2 },
                                        new BarrierGenerator[] { new 
CountBarrier(100000), new RandomBarrier(100000) });
 
-                       BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, 
ioMan);
+                       BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, 
new BufferSpiller(ioMan, myIG.getPageSize()));
 
                        for (int i = 0; i < 2000000; i++) {
                                BufferOrEvent boe = 
barrierBuffer.getNextNonBlocked();

Reply via email to