Sxnan commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1495348233
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -780,6 +780,9 @@ private void onTaskExecutionStateUpdate(
// only notifies FINISHED and FAILED states which are needed at the
moment.
// can be refined in FLINK-14233 after the actions are factored out
from ExecutionGraph.
switch (taskExecutionState.getExecutionState()) {
+ case RUNNING:
Review Comment:
We should update the comment above.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -2333,6 +2334,12 @@ void testOutputOnlyAfterEndOfStream() {
assertThat(nodeMap.get("map").isOutputOnlyAfterEndOfStream()).isTrue();
assertThat(nodeMap.get("sink:
Writer").isOutputOnlyAfterEndOfStream()).isFalse();
+ assertThat(nodeMap.get("Source:
source").getManagedMemoryOperatorScopeUseCaseWeights())
Review Comment:
We may want to test `TwoInputTransformation` and
`AbstractInputTransformation` as well.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -516,7 +516,7 @@ public void failJobDueToTaskFailure(
if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
// the periodic checkpoint scheduler is activated and deactivated
as a result of
// job status changes (running -> on, all other states -> off)
-
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
+
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator(tasks));
Review Comment:
We should update the comment above so that it is accurate after the change.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -2309,6 +2309,7 @@ void testOutputFormatSupportConcurrentExecutionAttempts()
{
void testOutputOnlyAfterEndOfStream() {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new
Configuration());
+ env.disableOperatorChaining();
Review Comment:
It's better to also test the case where operator chaining is enabled.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java:
##########
@@ -38,9 +38,12 @@
*/
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
Review Comment:
IIUC, we don't need to update the `serialVersionUID`, as adding a field is
not incompatible, according to the
[doc](https://docs.oracle.com/javase/7/docs/platform/serialization/spec/version.html#6678).
We just need to return NeverTrigger in the `getDefaultTrigger` method if
defaultTrigger is null so that the behavior is backward compatible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]