wanglijie95 commented on code in PR #21111:
URL: https://github.com/apache/flink/pull/21111#discussion_r1005242107


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -149,13 +159,45 @@ protected void startSchedulingInternal() {
         super.startSchedulingInternal();
     }
 
+    public boolean updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+        updateResultPartitionBytesMetrics(taskExecutionState.getIOMetrics());

Review Comment:
   1, 2:Fixed.
   3:This is indeed a problem. We can't clear the metric of a partition when it 
is reset, because for `ALL_TO_ALL` results, we only store the aggregated value. 
Given that, I think we should only record the metric on the first finish, and 
let it as the final value. @zhuzhurk  WDYT ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java:
##########
@@ -18,63 +18,53 @@
 
 package org.apache.flink.runtime.scheduler.adaptivebatch;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/** The blocking result info, which will be used to calculate the vertex 
parallelism. */
-public class BlockingResultInfo {
-
-    private final List<Long> blockingPartitionSizes;
-
-    private final boolean isBroadcast;
-
-    private BlockingResultInfo(List<Long> blockingPartitionSizes, boolean 
isBroadcast) {
-        this.blockingPartitionSizes = blockingPartitionSizes;
-        this.isBroadcast = isBroadcast;
-    }
-
-    public List<Long> getBlockingPartitionSizes() {
-        return blockingPartitionSizes;
-    }
-
-    public boolean isBroadcast() {
-        return isBroadcast;
-    }
-
-    @VisibleForTesting
-    static BlockingResultInfo createFromBroadcastResult(List<Long> 
blockingPartitionSizes) {
-        return new BlockingResultInfo(blockingPartitionSizes, true);
-    }
-
-    @VisibleForTesting
-    static BlockingResultInfo createFromNonBroadcastResult(List<Long> 
blockingPartitionSizes) {
-        return new BlockingResultInfo(blockingPartitionSizes, false);
-    }
-
-    public static BlockingResultInfo createFromIntermediateResult(
-            IntermediateResult intermediateResult) {
-        checkArgument(intermediateResult != null);
-
-        List<Long> blockingPartitionSizes = new ArrayList<>();
-        for (IntermediateResultPartition partition : 
intermediateResult.getPartitions()) {
-            checkState(partition.isConsumable());
-
-            IOMetrics ioMetrics = 
partition.getProducer().getPartitionProducer().getIOMetrics();
-            checkNotNull(ioMetrics, "IOMetrics should not be null.");
-
-            blockingPartitionSizes.add(
-                    
ioMetrics.getNumBytesProducedOfPartitions().get(partition.getPartitionId()));
-        }
-
-        return new BlockingResultInfo(blockingPartitionSizes, 
intermediateResult.isBroadcast());
-    }
+/**
+ * The blocking result info, which will be used to calculate the vertex 
parallelism and input infos.
+ */
+public interface BlockingResultInfo {
+
+    /**
+     * Get the intermediate result id.
+     *
+     * @return the intermediate result id
+     */
+    IntermediateDataSetID getResultId();
+
+    /**
+     * Whether it is a broadcast result.
+     *
+     * @return whether it is a broadcast result
+     */
+    boolean isBroadcast();
+
+    /**
+     * Whether it is a Pointwise result.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to