[FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in 
alignment.

If more data than the defined amount is buffered, the alignment is aborted and 
the checkpoint canceled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0962cb6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0962cb6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0962cb6f

Branch: refs/heads/release-1.1
Commit: 0962cb6f45607fb21d50030e325e99fc2c37164a
Parents: 1a4fdff
Author: Stephan Ewen <[email protected]>
Authored: Thu Nov 3 15:28:15 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Nov 8 19:07:16 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  15 +
 .../streaming/runtime/io/BarrierBuffer.java     |  48 ++-
 .../streaming/runtime/io/BufferSpiller.java     |  39 ++-
 .../runtime/io/StreamInputProcessor.java        |  17 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  17 +-
 .../runtime/tasks/OneInputStreamTask.java       |   3 +-
 .../runtime/tasks/TwoInputStreamTask.java       |   3 +-
 .../io/BarrierBufferAlignmentLimitTest.java     | 315 +++++++++++++++++++
 8 files changed, 441 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d1ad1c4..d9ccb35 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -277,6 +277,14 @@ public final class ConfigConstants {
        @PublicEvolving
        public static final String TASK_CANCELLATION_TIMEOUT_MILLIS = 
"task.cancellation.timeout";
 
+       /**
+        * The maximum number of bytes that a checkpoint alignment may buffer.
+        * If the checkpoint alignment buffers more than the configured amount 
of
+        * data, the checkpoint is aborted (skipped).
+        */
+       @PublicEvolving
+       public static final String TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = 
"task.checkpoint.alignment.max-size";
+
        // --------------------------- Runtime Algorithms 
-------------------------------
        
        /**
@@ -873,6 +881,13 @@ public final class ConfigConstants {
         */
        public static final long DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS = 0; 
// deactivated
 
+       /**
+        * The default for the maximum number of bytes that a checkpoint 
alignment may buffer.
+        * {@code -1} = infinite.
+        */
+       @PublicEvolving
+       public static final long DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT 
= -1L;
+
        // ------------------------ Runtime Algorithms ------------------------
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 7a8e7d3..c4cf98e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs 
with barriers until
  * all inputs have received the barrier for a given checkpoint.
@@ -65,6 +68,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * further data from the input gate. */
        private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> 
queuedBuffered;
 
+       /** The maximum number of bytes that may be buffered before an 
alignment is broken. -1 means unlimited */
+       private final long maxBufferedBytes;
+
        /** The sequence of buffers/events that has been unblocked and must now 
be consumed
         * before requesting further data from the input gate */
        private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
@@ -82,6 +88,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        /** The number of already closed channels */
        private int numClosedChannels;
 
+       /** The number of bytes in the queued spilled sequences */
+       private long numQueuedBytes;
+
        /** The timestamp as in {@link System#nanoTime()} at which the last 
alignment started */
        private long startOfAlignmentTimestamp;
 
@@ -92,14 +101,37 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        private boolean endOfStream;
 
        /**
+        * Creates a new checkpoint stream aligner.
+        * 
+        * <p>There is no limit to how much data may be buffered during an 
alignment.
         * 
         * @param inputGate The input gate to draw the buffers and events from.
         * @param ioManager The I/O manager that gives access to the temp 
directories.
-        * 
+        *
         * @throws IOException Thrown, when the spilling to temp files cannot 
be initialized.
         */
        public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws 
IOException {
+               this (inputGate, ioManager, -1);
+       }
+
+       /**
+        * Creates a new checkpoint stream aligner.
+        * 
+        * <p>The aligner will allow only alignments that buffer up to the 
given number of bytes.
+        * When that number is exceeded, it will stop the alignment and notify 
the task that the
+        * checkpoint has been cancelled.
+        * 
+        * @param inputGate The input gate to draw the buffers and events from.
+        * @param ioManager The I/O manager that gives access to the temp 
directories.
+        * @param maxBufferedBytes The maximum bytes to be buffered before the 
checkpoint aborts.
+        * 
+        * @throws IOException Thrown, when the spilling to temp files cannot 
be initialized.
+        */
+       public BarrierBuffer(InputGate inputGate, IOManager ioManager, long 
maxBufferedBytes) throws IOException {
+               checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
+
                this.inputGate = inputGate;
+               this.maxBufferedBytes = maxBufferedBytes;
                this.totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
                this.blockedChannels = new 
boolean[this.totalNumberOfInputChannels];
 
@@ -131,6 +163,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                if (isBlocked(next.getChannelIndex())) {
                                        // if the channel is blocked we, we 
just store the BufferOrEvent
                                        bufferSpiller.add(next);
+                                       checkSizeLimit();
                                }
                                else if (next.isBuffer()) {
                                        return next;
@@ -169,6 +202,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                currentBuffered = queuedBuffered.pollFirst();
                if (currentBuffered != null) {
                        currentBuffered.open();
+                       numQueuedBytes -= currentBuffered.size();
                }
        }
 
@@ -333,6 +367,16 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                }
        }
 
+       private void checkSizeLimit() throws Exception {
+               if (maxBufferedBytes > 0 && (numQueuedBytes + 
bufferSpiller.getBytesWritten()) > maxBufferedBytes) {
+                       // exceeded our limit - abort this checkpoint
+                       LOG.info("Checkpoint {} aborted because alignment 
volume limit ({} bytes) exceeded",
+                                       currentCheckpointId, maxBufferedBytes);
+
+                       releaseBlocksAndResetBarriers();
+                       notifyAbort(currentCheckpointId, new 
AlignmentLimitExceededException(maxBufferedBytes));
+               }
+       }
 
        @Override
        public void registerCheckpointEventHandler(StatefulTask<?> 
toNotifyOnCheckpoint) {
@@ -359,6 +403,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        seq.cleanup();
                }
                queuedBuffered.clear();
+               numQueuedBytes = 0L;
        }
 
        private void beginNewAlignment(long checkpointId, int channelIndex) 
