Repository: flink
Updated Branches:
  refs/heads/release-1.1 2b612f2d8 -> afaa27e9f


[FLINK-5285] Abort checkpoint only once in BarrierTracker

Prevent an interleaved sequence of cancellation markers for two consecutive 
checkpoints
to trigger a flood of cancellation markers for down stream operators. This is 
done by
aborting each checkpoint only once and don't re-create checkpoint barrier 
counts for already
aborted checkpoints.

Add test case

This closes #2964.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afaa27e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afaa27e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afaa27e9

Branch: refs/heads/release-1.1
Commit: afaa27e9faeb0352a49f30de90e719572caa97c5
Parents: 2b612f2
Author: Till Rohrmann <[email protected]>
Authored: Wed Dec 7 19:05:47 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Dec 9 14:37:31 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/io/BarrierBuffer.java     |  5 ++-
 .../streaming/runtime/io/BarrierTracker.java    | 25 ++++++++-----
 .../runtime/io/BarrierTrackerTest.java          | 37 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 5a60439..b71b564 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -312,6 +312,9 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                currentCheckpointId = barrierId;
                                startOfAlignmentTimestamp = 0L;
                                latestAlignmentDurationNanos = 0L;
+
+                               notifyAbort(currentCheckpointId, new 
CheckpointDeclineSubsumedException(barrierId));
+
                                notifyAbortOnCancellationBarrier(barrierId);
                        }
 
@@ -415,7 +418,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                startOfAlignmentTimestamp = System.nanoTime();
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Starting stream alignment for checkpoint " + 
checkpointId);
+                       LOG.debug("Starting stream alignment for checkpoint " + 
checkpointId + '.');
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 59b408d..fbe3042 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -210,6 +210,11 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                CheckpointBarrierCount cbc;
                while ((cbc = pendingCheckpoints.peekFirst()) != null && 
cbc.checkpointId() < checkpointId) {
                        pendingCheckpoints.removeFirst();
+
+                       if (cbc.markAborted()) {
+                               // abort the subsumed checkpoints if not 
already done
+                               notifyAbort(cbc.checkpointId());
+                       }
                }
 
                if (cbc != null && cbc.checkpointId() == checkpointId) {
@@ -225,17 +230,19 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                                pendingCheckpoints.removeFirst();
                        }
                }
-               else {
+               else if (checkpointId > latestPendingCheckpointID) {
                        notifyAbort(checkpointId);
 
-                       // first barrier for this checkpoint - remember it as 
aborted
-                       // since we polled away all entries with lower 
checkpoint IDs
-                       // this entry will become the new first entry
-                       if (pendingCheckpoints.size() < 
MAX_CHECKPOINTS_TO_TRACK) {
-                               CheckpointBarrierCount abortedMarker = new 
CheckpointBarrierCount(checkpointId);
-                               abortedMarker.markAborted();
-                               pendingCheckpoints.addFirst(abortedMarker);
-                       }
+                       latestPendingCheckpointID = checkpointId;
+
+                       CheckpointBarrierCount abortedMarker = new 
CheckpointBarrierCount(checkpointId);
+                       abortedMarker.markAborted();
+                       pendingCheckpoints.addFirst(abortedMarker);
+
+                       // we have removed all other pending checkpoint barrier 
counts --> no need to check that
+                       // we don't exceed the maximum checkpoints to track
+               } else {
+                       // trailing cancellation barrier which was already 
cancelled
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 978c212..729afe8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -35,6 +35,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for the behavior of the barrier tracker.
@@ -425,6 +430,38 @@ public class BarrierTrackerTest {
 
                assertTrue(tracker.isEmpty());
        }
+
+       /**
+        * Tests that each checkpoint is only aborted once in case of an 
interleaved cancellation
+        * barrier arrival of two consecutive checkpoints.
+        */
+       @Test
+       public void testInterleavedCancellationBarriers() throws Exception {
+               BufferOrEvent[] sequence = {
+                       createBarrier(1L, 0),
+                       createCancellationBarrier(2L, 0),
+                       createCancellationBarrier(1L, 1),
+                       createCancellationBarrier(2L, 1),
+                       createCancellationBarrier(1L, 2),
+                       createCancellationBarrier(2L, 2),
+                       createBuffer(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierTracker tracker = new BarrierTracker(gate);
+               StatefulTask statefulTask = mock(StatefulTask.class);
+
+               tracker.registerCheckpointEventHandler(statefulTask);
+
+               for (BufferOrEvent boe : sequence) {
+                       if (boe.isBuffer() || (boe.getEvent().getClass() != 
CheckpointBarrier.class && boe.getEvent().getClass() != 
CancelCheckpointMarker.class)) {
+                               assertEquals(boe, tracker.getNextNonBlocked());
+                       }
+               }
+
+               verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(1L), 
any(Throwable.class));
+               verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(2L), 
any(Throwable.class));
+       }
        
        // 
------------------------------------------------------------------------
        //  Utils

Reply via email to