Repository: flink
Updated Branches:
  refs/heads/release-1.1 4dd3efea4 -> 0962cb6f4


http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index d4fdc59..cf1f98e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -18,16 +18,18 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.StateHandle;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -35,6 +37,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -42,15 +45,23 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 /**
  * Tests for the behavior of the {@link BarrierBuffer}.
  */
 public class BarrierBufferTest {
 
+       private static final Random RND = new Random();
+
        private static final int PAGE_SIZE = 512;
-       
+
        private static int SIZE_COUNTER = 0;
-       
+
        private static IOManager IO_MANAGER;
 
        @BeforeClass
@@ -86,7 +97,9 @@ public class BarrierBufferTest {
                        for (BufferOrEvent boe : sequence) {
                                assertEquals(boe, buffer.getNextNonBlocked());
                        }
-                       
+
+                       assertEquals(0L, buffer.getAlignmentDurationNanos());
+
                        assertNull(buffer.getNextNonBlocked());
                        assertNull(buffer.getNextNonBlocked());
                        
@@ -120,6 +133,8 @@ public class BarrierBufferTest {
                                assertEquals(boe, buffer.getNextNonBlocked());
                        }
 
+                       assertEquals(0L, buffer.getAlignmentDurationNanos());
+
                        assertNull(buffer.getNextNonBlocked());
                        assertNull(buffer.getNextNonBlocked());
 
@@ -222,13 +237,15 @@ public class BarrierBufferTest {
                        ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
                        buffer.registerCheckpointEventHandler(handler);
                        handler.setNextExpectedCheckpointId(1L);
-                       
+
                        // pre checkpoint 1
                        check(sequence[0], buffer.getNextNonBlocked());
                        check(sequence[1], buffer.getNextNonBlocked());
                        check(sequence[2], buffer.getNextNonBlocked());
                        assertEquals(1L, handler.getNextExpectedCheckpointId());
 
+                       long startTs = System.nanoTime();
+
                        // blocking while aligning for checkpoint 1
                        check(sequence[7], buffer.getNextNonBlocked());
                        assertEquals(1L, handler.getNextExpectedCheckpointId());
@@ -236,6 +253,8 @@ public class BarrierBufferTest {
                        // checkpoint 1 done, returning buffered data
                        check(sequence[5], buffer.getNextNonBlocked());
                        assertEquals(2L, handler.getNextExpectedCheckpointId());
+                       validateAlignmentTime(startTs, buffer);
+
                        check(sequence[6], buffer.getNextNonBlocked());
 
                        // pre checkpoint 2
@@ -245,10 +264,13 @@ public class BarrierBufferTest {
                        check(sequence[12], buffer.getNextNonBlocked());
                        check(sequence[13], buffer.getNextNonBlocked());
                        assertEquals(2L, handler.getNextExpectedCheckpointId());
-                       
+
                        // checkpoint 2 barriers come together
+                       startTs = System.nanoTime();
                        check(sequence[17], buffer.getNextNonBlocked());
                        assertEquals(3L, handler.getNextExpectedCheckpointId());
+                       validateAlignmentTime(startTs, buffer);
+
                        check(sequence[18], buffer.getNextNonBlocked());
 
                        // checkpoint 3 starts, data buffered
@@ -257,7 +279,7 @@ public class BarrierBufferTest {
                        check(sequence[21], buffer.getNextNonBlocked());
 
                        // checkpoint 4 happens without extra data
-                       
+
                        // pre checkpoint 5
                        check(sequence[27], buffer.getNextNonBlocked());
                        assertEquals(5L, handler.getNextExpectedCheckpointId());
@@ -301,7 +323,7 @@ public class BarrierBufferTest {
                        BufferOrEvent[] sequence = {
                                        createBuffer(0), createBuffer(1), 
createBuffer(2),
                                        createBarrier(1, 1), createBarrier(1, 
2), createBarrier(1, 0),
-                                       
+
                                        createBuffer(2), createBuffer(1), 
createBuffer(0),
                                        createBarrier(2, 1),
                                        createBuffer(1), createBuffer(1), 
createEndOfPartition(1), createBuffer(0), createBuffer(2),
@@ -327,12 +349,14 @@ public class BarrierBufferTest {
                        assertEquals(2L, handler.getNextExpectedCheckpointId());
                        check(sequence[7], buffer.getNextNonBlocked());
                        check(sequence[8], buffer.getNextNonBlocked());
-                       
+
                        // checkpoint 2 alignment
+                       long startTs = System.nanoTime();
                        check(sequence[13], buffer.getNextNonBlocked());
                        check(sequence[14], buffer.getNextNonBlocked());
                        check(sequence[18], buffer.getNextNonBlocked());
                        check(sequence[19], buffer.getNextNonBlocked());
+                       validateAlignmentTime(startTs, buffer);
 
                        // end of stream: remaining buffered contents
                        check(sequence[10], buffer.getNextNonBlocked());
@@ -343,7 +367,7 @@ public class BarrierBufferTest {
 
                        assertNull(buffer.getNextNonBlocked());
                        assertNull(buffer.getNextNonBlocked());
-                       
+
                        buffer.cleanup();
 
                        checkNoTempFilesRemain();
@@ -389,7 +413,7 @@ public class BarrierBufferTest {
                                        createBarrier(3, 2),
                                        createBuffer(2), createBuffer(1), 
createBuffer(2), createBuffer(0),
                                        createBarrier(6, 1),
-                                       
+
                                        // complete checkpoint 4, checkpoint 5 
remains not fully triggered
                                        createBarrier(4, 2),
                                        createBuffer(2),
@@ -419,12 +443,14 @@ public class BarrierBufferTest {
 
                        // alignment of checkpoint 2 - buffering also some 
barriers for
                        // checkpoints 3 and 4
+                       long startTs = System.nanoTime();
                        check(sequence[13], buffer.getNextNonBlocked());
                        check(sequence[20], buffer.getNextNonBlocked());
                        check(sequence[23], buffer.getNextNonBlocked());
-                       
+
                        // checkpoint 2 completed
                        check(sequence[12], buffer.getNextNonBlocked());
+                       validateAlignmentTime(startTs, buffer);
                        check(sequence[25], buffer.getNextNonBlocked());
                        check(sequence[27], buffer.getNextNonBlocked());
                        check(sequence[30], buffer.getNextNonBlocked());
@@ -507,36 +533,53 @@ public class BarrierBufferTest {
                        MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
                        BarrierBuffer buffer = new BarrierBuffer(gate, 
IO_MANAGER);
 
-                       ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-                       buffer.registerCheckpointEventHandler(handler);
-                       handler.setNextExpectedCheckpointId(1L);
+                       StatefulTask<?> toNotify = mock(StatefulTask.class);
+                       buffer.registerCheckpointEventHandler(toNotify);
 
-                       // checkpoint 1
+                       long startTs;
+
+                       // initial data
                        check(sequence[0], buffer.getNextNonBlocked());
                        check(sequence[1], buffer.getNextNonBlocked());
                        check(sequence[2], buffer.getNextNonBlocked());
+
+                       // align checkpoint 1
+                       startTs = System.nanoTime();
                        check(sequence[7], buffer.getNextNonBlocked());
                        assertEquals(1L, buffer.getCurrentCheckpointId());
-                       
+
+                       // checkpoint done - replay buffered
                        check(sequence[5], buffer.getNextNonBlocked());
+                       validateAlignmentTime(startTs, buffer);
+                       verify(toNotify).triggerCheckpointOnBarrier(eq(1L), 
anyLong());
                        check(sequence[6], buffer.getNextNonBlocked());
+
                        check(sequence[9], buffer.getNextNonBlocked());
                        check(sequence[10], buffer.getNextNonBlocked());
 
                        // alignment of checkpoint 2
+                       startTs = System.nanoTime();
                        check(sequence[13], buffer.getNextNonBlocked());
-                       assertEquals(2L, buffer.getCurrentCheckpointId());
                        check(sequence[15], buffer.getNextNonBlocked());
 
                        // checkpoint 2 aborted, checkpoint 3 started
                        check(sequence[12], buffer.getNextNonBlocked());
                        assertEquals(3L, buffer.getCurrentCheckpointId());
+                       validateAlignmentTime(startTs, buffer);
+                       verify(toNotify).abortCheckpointOnBarrier(2L);
                        check(sequence[16], buffer.getNextNonBlocked());
+
+                       // checkpoint 3 alignment in progress
                        check(sequence[19], buffer.getNextNonBlocked());
-                       check(sequence[20], buffer.getNextNonBlocked());
-                       
+
                        // checkpoint 3 aborted (end of partition)
+                       check(sequence[20], buffer.getNextNonBlocked());
+                       verify(toNotify).abortCheckpointOnBarrier(3L);
+
+                       // replay buffered data from checkpoint 3
                        check(sequence[18], buffer.getNextNonBlocked());
+
+                       // all the remaining messages
                        check(sequence[21], buffer.getNextNonBlocked());
                        check(sequence[22], buffer.getNextNonBlocked());
                        check(sequence[23], buffer.getNextNonBlocked());
@@ -613,17 +656,21 @@ public class BarrierBufferTest {
                        check(sequence[19], buffer.getNextNonBlocked());
                        check(sequence[21], buffer.getNextNonBlocked());
 
+                       long startTs = System.nanoTime();
+
                        // checkpoint 2 aborted, checkpoint 4 started. replay 
buffered
                        check(sequence[12], buffer.getNextNonBlocked());
                        assertEquals(4L, buffer.getCurrentCheckpointId());
                        check(sequence[16], buffer.getNextNonBlocked());
                        check(sequence[18], buffer.getNextNonBlocked());
                        check(sequence[22], buffer.getNextNonBlocked());
-                       
+
                        // align checkpoint 4 remainder
                        check(sequence[25], buffer.getNextNonBlocked());
                        check(sequence[26], buffer.getNextNonBlocked());
-                       
+
+                       validateAlignmentTime(startTs, buffer);
+
                        // checkpoint 4 aborted (due to end of partition)
                        check(sequence[24], buffer.getNextNonBlocked());
                        check(sequence[27], buffer.getNextNonBlocked());
@@ -862,9 +909,9 @@ public class BarrierBufferTest {
 
                        assertNull(buffer.getNextNonBlocked());
                        assertNull(buffer.getNextNonBlocked());
-                       
+
                        buffer.cleanup();
-                       
+
                        checkNoTempFilesRemain();
                }
                catch (Exception e) {
@@ -874,26 +921,480 @@ public class BarrierBufferTest {
        }
 
        @Test
-       public void testEndOfStreamWhileCheckpoint() {
+       public void testEndOfStreamWhileCheckpoint() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // one checkpoint
+                               createBarrier(1, 0), createBarrier(1, 1), 
createBarrier(1, 2),
+
+                               // some buffers
+                               createBuffer(0), createBuffer(0), 
createBuffer(2),
+
+                               // start the checkpoint that will be incomplete
+                               createBarrier(2, 2), createBarrier(2, 0),
+                               createBuffer(0), createBuffer(2), 
createBuffer(1),
+
+                               // close one after the barrier one before the 
barrier
+                               createEndOfPartition(2), 
createEndOfPartition(1),
+                               createBuffer(0),
+
+                               // final end of stream
+                               createEndOfPartition(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               // data after first checkpoint
+               check(sequence[3], buffer.getNextNonBlocked());
+               check(sequence[4], buffer.getNextNonBlocked());
+               check(sequence[5], buffer.getNextNonBlocked());
+               assertEquals(1L, buffer.getCurrentCheckpointId());
+
+               // alignment of second checkpoint
+               check(sequence[10], buffer.getNextNonBlocked());
+               assertEquals(2L, buffer.getCurrentCheckpointId());
+
+               // first end-of-partition encountered: checkpoint will not be 
completed
+               check(sequence[12], buffer.getNextNonBlocked());
+               check(sequence[8], buffer.getNextNonBlocked());
+               check(sequence[9], buffer.getNextNonBlocked());
+               check(sequence[11], buffer.getNextNonBlocked());
+               check(sequence[13], buffer.getNextNonBlocked());
+               check(sequence[14], buffer.getNextNonBlocked());
+
+               // all done
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+
+               checkNoTempFilesRemain();
+       }
+
+       @Test
+       public void testSingleChannelAbortCheckpoint() throws Exception {
+               BufferOrEvent[] sequence = {
+                               createBuffer(0),
+                               createBarrier(1, 0),
+                               createBuffer(0),
+                               createBarrier(2, 0),
+                               createCancellationBarrier(4, 0),
+                               createBarrier(5, 0),
+                               createBuffer(0),
+                               createCancellationBarrier(6, 0),
+                               createBuffer(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               StatefulTask<?> toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               check(sequence[0], buffer.getNextNonBlocked());
+               check(sequence[2], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), 
anyLong());
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               check(sequence[6], buffer.getNextNonBlocked());
+               assertEquals(5L, buffer.getCurrentCheckpointId());
+               verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), 
anyLong());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+               verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), 
anyLong());
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               check(sequence[8], buffer.getNextNonBlocked());
+               assertEquals(6L, buffer.getCurrentCheckpointId());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
                
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+       }
+
+       @Test
+       public void testMultiChannelAbortCheckpoint() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // some buffers and a successful checkpoint
+                               /* 0 */ createBuffer(0), createBuffer(2), 
createBuffer(0),
+                               /* 3 */ createBarrier(1, 1), createBarrier(1, 
2),
+                               /* 5 */ createBuffer(2), createBuffer(1),
+                               /* 7 */ createBarrier(1, 0),
+                               /* 8 */ createBuffer(0), createBuffer(2),
+
+                               // aborted on last barrier
+                               /* 10 */ createBarrier(2, 0), createBarrier(2, 
2),
+                               /* 12 */ createBuffer(0), createBuffer(2),
+                               /* 14 */ createCancellationBarrier(2, 1),
+
+                               // successful checkpoint
+                               /* 15 */ createBuffer(2), createBuffer(1),
+                               /* 17 */ createBarrier(3, 1), createBarrier(3, 
2), createBarrier(3, 0),
+
+                               // abort on first barrier
+                               /* 20 */ createBuffer(0), createBuffer(1),
+                               /* 22 */ createCancellationBarrier(4, 1), 
createBarrier(4, 2),
+                               /* 24 */ createBuffer(0),
+                               /* 25 */ createBarrier(4, 0),
+
+                               // another successful checkpoint
+                               /* 26 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
+                               /* 29 */ createBarrier(5, 2), createBarrier(5, 
1), createBarrier(5, 0),
+                               /* 32 */ createBuffer(0), createBuffer(1),
+
+                               // abort multiple cancellations and a barrier 
after the cancellations
+                               /* 34 */ createCancellationBarrier(6, 1), 
createCancellationBarrier(6, 2),
+                               /* 36 */ createBarrier(6, 0),
+
+                               /* 37 */ createBuffer(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               StatefulTask<?> toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               long startTs;
+
+               // successful first checkpoint, with some aligned buffers
+               check(sequence[0], buffer.getNextNonBlocked());
+               check(sequence[1], buffer.getNextNonBlocked());
+               check(sequence[2], buffer.getNextNonBlocked());
+               startTs = System.nanoTime();
+               check(sequence[5], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), 
anyLong());
+               validateAlignmentTime(startTs, buffer);
+
+               check(sequence[6], buffer.getNextNonBlocked());
+               check(sequence[8], buffer.getNextNonBlocked());
+               check(sequence[9], buffer.getNextNonBlocked());
+
+               // canceled checkpoint on last barrier
+               startTs = System.nanoTime();
+               check(sequence[12], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+               validateAlignmentTime(startTs, buffer);
+               check(sequence[13], buffer.getNextNonBlocked());
+
+               // one more successful checkpoint
+               check(sequence[15], buffer.getNextNonBlocked());
+               check(sequence[16], buffer.getNextNonBlocked());
+               startTs = System.nanoTime();
+               check(sequence[20], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(3L), 
anyLong());
+               validateAlignmentTime(startTs, buffer);
+               check(sequence[21], buffer.getNextNonBlocked());
+
+               // this checkpoint gets immediately canceled
+               check(sequence[24], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               // some buffers
+               check(sequence[26], buffer.getNextNonBlocked());
+               check(sequence[27], buffer.getNextNonBlocked());
+               check(sequence[28], buffer.getNextNonBlocked());
+
+               // a simple successful checkpoint
+               startTs = System.nanoTime();
+               check(sequence[32], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), 
anyLong());
+               validateAlignmentTime(startTs, buffer);
+               check(sequence[33], buffer.getNextNonBlocked());
+
+               check(sequence[37], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               // all done
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+       }
+
+       @Test
+       public void testAbortViaQueuedBarriers() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // starting a checkpoint
+                               /* 0 */ createBuffer(1),
+                               /* 1 */ createBarrier(1, 1), createBarrier(1, 
2),
+                               /* 3 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
+
+                               // queued barrier and cancellation barrier
+                               /* 6 */ createCancellationBarrier(2, 2),
+                               /* 7 */ createBarrier(2, 1),
+
+                               // some intermediate buffers (some queued)
+                               /* 8 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
+
+                               // complete initial checkpoint
+                               /* 11 */ createBarrier(1, 0),
+
+                               // some buffers (none queued, since checkpoint 
is aborted)
+                               /* 12 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
+
+                               // final barrier of aborted checkpoint
+                               /* 15 */ createBarrier(2, 0),
+
+                               // some more buffers
+                               /* 16 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               StatefulTask<?> toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               long startTs;
+
+               check(sequence[0], buffer.getNextNonBlocked());
+
+               // starting first checkpoint
+               startTs = System.nanoTime();
+               check(sequence[4], buffer.getNextNonBlocked());
+               check(sequence[8], buffer.getNextNonBlocked());
+
+               // finished first checkpoint
+               check(sequence[3], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), 
anyLong());
+               validateAlignmentTime(startTs, buffer);
+
+               check(sequence[5], buffer.getNextNonBlocked());
+
+               // re-read the queued cancellation barriers
+               check(sequence[9], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               check(sequence[10], buffer.getNextNonBlocked());
+               check(sequence[12], buffer.getNextNonBlocked());
+               check(sequence[13], buffer.getNextNonBlocked());
+               check(sequence[14], buffer.getNextNonBlocked());
+
+               check(sequence[16], buffer.getNextNonBlocked());
+               check(sequence[17], buffer.getNextNonBlocked());
+               check(sequence[18], buffer.getNextNonBlocked());
+
+               // no further alignment should have happened
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               // no further checkpoint (abort) notifications
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+
+               // all done
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+       }
+
+       /**
+        * This tests the where a replay of queued checkpoint barriers meets
+        * a canceled checkpoint.
+        *
+        * The replayed newer checkpoint barrier must not try to cancel the
+        * already canceled checkpoint.
+        */
+       @Test
+       public void testAbortWhileHavingQueuedBarriers() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // starting a checkpoint
+                               /*  0 */ createBuffer(1),
+                               /*  1 */ createBarrier(1, 1),
+                               /*  2 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
+
+                               // queued barrier and cancellation barrier
+                               /*  5 */ createBarrier(2, 1),
+
+                               // some queued buffers
+                               /*  6 */ createBuffer(2), createBuffer(1),
+
+                               // cancel the initial checkpoint
+                               /*  8 */ createCancellationBarrier(1, 0),
+
+                               // some more buffers
+                               /*  9 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
+
+                               // ignored barrier - already canceled and moved 
to next checkpoint
+                               /* 12 */ createBarrier(1, 2),
+
+                               // some more buffers
+                               /* 13 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
+
+                               // complete next checkpoint regularly
+                               /* 16 */ createBarrier(2, 0), createBarrier(2, 
2),
+
+                               // some more buffers
+                               /* 18 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               StatefulTask<?> toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               long startTs;
+
+               check(sequence[0], buffer.getNextNonBlocked());
+
+               // starting first checkpoint
+               startTs = System.nanoTime();
+               check(sequence[2], buffer.getNextNonBlocked());
+               check(sequence[3], buffer.getNextNonBlocked());
+               check(sequence[6], buffer.getNextNonBlocked());
+
+               // cancelled by cancellation barrier
+               check(sequence[4], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
+
+               // the next checkpoint alignment starts now
+               startTs = System.nanoTime();
+               check(sequence[9], buffer.getNextNonBlocked());
+               check(sequence[11], buffer.getNextNonBlocked());
+               check(sequence[13], buffer.getNextNonBlocked());
+               check(sequence[15], buffer.getNextNonBlocked());
+
+               // checkpoint done
+               check(sequence[7], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), 
anyLong());
+
+               // queued data
+               check(sequence[10], buffer.getNextNonBlocked());
+               check(sequence[14], buffer.getNextNonBlocked());
+
+               // trailing data
+               check(sequence[18], buffer.getNextNonBlocked());
+               check(sequence[19], buffer.getNextNonBlocked());
+               check(sequence[20], buffer.getNextNonBlocked());
+
+               // all done
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+
+               // check overall notifications
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+       }
+
+       /**
+        * This tests the where a cancellation barrier is received for a 
checkpoint already
+        * canceled due to receiving a newer checkpoint barrier.
+        */
+       @Test
+       public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws 
Exception {
+               BufferOrEvent[] sequence = {
+                               // starting a checkpoint
+                               /*  0 */ createBuffer(2),
+                               /*  1 */ createBarrier(3, 1), createBarrier(3, 
0),
+                               /*  3 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
+
+                               // newer checkpoint barrier cancels/subsumes 
pending checkpoint
+                               /*  6 */ createBarrier(5, 2),
+
+                               // some queued buffers
+                               /*  7 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
+
+                               // cancel barrier the initial checkpoint /it is 
already canceled)
+                               /* 10 */ createCancellationBarrier(3, 2),
+
+                               // some more buffers
+                               /* 11 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
+
+                               // complete next checkpoint regularly
+                               /* 14 */ createBarrier(5, 0), createBarrier(5, 
1),
+
+                               // some more buffers
+                               /* 16 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               StatefulTask<?> toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               long startTs;
+
+               // validate the sequence
+
+               check(sequence[0], buffer.getNextNonBlocked());
+
+               // beginning of first checkpoint
+               check(sequence[5], buffer.getNextNonBlocked());
+
+               // future barrier aborts checkpoint
+               startTs = System.nanoTime();
+               check(sequence[3], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(3L);
+               check(sequence[4], buffer.getNextNonBlocked());
+
+               // alignment of next checkpoint
+               check(sequence[8], buffer.getNextNonBlocked());
+               check(sequence[9], buffer.getNextNonBlocked());
+               check(sequence[12], buffer.getNextNonBlocked());
+               check(sequence[13], buffer.getNextNonBlocked());
+
+               // checkpoint finished
+               check(sequence[7], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), 
anyLong());
+               check(sequence[11], buffer.getNextNonBlocked());
+
+               // remaining data
+               check(sequence[16], buffer.getNextNonBlocked());
+               check(sequence[17], buffer.getNextNonBlocked());
+               check(sequence[18], buffer.getNextNonBlocked());
+
+               // all done
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+
+               // check overall notifications
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
        }
 
        // 
------------------------------------------------------------------------
        //  Utils
        // 
------------------------------------------------------------------------
 
-       private static BufferOrEvent createBarrier(long id, int channel) {
-               return new BufferOrEvent(new CheckpointBarrier(id, 
System.currentTimeMillis()), channel);
+       private static BufferOrEvent createBarrier(long checkpointId, int 
channel) {
+               return new BufferOrEvent(new CheckpointBarrier(checkpointId, 
System.currentTimeMillis()), channel);
+       }
+
+       private static BufferOrEvent createCancellationBarrier(long 
checkpointId, int channel) {
+               return new BufferOrEvent(new 
CancelCheckpointMarker(checkpointId), channel);
        }
 
        private static BufferOrEvent createBuffer(int channel) {
-               // since we have no access to the contents, we need to use the 
size as an
-               // identifier to validate correctness here
-               Buffer buf = new Buffer(
-                               
MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE),
-                               FreeingBufferRecycler.INSTANCE);
-               
-               buf.setSize(SIZE_COUNTER++);
+               final int size = SIZE_COUNTER++;
+               byte[] bytes = new byte[size];
+               RND.nextBytes(bytes);
+
+               MemorySegment memory = 
MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
+               memory.put(0, bytes);
+
+               Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+               buf.setSize(size);
+
+               // retain an additional time so it does not get disposed after 
being read by the input gate
+               buf.retain();
+
                return new BufferOrEvent(buf, channel);
        }
 
@@ -907,15 +1408,16 @@ public class BarrierBufferTest {
                assertEquals(expected.isBuffer(), present.isBuffer());
                
                if (expected.isBuffer()) {
-                       // since we have no access to the contents, we need to 
use the size as an
-                       // identifier to validate correctness here
                        assertEquals(expected.getBuffer().getSize(), 
present.getBuffer().getSize());
+                       MemorySegment expectedMem = 
expected.getBuffer().getMemorySegment();
+                       MemorySegment presentMem = 
present.getBuffer().getMemorySegment();
+                       assertTrue("memory contents differs", 
expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0);
                }
                else {
                        assertEquals(expected.getEvent(), present.getEvent());
                }
        }
-       
+
        private static void checkNoTempFilesRemain() {
                // validate that all temp files have been removed
                for (File dir : IO_MANAGER.getSpillingDirectories()) {
@@ -926,12 +1428,17 @@ public class BarrierBufferTest {
                        }
                }
        }
-       
+
+       private static void validateAlignmentTime(long startTimestamp, 
BarrierBuffer buffer) {
+               final long elapsed = System.nanoTime() - startTimestamp;
+               assertTrue("wrong alignment time", 
buffer.getAlignmentDurationNanos() <= elapsed);
+       }
+
        // 
------------------------------------------------------------------------
        //  Testing Mocks
        // 
------------------------------------------------------------------------
 
-       private static class ValidatingCheckpointHandler implements 
EventListener<CheckpointBarrier> {
+       private static class ValidatingCheckpointHandler implements 
StatefulTask<StateHandle<Object>> {
                
                private long nextExpectedCheckpointId = -1L;
 
@@ -944,11 +1451,31 @@ public class BarrierBufferTest {
                }
 
                @Override
-               public void onEvent(CheckpointBarrier barrier) {
-                       assertNotNull(barrier);
-                       assertTrue("wrong checkpoint id", 
nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId());
-                       assertTrue(barrier.getTimestamp() > 0);
+               public void setInitialState(StateHandle<Object> stateHandle) 
throws Exception {
+                       throw new UnsupportedOperationException("should never 
be called");
+               }
+
+               @Override
+               public boolean triggerCheckpoint(long checkpointId, long 
timestamp) throws Exception {
+                       throw new UnsupportedOperationException("should never 
be called");
+               }
+
+               @Override
+               public void triggerCheckpointOnBarrier(long checkpointId, long 
timestamp) throws Exception {
+                       assertTrue("wrong checkpoint id",
+                                       nextExpectedCheckpointId == -1L || 
nextExpectedCheckpointId == checkpointId);
+
+                       assertTrue(timestamp > 0);
+
                        nextExpectedCheckpointId++;
                }
+
+               @Override
+               public void abortCheckpointOnBarrier(long checkpointId) {}
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       throw new UnsupportedOperationException("should never 
be called");
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index b9b6e5f..903f585 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,25 +19,30 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
+import org.apache.flink.runtime.state.StateHandle;
 import org.junit.Test;
 
 import java.util.Arrays;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the behavior of the barrier tracker.
  */
 public class BarrierTrackerTest {
-       
+
        private static final int PAGE_SIZE = 512;
-       
+
        @Test
        public void testSingleChannelNoBarriers() {
                try {
@@ -329,6 +334,98 @@ public class BarrierTrackerTest {
                }
        }
 
+       @Test
+       public void testSingleChannelAbortCheckpoint() throws Exception {
+               BufferOrEvent[] sequence = {
+                               createBuffer(0),
+                               createBarrier(1, 0),
+                               createBuffer(0),
+                               createBarrier(2, 0),
+                               createCancellationBarrier(4, 0),
+                               createBarrier(5, 0),
+                               createBuffer(0),
+                               createCancellationBarrier(6, 0),
+                               createBuffer(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, 
Arrays.asList(sequence));
+               BarrierTracker tracker = new BarrierTracker(gate);
+
+               // negative values mean an expected cancellation call!
+               CheckpointSequenceValidator validator =
+                               new CheckpointSequenceValidator(1, 2, -4, 5, 
-6);
+               tracker.registerCheckpointEventHandler(validator);
+
+               for (BufferOrEvent boe : sequence) {
+                       if (boe.isBuffer()) {
+                               assertEquals(boe, tracker.getNextNonBlocked());
+                       }
+                       assertTrue(tracker.isEmpty());
+               }
+
+               assertNull(tracker.getNextNonBlocked());
+               assertNull(tracker.getNextNonBlocked());
+       }
+
+       @Test
+       public void testMultiChannelAbortCheckpoint() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // some buffers and a successful checkpoint
+                               createBuffer(0), createBuffer(2), 
createBuffer(0),
+                               createBarrier(1, 1), createBarrier(1, 2),
+                               createBuffer(2), createBuffer(1),
+                               createBarrier(1, 0),
+
+                               // aborted on last barrier
+                               createBuffer(0), createBuffer(2),
+                               createBarrier(2, 0), createBarrier(2, 2),
+                               createBuffer(0), createBuffer(2),
+                               createCancellationBarrier(2, 1),
+
+                               // successful checkpoint
+                               createBuffer(2), createBuffer(1),
+                               createBarrier(3, 1), createBarrier(3, 2), 
createBarrier(3, 0),
+
+                               // abort on first barrier
+                               createBuffer(0), createBuffer(1),
+                               createCancellationBarrier(4, 1), 
createBarrier(4, 2),
+                               createBuffer(0),
+                               createBarrier(4, 0),
+
+                               // another successful checkpoint
+                               createBuffer(0), createBuffer(1), 
createBuffer(2),
+                               createBarrier(5, 2), createBarrier(5, 1), 
createBarrier(5, 0),
+
+                               // abort multiple cancellations and a barrier 
after the cancellations 
+                               createBuffer(0), createBuffer(1),
+                               createCancellationBarrier(6, 1), 
createCancellationBarrier(6, 2),
+                               createBarrier(6, 0),
+
+                               createBuffer(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierTracker tracker = new BarrierTracker(gate);
+
+               // negative values mean an expected cancellation call!
+               CheckpointSequenceValidator validator =
+                               new CheckpointSequenceValidator(1, -2, 3, -4, 
5, -6);
+               tracker.registerCheckpointEventHandler(validator);
+
+               for (BufferOrEvent boe : sequence) {
+                       if (boe.isBuffer()) {
+                               assertEquals(boe, tracker.getNextNonBlocked());
+                       }
+               }
+
+               assertTrue(tracker.isEmpty());
+
+               assertNull(tracker.getNextNonBlocked());
+               assertNull(tracker.getNextNonBlocked());
+
+               assertTrue(tracker.isEmpty());
+       }
+       
        // 
------------------------------------------------------------------------
        //  Utils
        // 
------------------------------------------------------------------------
@@ -337,6 +434,10 @@ public class BarrierTrackerTest {
                return new BufferOrEvent(new CheckpointBarrier(id, 
System.currentTimeMillis()), channel);
        }
 
+       private static BufferOrEvent createCancellationBarrier(long id, int 
channel) {
+               return new BufferOrEvent(new CancelCheckpointMarker(id), 
channel);
+       }
+
        private static BufferOrEvent createBuffer(int channel) {
                return new BufferOrEvent(
                                new Buffer(MemorySegmentFactory.wrap(new 
byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
@@ -346,22 +447,54 @@ public class BarrierTrackerTest {
        //  Testing Mocks
        // 
------------------------------------------------------------------------
        
-       private static class CheckpointSequenceValidator implements 
EventListener<CheckpointBarrier> {
+       private static class CheckpointSequenceValidator implements 
StatefulTask<StateHandle<Object>> {
 
                private final long[] checkpointIDs;
-               
+
                private int i = 0;
 
                private CheckpointSequenceValidator(long... checkpointIDs) {
                        this.checkpointIDs = checkpointIDs;
                }
-               
+
                @Override
-               public void onEvent(CheckpointBarrier barrier) {
+               public void setInitialState(StateHandle<Object> state) throws 
Exception {
+                       throw new UnsupportedOperationException("should never 
be called");
+               }
+
+               @Override
+               public boolean triggerCheckpoint(long checkpointId, long 
timestamp) throws Exception {
+                       throw new UnsupportedOperationException("should never 
be called");
+               }
+
+               @Override
+               public void triggerCheckpointOnBarrier(long checkpointId, long 
timestamp) throws Exception {
                        assertTrue("More checkpoints than expected", i < 
checkpointIDs.length);
-                       assertNotNull(barrier);
-                       assertEquals("wrong checkpoint id", checkpointIDs[i++], 
barrier.getId());
-                       assertTrue(barrier.getTimestamp() > 0);
+
+                       final long expectedId = checkpointIDs[i++];
+                       if (expectedId >= 0) {
+                               assertEquals("wrong checkpoint id", expectedId, 
checkpointId);
+                               assertTrue(timestamp > 0);
+                       } else {
+                               fail("got 'triggerCheckpointOnBarrier()' when 
expecting an 'abortCheckpointOnBarrier()'");
+                       }
+               }
+
+               @Override
+               public void abortCheckpointOnBarrier(long checkpointId) {
+                       assertTrue("More checkpoints than expected", i < 
checkpointIDs.length);
+
+                       final long expectedId = checkpointIDs[i++];
+                       if (expectedId < 0) {
+                               assertEquals("wrong checkpoint id for checkoint 
abort", -expectedId, checkpointId);
+                       } else {
+                               fail("got 'abortCheckpointOnBarrier()' when 
expecting an 'triggerCheckpointOnBarrier()'");
+                       }
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       throw new UnsupportedOperationException("should never 
be called");
                }
        }
 }

Reply via email to