[FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in alignment.
If more data than the defined amount is buffered, the alignment is aborted and the checkpoint canceled. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07ab9f45 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07ab9f45 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07ab9f45 Branch: refs/heads/master Commit: 07ab9f45341f8c49354d9357d9459ee2199b4e1d Parents: 48a4813 Author: Stephan Ewen <[email protected]> Authored: Thu Nov 3 15:28:15 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Nov 8 21:15:34 2016 +0100 ---------------------------------------------------------------------- .../flink/configuration/TaskManagerOptions.java | 14 +- .../AlignmentLimitExceededException.java | 33 ++ .../streaming/runtime/io/BarrierBuffer.java | 48 ++- .../streaming/runtime/io/BufferSpiller.java | 14 +- .../runtime/io/StreamInputProcessor.java | 14 +- .../runtime/io/StreamTwoInputProcessor.java | 16 +- .../runtime/tasks/OneInputStreamTask.java | 6 +- .../runtime/tasks/TwoInputStreamTask.java | 6 +- .../io/BarrierBufferAlignmentLimitTest.java | 343 +++++++++++++++++++ 9 files changed, 481 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index 0f60a4d..e5d36aa 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -56,9 +56,19 @@ public class TaskManagerOptions { key("task.cancellation.timeout") .defaultValue(180000L); + /** + * The maximum number of bytes that a checkpoint alignment may buffer. + * If the checkpoint alignment buffers more than the configured amount of + * data, the checkpoint is aborted (skipped). + * + * <p>The default value of {@code -1} indicates that there is no limit. + */ + public static final ConfigOption<Long> TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = + key("task.checkpoint.alignment.max-size") + .defaultValue(-1L); + // ------------------------------------------------------------------------ /** Not intended to be instantiated */ - private TaskManagerOptions() { - } + private TaskManagerOptions() {} } http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java new file mode 100644 index 0000000..64d57bc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because too many bytes were + * buffered in the alignment phase. + */ +public final class AlignmentLimitExceededException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public AlignmentLimitExceededException(long numBytes) { + super("The checkpoint alignment phase needed to buffer more than the configured maximum (" + + numBytes + " bytes)."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 66aaa44..0ad9905 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; @@ -37,6 +38,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayDeque; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until * all inputs have received the barrier for a given checkpoint. @@ -66,6 +69,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * further data from the input gate. */ private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered; + /** The maximum number of bytes that may be buffered before an alignment is broken. -1 means unlimited */ + private final long maxBufferedBytes; + /** The sequence of buffers/events that has been unblocked and must now be consumed * before requesting further data from the input gate */ private BufferSpiller.SpilledBufferOrEventSequence currentBuffered; @@ -83,6 +89,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { /** The number of already closed channels */ private int numClosedChannels; + /** The number of bytes in the queued spilled sequences */ + private long numQueuedBytes; + /** The timestamp as in {@link System#nanoTime()} at which the last alignment started */ private long startOfAlignmentTimestamp; @@ -93,14 +102,37 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private boolean endOfStream; /** + * Creates a new checkpoint stream aligner. + * + * <p>There is no limit to how much data may be buffered during an alignment. * * @param inputGate The input gate to draw the buffers and events from. * @param ioManager The I/O manager that gives access to the temp directories. - * + * * @throws IOException Thrown, when the spilling to temp files cannot be initialized. */ public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException { + this (inputGate, ioManager, -1); + } + + /** + * Creates a new checkpoint stream aligner. + * + * <p>The aligner will allow only alignments that buffer up to the given number of bytes. + * When that number is exceeded, it will stop the alignment and notify the task that the + * checkpoint has been cancelled. + * + * @param inputGate The input gate to draw the buffers and events from. + * @param ioManager The I/O manager that gives access to the temp directories. + * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts. + * + * @throws IOException Thrown, when the spilling to temp files cannot be initialized. + */ + public BarrierBuffer(InputGate inputGate, IOManager ioManager, long maxBufferedBytes) throws IOException { + checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0); + this.inputGate = inputGate; + this.maxBufferedBytes = maxBufferedBytes; this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); this.blockedChannels = new boolean[this.totalNumberOfInputChannels]; @@ -132,6 +164,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (isBlocked(next.getChannelIndex())) { // if the channel is blocked we, we just store the BufferOrEvent bufferSpiller.add(next); + checkSizeLimit(); } else if (next.isBuffer()) { return next; @@ -170,6 +203,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { currentBuffered = queuedBuffered.pollFirst(); if (currentBuffered != null) { currentBuffered.open(); + numQueuedBytes -= currentBuffered.size(); } } @@ -340,6 +374,16 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } } + private void checkSizeLimit() throws Exception { + if (maxBufferedBytes > 0 && (numQueuedBytes + bufferSpiller.getBytesWritten()) > maxBufferedBytes) { + // exceeded our limit - abort this checkpoint + LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded", + currentCheckpointId, maxBufferedBytes); + + releaseBlocksAndResetBarriers(); + notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes)); + } + } @Override public void registerCheckpointEventHandler(StatefulTask toNotifyOnCheckpoint) { @@ -366,6 +410,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { seq.cleanup(); } queuedBuffered.clear(); + numQueuedBytes = 0L; } private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException { @@ -436,6 +481,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (bufferedNow != null) { bufferedNow.open(); queuedBuffered.addFirst(currentBuffered); + numQueuedBytes += currentBuffered.size(); currentBuffered = bufferedNow; } } http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java index 45a330b..5133351 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java @@ -277,6 +277,9 @@ public class BufferSpiller { /** The byte buffer for bulk reading */ private final ByteBuffer buffer; + /** We store this size as a constant because it is crucial it never changes */ + private final long size; + /** The page size to instantiate properly sized memory segments */ private final int pageSize; @@ -291,11 +294,13 @@ public class BufferSpiller { * @param buffer The buffer used for bulk reading. * @param pageSize The page size to use for the created memory segments. */ - SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) { + SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) + throws IOException { this.file = file; this.fileChannel = fileChannel; this.buffer = buffer; this.pageSize = pageSize; + this.size = fileChannel.size(); } /** @@ -417,5 +422,12 @@ public class BufferSpiller { throw new IOException("Cannot remove temp file for stream alignment writer"); } } + + /** + * Gets the size of this spilled sequence. + */ + public long size() throws IOException { + return size; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 714317d..b3257a5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -22,6 +22,9 @@ import java.io.IOException; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; @@ -87,12 +90,19 @@ public class StreamInputProcessor<IN> { TypeSerializer<IN> inputSerializer, StatefulTask checkpointedTask, CheckpointingMode checkpointMode, - IOManager ioManager) throws IOException { + IOManager ioManager, + Configuration taskManagerConfig) throws IOException { InputGate inputGate = InputGateUtil.createInputGate(inputGates); if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { - this.barrierHandler = new BarrierBuffer(inputGate, ioManager); + long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); + if (!(maxAlign == -1 || maxAlign > 0)) { + throw new IllegalConfigurationException( + TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() + + " must be positive or -1 (infinite)"); + } + this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign); } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.barrierHandler = new BarrierTracker(inputGate); http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 5f7ffe4..e5aeec1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -20,6 +20,9 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; @@ -93,12 +96,19 @@ public class StreamTwoInputProcessor<IN1, IN2> { TypeSerializer<IN2> inputSerializer2, StatefulTask checkpointedTask, CheckpointingMode checkpointMode, - IOManager ioManager) throws IOException { - + IOManager ioManager, + Configuration taskManagerConfig) throws IOException { + final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { - this.barrierHandler = new BarrierBuffer(inputGate, ioManager); + long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); + if (!(maxAlign == -1 || maxAlign > 0)) { + throw new IllegalConfigurationException( + TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() + + " must be positive or -1 (infinite)"); + } + this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign); } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.barrierHandler = new BarrierTracker(inputGate); http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 68d3064..0f41103 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -41,10 +41,12 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO if (numberOfInputs > 0) { InputGate[] inputGates = getEnvironment().getAllInputGates(); - inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer, + inputProcessor = new StreamInputProcessor<IN>( + inputGates, inSerializer, this, configuration.getCheckpointMode(), - getEnvironment().getIOManager()); + getEnvironment().getIOManager(), + getEnvironment().getTaskManagerInfo().getConfiguration()); // make sure that stream tasks report their I/O statistics inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 978c9f2..545b95b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -65,11 +65,13 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS } } - this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2, + this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>( + inputList1, inputList2, inputDeserializer1, inputDeserializer2, this, configuration.getCheckpointMode(), - getEnvironment().getIOManager()); + getEnvironment().getIOManager(), + getEnvironment().getTaskManagerInfo().getConfiguration()); // make sure that stream tasks report their I/O statistics inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); http://git-wip-us.apache.org/repos/asf/flink/blob/07ab9f45/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java new file mode 100644 index 0000000..3e618ef --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.util.Arrays; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the barrier buffer's maximum limit of buffered/spilled bytes + */ +public class BarrierBufferAlignmentLimitTest { + + private static final int PAGE_SIZE = 512; + + private static final Random RND = new Random(); + + private static IOManager IO_MANAGER; + + // ------------------------------------------------------------------------ + // Setup + // ------------------------------------------------------------------------ + + @BeforeClass + public static void setup() { + IO_MANAGER = new IOManagerAsync(); + } + + @AfterClass + public static void shutdownIOManager() { + IO_MANAGER.shutdown(); + } + + // ------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------ + + /** + * This tests that a single alignment that buffers too much data cancels + */ + @Test + public void testBreakCheckpointAtAlignmentLimit() throws Exception { + BufferOrEvent[] sequence = { + // some initial buffers + /* 0 */ createBuffer(1, 100), createBuffer(2, 70), + /* 2 */ createBuffer(0, 42), createBuffer(2, 111), + + // starting a checkpoint + /* 4 */ createBarrier(7, 1), + /* 5 */ createBuffer(1, 100), createBuffer(2, 200), createBuffer(1, 300), createBuffer(0, 50), + /* 9 */ createBarrier(7, 0), + /* 10 */ createBuffer(2, 100), createBuffer(0, 100), createBuffer(1, 200), createBuffer(0, 200), + + // this buffer makes the alignment spill too large + /* 14 */ createBuffer(0, 101), + + // additional data + /* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), + + // checkpoint completes - this should not result in a "completion notification" + /* 18 */ createBarrier(7, 2), + + // trailing buffers + /* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100) + }; + + // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 1000); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + // validating the sequence of buffers + + check(sequence[0], buffer.getNextNonBlocked()); + check(sequence[1], buffer.getNextNonBlocked()); + check(sequence[2], buffer.getNextNonBlocked()); + check(sequence[3], buffer.getNextNonBlocked()); + + // start of checkpoint + long startTs = System.nanoTime(); + check(sequence[6], buffer.getNextNonBlocked()); + check(sequence[8], buffer.getNextNonBlocked()); + check(sequence[10], buffer.getNextNonBlocked()); + + // trying to pull the next makes the alignment overflow - so buffered buffers are replayed + check(sequence[5], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), any(AlignmentLimitExceededException.class)); + + // playing back buffered events + check(sequence[7], buffer.getNextNonBlocked()); + check(sequence[11], buffer.getNextNonBlocked()); + check(sequence[12], buffer.getNextNonBlocked()); + check(sequence[13], buffer.getNextNonBlocked()); + check(sequence[14], buffer.getNextNonBlocked()); + + // the additional data + check(sequence[15], buffer.getNextNonBlocked()); + check(sequence[16], buffer.getNextNonBlocked()); + check(sequence[17], buffer.getNextNonBlocked()); + + check(sequence[19], buffer.getNextNonBlocked()); + check(sequence[20], buffer.getNextNonBlocked()); + check(sequence[21], buffer.getNextNonBlocked()); + + // no call for a completed checkpoint must have happened + verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + /** + * This tests the following case: + * - an alignment starts + * - barriers from a second checkpoint queue before the first completes + * - together they are larger than the threshold + * - after the first checkpoint (with second checkpoint data queued) aborts, the second completes + */ + @Test + public void testAlignmentLimitWithQueuedAlignments() throws Exception { + BufferOrEvent[] sequence = { + // some initial buffers + /* 0 */ createBuffer(1, 100), createBuffer(2, 70), + + // starting a checkpoint + /* 2 */ createBarrier(3, 2), + /* 3 */ createBuffer(1, 100), createBuffer(2, 100), + /* 5 */ createBarrier(3, 0), + /* 6 */ createBuffer(0, 100), createBuffer(1, 100), + + // queue some data from the next checkpoint + /* 8 */ createBarrier(4, 0), + /* 9 */ createBuffer(0, 100), createBuffer(0, 120), createBuffer(1, 100), + + // this one makes the alignment overflow + /* 12 */ createBuffer(2, 100), + + // checkpoint completed + /* 13 */ createBarrier(3, 1), + + // more for the next checkpoint + /* 14 */ createBarrier(4, 1), + /* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), + + // next checkpoint completes + /* 18 */ createBarrier(4, 2), + + // trailing data + /* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100) + }; + + // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 500); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + // validating the sequence of buffers + long startTs; + + check(sequence[0], buffer.getNextNonBlocked()); + check(sequence[1], buffer.getNextNonBlocked()); + + // start of checkpoint + startTs = System.nanoTime(); + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[7], buffer.getNextNonBlocked()); + + // next checkpoint also in progress + check(sequence[11], buffer.getNextNonBlocked()); + + // checkpoint alignment aborted due to too much data + check(sequence[4], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(AlignmentLimitExceededException.class)); + + // replay buffered data - in the middle, the alignment for checkpoint 4 starts + check(sequence[6], buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence[12], buffer.getNextNonBlocked()); + + // only checkpoint 4 is pending now - the last checkpoint 3 barrier will not trigger success + check(sequence[17], buffer.getNextNonBlocked()); + + // checkpoint 4 completed - check and validate buffered replay + check(sequence[9], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L))); + + check(sequence[10], buffer.getNextNonBlocked()); + check(sequence[15], buffer.getNextNonBlocked()); + check(sequence[16], buffer.getNextNonBlocked()); + + // trailing data + check(sequence[19], buffer.getNextNonBlocked()); + check(sequence[20], buffer.getNextNonBlocked()); + check(sequence[21], buffer.getNextNonBlocked()); + + // only checkpoint 4 was successfully completed, not checkpoint 3 + verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L))); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static BufferOrEvent createBuffer(int channel, int size) { + byte[] bytes = new byte[size]; + RND.nextBytes(bytes); + + MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE); + memory.put(0, bytes); + + Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE); + buf.setSize(size); + + // retain an additional time so it does not get disposed after being read by the input gate + buf.retain(); + + return new BufferOrEvent(buf, channel); + } + + private static BufferOrEvent createBarrier(long id, int channel) { + return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel); + } + + private static void check(BufferOrEvent expected, BufferOrEvent present) { + assertNotNull(expected); + assertNotNull(present); + assertEquals(expected.isBuffer(), present.isBuffer()); + + if (expected.isBuffer()) { + assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize()); + MemorySegment expectedMem = expected.getBuffer().getMemorySegment(); + MemorySegment presentMem = present.getBuffer().getMemorySegment(); + assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0); + } + else { + assertEquals(expected.getEvent(), present.getEvent()); + } + } + + private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) { + final long elapsed = System.nanoTime() - startTimestamp; + assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed); + } + + private static void checkNoTempFilesRemain() { + // validate that all temp files have been removed + for (File dir : IO_MANAGER.getSpillingDirectories()) { + for (String file : dir.list()) { + if (file != null && !(file.equals(".") || file.equals(".."))) { + fail("barrier buffer did not clean up temp files. remaining file: " + file); + } + } + } + } + + /** + * A validation matcher for checkpoint metadata against checkpoint IDs + */ + private static class CheckpointMatcher extends BaseMatcher<CheckpointMetaData> { + + private final long checkpointId; + + CheckpointMatcher(long checkpointId) { + this.checkpointId = checkpointId; + } + + @Override + public boolean matches(Object o) { + return o != null && + o.getClass() == CheckpointMetaData.class && + ((CheckpointMetaData) o).getCheckpointId() == checkpointId; + } + + @Override + public void describeTo(Description description) { + description.appendText("CheckpointMetaData - id = " + checkpointId); + } + } +}
