1996fanrui commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1114369247
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -256,19 +259,27 @@
private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
/**
- * Flag to mark the task "in operation", in which case check needs to be
initialized to true, so
- * that early cancel() before invoke() behaves correctly.
+ * INITIALIZED: task constructor was called or on init state. RUNNING:
task is in operation.
+ * FAILING: task is failing e.g., if an exception has occurred inside
{@link #invoke()}.
+ * CANCELED: when this task is canceled. FINISHED: task successfully
terminated.
*/
- private volatile boolean isRunning;
+ private enum TaskState {
+ FINISHED(Arrays.asList(new TaskState[] {})),
+ CANCELED(Arrays.asList(new TaskState[] {})),
+ RUNNING(Arrays.asList(new TaskState[] {CANCELED, FINISHED})),
+ INITIALIZED(Arrays.asList(new TaskState[] {RUNNING, CANCELED,
FINISHED}));
- /** Flag to mark this task as canceled. */
- private volatile boolean canceled;
+ private List<TaskState> validFromStates;
+ private boolean failing = false;
Review Comment:
Why add the `failing` inside the `TaskState`?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -256,19 +259,27 @@
private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
/**
- * Flag to mark the task "in operation", in which case check needs to be
initialized to true, so
- * that early cancel() before invoke() behaves correctly.
+ * INITIALIZED: task constructor was called or on init state. RUNNING:
task is in operation.
+ * FAILING: task is failing e.g., if an exception has occurred inside
{@link #invoke()}.
+ * CANCELED: when this task is canceled. FINISHED: task successfully
terminated.
Review Comment:
How about drawing a state transition diagram? Similar to ExecutionState's
annotation[1], it will be clearer.
[1]
https://github.com/apache/flink/blob/2e91543836d667a0b367688bb5ce290c3164479c/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java#L26
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -256,19 +259,27 @@
private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
/**
- * Flag to mark the task "in operation", in which case check needs to be
initialized to true, so
- * that early cancel() before invoke() behaves correctly.
+ * INITIALIZED: task constructor was called or on init state. RUNNING:
task is in operation.
+ * FAILING: task is failing e.g., if an exception has occurred inside
{@link #invoke()}.
+ * CANCELED: when this task is canceled. FINISHED: task successfully
terminated.
*/
- private volatile boolean isRunning;
+ private enum TaskState {
+ FINISHED(Arrays.asList(new TaskState[] {})),
+ CANCELED(Arrays.asList(new TaskState[] {})),
+ RUNNING(Arrays.asList(new TaskState[] {CANCELED, FINISHED})),
+ INITIALIZED(Arrays.asList(new TaskState[] {RUNNING, CANCELED,
FINISHED}));
- /** Flag to mark this task as canceled. */
- private volatile boolean canceled;
+ private List<TaskState> validFromStates;
+ private boolean failing = false;
- /**
- * Flag to mark this task as failing, i.e. if an exception has occurred
inside {@link
- * #invoke()}.
- */
- private volatile boolean failing;
+ TaskState(List<TaskState> validFromStates) {
+ this.validFromStates = validFromStates;
+ }
+
+ public boolean isValidTransition(TaskState fromState, TaskState
toState) {
Review Comment:
The method isn't called, right?
As I understand, it should be called when transition state, right?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -256,19 +259,27 @@
private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
/**
- * Flag to mark the task "in operation", in which case check needs to be
initialized to true, so
- * that early cancel() before invoke() behaves correctly.
+ * INITIALIZED: task constructor was called or on init state. RUNNING:
task is in operation.
Review Comment:
How about moving the description of the state to the corresponding state
field? It's similar to `JobStatus`[1] or `ExecutionState`[2].
[1]
https://github.com/apache/flink/blob/2e91543836d667a0b367688bb5ce290c3164479c/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java#L26
[2]
https://github.com/apache/flink/blob/2e91543836d667a0b367688bb5ce290c3164479c/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java#L57
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -256,19 +259,27 @@
private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
/**
- * Flag to mark the task "in operation", in which case check needs to be
initialized to true, so
- * that early cancel() before invoke() behaves correctly.
+ * INITIALIZED: task constructor was called or on init state. RUNNING:
task is in operation.
+ * FAILING: task is failing e.g., if an exception has occurred inside
{@link #invoke()}.
+ * CANCELED: when this task is canceled. FINISHED: task successfully
terminated.
*/
- private volatile boolean isRunning;
+ private enum TaskState {
+ FINISHED(Arrays.asList(new TaskState[] {})),
+ CANCELED(Arrays.asList(new TaskState[] {})),
+ RUNNING(Arrays.asList(new TaskState[] {CANCELED, FINISHED})),
+ INITIALIZED(Arrays.asList(new TaskState[] {RUNNING, CANCELED,
FINISHED}));
- /** Flag to mark this task as canceled. */
- private volatile boolean canceled;
+ private List<TaskState> validFromStates;
Review Comment:
How about using `Set` instead of `List` due to we always call
`validFromStates.contains()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]