[
https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648167#comment-15648167
]
ASF GitHub Bot commented on FLINK-4975:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2754#discussion_r87039138
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
---
@@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() {
}
@Test
- public void testEndOfStreamWhileCheckpoint() {
+ public void testEndOfStreamWhileCheckpoint() throws Exception {
+ BufferOrEvent[] sequence = {
+ // one checkpoint
+ createBarrier(1, 0), createBarrier(1, 1),
createBarrier(1, 2),
+
+ // some buffers
+ createBuffer(0), createBuffer(0),
createBuffer(2),
+
+ // start the checkpoint that will be incomplete
+ createBarrier(2, 2), createBarrier(2, 0),
+ createBuffer(0), createBuffer(2),
createBuffer(1),
+
+ // close one after the barrier one before the
barrier
+ createEndOfPartition(2),
createEndOfPartition(1),
+ createBuffer(0),
+
+ // final end of stream
+ createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3,
Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ // data after first checkpoint
+ check(sequence[3], buffer.getNextNonBlocked());
+ check(sequence[4], buffer.getNextNonBlocked());
+ check(sequence[5], buffer.getNextNonBlocked());
+ assertEquals(1L, buffer.getCurrentCheckpointId());
+
+ // alignment of second checkpoint
+ check(sequence[10], buffer.getNextNonBlocked());
+ assertEquals(2L, buffer.getCurrentCheckpointId());
+
+ // first end-of-partition encountered: checkpoint will not be
completed
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[8], buffer.getNextNonBlocked());
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[11], buffer.getNextNonBlocked());
+ check(sequence[13], buffer.getNextNonBlocked());
+ check(sequence[14], buffer.getNextNonBlocked());
+
+ // all done
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+
+ buffer.cleanup();
+
+ checkNoTempFilesRemain();
+ }
+
+ @Test
+ public void testSingleChannelAbortCheckpoint() throws Exception {
+ BufferOrEvent[] sequence = {
+ createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(0),
+ createBarrier(2, 0),
+ createCancellationBarrier(4, 0),
+ createBarrier(5, 0),
+ createBuffer(0),
+ createCancellationBarrier(6, 0),
+ createBuffer(0)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 1,
Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ StatefulTask toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
+
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ verify(toNotify,
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+ check(sequence[6], buffer.getNextNonBlocked());
+ assertEquals(5L, buffer.getCurrentCheckpointId());
+ verify(toNotify,
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+ verify(toNotify,
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+ check(sequence[8], buffer.getNextNonBlocked());
+ assertEquals(6L, buffer.getCurrentCheckpointId());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+ buffer.cleanup();
+ checkNoTempFilesRemain();
+ }
+
+ @Test
+ public void testMultiChannelAbortCheckpoint() throws Exception {
+ BufferOrEvent[] sequence = {
+ // some buffers and a successful checkpoint
+ /* 0 */ createBuffer(0), createBuffer(2),
createBuffer(0),
+ /* 3 */ createBarrier(1, 1), createBarrier(1,
2),
+ /* 5 */ createBuffer(2), createBuffer(1),
+ /* 7 */ createBarrier(1, 0),
+ /* 8 */ createBuffer(0), createBuffer(2),
+
+ // aborted on last barrier
+ /* 10 */ createBarrier(2, 0), createBarrier(2,
2),
+ /* 12 */ createBuffer(0), createBuffer(2),
+ /* 14 */ createCancellationBarrier(2, 1),
+
+ // successful checkpoint
+ /* 15 */ createBuffer(2), createBuffer(1),
+ /* 17 */ createBarrier(3, 1), createBarrier(3,
2), createBarrier(3, 0),
+
+ // abort on first barrier
+ /* 20 */ createBuffer(0), createBuffer(1),
+ /* 22 */ createCancellationBarrier(4, 1),
createBarrier(4, 2),
+ /* 24 */ createBuffer(0),
+ /* 25 */ createBarrier(4, 0),
+
+ // another successful checkpoint
+ /* 26 */ createBuffer(0), createBuffer(1),
createBuffer(2),
+ /* 29 */ createBarrier(5, 2), createBarrier(5,
1), createBarrier(5, 0),
+ /* 32 */ createBuffer(0), createBuffer(1),
+
+ // abort multiple cancellations and a barrier
after the cancellations
+ /* 34 */ createCancellationBarrier(6, 1),
createCancellationBarrier(6, 2),
+ /* 36 */ createBarrier(6, 0),
+
+ /* 37 */ createBuffer(0)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3,
Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ StatefulTask toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
+
+ long startTs;
+
+ // successful first checkpoint, with some aligned buffers
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ startTs = System.nanoTime();
+ check(sequence[5], buffer.getNextNonBlocked());
+ verify(toNotify,
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+ validateAlignmentTime(startTs, buffer);
+
+ check(sequence[6], buffer.getNextNonBlocked());
+ check(sequence[8], buffer.getNextNonBlocked());
+ check(sequence[9], buffer.getNextNonBlocked());
+
+ // canceled checkpoint on last barrier
+ startTs = System.nanoTime();
+ check(sequence[12], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+ validateAlignmentTime(startTs, buffer);
+ check(sequence[13], buffer.getNextNonBlocked());
+
+ // one more successful checkpoint
+ check(sequence[15], buffer.getNextNonBlocked());
+ check(sequence[16], buffer.getNextNonBlocked());
+ startTs = System.nanoTime();
+ check(sequence[20], buffer.getNextNonBlocked());
+ verify(toNotify,
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
+ validateAlignmentTime(startTs, buffer);
+ check(sequence[21], buffer.getNextNonBlocked());
+
+ // this checkpoint gets immediately canceled
+ check(sequence[24], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+ // some buffers
+ check(sequence[26], buffer.getNextNonBlocked());
+ check(sequence[27], buffer.getNextNonBlocked());
+ check(sequence[28], buffer.getNextNonBlocked());
+
+ // a simple successful checkpoint
+ startTs = System.nanoTime();
+ check(sequence[32], buffer.getNextNonBlocked());
+ verify(toNotify,
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+ validateAlignmentTime(startTs, buffer);
+ check(sequence[33], buffer.getNextNonBlocked());
+
+ check(sequence[37], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+ // all done
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+
+ buffer.cleanup();
+ checkNoTempFilesRemain();
+ }
+
+ @Test
+ public void testAbortViaQueuedBarriers() throws Exception {
+ BufferOrEvent[] sequence = {
+ // starting a checkpoint
+ /* 0 */ createBuffer(1),
+ /* 1 */ createBarrier(1, 1), createBarrier(1,
2),
+ /* 3 */ createBuffer(2), createBuffer(0),
createBuffer(1),
+
+ // queued barrier and cancellation barrier
+ /* 6 */ createCancellationBarrier(2, 2),
+ /* 7 */ createBarrier(2, 1),
+
+ // some intermediate buffers (some queued)
+ /* 8 */ createBuffer(0), createBuffer(1),
createBuffer(2),
+
+ // complete initial checkpoint
+ /* 11 */ createBarrier(1, 0),
+
+ // some buffers (none queued, since checkpoint
is aborted)
+ /* 12 */ createBuffer(2), createBuffer(1),
createBuffer(0),
+
+ // final barrier of aborted checkpoint
+ /* 15 */ createBarrier(1, 2),
--- End diff --
Both late and aborted barriers are handled in the same code path, so the
test worked despite that.
> Add a limit for how much data may be buffered during checkpoint alignment
> -------------------------------------------------------------------------
>
> Key: FLINK-4975
> URL: https://issues.apache.org/jira/browse/FLINK-4975
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.1.3
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.1.4
>
>
> During checkpoint alignment, data may be buffered/spilled.
> We should introduce an upper limit for the spilled data volume. After
> exceeding that limit, the checkpoint alignment should abort and the
> checkpoint be canceled.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)