pnowojski commented on a change in pull request #12186:
URL: https://github.com/apache/flink/pull/12186#discussion_r426133280
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -269,16 +284,18 @@ private void takeSnapshotSync(
try {
for (StreamOperatorWrapper<?, ?> operatorWrapper :
operatorChain.getAllOperators(true)) {
- operatorSnapshotsInProgress.put(
-
operatorWrapper.getStreamOperator().getOperatorID(),
- buildOperatorSnapshotFutures(
- checkpointMetaData,
- checkpointOptions,
- operatorChain,
-
operatorWrapper.getStreamOperator(),
- isCanceled,
- channelStateWriteResult,
- storage));
+ if (!operatorWrapper.isClosed()) {
Review comment:
isn't this a duplicated call, now with the same check above?
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -934,6 +936,62 @@ public void testOperatorClosingBeforeStopRunning() throws
Throwable {
}
}
+ /**
+ * Tests that {@link StreamTask#notifyCheckpointCompleteAsync(long)} is
not relayed to closed operators.
+ *
+ * <p>See FLINK-16383.
+ */
+ @Test
+ public void testNotifyCheckpointOnClosedOperator() throws Throwable {
+ ClosingOperator operator = new ClosingOperator();
+ MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
+ new
MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new,
BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ StreamTaskMailboxTestHarness<Integer> harness = builder
+ .setupOutputForSingletonOperatorChain(operator)
+ .build();
+ // keeps the mailbox from suspending
+ harness.setAutoProcess(false);
+ harness.processElement(new StreamRecord<>(1));
+
+ harness.streamTask.notifyCheckpointCompleteAsync(1);
+ harness.streamTask.runMailboxStep();
+ assertEquals(1, operator.notified.get());
+ assertEquals(false, operator.closed.get());
+
+ // close operators directly, so that task is still fully running
+
harness.streamTask.operatorChain.closeOperators(harness.streamTask.getActionExecutor());
+ harness.streamTask.notifyCheckpointCompleteAsync(2);
+ harness.streamTask.runMailboxStep();
+ assertEquals(1, operator.notified.get());
+ assertEquals(true, operator.closed.get());
+ }
+
+ /**
+ * Tests that checkpoints are declined if operators are (partially)
closed.
+ *
+ * <p>See FLINK-16383.
+ */
+ @Test
+ public void testCheckpointDeclinedOnClosedOperator() throws Throwable {
+ ClosingOperator operator = new ClosingOperator();
+ MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
+ new
MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new,
BasicTypeInfo.INT_TYPE_INFO)
Review comment:
oO I haven't tested it with `OneInputStreamTask::new` :) Good to know
that it's working.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -262,7 +262,9 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws Exception {
// go forward through the operator chain and tell each operator
// to prepare the checkpoint
for (StreamOperatorWrapper<?, ?> operatorWrapper :
getAllOperators()) {
-
operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
+ if (!operatorWrapper.isClosed()) {
+
operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
+ }
Review comment:
Maybe it would be better push the `isClosed()` checks into the wrapper
class? And instead of:
```
if (!operatorWrapper.isClosed()) {
operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
}
```
have here (and in other places):
```
operatorWrapper.prepareSnapshotPreBarrier(checkpointId);
```
?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]