zhuzhurk commented on code in PR #21111:
URL: https://github.com/apache/flink/pull/21111#discussion_r1003036792
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##########
@@ -693,41 +664,28 @@ private ExecutionGraph createExecutionGraph(Configuration
configuration) throws
.setJobGraph(jobGraph)
.setJobMasterConfig(configuration)
.setBlobWriter(blobWriter)
- .build(EXECUTOR_RESOURCE.getExecutor());
+ .build(EXECUTOR_EXTENSION.getExecutor());
}
- private static final class ExecutionStageMatcher
- extends TypeSafeMatcher<List<ExecutionAttemptID>> {
- private final List<Collection<ExecutionAttemptID>> executionStages;
-
- private ExecutionStageMatcher(List<Collection<ExecutionAttemptID>>
executionStages) {
- this.executionStages = executionStages;
- }
-
- @Override
- protected boolean matchesSafely(List<ExecutionAttemptID>
submissionOrder) {
- final Iterator<ExecutionAttemptID> submissionIterator =
submissionOrder.iterator();
+ private boolean isDeployedInTopologicalOrder(
Review Comment:
can be static
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionBytesCounter.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** This counter will count the data size of a partition. */
+public class ResultPartitionBytesCounter {
+
+ /** The data size of each subpartition. */
+ private final List<Counter> subpartitionBytes;
+
+ public ResultPartitionBytesCounter(int numSubpartitions) {
+ this.subpartitionBytes = new ArrayList<>();
+ for (int i = 0; i < numSubpartitions; ++i) {
+ subpartitionBytes.add(new SimpleCounter());
+ }
+ }
+
+ public void inc(int targetSubpartition, long bytes) {
+ subpartitionBytes.get(targetSubpartition).inc(bytes);
+ }
+
+ public void broadcastInc(long bytes) {
Review Comment:
maybe "incAll()"?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionBytesCounter.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** This counter will count the data size of a partition. */
+public class ResultPartitionBytesCounter {
+
+ /** The data size of each subpartition. */
+ private final List<Counter> subpartitionBytes;
+
+ public ResultPartitionBytesCounter(int numSubpartitions) {
+ this.subpartitionBytes = new ArrayList<>();
+ for (int i = 0; i < numSubpartitions; ++i) {
+ subpartitionBytes.add(new SimpleCounter());
+ }
+ }
+
+ public void inc(int targetSubpartition, long bytes) {
+ subpartitionBytes.get(targetSubpartition).inc(bytes);
+ }
+
+ public void broadcastInc(long bytes) {
+ subpartitionBytes.forEach(counter -> counter.inc(bytes));
+ }
+
+ // public List<Counter> getSubpartitionBytes() {
Review Comment:
should be removed.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -1548,7 +1548,18 @@ private void updateAccumulatorsAndMetrics(
}
}
if (metrics != null) {
- this.ioMetrics = metrics;
+ // The IOMetrics#resultPartitionBytes will not be used anymore, so
we clear it here to
+ // reduce the space usage.
+ this.ioMetrics =
Review Comment:
Here we can use the existing ctor of `IOMetrics` with no `null` parameter.
I would suggest to improve the comments to be "Drop
IOMetrics#resultPartitionBytes because it will not be used anymore. It can
result in very high memory usage when there are many executions and
sub-partitions.".
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java:
##########
@@ -74,7 +101,7 @@ public IOMetrics(
long numRecordsIn,
long numRecordsOut,
long accumulateIdleTime,
- long accumulateBusyTime,
+ double accumulateBusyTime,
Review Comment:
This is an unrelated change and should be in a hotfix. Still I'm not sure
why that only for `accumulateBusyTime` the double type is used?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java:
##########
@@ -42,30 +47,52 @@ public class IOMetrics implements Serializable {
protected double accumulateBusyTime;
protected long accumulateIdleTime;
- protected final Map<IntermediateResultPartitionID, Long>
numBytesProducedOfPartitions =
- new HashMap<>();
+ @Nullable
+ protected Map<IntermediateResultPartitionID, ResultPartitionBytes>
resultPartitionBytes;
public IOMetrics(
Meter recordsIn,
Meter recordsOut,
Meter bytesIn,
Meter bytesOut,
- Map<IntermediateResultPartitionID, Counter>
numBytesProducedCounters,
Gauge<Long> accumulatedBackPressuredTime,
Gauge<Long> accumulatedIdleTime,
- Gauge<Double> accumulatedBusyTime) {
+ Gauge<Double> accumulatedBusyTime,
+ Map<IntermediateResultPartitionID, ResultPartitionBytesCounter>
+ resultPartitionBytesCounters) {
this.numRecordsIn = recordsIn.getCount();
this.numRecordsOut = recordsOut.getCount();
this.numBytesIn = bytesIn.getCount();
this.numBytesOut = bytesOut.getCount();
this.accumulateBackPressuredTime =
accumulatedBackPressuredTime.getValue();
this.accumulateBusyTime = accumulatedBusyTime.getValue();
this.accumulateIdleTime = accumulatedIdleTime.getValue();
+ this.resultPartitionBytes =
+ resultPartitionBytesCounters.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
entry.getValue().createSnapshot()));
+ }
- for (Map.Entry<IntermediateResultPartitionID, Counter> counter :
- numBytesProducedCounters.entrySet()) {
- numBytesProducedOfPartitions.put(counter.getKey(),
counter.getValue().getCount());
- }
+ public IOMetrics(
+ long numBytesIn,
+ long numBytesOut,
+ long numRecordsIn,
+ long numRecordsOut,
+ long accumulateIdleTime,
+ double accumulateBusyTime,
+ long accumulateBackPressuredTime,
+ @Nullable
+ Map<IntermediateResultPartitionID, ResultPartitionBytes>
resultPartitionBytes) {
+ this.numBytesIn = numBytesIn;
+ this.numBytesOut = numBytesOut;
+ this.numRecordsIn = numRecordsIn;
+ this.numRecordsOut = numRecordsOut;
+ this.accumulateIdleTime = accumulateIdleTime;
+ this.accumulateBusyTime = accumulateBusyTime;
+ this.accumulateBackPressuredTime = accumulateBackPressuredTime;
+ this.resultPartitionBytes = resultPartitionBytes;
}
public IOMetrics(
Review Comment:
This ctor can be used in cases that `resultPartitionBytes` is null. And it
should reuse the above ctor for its implementation.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -149,13 +159,45 @@ protected void startSchedulingInternal() {
super.startSchedulingInternal();
}
+ public boolean updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
+ updateResultPartitionBytesMetrics(taskExecutionState.getIOMetrics());
Review Comment:
1. I prefer to not rely on the implicit assumption that `ioMetrics` is not
null iff the task finishes, which might be broken without awareness.
2. Also, maybe we should do this only if the task **truely** transitions to
FINISHED. That means, it a task finishes at TM side but is not admitted at JM
side, we should not record its size.
3. Another question related to question `#2` is that should we clear the
size if the result is reset?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java:
##########
@@ -42,30 +47,52 @@ public class IOMetrics implements Serializable {
protected double accumulateBusyTime;
protected long accumulateIdleTime;
- protected final Map<IntermediateResultPartitionID, Long>
numBytesProducedOfPartitions =
- new HashMap<>();
+ @Nullable
+ protected Map<IntermediateResultPartitionID, ResultPartitionBytes>
resultPartitionBytes;
public IOMetrics(
Meter recordsIn,
Meter recordsOut,
Meter bytesIn,
Meter bytesOut,
- Map<IntermediateResultPartitionID, Counter>
numBytesProducedCounters,
Gauge<Long> accumulatedBackPressuredTime,
Gauge<Long> accumulatedIdleTime,
- Gauge<Double> accumulatedBusyTime) {
+ Gauge<Double> accumulatedBusyTime,
+ Map<IntermediateResultPartitionID, ResultPartitionBytesCounter>
+ resultPartitionBytesCounters) {
this.numRecordsIn = recordsIn.getCount();
this.numRecordsOut = recordsOut.getCount();
this.numBytesIn = bytesIn.getCount();
this.numBytesOut = bytesOut.getCount();
this.accumulateBackPressuredTime =
accumulatedBackPressuredTime.getValue();
this.accumulateBusyTime = accumulatedBusyTime.getValue();
this.accumulateIdleTime = accumulatedIdleTime.getValue();
+ this.resultPartitionBytes =
+ resultPartitionBytesCounters.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
entry.getValue().createSnapshot()));
+ }
- for (Map.Entry<IntermediateResultPartitionID, Counter> counter :
- numBytesProducedCounters.entrySet()) {
- numBytesProducedOfPartitions.put(counter.getKey(),
counter.getValue().getCount());
- }
+ public IOMetrics(
Review Comment:
I think this ctor is for test only and should be marked with
`@VisibleForTesting`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -149,13 +159,45 @@ protected void startSchedulingInternal() {
super.startSchedulingInternal();
}
+ public boolean updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
Review Comment:
`@Override` is missing.
I would also suggest to override
`updateTaskExecutionState(TaskExecutionStateTransition)` instead of the legacy
one.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java:
##########
@@ -18,63 +18,53 @@
package org.apache.flink.runtime.scheduler.adaptivebatch;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/** The blocking result info, which will be used to calculate the vertex
parallelism. */
-public class BlockingResultInfo {
-
- private final List<Long> blockingPartitionSizes;
-
- private final boolean isBroadcast;
-
- private BlockingResultInfo(List<Long> blockingPartitionSizes, boolean
isBroadcast) {
- this.blockingPartitionSizes = blockingPartitionSizes;
- this.isBroadcast = isBroadcast;
- }
-
- public List<Long> getBlockingPartitionSizes() {
- return blockingPartitionSizes;
- }
-
- public boolean isBroadcast() {
- return isBroadcast;
- }
-
- @VisibleForTesting
- static BlockingResultInfo createFromBroadcastResult(List<Long>
blockingPartitionSizes) {
- return new BlockingResultInfo(blockingPartitionSizes, true);
- }
-
- @VisibleForTesting
- static BlockingResultInfo createFromNonBroadcastResult(List<Long>
blockingPartitionSizes) {
- return new BlockingResultInfo(blockingPartitionSizes, false);
- }
-
- public static BlockingResultInfo createFromIntermediateResult(
- IntermediateResult intermediateResult) {
- checkArgument(intermediateResult != null);
-
- List<Long> blockingPartitionSizes = new ArrayList<>();
- for (IntermediateResultPartition partition :
intermediateResult.getPartitions()) {
- checkState(partition.isConsumable());
-
- IOMetrics ioMetrics =
partition.getProducer().getPartitionProducer().getIOMetrics();
- checkNotNull(ioMetrics, "IOMetrics should not be null.");
-
- blockingPartitionSizes.add(
-
ioMetrics.getNumBytesProducedOfPartitions().get(partition.getPartitionId()));
- }
-
- return new BlockingResultInfo(blockingPartitionSizes,
intermediateResult.isBroadcast());
- }
+/**
+ * The blocking result info, which will be used to calculate the vertex
parallelism and input infos.
+ */
+public interface BlockingResultInfo {
+
+ /**
+ * Get the intermediate result id.
+ *
+ * @return the intermediate result id
+ */
+ IntermediateDataSetID getResultId();
+
+ /**
+ * Whether it is a broadcast result.
+ *
+ * @return whether it is a broadcast result
+ */
+ boolean isBroadcast();
+
+ /**
+ * Whether it is a Pointwise result.
Review Comment:
Pointwise -> pointwise
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Information of All-To-All result. */
+public class AllToAllBlockingResultInfo implements BlockingResultInfo {
+
+ private final IntermediateDataSetID resultId;
+
+ private final int numOfSubpartitions;
+
+ private final boolean isBroadcast;
+
+ /**
+ * Aggregated subpartition bytes, which aggregates the subpartition bytes
with the same
+ * subpartition index in different partitions. Note that We can aggregate
them because they will
+ * be consumed by the same downstream task.
+ */
+ private final long[] aggregatedSubpartitionBytes;
+
+ AllToAllBlockingResultInfo(
+ IntermediateDataSetID resultId, int numOfSubpartitions, boolean
isBroadcast) {
+ this.resultId = checkNotNull(resultId);
+ this.numOfSubpartitions = numOfSubpartitions;
+ this.isBroadcast = isBroadcast;
+ this.aggregatedSubpartitionBytes = new long[numOfSubpartitions];
+ Arrays.fill(this.aggregatedSubpartitionBytes, 0L);
+ }
+
+ @Override
+ public IntermediateDataSetID getResultId() {
+ return resultId;
+ }
+
+ @Override
+ public boolean isBroadcast() {
+ return isBroadcast;
+ }
+
+ @Override
+ public boolean isPointwise() {
+ return false;
+ }
+
+ @Override
+ public long getNumBytesProduced() {
+ if (isBroadcast) {
+ return aggregatedSubpartitionBytes[0];
+ } else {
+ return Arrays.stream(aggregatedSubpartitionBytes).sum();
+ }
+ }
+
+ @Override
+ public void partitionFinished(int partitionIndex, ResultPartitionBytes
partitionBytes) {
Review Comment:
What if a partition get finished, and later get reset and finished again?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]