This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 62927c90c5a8aa79dfc68b6bb4f05f4aeebcd2b4
Author: Piotr Nowojski <[email protected]>
AuthorDate: Sat Jun 15 10:50:24 2019 +0200

    [hotfix][network] Do not abort the same checkpoint barrier twice when 
cancellation marker was lost
---
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 12 +--
 .../runtime/io/BarrierBufferTestBase.java          | 19 +++++
 .../streaming/runtime/io/BarrierTrackerTest.java   | 55 -------------
 .../runtime/io/CheckpointSequenceValidator.java    | 90 ++++++++++++++++++++++
 4 files changed, 112 insertions(+), 64 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 23717f5..0f8fa40 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -330,12 +330,6 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                startOfAlignmentTimestamp = 0L;
                                latestAlignmentDurationNanos = 0L;
 
-                               notifyAbort(currentCheckpointId,
-                                       new CheckpointException(
-                                               "Barrier id: " + barrierId,
-                                               
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED
-                                       ));
-
                                notifyAbortOnCancellationBarrier(barrierId);
                        }
 
@@ -380,11 +374,11 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) 
throws Exception {
                if (toNotifyOnCheckpoint != null) {
                        CheckpointMetaData checkpointMetaData =
-                                       new 
CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+                               new 
CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
 
                        CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
-                                       
.setBytesBufferedInAlignment(bufferStorage.currentBufferedSize())
-                                       
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
+                               
.setBytesBufferedInAlignment(bufferStorage.currentBufferedSize())
+                               
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
 
                        toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
                                checkpointMetaData,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index 3b4f65f..e6e48a8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -689,6 +689,25 @@ public abstract class BarrierBufferTestBase {
        }
 
        @Test
+       public void testMissingCancellationBarriers() throws Exception {
+               BufferOrEvent[] sequence = {
+                       createBarrier(1L, 0),
+                       createCancellationBarrier(2L, 0),
+                       createCancellationBarrier(3L, 0),
+                       createCancellationBarrier(3L, 1),
+                       createBuffer(0)
+               };
+               AbstractInvokable validator = new 
CheckpointSequenceValidator(-3);
+               buffer = createBarrierBuffer(2, sequence, validator);
+
+               for (BufferOrEvent boe : sequence) {
+                       if (boe.isBuffer() || (boe.getEvent().getClass() != 
CheckpointBarrier.class && boe.getEvent().getClass() != 
CancelCheckpointMarker.class)) {
+                               assertEquals(boe, buffer.pollNext().get());
+                       }
+               }
+       }
+
+       @Test
        public void testEarlyCleanup() throws Exception {
                BufferOrEvent[] sequence = {
                        createBuffer(0), createBuffer(1), createBuffer(2),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index cb58837..1be2aab 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,8 +19,6 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -28,7 +26,6 @@ import 
org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 
 import org.junit.After;
 import org.junit.Test;
@@ -40,7 +37,6 @@ import java.util.Arrays;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -404,55 +400,4 @@ public class BarrierTrackerTest {
        //  Testing Mocks
        // 
------------------------------------------------------------------------
 
-       private static class CheckpointSequenceValidator extends 
AbstractInvokable {
-
-               private final long[] checkpointIDs;
-
-               private int i = 0;
-
-               private CheckpointSequenceValidator(long... checkpointIDs) {
-                       super(new DummyEnvironment("test", 1, 0));
-                       this.checkpointIDs = checkpointIDs;
-               }
-
-               @Override
-               public void invoke() {
-                       throw new UnsupportedOperationException("should never 
be called");
-               }
-
-               @Override
-               public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, boolean 
advanceToEndOfEventTime) throws Exception {
-                       throw new UnsupportedOperationException("should never 
be called");
-               }
-
-               @Override
-               public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-                       assertTrue("More checkpoints than expected", i < 
checkpointIDs.length);
-
-                       final long expectedId = checkpointIDs[i++];
-                       if (expectedId >= 0) {
-                               assertEquals("wrong checkpoint id", expectedId, 
checkpointMetaData.getCheckpointId());
-                               assertTrue(checkpointMetaData.getTimestamp() > 
0);
-                       } else {
-                               fail("got 'triggerCheckpointOnBarrier()' when 
expecting an 'abortCheckpointOnBarrier()'");
-                       }
-               }
-
-               @Override
-               public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
-                       assertTrue("More checkpoints than expected", i < 
checkpointIDs.length);
-
-                       final long expectedId = checkpointIDs[i++];
-                       if (expectedId < 0) {
-                               assertEquals("wrong checkpoint id for 
checkpoint abort", -expectedId, checkpointId);
-                       } else {
-                               fail("got 'abortCheckpointOnBarrier()' when 
expecting an 'triggerCheckpointOnBarrier()'");
-                       }
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       throw new UnsupportedOperationException("should never 
be called");
-               }
-       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
new file mode 100644
index 0000000..83b5364
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointSequenceValidator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@link AbstractInvokable} that validates expected order of completed and 
aborted checkpoints.
+ */
+class CheckpointSequenceValidator extends AbstractInvokable {
+
+       private final long[] checkpointIDs;
+
+       private int i = 0;
+
+       CheckpointSequenceValidator(long... checkpointIDs) {
+               super(new DummyEnvironment("test", 1, 0));
+               this.checkpointIDs = checkpointIDs;
+       }
+
+       @Override
+       public void invoke() {
+               throw new UnsupportedOperationException("should never be 
called");
+       }
+
+       @Override
+       public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, 
CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws 
Exception {
+               throw new UnsupportedOperationException("should never be 
called");
+       }
+
+       @Override
+       public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
+               assertTrue("Unexpected triggerCheckpointOnBarrier(" + 
checkpointMetaData.getCheckpointId() + ")", i < checkpointIDs.length);
+
+               final long expectedId = checkpointIDs[i++];
+               if (expectedId >= 0) {
+                       assertEquals("wrong checkpoint id", expectedId, 
checkpointMetaData.getCheckpointId());
+                       assertTrue(checkpointMetaData.getTimestamp() > 0);
+               } else {
+                       fail(String.format(
+                               "got 'triggerCheckpointOnBarrier(%d)' when 
expecting an 'abortCheckpointOnBarrier(%d)'",
+                               checkpointMetaData.getCheckpointId(),
+                               expectedId));
+               }
+       }
+
+       @Override
+       public void abortCheckpointOnBarrier(long checkpointId, Throwable 
cause) {
+               assertTrue("Unexpected abortCheckpointOnBarrier(" + 
checkpointId + ")", i < checkpointIDs.length);
+
+               final long expectedId = checkpointIDs[i++];
+               if (expectedId < 0) {
+                       assertEquals("wrong checkpoint id for checkpoint 
abort", -expectedId, checkpointId);
+               } else {
+                       fail(String.format(
+                               "got 'abortCheckpointOnBarrier(%d)' when 
expecting an 'triggerCheckpointOnBarrier(%d)'",
+                               checkpointId,
+                               expectedId));
+               }
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               throw new UnsupportedOperationException("should never be 
called");
+       }
+}

Reply via email to