Repository: flink Updated Branches: refs/heads/release-1.1 4dd3efea4 -> 0962cb6f4
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java index d4fdc59..cf1f98e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java @@ -18,16 +18,18 @@ package org.apache.flink.streaming.runtime.io; -import org.apache.flink.core.memory.HeapMemorySegment; +import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; +import org.apache.flink.runtime.state.StateHandle; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -35,6 +37,7 @@ import org.junit.Test; import java.io.File; import java.util.Arrays; +import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -42,15 +45,23 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + /** * Tests for the behavior of the {@link BarrierBuffer}. */ public class BarrierBufferTest { + private static final Random RND = new Random(); + private static final int PAGE_SIZE = 512; - + private static int SIZE_COUNTER = 0; - + private static IOManager IO_MANAGER; @BeforeClass @@ -86,7 +97,9 @@ public class BarrierBufferTest { for (BufferOrEvent boe : sequence) { assertEquals(boe, buffer.getNextNonBlocked()); } - + + assertEquals(0L, buffer.getAlignmentDurationNanos()); + assertNull(buffer.getNextNonBlocked()); assertNull(buffer.getNextNonBlocked()); @@ -120,6 +133,8 @@ public class BarrierBufferTest { assertEquals(boe, buffer.getNextNonBlocked()); } + assertEquals(0L, buffer.getAlignmentDurationNanos()); + assertNull(buffer.getNextNonBlocked()); assertNull(buffer.getNextNonBlocked()); @@ -222,13 +237,15 @@ public class BarrierBufferTest { ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer.registerCheckpointEventHandler(handler); handler.setNextExpectedCheckpointId(1L); - + // pre checkpoint 1 check(sequence[0], buffer.getNextNonBlocked()); check(sequence[1], buffer.getNextNonBlocked()); check(sequence[2], buffer.getNextNonBlocked()); assertEquals(1L, handler.getNextExpectedCheckpointId()); + long startTs = System.nanoTime(); + // blocking while aligning for checkpoint 1 check(sequence[7], buffer.getNextNonBlocked()); assertEquals(1L, handler.getNextExpectedCheckpointId()); @@ -236,6 +253,8 @@ public class BarrierBufferTest { // checkpoint 1 done, returning buffered data check(sequence[5], buffer.getNextNonBlocked()); assertEquals(2L, handler.getNextExpectedCheckpointId()); + validateAlignmentTime(startTs, buffer); + check(sequence[6], buffer.getNextNonBlocked()); // pre checkpoint 2 @@ -245,10 +264,13 @@ public class BarrierBufferTest { check(sequence[12], buffer.getNextNonBlocked()); check(sequence[13], buffer.getNextNonBlocked()); assertEquals(2L, handler.getNextExpectedCheckpointId()); - + // checkpoint 2 barriers come together + startTs = System.nanoTime(); check(sequence[17], buffer.getNextNonBlocked()); assertEquals(3L, handler.getNextExpectedCheckpointId()); + validateAlignmentTime(startTs, buffer); + check(sequence[18], buffer.getNextNonBlocked()); // checkpoint 3 starts, data buffered @@ -257,7 +279,7 @@ public class BarrierBufferTest { check(sequence[21], buffer.getNextNonBlocked()); // checkpoint 4 happens without extra data - + // pre checkpoint 5 check(sequence[27], buffer.getNextNonBlocked()); assertEquals(5L, handler.getNextExpectedCheckpointId()); @@ -301,7 +323,7 @@ public class BarrierBufferTest { BufferOrEvent[] sequence = { createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0), - + createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(2, 1), createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2), @@ -327,12 +349,14 @@ public class BarrierBufferTest { assertEquals(2L, handler.getNextExpectedCheckpointId()); check(sequence[7], buffer.getNextNonBlocked()); check(sequence[8], buffer.getNextNonBlocked()); - + // checkpoint 2 alignment + long startTs = System.nanoTime(); check(sequence[13], buffer.getNextNonBlocked()); check(sequence[14], buffer.getNextNonBlocked()); check(sequence[18], buffer.getNextNonBlocked()); check(sequence[19], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); // end of stream: remaining buffered contents check(sequence[10], buffer.getNextNonBlocked()); @@ -343,7 +367,7 @@ public class BarrierBufferTest { assertNull(buffer.getNextNonBlocked()); assertNull(buffer.getNextNonBlocked()); - + buffer.cleanup(); checkNoTempFilesRemain(); @@ -389,7 +413,7 @@ public class BarrierBufferTest { createBarrier(3, 2), createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0), createBarrier(6, 1), - + // complete checkpoint 4, checkpoint 5 remains not fully triggered createBarrier(4, 2), createBuffer(2), @@ -419,12 +443,14 @@ public class BarrierBufferTest { // alignment of checkpoint 2 - buffering also some barriers for // checkpoints 3 and 4 + long startTs = System.nanoTime(); check(sequence[13], buffer.getNextNonBlocked()); check(sequence[20], buffer.getNextNonBlocked()); check(sequence[23], buffer.getNextNonBlocked()); - + // checkpoint 2 completed check(sequence[12], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); check(sequence[25], buffer.getNextNonBlocked()); check(sequence[27], buffer.getNextNonBlocked()); check(sequence[30], buffer.getNextNonBlocked()); @@ -507,36 +533,53 @@ public class BarrierBufferTest { MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); - ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); - buffer.registerCheckpointEventHandler(handler); - handler.setNextExpectedCheckpointId(1L); + StatefulTask<?> toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); - // checkpoint 1 + long startTs; + + // initial data check(sequence[0], buffer.getNextNonBlocked()); check(sequence[1], buffer.getNextNonBlocked()); check(sequence[2], buffer.getNextNonBlocked()); + + // align checkpoint 1 + startTs = System.nanoTime(); check(sequence[7], buffer.getNextNonBlocked()); assertEquals(1L, buffer.getCurrentCheckpointId()); - + + // checkpoint done - replay buffered check(sequence[5], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify).triggerCheckpointOnBarrier(eq(1L), anyLong()); check(sequence[6], buffer.getNextNonBlocked()); + check(sequence[9], buffer.getNextNonBlocked()); check(sequence[10], buffer.getNextNonBlocked()); // alignment of checkpoint 2 + startTs = System.nanoTime(); check(sequence[13], buffer.getNextNonBlocked()); - assertEquals(2L, buffer.getCurrentCheckpointId()); check(sequence[15], buffer.getNextNonBlocked()); // checkpoint 2 aborted, checkpoint 3 started check(sequence[12], buffer.getNextNonBlocked()); assertEquals(3L, buffer.getCurrentCheckpointId()); + validateAlignmentTime(startTs, buffer); + verify(toNotify).abortCheckpointOnBarrier(2L); check(sequence[16], buffer.getNextNonBlocked()); + + // checkpoint 3 alignment in progress check(sequence[19], buffer.getNextNonBlocked()); - check(sequence[20], buffer.getNextNonBlocked()); - + // checkpoint 3 aborted (end of partition) + check(sequence[20], buffer.getNextNonBlocked()); + verify(toNotify).abortCheckpointOnBarrier(3L); + + // replay buffered data from checkpoint 3 check(sequence[18], buffer.getNextNonBlocked()); + + // all the remaining messages check(sequence[21], buffer.getNextNonBlocked()); check(sequence[22], buffer.getNextNonBlocked()); check(sequence[23], buffer.getNextNonBlocked()); @@ -613,17 +656,21 @@ public class BarrierBufferTest { check(sequence[19], buffer.getNextNonBlocked()); check(sequence[21], buffer.getNextNonBlocked()); + long startTs = System.nanoTime(); + // checkpoint 2 aborted, checkpoint 4 started. replay buffered check(sequence[12], buffer.getNextNonBlocked()); assertEquals(4L, buffer.getCurrentCheckpointId()); check(sequence[16], buffer.getNextNonBlocked()); check(sequence[18], buffer.getNextNonBlocked()); check(sequence[22], buffer.getNextNonBlocked()); - + // align checkpoint 4 remainder check(sequence[25], buffer.getNextNonBlocked()); check(sequence[26], buffer.getNextNonBlocked()); - + + validateAlignmentTime(startTs, buffer); + // checkpoint 4 aborted (due to end of partition) check(sequence[24], buffer.getNextNonBlocked()); check(sequence[27], buffer.getNextNonBlocked()); @@ -862,9 +909,9 @@ public class BarrierBufferTest { assertNull(buffer.getNextNonBlocked()); assertNull(buffer.getNextNonBlocked()); - + buffer.cleanup(); - + checkNoTempFilesRemain(); } catch (Exception e) { @@ -874,26 +921,480 @@ public class BarrierBufferTest { } @Test - public void testEndOfStreamWhileCheckpoint() { + public void testEndOfStreamWhileCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0), createBuffer(0), createBuffer(2), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0), createBuffer(2), createBuffer(1), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[5], buffer.getNextNonBlocked()); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked()); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered: checkpoint will not be completed + check(sequence[12], buffer.getNextNonBlocked()); + check(sequence[8], buffer.getNextNonBlocked()); + check(sequence[9], buffer.getNextNonBlocked()); + check(sequence[11], buffer.getNextNonBlocked()); + check(sequence[13], buffer.getNextNonBlocked()); + check(sequence[14], buffer.getNextNonBlocked()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + + checkNoTempFilesRemain(); + } + + @Test + public void testSingleChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask<?> toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + check(sequence[0], buffer.getNextNonBlocked()); + check(sequence[2], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong()); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence[6], buffer.getNextNonBlocked()); + assertEquals(5L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong()); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence[8], buffer.getNextNonBlocked()); + assertEquals(6L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testMultiChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2), createBuffer(1), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0), createBuffer(2), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0), createBuffer(2), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2), createBuffer(1), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0), createBuffer(1), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0), createBuffer(1), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask<?> toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // successful first checkpoint, with some aligned buffers + check(sequence[0], buffer.getNextNonBlocked()); + check(sequence[1], buffer.getNextNonBlocked()); + check(sequence[2], buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence[5], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong()); + validateAlignmentTime(startTs, buffer); + + check(sequence[6], buffer.getNextNonBlocked()); + check(sequence[8], buffer.getNextNonBlocked()); + check(sequence[9], buffer.getNextNonBlocked()); + + // canceled checkpoint on last barrier + startTs = System.nanoTime(); + check(sequence[12], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + validateAlignmentTime(startTs, buffer); + check(sequence[13], buffer.getNextNonBlocked()); + + // one more successful checkpoint + check(sequence[15], buffer.getNextNonBlocked()); + check(sequence[16], buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence[20], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(3L), anyLong()); + validateAlignmentTime(startTs, buffer); + check(sequence[21], buffer.getNextNonBlocked()); + + // this checkpoint gets immediately canceled + check(sequence[24], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // some buffers + check(sequence[26], buffer.getNextNonBlocked()); + check(sequence[27], buffer.getNextNonBlocked()); + check(sequence[28], buffer.getNextNonBlocked()); + + // a simple successful checkpoint + startTs = System.nanoTime(); + check(sequence[32], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong()); + validateAlignmentTime(startTs, buffer); + check(sequence[33], buffer.getNextNonBlocked()); + + check(sequence[37], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testAbortViaQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), createBarrier(1, 2), + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 6 */ createCancellationBarrier(2, 2), + /* 7 */ createBarrier(2, 1), + + // some intermediate buffers (some queued) + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete initial checkpoint + /* 11 */ createBarrier(1, 0), + + // some buffers (none queued, since checkpoint is aborted) + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // final barrier of aborted checkpoint + /* 15 */ createBarrier(2, 0), + + // some more buffers + /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask<?> toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + check(sequence[0], buffer.getNextNonBlocked()); + + // starting first checkpoint + startTs = System.nanoTime(); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[8], buffer.getNextNonBlocked()); + + // finished first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong()); + validateAlignmentTime(startTs, buffer); + + check(sequence[5], buffer.getNextNonBlocked()); + + // re-read the queued cancellation barriers + check(sequence[9], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence[10], buffer.getNextNonBlocked()); + check(sequence[12], buffer.getNextNonBlocked()); + check(sequence[13], buffer.getNextNonBlocked()); + check(sequence[14], buffer.getNextNonBlocked()); + + check(sequence[16], buffer.getNextNonBlocked()); + check(sequence[17], buffer.getNextNonBlocked()); + check(sequence[18], buffer.getNextNonBlocked()); + + // no further alignment should have happened + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // no further checkpoint (abort) notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + /** + * This tests the where a replay of queued checkpoint barriers meets + * a canceled checkpoint. + * + * The replayed newer checkpoint barrier must not try to cancel the + * already canceled checkpoint. + */ + @Test + public void testAbortWhileHavingQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), + /* 2 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 5 */ createBarrier(2, 1), + + // some queued buffers + /* 6 */ createBuffer(2), createBuffer(1), + + // cancel the initial checkpoint + /* 8 */ createCancellationBarrier(1, 0), + + // some more buffers + /* 9 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // ignored barrier - already canceled and moved to next checkpoint + /* 12 */ createBarrier(1, 2), + + // some more buffers + /* 13 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete next checkpoint regularly + /* 16 */ createBarrier(2, 0), createBarrier(2, 2), + + // some more buffers + /* 18 */ createBuffer(0), createBuffer(1), createBuffer(2) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask<?> toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + check(sequence[0], buffer.getNextNonBlocked()); + + // starting first checkpoint + startTs = System.nanoTime(); + check(sequence[2], buffer.getNextNonBlocked()); + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[6], buffer.getNextNonBlocked()); + + // cancelled by cancellation barrier + check(sequence[4], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).abortCheckpointOnBarrier(1L); + + // the next checkpoint alignment starts now + startTs = System.nanoTime(); + check(sequence[9], buffer.getNextNonBlocked()); + check(sequence[11], buffer.getNextNonBlocked()); + check(sequence[13], buffer.getNextNonBlocked()); + check(sequence[15], buffer.getNextNonBlocked()); + + // checkpoint done + check(sequence[7], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong()); + + // queued data + check(sequence[10], buffer.getNextNonBlocked()); + check(sequence[14], buffer.getNextNonBlocked()); + + // trailing data + check(sequence[18], buffer.getNextNonBlocked()); + check(sequence[19], buffer.getNextNonBlocked()); + check(sequence[20], buffer.getNextNonBlocked()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + + // check overall notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + } + + /** + * This tests the where a cancellation barrier is received for a checkpoint already + * canceled due to receiving a newer checkpoint barrier. + */ + @Test + public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(2), + /* 1 */ createBarrier(3, 1), createBarrier(3, 0), + /* 3 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // newer checkpoint barrier cancels/subsumes pending checkpoint + /* 6 */ createBarrier(5, 2), + + // some queued buffers + /* 7 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // cancel barrier the initial checkpoint /it is already canceled) + /* 10 */ createCancellationBarrier(3, 2), + + // some more buffers + /* 11 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // complete next checkpoint regularly + /* 14 */ createBarrier(5, 0), createBarrier(5, 1), + + // some more buffers + /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask<?> toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // validate the sequence + + check(sequence[0], buffer.getNextNonBlocked()); + + // beginning of first checkpoint + check(sequence[5], buffer.getNextNonBlocked()); + + // future barrier aborts checkpoint + startTs = System.nanoTime(); + check(sequence[3], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(3L); + check(sequence[4], buffer.getNextNonBlocked()); + + // alignment of next checkpoint + check(sequence[8], buffer.getNextNonBlocked()); + check(sequence[9], buffer.getNextNonBlocked()); + check(sequence[12], buffer.getNextNonBlocked()); + check(sequence[13], buffer.getNextNonBlocked()); + + // checkpoint finished + check(sequence[7], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong()); + check(sequence[11], buffer.getNextNonBlocked()); + + // remaining data + check(sequence[16], buffer.getNextNonBlocked()); + check(sequence[17], buffer.getNextNonBlocked()); + check(sequence[18], buffer.getNextNonBlocked()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + + // check overall notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); } // ------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------ - private static BufferOrEvent createBarrier(long id, int channel) { - return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel); + private static BufferOrEvent createBarrier(long checkpointId, int channel) { + return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis()), channel); + } + + private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) { + return new BufferOrEvent(new CancelCheckpointMarker(checkpointId), channel); } private static BufferOrEvent createBuffer(int channel) { - // since we have no access to the contents, we need to use the size as an - // identifier to validate correctness here - Buffer buf = new Buffer( - MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE), - FreeingBufferRecycler.INSTANCE); - - buf.setSize(SIZE_COUNTER++); + final int size = SIZE_COUNTER++; + byte[] bytes = new byte[size]; + RND.nextBytes(bytes); + + MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE); + memory.put(0, bytes); + + Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE); + buf.setSize(size); + + // retain an additional time so it does not get disposed after being read by the input gate + buf.retain(); + return new BufferOrEvent(buf, channel); } @@ -907,15 +1408,16 @@ public class BarrierBufferTest { assertEquals(expected.isBuffer(), present.isBuffer()); if (expected.isBuffer()) { - // since we have no access to the contents, we need to use the size as an - // identifier to validate correctness here assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize()); + MemorySegment expectedMem = expected.getBuffer().getMemorySegment(); + MemorySegment presentMem = present.getBuffer().getMemorySegment(); + assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0); } else { assertEquals(expected.getEvent(), present.getEvent()); } } - + private static void checkNoTempFilesRemain() { // validate that all temp files have been removed for (File dir : IO_MANAGER.getSpillingDirectories()) { @@ -926,12 +1428,17 @@ public class BarrierBufferTest { } } } - + + private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) { + final long elapsed = System.nanoTime() - startTimestamp; + assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed); + } + // ------------------------------------------------------------------------ // Testing Mocks // ------------------------------------------------------------------------ - private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> { + private static class ValidatingCheckpointHandler implements StatefulTask<StateHandle<Object>> { private long nextExpectedCheckpointId = -1L; @@ -944,11 +1451,31 @@ public class BarrierBufferTest { } @Override - public void onEvent(CheckpointBarrier barrier) { - assertNotNull(barrier); - assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId()); - assertTrue(barrier.getTimestamp() > 0); + public void setInitialState(StateHandle<Object> stateHandle) throws Exception { + throw new UnsupportedOperationException("should never be called"); + } + + @Override + public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception { + throw new UnsupportedOperationException("should never be called"); + } + + @Override + public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception { + assertTrue("wrong checkpoint id", + nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointId); + + assertTrue(timestamp > 0); + nextExpectedCheckpointId++; } + + @Override + public void abortCheckpointOnBarrier(long checkpointId) {} + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + throw new UnsupportedOperationException("should never be called"); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index b9b6e5f..903f585 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -19,25 +19,30 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; +import org.apache.flink.runtime.state.StateHandle; import org.junit.Test; import java.util.Arrays; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for the behavior of the barrier tracker. */ public class BarrierTrackerTest { - + private static final int PAGE_SIZE = 512; - + @Test public void testSingleChannelNoBarriers() { try { @@ -329,6 +334,98 @@ public class BarrierTrackerTest { } } + @Test + public void testSingleChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierTracker tracker = new BarrierTracker(gate); + + // negative values mean an expected cancellation call! + CheckpointSequenceValidator validator = + new CheckpointSequenceValidator(1, 2, -4, 5, -6); + tracker.registerCheckpointEventHandler(validator); + + for (BufferOrEvent boe : sequence) { + if (boe.isBuffer()) { + assertEquals(boe, tracker.getNextNonBlocked()); + } + assertTrue(tracker.isEmpty()); + } + + assertNull(tracker.getNextNonBlocked()); + assertNull(tracker.getNextNonBlocked()); + } + + @Test + public void testMultiChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // some buffers and a successful checkpoint + createBuffer(0), createBuffer(2), createBuffer(0), + createBarrier(1, 1), createBarrier(1, 2), + createBuffer(2), createBuffer(1), + createBarrier(1, 0), + + // aborted on last barrier + createBuffer(0), createBuffer(2), + createBarrier(2, 0), createBarrier(2, 2), + createBuffer(0), createBuffer(2), + createCancellationBarrier(2, 1), + + // successful checkpoint + createBuffer(2), createBuffer(1), + createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + createBuffer(0), createBuffer(1), + createCancellationBarrier(4, 1), createBarrier(4, 2), + createBuffer(0), + createBarrier(4, 0), + + // another successful checkpoint + createBuffer(0), createBuffer(1), createBuffer(2), + createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + + // abort multiple cancellations and a barrier after the cancellations + createBuffer(0), createBuffer(1), + createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + createBarrier(6, 0), + + createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierTracker tracker = new BarrierTracker(gate); + + // negative values mean an expected cancellation call! + CheckpointSequenceValidator validator = + new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6); + tracker.registerCheckpointEventHandler(validator); + + for (BufferOrEvent boe : sequence) { + if (boe.isBuffer()) { + assertEquals(boe, tracker.getNextNonBlocked()); + } + } + + assertTrue(tracker.isEmpty()); + + assertNull(tracker.getNextNonBlocked()); + assertNull(tracker.getNextNonBlocked()); + + assertTrue(tracker.isEmpty()); + } + // ------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------ @@ -337,6 +434,10 @@ public class BarrierTrackerTest { return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel); } + private static BufferOrEvent createCancellationBarrier(long id, int channel) { + return new BufferOrEvent(new CancelCheckpointMarker(id), channel); + } + private static BufferOrEvent createBuffer(int channel) { return new BufferOrEvent( new Buffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel); @@ -346,22 +447,54 @@ public class BarrierTrackerTest { // Testing Mocks // ------------------------------------------------------------------------ - private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> { + private static class CheckpointSequenceValidator implements StatefulTask<StateHandle<Object>> { private final long[] checkpointIDs; - + private int i = 0; private CheckpointSequenceValidator(long... checkpointIDs) { this.checkpointIDs = checkpointIDs; } - + @Override - public void onEvent(CheckpointBarrier barrier) { + public void setInitialState(StateHandle<Object> state) throws Exception { + throw new UnsupportedOperationException("should never be called"); + } + + @Override + public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception { + throw new UnsupportedOperationException("should never be called"); + } + + @Override + public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception { assertTrue("More checkpoints than expected", i < checkpointIDs.length); - assertNotNull(barrier); - assertEquals("wrong checkpoint id", checkpointIDs[i++], barrier.getId()); - assertTrue(barrier.getTimestamp() > 0); + + final long expectedId = checkpointIDs[i++]; + if (expectedId >= 0) { + assertEquals("wrong checkpoint id", expectedId, checkpointId); + assertTrue(timestamp > 0); + } else { + fail("got 'triggerCheckpointOnBarrier()' when expecting an 'abortCheckpointOnBarrier()'"); + } + } + + @Override + public void abortCheckpointOnBarrier(long checkpointId) { + assertTrue("More checkpoints than expected", i < checkpointIDs.length); + + final long expectedId = checkpointIDs[i++]; + if (expectedId < 0) { + assertEquals("wrong checkpoint id for checkoint abort", -expectedId, checkpointId); + } else { + fail("got 'abortCheckpointOnBarrier()' when expecting an 'triggerCheckpointOnBarrier()'"); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + throw new UnsupportedOperationException("should never be called"); } } }
