[FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in alignment.
If more data than the defined amount is buffered, the alignment is aborted and the checkpoint canceled. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0962cb6f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0962cb6f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0962cb6f Branch: refs/heads/release-1.1 Commit: 0962cb6f45607fb21d50030e325e99fc2c37164a Parents: 1a4fdff Author: Stephan Ewen <[email protected]> Authored: Thu Nov 3 15:28:15 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Nov 8 19:07:16 2016 +0100 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 15 + .../streaming/runtime/io/BarrierBuffer.java | 48 ++- .../streaming/runtime/io/BufferSpiller.java | 39 ++- .../runtime/io/StreamInputProcessor.java | 17 +- .../runtime/io/StreamTwoInputProcessor.java | 17 +- .../runtime/tasks/OneInputStreamTask.java | 3 +- .../runtime/tasks/TwoInputStreamTask.java | 3 +- .../io/BarrierBufferAlignmentLimitTest.java | 315 +++++++++++++++++++ 8 files changed, 441 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index d1ad1c4..d9ccb35 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -277,6 +277,14 @@ public final class ConfigConstants { @PublicEvolving public static final String TASK_CANCELLATION_TIMEOUT_MILLIS = "task.cancellation.timeout"; + /** + * The maximum number of bytes that a checkpoint alignment may buffer. + * If the checkpoint alignment buffers more than the configured amount of + * data, the checkpoint is aborted (skipped). + */ + @PublicEvolving + public static final String TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = "task.checkpoint.alignment.max-size"; + // --------------------------- Runtime Algorithms ------------------------------- /** @@ -873,6 +881,13 @@ public final class ConfigConstants { */ public static final long DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS = 0; // deactivated + /** + * The default for the maximum number of bytes that a checkpoint alignment may buffer. + * {@code -1} = infinite. + */ + @PublicEvolving + public static final long DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = -1L; + // ------------------------ Runtime Algorithms ------------------------ /** http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 7a8e7d3..c4cf98e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; @@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayDeque; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until * all inputs have received the barrier for a given checkpoint. @@ -65,6 +68,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * further data from the input gate. */ private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered; + /** The maximum number of bytes that may be buffered before an alignment is broken. -1 means unlimited */ + private final long maxBufferedBytes; + /** The sequence of buffers/events that has been unblocked and must now be consumed * before requesting further data from the input gate */ private BufferSpiller.SpilledBufferOrEventSequence currentBuffered; @@ -82,6 +88,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { /** The number of already closed channels */ private int numClosedChannels; + /** The number of bytes in the queued spilled sequences */ + private long numQueuedBytes; + /** The timestamp as in {@link System#nanoTime()} at which the last alignment started */ private long startOfAlignmentTimestamp; @@ -92,14 +101,37 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private boolean endOfStream; /** + * Creates a new checkpoint stream aligner. + * + * <p>There is no limit to how much data may be buffered during an alignment. * * @param inputGate The input gate to draw the buffers and events from. * @param ioManager The I/O manager that gives access to the temp directories. - * + * * @throws IOException Thrown, when the spilling to temp files cannot be initialized. */ public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException { + this (inputGate, ioManager, -1); + } + + /** + * Creates a new checkpoint stream aligner. + * + * <p>The aligner will allow only alignments that buffer up to the given number of bytes. + * When that number is exceeded, it will stop the alignment and notify the task that the + * checkpoint has been cancelled. + * + * @param inputGate The input gate to draw the buffers and events from. + * @param ioManager The I/O manager that gives access to the temp directories. + * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts. + * + * @throws IOException Thrown, when the spilling to temp files cannot be initialized. + */ + public BarrierBuffer(InputGate inputGate, IOManager ioManager, long maxBufferedBytes) throws IOException { + checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0); + this.inputGate = inputGate; + this.maxBufferedBytes = maxBufferedBytes; this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); this.blockedChannels = new boolean[this.totalNumberOfInputChannels]; @@ -131,6 +163,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (isBlocked(next.getChannelIndex())) { // if the channel is blocked we, we just store the BufferOrEvent bufferSpiller.add(next); + checkSizeLimit(); } else if (next.isBuffer()) { return next; @@ -169,6 +202,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { currentBuffered = queuedBuffered.pollFirst(); if (currentBuffered != null) { currentBuffered.open(); + numQueuedBytes -= currentBuffered.size(); } } @@ -333,6 +367,16 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } } + private void checkSizeLimit() throws Exception { + if (maxBufferedBytes > 0 && (numQueuedBytes + bufferSpiller.getBytesWritten()) > maxBufferedBytes) { + // exceeded our limit - abort this checkpoint + LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded", + currentCheckpointId, maxBufferedBytes); + + releaseBlocksAndResetBarriers(); + notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes)); + } + } @Override public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) { @@ -359,6 +403,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { seq.cleanup(); } queuedBuffered.clear(); + numQueuedBytes = 0L; } private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException { @@ -429,6 +474,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (bufferedNow != null) { bufferedNow.open(); queuedBuffered.addFirst(currentBuffered); + numQueuedBytes += currentBuffered.size(); currentBuffered = bufferedNow; } } http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java index dc8d245..8060d02 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java @@ -84,9 +84,9 @@ public class BufferSpiller { /** A counter, to created numbered spill files */ private int fileCounter; - - /** A flag to check whether the spiller has written since the last roll over */ - private boolean hasWritten; + + /** The number of bytes written since the last roll over */ + private long bytesWritten; /** * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories. @@ -124,7 +124,6 @@ public class BufferSpiller { * @throws IOException Thrown, if the buffer of event could not be spilled. */ public void add(BufferOrEvent boe) throws IOException { - hasWritten = true; try { ByteBuffer contents; if (boe.isBuffer()) { @@ -140,7 +139,9 @@ public class BufferSpiller { headBuffer.putInt(contents.remaining()); headBuffer.put((byte) (boe.isBuffer() ? 0 : 1)); headBuffer.flip(); - + + bytesWritten += (headBuffer.remaining() + contents.remaining()); + sources[1] = contents; currentChannel.write(sources); } @@ -186,7 +187,7 @@ public class BufferSpiller { } private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException { - if (!hasWritten) { + if (bytesWritten == 0) { return null; } @@ -205,8 +206,8 @@ public class BufferSpiller { // create ourselves a new spill file createSpillingChannel(); - - hasWritten = false; + + bytesWritten = 0L; return seq; } @@ -225,6 +226,14 @@ public class BufferSpiller { } } + /** + * Gets the number of bytes written in the current spill file. + * @return the number of bytes written in the current spill file + */ + public long getBytesWritten() { + return bytesWritten; + } + // ------------------------------------------------------------------------ // For testing // ------------------------------------------------------------------------ @@ -268,6 +277,9 @@ public class BufferSpiller { /** The byte buffer for bulk reading */ private final ByteBuffer buffer; + /** We store this size as a constant because it is crucial it never changes */ + private final long size; + /** The page size to instantiate properly sized memory segments */ private final int pageSize; @@ -282,11 +294,13 @@ public class BufferSpiller { * @param buffer The buffer used for bulk reading. * @param pageSize The page size to use for the created memory segments. */ - SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) { + SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) + throws IOException { this.file = file; this.fileChannel = fileChannel; this.buffer = buffer; this.pageSize = pageSize; + this.size = fileChannel.size(); } /** @@ -408,5 +422,12 @@ public class BufferSpiller { throw new IOException("Cannot remove temp file for stream alignment writer"); } } + + /** + * Gets the size of this spilled sequence. + */ + public long size() throws IOException { + return size; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 7d9e4d2..bcca2bb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -22,6 +22,9 @@ import java.io.IOException; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; @@ -87,12 +90,22 @@ public class StreamInputProcessor<IN> { StatefulTask<?> checkpointListener, CheckpointingMode checkpointMode, IOManager ioManager, - boolean enableWatermarkMultiplexing) throws IOException { + boolean enableWatermarkMultiplexing, + Configuration taskManagerConfig) throws IOException { InputGate inputGate = InputGateUtil.createInputGate(inputGates); if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { - this.barrierHandler = new BarrierBuffer(inputGate, ioManager); + long maxAlign = taskManagerConfig.getLong( + ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT, + ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); + + if (!(maxAlign == -1 || maxAlign > 0)) { + throw new IllegalConfigurationException( + ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT + + " must be positive or -1 (infinite)"); + } + this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign); } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.barrierHandler = new BarrierTracker(inputGate); http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index a3ae077..f116aff 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -20,6 +20,9 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.metrics.groups.IOMetricGroup; @@ -97,12 +100,22 @@ public class StreamTwoInputProcessor<IN1, IN2> { StatefulTask<?> checkpointListener, CheckpointingMode checkpointMode, IOManager ioManager, - boolean enableWatermarkMultiplexing) throws IOException { + boolean enableWatermarkMultiplexing, + Configuration taskManagerConfig) throws IOException { final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { - this.barrierHandler = new BarrierBuffer(inputGate, ioManager); + long maxAlign = taskManagerConfig.getLong( + ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT, + ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); + + if (!(maxAlign == -1 || maxAlign > 0)) { + throw new IllegalConfigurationException( + ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT + + " must be positive or -1 (infinite)"); + } + this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign); } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.barrierHandler = new BarrierTracker(inputGate); http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index d18ca16..8470c7c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -46,7 +46,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO this, configuration.getCheckpointMode(), getEnvironment().getIOManager(), - isSerializingTimestamps()); + isSerializingTimestamps(), + getEnvironment().getTaskManagerInfo().getConfiguration()); // make sure that stream tasks report their I/O statistics AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 9252063..8718b88 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -71,7 +71,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS this, configuration.getCheckpointMode(), getEnvironment().getIOManager(), - isSerializingTimestamps()); + isSerializingTimestamps(), + getEnvironment().getTaskManagerInfo().getConfiguration()); // make sure that stream tasks report their I/O statistics AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java new file mode 100644 index 0000000..529f809 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.util.Arrays; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the barrier buffer's maximum limit of buffered/spilled bytes + */ +public class BarrierBufferAlignmentLimitTest { + + private static final int PAGE_SIZE = 512; + + private static final Random RND = new Random(); + + private static IOManager IO_MANAGER; + + // ------------------------------------------------------------------------ + // Setup + // ------------------------------------------------------------------------ + + @BeforeClass + public static void setup() { + IO_MANAGER = new IOManagerAsync(); + } + + @AfterClass + public static void shutdownIOManager() { + IO_MANAGER.shutdown(); + } + + // ------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------ + + /** + * This tests that a single alignment that buffers too much data cancels + */ + @Test + public void testBreakCheckpointAtAlignmentLimit() throws Exception { + BufferOrEvent[] sequence = { + // some initial buffers + /* 0 */ createBuffer(1, 100), createBuffer(2, 70), + /* 2 */ createBuffer(0, 42), createBuffer(2, 111), + + // starting a checkpoint + /* 4 */ createBarrier(7, 1), + /* 5 */ createBuffer(1, 100), createBuffer(2, 200), createBuffer(1, 300), createBuffer(0, 50), + /* 9 */ createBarrier(7, 0), + /* 10 */ createBuffer(2, 100), createBuffer(0, 100), createBuffer(1, 200), createBuffer(0, 200), + + // this buffer makes the alignment spill too large + /* 14 */ createBuffer(0, 101), + + // additional data + /* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), + + // checkpoint completes - this should not result in a "completion notification" + /* 18 */ createBarrier(7, 2), + + // trailing buffers + /* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100) + }; + + // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 1000); + + StatefulTask<?> toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + // validating the sequence of buffers + + check(sequence[0], buffer.getNextNonBlocked()); + check(sequence[1], buffer.getNextNonBlocked()); + check(sequence[2], buffer.getNextNonBlocked()); + check(sequence[3], buffer.getNextNonBlocked()); + + // start of checkpoint + long startTs = System.nanoTime(); + check(sequence[6], buffer.getNextNonBlocked()); + check(sequence[8], buffer.getNextNonBlocked()); + check(sequence[10], buffer.getNextNonBlocked()); + + // trying to pull the next makes the alignment overflow - so buffered buffers are replayed + check(sequence[5], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), any(AlignmentLimitExceededException.class)); + + // playing back buffered events + check(sequence[7], buffer.getNextNonBlocked()); + check(sequence[11], buffer.getNextNonBlocked()); + check(sequence[12], buffer.getNextNonBlocked()); + check(sequence[13], buffer.getNextNonBlocked()); + check(sequence[14], buffer.getNextNonBlocked()); + + // the additional data + check(sequence[15], buffer.getNextNonBlocked()); + check(sequence[16], buffer.getNextNonBlocked()); + check(sequence[17], buffer.getNextNonBlocked()); + + check(sequence[19], buffer.getNextNonBlocked()); + check(sequence[20], buffer.getNextNonBlocked()); + check(sequence[21], buffer.getNextNonBlocked()); + + // no call for a completed checkpoint must have happened + verify(toNotify, times(0)).triggerCheckpointOnBarrier(anyLong(), anyLong()); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + /** + * This tests the following case: + * - an alignment starts + * - barriers from a second checkpoint queue before the first completes + * - together they are larger than the threshold + * - after the first checkpoint (with second checkpoint data queued) aborts, the second completes + */ + @Test + public void testAlignmentLimitWithQueuedAlignments() throws Exception { + BufferOrEvent[] sequence = { + // some initial buffers + /* 0 */ createBuffer(1, 100), createBuffer(2, 70), + + // starting a checkpoint + /* 2 */ createBarrier(3, 2), + /* 3 */ createBuffer(1, 100), createBuffer(2, 100), + /* 5 */ createBarrier(3, 0), + /* 6 */ createBuffer(0, 100), createBuffer(1, 100), + + // queue some data from the next checkpoint + /* 8 */ createBarrier(4, 0), + /* 9 */ createBuffer(0, 100), createBuffer(0, 120), createBuffer(1, 100), + + // this one makes the alignment overflow + /* 12 */ createBuffer(2, 100), + + // checkpoint completed + /* 13 */ createBarrier(3, 1), + + // more for the next checkpoint + /* 14 */ createBarrier(4, 1), + /* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), + + // next checkpoint completes + /* 18 */ createBarrier(4, 2), + + // trailing data + /* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100) + }; + + // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 500); + + StatefulTask<?> toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + // validating the sequence of buffers + long startTs; + + check(sequence[0], buffer.getNextNonBlocked()); + check(sequence[1], buffer.getNextNonBlocked()); + + // start of checkpoint + startTs = System.nanoTime(); + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[7], buffer.getNextNonBlocked()); + + // next checkpoint also in progress + check(sequence[11], buffer.getNextNonBlocked()); + + // checkpoint alignment aborted due to too much data + check(sequence[4], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(AlignmentLimitExceededException.class)); + + // replay buffered data - in the middle, the alignment for checkpoint 4 starts + check(sequence[6], buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence[12], buffer.getNextNonBlocked()); + + // only checkpoint 4 is pending now - the last checkpoint 3 barrier will not trigger success + check(sequence[17], buffer.getNextNonBlocked()); + + // checkpoint 4 completed - check and validate buffered replay + check(sequence[9], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(4L), anyLong()); + + check(sequence[10], buffer.getNextNonBlocked()); + check(sequence[15], buffer.getNextNonBlocked()); + check(sequence[16], buffer.getNextNonBlocked()); + + // trailing data + check(sequence[19], buffer.getNextNonBlocked()); + check(sequence[20], buffer.getNextNonBlocked()); + check(sequence[21], buffer.getNextNonBlocked()); + + // only checkpoint 4 was successfully completed, not checkpoint 3 + verify(toNotify, times(0)).triggerCheckpointOnBarrier(eq(3L), anyLong()); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static BufferOrEvent createBuffer(int channel, int size) { + byte[] bytes = new byte[size]; + RND.nextBytes(bytes); + + MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE); + memory.put(0, bytes); + + Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE); + buf.setSize(size); + + // retain an additional time so it does not get disposed after being read by the input gate + buf.retain(); + + return new BufferOrEvent(buf, channel); + } + + private static BufferOrEvent createBarrier(long id, int channel) { + return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel); + } + + private static void check(BufferOrEvent expected, BufferOrEvent present) { + assertNotNull(expected); + assertNotNull(present); + assertEquals(expected.isBuffer(), present.isBuffer()); + + if (expected.isBuffer()) { + assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize()); + MemorySegment expectedMem = expected.getBuffer().getMemorySegment(); + MemorySegment presentMem = present.getBuffer().getMemorySegment(); + assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0); + } + else { + assertEquals(expected.getEvent(), present.getEvent()); + } + } + + private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) { + final long elapsed = System.nanoTime() - startTimestamp; + assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed); + } + + private static void checkNoTempFilesRemain() { + // validate that all temp files have been removed + for (File dir : IO_MANAGER.getSpillingDirectories()) { + for (String file : dir.list()) { + if (file != null && !(file.equals(".") || file.equals(".."))) { + fail("barrier buffer did not clean up temp files. remaining file: " + file); + } + } + } + } +}