throws IOException {
@@ -429,6 +474,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        if (bufferedNow != null) {
                                bufferedNow.open();
                                queuedBuffered.addFirst(currentBuffered);
+                               numQueuedBytes += currentBuffered.size();
                                currentBuffered = bufferedNow;
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index dc8d245..8060d02 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -84,9 +84,9 @@ public class BufferSpiller {
        
        /** A counter, to created numbered spill files */
        private int fileCounter;
-       
-       /** A flag to check whether the spiller has written since the last roll 
over */
-       private boolean hasWritten;
+
+       /** The number of bytes written since the last roll over */
+       private long bytesWritten;
        
        /**
         * Creates a new buffer spiller, spilling to one of the I/O manager's 
temp directories.
@@ -124,7 +124,6 @@ public class BufferSpiller {
         * @throws IOException Thrown, if the buffer of event could not be 
spilled.
         */
        public void add(BufferOrEvent boe) throws IOException {
-               hasWritten = true;
                try {
                        ByteBuffer contents;
                        if (boe.isBuffer()) {
@@ -140,7 +139,9 @@ public class BufferSpiller {
                        headBuffer.putInt(contents.remaining());
                        headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
                        headBuffer.flip();
-                       
+
+                       bytesWritten += (headBuffer.remaining() + 
contents.remaining());
+
                        sources[1] = contents;
                        currentChannel.write(sources);
                }
@@ -186,7 +187,7 @@ public class BufferSpiller {
        }
        
        private SpilledBufferOrEventSequence rollOverInternal(boolean 
newBuffer) throws IOException {
-               if (!hasWritten) {
+               if (bytesWritten == 0) {
                        return null;
                }
                
@@ -205,8 +206,8 @@ public class BufferSpiller {
                
                // create ourselves a new spill file
                createSpillingChannel();
-               
-               hasWritten = false;
+
+               bytesWritten = 0L;
                return seq;
        }
 
@@ -225,6 +226,14 @@ public class BufferSpiller {
                }
        }
 
+       /**
+        * Gets the number of bytes written in the current spill file.
+        * @return the number of bytes written in the current spill file
+        */
+       public long getBytesWritten() {
+               return bytesWritten;
+       }
+
        // 
------------------------------------------------------------------------
        //  For testing
        // 
------------------------------------------------------------------------
@@ -268,6 +277,9 @@ public class BufferSpiller {
                /** The byte buffer for bulk reading */
                private final ByteBuffer buffer;
 
+               /** We store this size as a constant because it is crucial it 
never changes */
+               private final long size;
+
                /** The page size to instantiate properly sized memory segments 
*/
                private final int pageSize;
 
@@ -282,11 +294,13 @@ public class BufferSpiller {
                 * @param buffer The buffer used for bulk reading.
                 * @param pageSize The page size to use for the created memory 
segments.
                 */
-               SpilledBufferOrEventSequence(File file, FileChannel 
fileChannel, ByteBuffer buffer, int pageSize) {
+               SpilledBufferOrEventSequence(File file, FileChannel 
fileChannel, ByteBuffer buffer, int pageSize)
+                               throws IOException {
                        this.file = file;
                        this.fileChannel = fileChannel;
                        this.buffer = buffer;
                        this.pageSize = pageSize;
+                       this.size = fileChannel.size();
                }
 
                /**
@@ -408,5 +422,12 @@ public class BufferSpiller {
                                throw new IOException("Cannot remove temp file 
for stream alignment writer");
                        }
                }
+
+               /**
+                * Gets the size of this spilled sequence.
+                */
+               public long size() throws IOException {
+                       return size;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 7d9e4d2..bcca2bb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
@@ -87,12 +90,22 @@ public class StreamInputProcessor<IN> {
                                                                StatefulTask<?> 
checkpointListener,
                                                                
CheckpointingMode checkpointMode,
                                                                IOManager 
ioManager,
-                                                               boolean 
enableWatermarkMultiplexing) throws IOException {
+                                                               boolean 
enableWatermarkMultiplexing,
+                                                               Configuration 
taskManagerConfig) throws IOException {
 
                InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
                if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-                       this.barrierHandler = new BarrierBuffer(inputGate, 
ioManager);
+                       long maxAlign = taskManagerConfig.getLong(
+                                       
ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT,
+                                       
ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+
+                       if (!(maxAlign == -1 || maxAlign > 0)) {
+                               throw new IllegalConfigurationException(
+                                               
ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT
+                                               + " must be positive or -1 
(infinite)");
+                       }
+                       this.barrierHandler = new BarrierBuffer(inputGate, 
ioManager, maxAlign);
                }
                else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
                        this.barrierHandler = new BarrierTracker(inputGate);

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a3ae077..f116aff 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
@@ -97,12 +100,22 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        StatefulTask<?> checkpointListener,
                        CheckpointingMode checkpointMode,
                        IOManager ioManager,
-                       boolean enableWatermarkMultiplexing) throws IOException 
{
+                       boolean enableWatermarkMultiplexing,
+                       Configuration taskManagerConfig) throws IOException {
                
                final InputGate inputGate = 
InputGateUtil.createInputGate(inputGates1, inputGates2);
 
                if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-                       this.barrierHandler = new BarrierBuffer(inputGate, 
ioManager);
+                       long maxAlign = taskManagerConfig.getLong(
+                                       
ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT,
+                                       
ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+
+                       if (!(maxAlign == -1 || maxAlign > 0)) {
+                               throw new IllegalConfigurationException(
+                                               
ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT
+                                                               + " must be 
positive or -1 (infinite)");
+                       }
+                       this.barrierHandler = new BarrierBuffer(inputGate, 
ioManager, maxAlign);
                }
                else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
                        this.barrierHandler = new BarrierTracker(inputGate);

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index d18ca16..8470c7c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -46,7 +46,8 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                                        this,
                                        configuration.getCheckpointMode(),
                                        getEnvironment().getIOManager(),
-                                       isSerializingTimestamps());
+                                       isSerializingTimestamps(),
+                                       
getEnvironment().getTaskManagerInfo().getConfiguration());
 
                        // make sure that stream tasks report their I/O 
statistics
                        AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 9252063..8718b88 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -71,7 +71,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                                this,
                                configuration.getCheckpointMode(),
                                getEnvironment().getIOManager(),
-                               isSerializingTimestamps());
+                               isSerializingTimestamps(),
+                               
getEnvironment().getTaskManagerInfo().getConfiguration());
 
                // make sure that stream tasks report their I/O statistics
                AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
new file mode 100644
index 0000000..529f809
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the barrier buffer's maximum limit of buffered/spilled bytes 
+ */
+public class BarrierBufferAlignmentLimitTest {
+
+       private static final int PAGE_SIZE = 512;
+
+       private static final Random RND = new Random();
+
+       private static IOManager IO_MANAGER;
+
+       // 
------------------------------------------------------------------------
+       //  Setup
+       // 
------------------------------------------------------------------------
+
+       @BeforeClass
+       public static void setup() {
+               IO_MANAGER = new IOManagerAsync();
+       }
+
+       @AfterClass
+       public static void shutdownIOManager() {
+               IO_MANAGER.shutdown();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Tests
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This tests that a single alignment that buffers too much data cancels
+        */
+       @Test
+       public void testBreakCheckpointAtAlignmentLimit() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // some initial buffers
+                               /*  0 */ createBuffer(1, 100), createBuffer(2, 
70),
+                               /*  2 */ createBuffer(0, 42), createBuffer(2, 
111),
+
+                               // starting a checkpoint
+                               /*  4 */ createBarrier(7, 1), 
+                               /*  5 */ createBuffer(1, 100), createBuffer(2, 
200), createBuffer(1, 300), createBuffer(0, 50),
+                               /*  9 */ createBarrier(7, 0),
+                               /* 10 */ createBuffer(2, 100), createBuffer(0, 
100), createBuffer(1, 200), createBuffer(0, 200),
+
+                               // this buffer makes the alignment spill too 
large
+                               /* 14 */ createBuffer(0, 101),
+
+                               // additional data
+                               /* 15 */ createBuffer(0, 100), createBuffer(1, 
100), createBuffer(2, 100),
+                               
+                               // checkpoint completes - this should not 
result in a "completion notification"
+                               /* 18 */ createBarrier(7, 2),
+
+                               // trailing buffers
+                               /* 19 */ createBuffer(0, 100), createBuffer(1, 
100), createBuffer(2, 100)
+               };
+
+               // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 
1000);
+
+               StatefulTask<?> toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               // validating the sequence of buffers
+
+               check(sequence[0], buffer.getNextNonBlocked());
+               check(sequence[1], buffer.getNextNonBlocked());
+               check(sequence[2], buffer.getNextNonBlocked());
+               check(sequence[3], buffer.getNextNonBlocked());
+
+               // start of checkpoint
+               long startTs = System.nanoTime();
+               check(sequence[6], buffer.getNextNonBlocked());
+               check(sequence[8], buffer.getNextNonBlocked());
+               check(sequence[10], buffer.getNextNonBlocked());
+
+               // trying to pull the next makes the alignment overflow - so 
buffered buffers are replayed
+               check(sequence[5], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), 
any(AlignmentLimitExceededException.class));
+
+               // playing back buffered events
+               check(sequence[7], buffer.getNextNonBlocked());
+               check(sequence[11], buffer.getNextNonBlocked());
+               check(sequence[12], buffer.getNextNonBlocked());
+               check(sequence[13], buffer.getNextNonBlocked());
+               check(sequence[14], buffer.getNextNonBlocked());
+
+               // the additional data
+               check(sequence[15], buffer.getNextNonBlocked());
+               check(sequence[16], buffer.getNextNonBlocked());
+               check(sequence[17], buffer.getNextNonBlocked());
+
+               check(sequence[19], buffer.getNextNonBlocked());
+               check(sequence[20], buffer.getNextNonBlocked());
+               check(sequence[21], buffer.getNextNonBlocked());
+
+               // no call for a completed checkpoint must have happened
+               verify(toNotify, 
times(0)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+       }
+
+       /**
+        * This tests the following case:
+        *   - an alignment starts
+        *   - barriers from a second checkpoint queue before the first 
completes
+        *   - together they are larger than the threshold
+        *   - after the first checkpoint (with second checkpoint data queued) 
aborts, the second completes 
+        */
+       @Test
+       public void testAlignmentLimitWithQueuedAlignments() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // some initial buffers
+                               /*  0 */ createBuffer(1, 100), createBuffer(2, 
70),
+
+                               // starting a checkpoint
+                               /*  2 */ createBarrier(3, 2), 
+                               /*  3 */ createBuffer(1, 100), createBuffer(2, 
100), 
+                               /*  5 */ createBarrier(3, 0),
+                               /*  6 */ createBuffer(0, 100), createBuffer(1, 
100),
+
+                               // queue some data from the next checkpoint
+                               /*  8 */ createBarrier(4, 0),
+                               /*  9 */ createBuffer(0, 100), createBuffer(0, 
120), createBuffer(1, 100),
+
+                               // this one makes the alignment overflow
+                               /* 12 */ createBuffer(2, 100),
+
+                               // checkpoint completed
+                               /* 13 */ createBarrier(3, 1),
+
+                               // more for the next checkpoint
+                               /* 14 */ createBarrier(4, 1),
+                               /* 15 */ createBuffer(0, 100), createBuffer(1, 
100), createBuffer(2, 100),
+
+                               // next checkpoint completes
+                               /* 18 */ createBarrier(4, 2),
+
+                               // trailing data
+                               /* 19 */ createBuffer(0, 100), createBuffer(1, 
100), createBuffer(2, 100)
+               };
+
+               // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 500);
+
+               StatefulTask<?> toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               // validating the sequence of buffers
+               long startTs;
+
+               check(sequence[0], buffer.getNextNonBlocked());
+               check(sequence[1], buffer.getNextNonBlocked());
+
+               // start of checkpoint
+               startTs = System.nanoTime();
+               check(sequence[3], buffer.getNextNonBlocked());
+               check(sequence[7], buffer.getNextNonBlocked());
+
+               // next checkpoint also in progress
+               check(sequence[11], buffer.getNextNonBlocked());
+
+               // checkpoint alignment aborted due to too much data
+               check(sequence[4], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), 
any(AlignmentLimitExceededException.class));
+
+               // replay buffered data - in the middle, the alignment for 
checkpoint 4 starts
+               check(sequence[6], buffer.getNextNonBlocked());
+               startTs = System.nanoTime();
+               check(sequence[12], buffer.getNextNonBlocked());
+
+               // only checkpoint 4 is pending now - the last checkpoint 3 
barrier will not trigger success 
+               check(sequence[17], buffer.getNextNonBlocked());
+
+               // checkpoint 4 completed - check and validate buffered replay
+               check(sequence[9], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(4L), 
anyLong());
+
+               check(sequence[10], buffer.getNextNonBlocked());
+               check(sequence[15], buffer.getNextNonBlocked());
+               check(sequence[16], buffer.getNextNonBlocked());
+
+               // trailing data
+               check(sequence[19], buffer.getNextNonBlocked());
+               check(sequence[20], buffer.getNextNonBlocked());
+               check(sequence[21], buffer.getNextNonBlocked());
+
+               // only checkpoint 4 was successfully completed, not checkpoint 
3
+               verify(toNotify, times(0)).triggerCheckpointOnBarrier(eq(3L), 
anyLong());
+
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static BufferOrEvent createBuffer(int channel, int size) {
+               byte[] bytes = new byte[size];
+               RND.nextBytes(bytes);
+
+               MemorySegment memory = 
MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
+               memory.put(0, bytes);
+
+               Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+               buf.setSize(size);
+
+               // retain an additional time so it does not get disposed after 
being read by the input gate
+               buf.retain();
+
+               return new BufferOrEvent(buf, channel);
+       }
+
+       private static BufferOrEvent createBarrier(long id, int channel) {
+               return new BufferOrEvent(new CheckpointBarrier(id, 
System.currentTimeMillis()), channel);
+       }
+
+       private static void check(BufferOrEvent expected, BufferOrEvent 
present) {
+               assertNotNull(expected);
+               assertNotNull(present);
+               assertEquals(expected.isBuffer(), present.isBuffer());
+
+               if (expected.isBuffer()) {
+                       assertEquals(expected.getBuffer().getSize(), 
present.getBuffer().getSize());
+                       MemorySegment expectedMem = 
expected.getBuffer().getMemorySegment();
+                       MemorySegment presentMem = 
present.getBuffer().getMemorySegment();
+                       assertTrue("memory contents differs", 
expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0);
+               }
+               else {
+                       assertEquals(expected.getEvent(), present.getEvent());
+               }
+       }
+
+       private static void validateAlignmentTime(long startTimestamp, 
BarrierBuffer buffer) {
+               final long elapsed = System.nanoTime() - startTimestamp;
+               assertTrue("wrong alignment time", 
buffer.getAlignmentDurationNanos() <= elapsed);
+       }
+
+       private static void checkNoTempFilesRemain() {
+               // validate that all temp files have been removed
+               for (File dir : IO_MANAGER.getSpillingDirectories()) {
+                       for (String file : dir.list()) {
+                               if (file != null && !(file.equals(".") || 
file.equals(".."))) {
+                                       fail("barrier buffer did not clean up 
temp files. remaining file: " + file);
+                               }
+                       }
+               }
+       }
+}

Reply via email to