Repository: flink Updated Branches: refs/heads/release-1.1 2b612f2d8 -> afaa27e9f
[FLINK-5285] Abort checkpoint only once in BarrierTracker Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints to trigger a flood of cancellation markers for down stream operators. This is done by aborting each checkpoint only once and don't re-create checkpoint barrier counts for already aborted checkpoints. Add test case This closes #2964. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afaa27e9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afaa27e9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afaa27e9 Branch: refs/heads/release-1.1 Commit: afaa27e9faeb0352a49f30de90e719572caa97c5 Parents: 2b612f2 Author: Till Rohrmann <[email protected]> Authored: Wed Dec 7 19:05:47 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Dec 9 14:37:31 2016 +0100 ---------------------------------------------------------------------- .../streaming/runtime/io/BarrierBuffer.java | 5 ++- .../streaming/runtime/io/BarrierTracker.java | 25 ++++++++----- .../runtime/io/BarrierTrackerTest.java | 37 ++++++++++++++++++++ 3 files changed, 57 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 5a60439..b71b564 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -312,6 +312,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler { currentCheckpointId = barrierId; startOfAlignmentTimestamp = 0L; latestAlignmentDurationNanos = 0L; + + notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); + notifyAbortOnCancellationBarrier(barrierId); } @@ -415,7 +418,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { startOfAlignmentTimestamp = System.nanoTime(); if (LOG.isDebugEnabled()) { - LOG.debug("Starting stream alignment for checkpoint " + checkpointId); + LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.'); } } http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java index 59b408d..fbe3042 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java @@ -210,6 +210,11 @@ public class BarrierTracker implements CheckpointBarrierHandler { CheckpointBarrierCount cbc; while ((cbc = pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) { pendingCheckpoints.removeFirst(); + + if (cbc.markAborted()) { + // abort the subsumed checkpoints if not already done + notifyAbort(cbc.checkpointId()); + } } if (cbc != null && cbc.checkpointId() == checkpointId) { @@ -225,17 +230,19 @@ public class BarrierTracker implements CheckpointBarrierHandler { pendingCheckpoints.removeFirst(); } } - else { + else if (checkpointId > latestPendingCheckpointID) { notifyAbort(checkpointId); - // first barrier for this checkpoint - remember it as aborted - // since we polled away all entries with lower checkpoint IDs - // this entry will become the new first entry - if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) { - CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); - abortedMarker.markAborted(); - pendingCheckpoints.addFirst(abortedMarker); - } + latestPendingCheckpointID = checkpointId; + + CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); + abortedMarker.markAborted(); + pendingCheckpoints.addFirst(abortedMarker); + + // we have removed all other pending checkpoint barrier counts --> no need to check that + // we don't exceed the maximum checkpoints to track + } else { + // trailing cancellation barrier which was already cancelled } } http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index 978c212..729afe8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -35,6 +35,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * Tests for the behavior of the barrier tracker. @@ -425,6 +430,38 @@ public class BarrierTrackerTest { assertTrue(tracker.isEmpty()); } + + /** + * Tests that each checkpoint is only aborted once in case of an interleaved cancellation + * barrier arrival of two consecutive checkpoints. + */ + @Test + public void testInterleavedCancellationBarriers() throws Exception { + BufferOrEvent[] sequence = { + createBarrier(1L, 0), + createCancellationBarrier(2L, 0), + createCancellationBarrier(1L, 1), + createCancellationBarrier(2L, 1), + createCancellationBarrier(1L, 2), + createCancellationBarrier(2L, 2), + createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierTracker tracker = new BarrierTracker(gate); + StatefulTask statefulTask = mock(StatefulTask.class); + + tracker.registerCheckpointEventHandler(statefulTask); + + for (BufferOrEvent boe : sequence) { + if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) { + assertEquals(boe, tracker.getNextNonBlocked()); + } + } + + verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(1L), any(Throwable.class)); + verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(2L), any(Throwable.class)); + } // ------------------------------------------------------------------------ // Utils
