This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 62927c90c5a8aa79dfc68b6bb4f05f4aeebcd2b4 Author: Piotr Nowojski <[email protected]> AuthorDate: Sat Jun 15 10:50:24 2019 +0200 [hotfix][network] Do not abort the same checkpoint barrier twice when cancellation marker was lost --- .../flink/streaming/runtime/io/BarrierBuffer.java | 12 +-- .../runtime/io/BarrierBufferTestBase.java | 19 +++++ .../streaming/runtime/io/BarrierTrackerTest.java | 55 ------------- .../runtime/io/CheckpointSequenceValidator.java | 90 ++++++++++++++++++++++ 4 files changed, 112 insertions(+), 64 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 23717f5..0f8fa40 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -330,12 +330,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler { startOfAlignmentTimestamp = 0L; latestAlignmentDurationNanos = 0L; - notifyAbort(currentCheckpointId, - new CheckpointException( - "Barrier id: " + barrierId, - CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED - )); - notifyAbortOnCancellationBarrier(barrierId); } @@ -380,11 +374,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception { if (toNotifyOnCheckpoint != null) { CheckpointMetaData checkpointMetaData = - new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp()); + new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp()); CheckpointMetrics checkpointMetrics = new CheckpointMetrics() - .setBytesBufferedInAlignment(bufferStorage.currentBufferedSize()) - .setAlignmentDurationNanos(latestAlignmentDurationNanos); + .setBytesBufferedInAlignment(bufferStorage.currentBufferedSize()) + .setAlignmentDurationNanos(latestAlignmentDurationNanos); toNotifyOnCheckpoint.triggerCheckpointOnBarrier( checkpointMetaData, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java index 3b4f65f..e6e48a8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java @@ -689,6 +689,25 @@ public abstract class BarrierBufferTestBase { } @Test + public void testMissingCancellationBarriers() throws Exception { + BufferOrEvent[] sequence = { + createBarrier(1L, 0), + createCancellationBarrier(2L, 0), + createCancellationBarrier(3L, 0), + createCancellationBarrier(3L, 1), + createBuffer(0) + }; + AbstractInvokable validator = new CheckpointSequenceValidator(-3); + buffer = createBarrierBuffer(2, sequence, validator); + + for (BufferOrEvent boe : sequence) { + if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) { + assertEquals(boe, buffer.pollNext().get()); + } + } + } + + @Test public void testEarlyCleanup() throws Exception { BufferOrEvent[] sequence = { createBuffer(0), createBuffer(1), createBuffer(2), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index cb58837..1be2aab 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -19,8 +19,6 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -28,7 +26,6 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.junit.After; import org.junit.Test; @@ -40,7 +37,6 @@ import java.util.Arrays; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -404,55 +400,4 @@ public class BarrierTrackerTest { // Testing Mocks // ------------------------------------------------------------------------ - private static class CheckpointSequenceValidator extends AbstractInvokable { - - private final long[] checkpointIDs; - - private int i = 0; - - private CheckpointSequenceValidator(long... checkpointIDs) { - super(new DummyEnvironment("test", 1, 0)); - this.checkpointIDs = checkpointIDs; - } - - @Override - public void invoke() { - throw new UnsupportedOperationException("should never be called"); - } - - @Override - public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception { - throw new UnsupportedOperationException("should never be called"); - } - - @Override - public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { - assertTrue("More checkpoints than expected", i < checkpointIDs.length); - - final long expectedId = checkpointIDs[i++]; - if (expectedId >= 0) { - assertEquals("wrong checkpoint id", expectedId, checkpointMetaData.getCheckpointId()); - assertTrue(checkpointMetaData.getTimestamp() > 0); - } else { - fail("got 'triggerCheckpointOnBarrier()' when expecting an 'abortCheckpointOnBarrier()'"); - } - } - - @Override - public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) { - assertTrue("More checkpoints than expected", i < checkpointIDs.length); - - final long expectedId = checkpointIDs[i++]; - if (expectedId < 0) { - assertEquals("wrong checkpoint id for checkpoint abort", -expectedId, checkpointId); - } else { - fail("got 'abortCheckpointOnBarrier()' when expecting an 'triggerCheckpointOnBarrier()'"); - } - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - throw new UnsupportedOperationException("should never be called"); - } - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java new file mode 100644 index 0000000..83b5364 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * {@link AbstractInvokable} that validates expected order of completed and aborted checkpoints. + */ +class CheckpointSequenceValidator extends AbstractInvokable { + + private final long[] checkpointIDs; + + private int i = 0; + + CheckpointSequenceValidator(long... checkpointIDs) { + super(new DummyEnvironment("test", 1, 0)); + this.checkpointIDs = checkpointIDs; + } + + @Override + public void invoke() { + throw new UnsupportedOperationException("should never be called"); + } + + @Override + public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception { + throw new UnsupportedOperationException("should never be called"); + } + + @Override + public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { + assertTrue("Unexpected triggerCheckpointOnBarrier(" + checkpointMetaData.getCheckpointId() + ")", i < checkpointIDs.length); + + final long expectedId = checkpointIDs[i++]; + if (expectedId >= 0) { + assertEquals("wrong checkpoint id", expectedId, checkpointMetaData.getCheckpointId()); + assertTrue(checkpointMetaData.getTimestamp() > 0); + } else { + fail(String.format( + "got 'triggerCheckpointOnBarrier(%d)' when expecting an 'abortCheckpointOnBarrier(%d)'", + checkpointMetaData.getCheckpointId(), + expectedId)); + } + } + + @Override + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) { + assertTrue("Unexpected abortCheckpointOnBarrier(" + checkpointId + ")", i < checkpointIDs.length); + + final long expectedId = checkpointIDs[i++]; + if (expectedId < 0) { + assertEquals("wrong checkpoint id for checkpoint abort", -expectedId, checkpointId); + } else { + fail(String.format( + "got 'abortCheckpointOnBarrier(%d)' when expecting an 'triggerCheckpointOnBarrier(%d)'", + checkpointId, + expectedId)); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + throw new UnsupportedOperationException("should never be called"); + } +}
