AHeise commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r435861937
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -512,6 +512,123 @@ public void
testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except
}
}
+ /**
+ * Tests {@link
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+ * abort the current pending checkpoint triggered by
+ * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier,
InputChannelInfo)}.
+ */
+ @Test
+ public void testProcessCancellationBarrierWitchPendingCheckpoint()
throws Exception {
Review comment:
How about naming it
`testProcessCancellationBarrierAfterNotifyBarrierReceived` and the other test
`testProcessCancellationBarrierAfterProcessBarrier`?
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -512,6 +512,123 @@ public void
testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except
}
}
+ /**
+ * Tests {@link
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+ * abort the current pending checkpoint triggered by
+ * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier,
InputChannelInfo)}.
+ */
+ @Test
+ public void testProcessCancellationBarrierWitchPendingCheckpoint()
throws Exception {
+ final long checkpointId = 0L;
Review comment:
nit: extract `DEFAULT_CHECKPOINT_ID`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -379,12 +376,38 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized boolean onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (numBarriersReceived > 0) {
+ resetReceivedBarriers();
+ notifyAbort(new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ return true;
+ }
+ return false;
+ }
+
+ synchronized boolean setCancelledCheckpointId(long
canceledCheckpointId) {
+ boolean shouldAbort = false;
+ if (canceledCheckpointId > currentReceivedCheckpointId)
{
+ currentReceivedCheckpointId =
canceledCheckpointId;
+ shouldAbort = true;
+
Review comment:
nit: weird newline
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -379,12 +376,38 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized boolean onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (numBarriersReceived > 0) {
+ resetReceivedBarriers();
+ notifyAbort(new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ return true;
+ }
+ return false;
+ }
+
+ synchronized boolean setCancelledCheckpointId(long
canceledCheckpointId) {
+ boolean shouldAbort = false;
Review comment:
nit: shouldAbort can be inlined for improved readability (in my eyes).
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -512,6 +512,123 @@ public void
testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except
}
}
+ /**
+ * Tests {@link
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+ * abort the current pending checkpoint triggered by
+ * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier,
InputChannelInfo)}.
+ */
+ @Test
+ public void testProcessCancellationBarrierWitchPendingCheckpoint()
throws Exception {
+ final long checkpointId = 0L;
+ final ValidatingCheckpointInvokable invokable = new
ValidatingCheckpointInvokable();
+ final CheckpointBarrierUnaligner handler = new
CheckpointBarrierUnaligner(
+ new int[] { 1 }, ChannelStateWriter.NO_OP, "test",
invokable);
+
+ ThreadSafeUnaligner unaligner =
handler.getThreadSafeUnaligner();
+ // should trigger respective checkpoint
+
unaligner.notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new
InputChannelInfo(0, 0));
+
+ assertFalse(handler.isCheckpointPending());
+ assertTrue(unaligner.isCheckpointPending());
+ assertEquals(-1L, handler.getLatestCheckpointId());
+ assertEquals(checkpointId, unaligner.getCurrentCheckpointId());
+
+ testProcessCancellationBarrier(handler, invokable,
checkpointId);
+ }
+
+ /**
+ * Tests {@link
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+ * abort the current pending checkpoint triggered by
+ * {@link CheckpointBarrierUnaligner#processBarrier(CheckpointBarrier,
int)}.
+ */
+ @Test
+ public void testProcessCancellationBarrierWitchPendingCheckpoint2()
throws Exception {
+ final long checkpointId = 0L;
+ final ValidatingCheckpointInvokable invokable = new
ValidatingCheckpointInvokable();
+ final CheckpointBarrierUnaligner handler = new
CheckpointBarrierUnaligner(
+ new int[] { 1 }, ChannelStateWriter.NO_OP, "test",
invokable);
+
+ // should trigger respective checkpoint
+ handler.processBarrier(buildCheckpointBarrier(checkpointId), 0);
+
+ assertTrue(handler.isCheckpointPending());
+
assertTrue(handler.getThreadSafeUnaligner().isCheckpointPending());
+ assertEquals(checkpointId, handler.getLatestCheckpointId());
+ assertEquals(checkpointId,
handler.getThreadSafeUnaligner().getCurrentCheckpointId());
+
+ testProcessCancellationBarrier(handler, invokable,
checkpointId);
+ }
+
+ @Test
+ public void
testProcessCancellationBarrierBeforeProcessAndReceiveBarrier() throws Exception
{
+ final long checkpointId = 0L;
+ final ValidatingCheckpointInvokable invokable = new
ValidatingCheckpointInvokable();
+ final CheckpointBarrierUnaligner handler = new
CheckpointBarrierUnaligner(
+ new int[] { 1 }, ChannelStateWriter.NO_OP, "test",
invokable);
+
+ handler.processCancellationBarrier(new
CancelCheckpointMarker(checkpointId));
+
+ verifyTriggeredCheckpoint(handler, invokable, checkpointId);
+
+ // it would not trigger checkpoint since the respective
cancellation barrier already happened before
+ handler.processBarrier(buildCheckpointBarrier(checkpointId), 0);
+
handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(checkpointId),
new InputChannelInfo(0, 0));
+
+ verifyTriggeredCheckpoint(handler, invokable, checkpointId);
+ }
+
+ private void testProcessCancellationBarrier(
+ CheckpointBarrierUnaligner handler,
+ ValidatingCheckpointInvokable invokable,
+ long currentCheckpointId) throws Exception {
+
+ // should abort current checkpoint while processing
CancelCheckpointMarker
+ handler.processCancellationBarrier(new
CancelCheckpointMarker(currentCheckpointId));
+ verifyTriggeredCheckpoint(handler, invokable,
currentCheckpointId);
+
+ final long canceledCheckpointId = 1L;
+ // should update current checkpoint id and abort notification
while processing CancelCheckpointMarker
+ handler.processCancellationBarrier(new
CancelCheckpointMarker(canceledCheckpointId));
+ verifyTriggeredCheckpoint(handler, invokable,
canceledCheckpointId);
Review comment:
Would it make sense to extract these assertions in separate test cases?
There might be a bias in the implementation, when the checkpoint has already
been canceled. It would only add two test cases afaik.
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -512,6 +512,123 @@ public void
testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except
}
}
+ /**
+ * Tests {@link
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+ * abort the current pending checkpoint triggered by
+ * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier,
InputChannelInfo)}.
+ */
+ @Test
+ public void testProcessCancellationBarrierWitchPendingCheckpoint()
throws Exception {
Review comment:
typo: witch -> with
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]