[FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in 
alignment.

If more data than the defined amount is buffered, the alignment is aborted and 
the checkpoint canceled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07ab9f45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07ab9f45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07ab9f45

Branch: refs/heads/master
Commit: 07ab9f45341f8c49354d9357d9459ee2199b4e1d
Parents: 48a4813
Author: Stephan Ewen <[email protected]>
Authored: Thu Nov 3 15:28:15 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Nov 8 21:15:34 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/TaskManagerOptions.java |  14 +-
 .../AlignmentLimitExceededException.java        |  33 ++
 .../streaming/runtime/io/BarrierBuffer.java     |  48 ++-
 .../streaming/runtime/io/BufferSpiller.java     |  14 +-
 .../runtime/io/StreamInputProcessor.java        |  14 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  16 +-
 .../runtime/tasks/OneInputStreamTask.java       |   6 +-
 .../runtime/tasks/TwoInputStreamTask.java       |   6 +-
 .../io/BarrierBufferAlignmentLimitTest.java     | 343 +++++++++++++++++++
 9 files changed, 481 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 0f60a4d..e5d36aa 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -56,9 +56,19 @@ public class TaskManagerOptions {
                        key("task.cancellation.timeout")
                                        .defaultValue(180000L);
 
+       /**
+        * The maximum number of bytes that a checkpoint alignment may buffer.
+        * If the checkpoint alignment buffers more than the configured amount 
of
+        * data, the checkpoint is aborted (skipped).
+        * 
+        * <p>The default value of {@code -1} indicates that there is no limit.
+        */
+       public static final ConfigOption<Long> 
TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT =
+                       key("task.checkpoint.alignment.max-size")
+                       .defaultValue(-1L);
+       
        // 
------------------------------------------------------------------------
 
        /** Not intended to be instantiated */
-       private TaskManagerOptions() {
-       }
+       private TaskManagerOptions() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
new file mode 100644
index 0000000..64d57bc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because too many bytes 
were
+ * buffered in the alignment phase.
+ */
+public final class AlignmentLimitExceededException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public AlignmentLimitExceededException(long numBytes) {
+               super("The checkpoint alignment phase needed to buffer more 
than the configured maximum ("
+                               + numBytes + " bytes).");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 66aaa44..0ad9905 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
@@ -37,6 +38,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs 
with barriers until
  * all inputs have received the barrier for a given checkpoint.
@@ -66,6 +69,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * further data from the input gate. */
        private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> 
queuedBuffered;
 
+       /** The maximum number of bytes that may be buffered before an 
alignment is broken. -1 means unlimited */
+       private final long maxBufferedBytes;
+
        /** The sequence of buffers/events that has been unblocked and must now 
be consumed
         * before requesting further data from the input gate */
        private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
@@ -83,6 +89,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        /** The number of already closed channels */
        private int numClosedChannels;
 
+       /** The number of bytes in the queued spilled sequences */
+       private long numQueuedBytes;
+
        /** The timestamp as in {@link System#nanoTime()} at which the last 
alignment started */
        private long startOfAlignmentTimestamp;
 
@@ -93,14 +102,37 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        private boolean endOfStream;
 
        /**
+        * Creates a new checkpoint stream aligner.
+        * 
+        * <p>There is no limit to how much data may be buffered during an 
alignment.
         * 
         * @param inputGate The input gate to draw the buffers and events from.
         * @param ioManager The I/O manager that gives access to the temp 
directories.
-        * 
+        *
         * @throws IOException Thrown, when the spilling to temp files cannot 
be initialized.
         */
        public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws 
IOException {
+               this (inputGate, ioManager, -1);
+       }
+
+       /**
+        * Creates a new checkpoint stream aligner.
+        * 
+        * <p>The aligner will allow only alignments that buffer up to the 
given number of bytes.
+        * When that number is exceeded, it will stop the alignment and notify 
the task that the
+        * checkpoint has been cancelled.
+        * 
+        * @param inputGate The input gate to draw the buffers and events from.
+        * @param ioManager The I/O manager that gives access to the temp 
directories.
+        * @param maxBufferedBytes The maximum bytes to be buffered before the 
checkpoint aborts.
+        * 
+        * @throws IOException Thrown, when the spilling to temp files cannot 
be initialized.
+        */
+       public BarrierBuffer(InputGate inputGate, IOManager ioManager, long 
maxBufferedBytes) throws IOException {
+               checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
+
                this.inputGate = inputGate;
+               this.maxBufferedBytes = maxBufferedBytes;
                this.totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
                this.blockedChannels = new 
boolean[this.totalNumberOfInputChannels];
 
@@ -132,6 +164,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                if (isBlocked(next.getChannelIndex())) {
                                        // if the channel is blocked we, we 
just store the BufferOrEvent
                                        bufferSpiller.add(next);
+                                       checkSizeLimit();
                                }
                                else if (next.isBuffer()) {
                                        return next;
@@ -170,6 +203,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                currentBuffered = queuedBuffered.pollFirst();
                if (currentBuffered != null) {
                        currentBuffered.open();
+                       numQueuedBytes -= currentBuffered.size();
                }
        }
 
@@ -340,6 +374,16 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                }
        }
 
+       private void checkSizeLimit() throws Exception {
+               if (maxBufferedBytes > 0 && (numQueuedBytes + 
bufferSpiller.getBytesWritten()) > maxBufferedBytes) {
+                       // exceeded our limit - abort this checkpoint
+                       LOG.info("Checkpoint {} aborted because alignment 
volume limit ({} bytes) exceeded",
+                                       currentCheckpointId, maxBufferedBytes);
+
+                       releaseBlocksAndResetBarriers();
+                       notifyAbort(currentCheckpointId, new 
AlignmentLimitExceededException(maxBufferedBytes));
+               }
+       }
 
        @Override
        public void registerCheckpointEventHandler(StatefulTask 
toNotifyOnCheckpoint) {
@@ -366,6 +410,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        seq.cleanup();
                }
                queuedBuffered.clear();
+               numQueuedBytes = 0L;
        }
 
        private void beginNewAlignment(long checkpointId, int channelIndex) 
throws IOException {
@@ -436,6 +481,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        if (bufferedNow != null) {
                                bufferedNow.open();
                                queuedBuffered.addFirst(currentBuffered);
+                               numQueuedBytes += currentBuffered.size();
                                currentBuffered = bufferedNow;
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 45a330b..5133351 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -277,6 +277,9 @@ public class BufferSpiller {
                /** The byte buffer for bulk reading */
                private final ByteBuffer buffer;
 
+               /** We store this size as a constant because it is crucial it 
never changes */
+               private final long size;
+
                /** The page size to instantiate properly sized memory segments 
*/
                private final int pageSize;
 
@@ -291,11 +294,13 @@ public class BufferSpiller {
                 * @param buffer The buffer used for bulk reading.
                 * @param pageSize The page size to use for the created memory 
segments.
                 */
-               SpilledBufferOrEventSequence(File file, FileChannel 
fileChannel, ByteBuffer buffer, int pageSize) {
+               SpilledBufferOrEventSequence(File file, FileChannel 
fileChannel, ByteBuffer buffer, int pageSize)
+                               throws IOException {
                        this.file = file;
                        this.fileChannel = fileChannel;
                        this.buffer = buffer;
                        this.pageSize = pageSize;
+                       this.size = fileChannel.size();
                }
 
                /**
@@ -417,5 +422,12 @@ public class BufferSpiller {
                                throw new IOException("Cannot remove temp file 
for stream alignment writer");
                        }
                }
+
+               /**
+                * Gets the size of this spilled sequence.
+                */
+               public long size() throws IOException {
+                       return size;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 714317d..b3257a5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
@@ -87,12 +90,19 @@ public class StreamInputProcessor<IN> {
                        TypeSerializer<IN> inputSerializer,
                        StatefulTask checkpointedTask,
                        CheckpointingMode checkpointMode,
-                       IOManager ioManager) throws IOException {
+                       IOManager ioManager,
+                       Configuration taskManagerConfig) throws IOException {
 
                InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
                if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-                       this.barrierHandler = new BarrierBuffer(inputGate, 
ioManager);
+                       long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+                       if (!(maxAlign == -1 || maxAlign > 0)) {
+                               throw new IllegalConfigurationException(
+                                               
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
+                                               + " must be positive or -1 
(infinite)");
+                       }
+                       this.barrierHandler = new BarrierBuffer(inputGate, 
ioManager, maxAlign);
                }
                else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
                        this.barrierHandler = new BarrierTracker(inputGate);

http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 5f7ffe4..e5aeec1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
@@ -93,12 +96,19 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        TypeSerializer<IN2> inputSerializer2,
                        StatefulTask checkpointedTask,
                        CheckpointingMode checkpointMode,
-                       IOManager ioManager) throws IOException {
-               
+                       IOManager ioManager,
+                       Configuration taskManagerConfig) throws IOException {
+
                final InputGate inputGate = 
InputGateUtil.createInputGate(inputGates1, inputGates2);
 
                if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-                       this.barrierHandler = new BarrierBuffer(inputGate, 
ioManager);
+                       long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+                       if (!(maxAlign == -1 || maxAlign > 0)) {
+                               throw new IllegalConfigurationException(
+                                               
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
+                                                               + " must be 
positive or -1 (infinite)");
+                       }
+                       this.barrierHandler = new BarrierBuffer(inputGate, 
ioManager, maxAlign);
                }
                else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
                        this.barrierHandler = new BarrierTracker(inputGate);

http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 68d3064..0f41103 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -41,10 +41,12 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
 
                if (numberOfInputs > 0) {
                        InputGate[] inputGates = 
getEnvironment().getAllInputGates();
-                       inputProcessor = new 
StreamInputProcessor<IN>(inputGates, inSerializer,
+                       inputProcessor = new StreamInputProcessor<IN>(
+                                       inputGates, inSerializer,
                                        this, 
                                        configuration.getCheckpointMode(),
-                                       getEnvironment().getIOManager());
+                                       getEnvironment().getIOManager(),
+                                       
getEnvironment().getTaskManagerInfo().getConfiguration());
 
                        // make sure that stream tasks report their I/O 
statistics
                        
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());

http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 978c9f2..545b95b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -65,11 +65,13 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                        }
                }
        
-               this.inputProcessor = new StreamTwoInputProcessor<IN1, 
IN2>(inputList1, inputList2,
+               this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(
+                               inputList1, inputList2,
                                inputDeserializer1, inputDeserializer2,
                                this,
                                configuration.getCheckpointMode(),
-                               getEnvironment().getIOManager());
+                               getEnvironment().getIOManager(),
+                               
getEnvironment().getTaskManagerInfo().getConfiguration());
 
                // make sure that stream tasks report their I/O statistics
                
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());

http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
new file mode 100644
index 0000000..3e618ef
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the barrier buffer's maximum limit of buffered/spilled bytes 
+ */
+public class BarrierBufferAlignmentLimitTest {
+
+       private static final int PAGE_SIZE = 512;
+
+       private static final Random RND = new Random();
+
+       private static IOManager IO_MANAGER;
+
+       // 
------------------------------------------------------------------------
+       //  Setup
+       // 
------------------------------------------------------------------------
+
+       @BeforeClass
+       public static void setup() {
+               IO_MANAGER = new IOManagerAsync();
+       }
+
+       @AfterClass
+       public static void shutdownIOManager() {
+               IO_MANAGER.shutdown();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Tests
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This tests that a single alignment that buffers too much data cancels
+        */
+       @Test
+       public void testBreakCheckpointAtAlignmentLimit() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // some initial buffers
+                               /*  0 */ createBuffer(1, 100), createBuffer(2, 
70),
+                               /*  2 */ createBuffer(0, 42), createBuffer(2, 
111),
+
+                               // starting a checkpoint
+                               /*  4 */ createBarrier(7, 1), 
+                               /*  5 */ createBuffer(1, 100), createBuffer(2, 
200), createBuffer(1, 300), createBuffer(0, 50),
+                               /*  9 */ createBarrier(7, 0),
+                               /* 10 */ createBuffer(2, 100), createBuffer(0, 
100), createBuffer(1, 200), createBuffer(0, 200),
+
+                               // this buffer makes the alignment spill too 
large
+                               /* 14 */ createBuffer(0, 101),
+
+                               // additional data
+                               /* 15 */ createBuffer(0, 100), createBuffer(1, 
100), createBuffer(2, 100),
+                               
+                               // checkpoint completes - this should not 
result in a "completion notification"
+                               /* 18 */ createBarrier(7, 2),
+
+                               // trailing buffers
+                               /* 19 */ createBuffer(0, 100), createBuffer(1, 
100), createBuffer(2, 100)
+               };
+
+               // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 
1000);
+
+               StatefulTask toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               // validating the sequence of buffers
+
+               check(sequence[0], buffer.getNextNonBlocked());
+               check(sequence[1], buffer.getNextNonBlocked());
+               check(sequence[2], buffer.getNextNonBlocked());
+               check(sequence[3], buffer.getNextNonBlocked());
+
+               // start of checkpoint
+               long startTs = System.nanoTime();
+               check(sequence[6], buffer.getNextNonBlocked());
+               check(sequence[8], buffer.getNextNonBlocked());
+               check(sequence[10], buffer.getNextNonBlocked());
+
+               // trying to pull the next makes the alignment overflow - so 
buffered buffers are replayed
+               check(sequence[5], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), 
any(AlignmentLimitExceededException.class));
+
+               // playing back buffered events
+               check(sequence[7], buffer.getNextNonBlocked());
+               check(sequence[11], buffer.getNextNonBlocked());
+               check(sequence[12], buffer.getNextNonBlocked());
+               check(sequence[13], buffer.getNextNonBlocked());
+               check(sequence[14], buffer.getNextNonBlocked());
+
+               // the additional data
+               check(sequence[15], buffer.getNextNonBlocked());
+               check(sequence[16], buffer.getNextNonBlocked());
+               check(sequence[17], buffer.getNextNonBlocked());
+
+               check(sequence[19], buffer.getNextNonBlocked());
+               check(sequence[20], buffer.getNextNonBlocked());
+               check(sequence[21], buffer.getNextNonBlocked());
+
+               // no call for a completed checkpoint must have happened
+               verify(toNotify, 
times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+       }
+
+       /**
+        * This tests the following case:
+        *   - an alignment starts
+        *   - barriers from a second checkpoint queue before the first 
completes
+        *   - together they are larger than the threshold
+        *   - after the first checkpoint (with second checkpoint data queued) 
aborts, the second completes 
+        */
+       @Test
+       public void testAlignmentLimitWithQueuedAlignments() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // some initial buffers
+                               /*  0 */ createBuffer(1, 100), createBuffer(2, 
70),
+
+                               // starting a checkpoint
+                               /*  2 */ createBarrier(3, 2), 
+                               /*  3 */ createBuffer(1, 100), createBuffer(2, 
100), 
+                               /*  5 */ createBarrier(3, 0),
+                               /*  6 */ createBuffer(0, 100), createBuffer(1, 
100),
+
+                               // queue some data from the next checkpoint
+                               /*  8 */ createBarrier(4, 0),
+                               /*  9 */ createBuffer(0, 100), createBuffer(0, 
120), createBuffer(1, 100),
+
+                               // this one makes the alignment overflow
+                               /* 12 */ createBuffer(2, 100),
+
+                               // checkpoint completed
+                               /* 13 */ createBarrier(3, 1),
+
+                               // more for the next checkpoint
+                               /* 14 */ createBarrier(4, 1),
+                               /* 15 */ createBuffer(0, 100), createBuffer(1, 
100), createBuffer(2, 100),
+
+                               // next checkpoint completes
+                               /* 18 */ createBarrier(4, 2),
+
+                               // trailing data
+                               /* 19 */ createBuffer(0, 100), createBuffer(1, 
100), createBuffer(2, 100)
+               };
+
+               // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 500);
+
+               StatefulTask toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               // validating the sequence of buffers
+               long startTs;
+
+               check(sequence[0], buffer.getNextNonBlocked());
+               check(sequence[1], buffer.getNextNonBlocked());
+
+               // start of checkpoint
+               startTs = System.nanoTime();
+               check(sequence[3], buffer.getNextNonBlocked());
+               check(sequence[7], buffer.getNextNonBlocked());
+
+               // next checkpoint also in progress
+               check(sequence[11], buffer.getNextNonBlocked());
+
+               // checkpoint alignment aborted due to too much data
+               check(sequence[4], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), 
any(AlignmentLimitExceededException.class));
+
+               // replay buffered data - in the middle, the alignment for 
checkpoint 4 starts
+               check(sequence[6], buffer.getNextNonBlocked());
+               startTs = System.nanoTime();
+               check(sequence[12], buffer.getNextNonBlocked());
+
+               // only checkpoint 4 is pending now - the last checkpoint 3 
barrier will not trigger success 
+               check(sequence[17], buffer.getNextNonBlocked());
+
+               // checkpoint 4 completed - check and validate buffered replay
+               check(sequence[9], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)));
+
+               check(sequence[10], buffer.getNextNonBlocked());
+               check(sequence[15], buffer.getNextNonBlocked());
+               check(sequence[16], buffer.getNextNonBlocked());
+
+               // trailing data
+               check(sequence[19], buffer.getNextNonBlocked());
+               check(sequence[20], buffer.getNextNonBlocked());
+               check(sequence[21], buffer.getNextNonBlocked());
+
+               // only checkpoint 4 was successfully completed, not checkpoint 
3
+               verify(toNotify, 
times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
+
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static BufferOrEvent createBuffer(int channel, int size) {
+               byte[] bytes = new byte[size];
+               RND.nextBytes(bytes);
+
+               MemorySegment memory = 
MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
+               memory.put(0, bytes);
+
+               Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+               buf.setSize(size);
+
+               // retain an additional time so it does not get disposed after 
being read by the input gate
+               buf.retain();
+
+               return new BufferOrEvent(buf, channel);
+       }
+
+       private static BufferOrEvent createBarrier(long id, int channel) {
+               return new BufferOrEvent(new CheckpointBarrier(id, 
System.currentTimeMillis()), channel);
+       }
+
+       private static void check(BufferOrEvent expected, BufferOrEvent 
present) {
+               assertNotNull(expected);
+               assertNotNull(present);
+               assertEquals(expected.isBuffer(), present.isBuffer());
+
+               if (expected.isBuffer()) {
+                       assertEquals(expected.getBuffer().getSize(), 
present.getBuffer().getSize());
+                       MemorySegment expectedMem = 
expected.getBuffer().getMemorySegment();
+                       MemorySegment presentMem = 
present.getBuffer().getMemorySegment();
+                       assertTrue("memory contents differs", 
expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0);
+               }
+               else {
+                       assertEquals(expected.getEvent(), present.getEvent());
+               }
+       }
+
+       private static void validateAlignmentTime(long startTimestamp, 
BarrierBuffer buffer) {
+               final long elapsed = System.nanoTime() - startTimestamp;
+               assertTrue("wrong alignment time", 
buffer.getAlignmentDurationNanos() <= elapsed);
+       }
+
+       private static void checkNoTempFilesRemain() {
+               // validate that all temp files have been removed
+               for (File dir : IO_MANAGER.getSpillingDirectories()) {
+                       for (String file : dir.list()) {
+                               if (file != null && !(file.equals(".") || 
file.equals(".."))) {
+                                       fail("barrier buffer did not clean up 
temp files. remaining file: " + file);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * A validation matcher for checkpoint metadata against checkpoint IDs
+        */
+       private static class CheckpointMatcher extends 
BaseMatcher<CheckpointMetaData> {
+
+               private final long checkpointId;
+
+               CheckpointMatcher(long checkpointId) {
+                       this.checkpointId = checkpointId;
+               }
+
+               @Override
+               public boolean matches(Object o) {
+                       return o != null &&
+                                       o.getClass() == 
CheckpointMetaData.class &&
+                                       ((CheckpointMetaData) 
o).getCheckpointId() == checkpointId;
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description.appendText("CheckpointMetaData - id = " + 
checkpointId);
+               }
+       }
+}

Reply via email to