Repository: flink
Updated Branches:
  refs/heads/master 41d5875bf -> 0c42d258e


[FLINK-5285] Abort checkpoint only once in BarrierTracker

Prevent an interleaved sequence of cancellation markers for two consecutive 
checkpoints
to trigger a flood of cancellation markers for down stream operators. This is 
done by
aborting each checkpoint only once and don't re-create checkpoint barrier 
counts for already
aborted checkpoints.

Add test case

This closes #2963.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3f19a5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3f19a5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3f19a5b

Branch: refs/heads/master
Commit: d3f19a5bead1d0709da733b75d729afa9341c250
Parents: 41d5875
Author: Till Rohrmann <[email protected]>
Authored: Wed Dec 7 19:05:47 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Dec 9 14:41:15 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/io/BarrierBuffer.java     |  5 ++-
 .../streaming/runtime/io/BarrierTracker.java    | 25 ++++++++-----
 .../runtime/io/BarrierTrackerTest.java          | 37 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d3f19a5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 0940133..0baf126 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -313,6 +313,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                currentCheckpointId = barrierId;
                                startOfAlignmentTimestamp = 0L;
                                latestAlignmentDurationNanos = 0L;
+
+                               notifyAbort(currentCheckpointId, new 
CheckpointDeclineSubsumedException(barrierId));
+
                                notifyAbortOnCancellationBarrier(barrierId);
                        }
 
@@ -422,7 +425,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                startOfAlignmentTimestamp = System.nanoTime();
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Starting stream alignment for checkpoint " + 
checkpointId);
+                       LOG.debug("Starting stream alignment for checkpoint " + 
checkpointId + '.');
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d3f19a5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index f497f4b..9351f1b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -211,6 +211,11 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                CheckpointBarrierCount cbc;
                while ((cbc = pendingCheckpoints.peekFirst()) != null && 
cbc.checkpointId() < checkpointId) {
                        pendingCheckpoints.removeFirst();
+
+                       if (cbc.markAborted()) {
+                               // abort the subsumed checkpoints if not 
already done
+                               notifyAbort(cbc.checkpointId());
+                       }
                }
 
                if (cbc != null && cbc.checkpointId() == checkpointId) {
@@ -226,17 +231,19 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                                pendingCheckpoints.removeFirst();
                        }
                }
-               else {
+               else if (checkpointId > latestPendingCheckpointID) {
                        notifyAbort(checkpointId);
 
-                       // first barrier for this checkpoint - remember it as 
aborted
-                       // since we polled away all entries with lower 
checkpoint IDs
-                       // this entry will become the new first entry
-                       if (pendingCheckpoints.size() < 
MAX_CHECKPOINTS_TO_TRACK) {
-                               CheckpointBarrierCount abortedMarker = new 
CheckpointBarrierCount(checkpointId);
-                               abortedMarker.markAborted();
-                               pendingCheckpoints.addFirst(abortedMarker);
-                       }
+                       latestPendingCheckpointID = checkpointId;
+
+                       CheckpointBarrierCount abortedMarker = new 
CheckpointBarrierCount(checkpointId);
+                       abortedMarker.markAborted();
+                       pendingCheckpoints.addFirst(abortedMarker);
+
+                       // we have removed all other pending checkpoint barrier 
counts --> no need to check that
+                       // we don't exceed the maximum checkpoints to track
+               } else {
+                       // trailing cancellation barrier which was already 
cancelled
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d3f19a5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 7ae144d..0d9e6ac 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -36,6 +36,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for the behavior of the barrier tracker.
@@ -426,6 +431,38 @@ public class BarrierTrackerTest {
 
                assertTrue(tracker.isEmpty());
        }
+
+       /**
+        * Tests that each checkpoint is only aborted once in case of an 
interleaved cancellation
+        * barrier arrival of two consecutive checkpoints.
+        */
+       @Test
+       public void testInterleavedCancellationBarriers() throws Exception {
+               BufferOrEvent[] sequence = {
+                       createBarrier(1L, 0),
+                       createCancellationBarrier(2L, 0),
+                       createCancellationBarrier(1L, 1),
+                       createCancellationBarrier(2L, 1),
+                       createCancellationBarrier(1L, 2),
+                       createCancellationBarrier(2L, 2),
+                       createBuffer(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierTracker tracker = new BarrierTracker(gate);
+               StatefulTask statefulTask = mock(StatefulTask.class);
+
+               tracker.registerCheckpointEventHandler(statefulTask);
+
+               for (BufferOrEvent boe : sequence) {
+                       if (boe.isBuffer() || (boe.getEvent().getClass() != 
CheckpointBarrier.class && boe.getEvent().getClass() != 
CancelCheckpointMarker.class)) {
+                               assertEquals(boe, tracker.getNextNonBlocked());
+                       }
+               }
+
+               verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(1L), 
any(Throwable.class));
+               verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(2L), 
any(Throwable.class));
+       }
        
        // 
------------------------------------------------------------------------
        //  Utils

Reply via email to