Repository: flink Updated Branches: refs/heads/master 41d5875bf -> 0c42d258e
[FLINK-5285] Abort checkpoint only once in BarrierTracker Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints to trigger a flood of cancellation markers for down stream operators. This is done by aborting each checkpoint only once and don't re-create checkpoint barrier counts for already aborted checkpoints. Add test case This closes #2963. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3f19a5b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3f19a5b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3f19a5b Branch: refs/heads/master Commit: d3f19a5bead1d0709da733b75d729afa9341c250 Parents: 41d5875 Author: Till Rohrmann <[email protected]> Authored: Wed Dec 7 19:05:47 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Dec 9 14:41:15 2016 +0100 ---------------------------------------------------------------------- .../streaming/runtime/io/BarrierBuffer.java | 5 ++- .../streaming/runtime/io/BarrierTracker.java | 25 ++++++++----- .../runtime/io/BarrierTrackerTest.java | 37 ++++++++++++++++++++ 3 files changed, 57 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d3f19a5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 0940133..0baf126 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -313,6 +313,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { currentCheckpointId = barrierId; startOfAlignmentTimestamp = 0L; latestAlignmentDurationNanos = 0L; + + notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); + notifyAbortOnCancellationBarrier(barrierId); } @@ -422,7 +425,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { startOfAlignmentTimestamp = System.nanoTime(); if (LOG.isDebugEnabled()) { - LOG.debug("Starting stream alignment for checkpoint " + checkpointId); + LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.'); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d3f19a5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java index f497f4b..9351f1b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java @@ -211,6 +211,11 @@ public class BarrierTracker implements CheckpointBarrierHandler { CheckpointBarrierCount cbc; while ((cbc = pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) { pendingCheckpoints.removeFirst(); + + if (cbc.markAborted()) { + // abort the subsumed checkpoints if not already done + notifyAbort(cbc.checkpointId()); + } } if (cbc != null && cbc.checkpointId() == checkpointId) { @@ -226,17 +231,19 @@ public class BarrierTracker implements CheckpointBarrierHandler { pendingCheckpoints.removeFirst(); } } - else { + else if (checkpointId > latestPendingCheckpointID) { notifyAbort(checkpointId); - // first barrier for this checkpoint - remember it as aborted - // since we polled away all entries with lower checkpoint IDs - // this entry will become the new first entry - if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) { - CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); - abortedMarker.markAborted(); - pendingCheckpoints.addFirst(abortedMarker); - } + latestPendingCheckpointID = checkpointId; + + CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); + abortedMarker.markAborted(); + pendingCheckpoints.addFirst(abortedMarker); + + // we have removed all other pending checkpoint barrier counts --> no need to check that + // we don't exceed the maximum checkpoints to track + } else { + // trailing cancellation barrier which was already cancelled } } http://git-wip-us.apache.org/repos/asf/flink/blob/d3f19a5b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index 7ae144d..0d9e6ac 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -36,6 +36,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * Tests for the behavior of the barrier tracker. @@ -426,6 +431,38 @@ public class BarrierTrackerTest { assertTrue(tracker.isEmpty()); } + + /** + * Tests that each checkpoint is only aborted once in case of an interleaved cancellation + * barrier arrival of two consecutive checkpoints. + */ + @Test + public void testInterleavedCancellationBarriers() throws Exception { + BufferOrEvent[] sequence = { + createBarrier(1L, 0), + createCancellationBarrier(2L, 0), + createCancellationBarrier(1L, 1), + createCancellationBarrier(2L, 1), + createCancellationBarrier(1L, 2), + createCancellationBarrier(2L, 2), + createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierTracker tracker = new BarrierTracker(gate); + StatefulTask statefulTask = mock(StatefulTask.class); + + tracker.registerCheckpointEventHandler(statefulTask); + + for (BufferOrEvent boe : sequence) { + if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) { + assertEquals(boe, tracker.getNextNonBlocked()); + } + } + + verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(1L), any(Throwable.class)); + verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(2L), any(Throwable.class)); + } // ------------------------------------------------------------------------ // Utils
