pnowojski commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1112208685
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -949,7 +952,9 @@ public final void cleanUp(Throwable throwable) throws
Exception {
// disabled the interruptions or not.
getCompletionFuture().exceptionally(unused -> null).join();
// clean up everything we initialized
- isRunning = false;
+ if (!isCanceled() && !isFailing()) {
Review Comment:
I would be cautious about any changes to the semantic here. Have you
considered that in this PR you are flattening the matrix of valid combinations
into the state machine that @akalash pointed out? For example before
`cancelled` could be set `true` while `isRunning` was also `true`. Now that's
no longer possible, and wouldn't such change in semantic cause some
side-effects?
Maybe it would be good to actually define the state machine that @akalash
proposed, but then also to go through all relevant getters
(`isFailing()`/`isRunning()`/`isXXX()`), and check how are they being used and
if they still make sense/are valid.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -772,15 +773,15 @@ private CompletableFuture<Void> restoreGates() throws
Exception {
}
private void ensureNotCanceled() {
- if (canceled) {
+ if (isCanceled()) {
throw new CancelTaskException();
}
}
@Override
public final void invoke() throws Exception {
// Allow invoking method 'invoke' without having to call 'restore'
before it.
- if (!isRunning) {
+ if (!isRunning()) {
Review Comment:
wouldn't `if (taskState == INITIALIZED)` be the more correct equivalent here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]