zhuzhurk commented on code in PR #21111:
URL: https://github.com/apache/flink/pull/21111#discussion_r1011461724


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -149,13 +159,45 @@ protected void startSchedulingInternal() {
         super.startSchedulingInternal();
     }
 
+    public boolean updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+        updateResultPartitionBytesMetrics(taskExecutionState.getIOMetrics());

Review Comment:
   Looks to me it's not right because the data produced by an execution vertex 
can change if restarted. For example, 
   1. a restarted source may return all the previous source splits and consume 
different splits.
   2. if a result is re-produced differently, its restarted consumers may 
produce different results.
   
   Besides that, I think there's a problem with fix for my above question `#2`. 
Because `taskExecutionState.getExecutionState() == ExecutionState.FINISHED` 
does not mean the task truly transitions to `FINISHED` at JM side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to