zhuzhurk commented on a change in pull request #16173:
URL: https://github.com/apache/flink/pull/16173#discussion_r659701218
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
##########
@@ -61,4 +63,16 @@
* @param shuffleDescriptor shuffle descriptor of the result partition to
release externally.
*/
void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor);
+
+ /**
+ * Get shuffle memory size for a task with the given {@code
taskInputsOutputsDescriptor}.
+ *
+ * @param taskInputsOutputsDescriptor describes task inputs and outputs
information for shuffle
+ * memory calculation
+ * @return shuffle memory size for a task with the given {@code
taskInputsOutputsDescriptor}
+ */
+ default MemorySize getShuffleMemoryForTask(
Review comment:
I feel that `computeShuffleMemorySizeForTask` sounds more accurate and
easier for understanding.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
##########
@@ -59,6 +59,7 @@ public void testNToN() throws Exception {
ev.getParallelSubtaskIndex(),
consumedPartitionGroup.getFirst().getPartitionNumber());
}
+ EdgeManagerBuildUtil.getMaxNumEdgesToTarget(N, N,
DistributionPattern.POINTWISE);
Review comment:
What's this line for?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SSGNetworkMemoryCalculator.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.EdgeManagerBuildUtil;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Util to analyze inputs & outputs of {@link ExecutionJobVertex} and
calculate network memory
+ * requirement for slot sharing group (SSG).
+ */
+public class SSGNetworkMemoryCalculator {
Review comment:
Given that it is a static util class, I think name like
`SSGNetworkMemoryCalculationUtils` is better and a private constructor should
be added for it.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
##########
@@ -180,9 +181,6 @@ private void testLowToHigh(int lowDop, int highDop) throws
Exception {
throw new IllegalArgumentException();
}
- final int factor = highDop / lowDop;
Review comment:
I would suggest to add new tests for `getMaxNumEdgesToTarget` instead
changing the existing tests which are testing other codes.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/NettyShuffleUtilsTest.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED_BOUNDED;
+import static
org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link NettyShuffleUtils}. */
+public class NettyShuffleUtilsTest extends TestLogger {
+
+ /**
+ * This test verifies that the {@link NettyShuffleEnvironment} requires
buffers as expected, so
+ * that the required shuffle memory size returned by {@link
+ * ShuffleMaster#getShuffleMemoryForTask(TaskInputsOutputsDescriptor)} is
correct.
+ */
+ @Test
+ public void testComputeRequiredNetworkBuffers() throws Exception {
+ int numBuffersPerChannel = 5;
+ int numBuffersPerGate = 8;
+ int sortShuffleMinParallelism = 8;
+ int numSortShuffleMinBuffers = 12;
+
+ int numChannels1 = 3;
+ int numChannels2 = 4;
+ int numSubs1 = 5; // pipelined shuffle
+ int numSubs2 = 6; // hash blocking shuffle
+ int numSubs3 = 10; // sort blocking shuffle
+
+ int numTotalBuffers =
+ NettyShuffleUtils.computeRequiredNetworkBuffers(
+ numBuffersPerChannel,
+ numBuffersPerGate,
+ sortShuffleMinParallelism,
+ numSortShuffleMinBuffers,
+ Arrays.asList(numChannels1, numChannels2),
+ Arrays.asList(
+ Pair.of(numSubs1, PIPELINED_BOUNDED),
+ Pair.of(numSubs2, BLOCKING),
+ Pair.of(numSubs3, BLOCKING)));
+
+ NettyShuffleEnvironment sEnv =
+ new NettyShuffleEnvironmentBuilder()
+ .setNumNetworkBuffers(numTotalBuffers)
+ .setNetworkBuffersPerChannel(numBuffersPerChannel)
+ .setSortShuffleMinBuffers(numSortShuffleMinBuffers)
+
.setSortShuffleMinParallelism(sortShuffleMinParallelism)
+ .build();
+
+ SingleInputGate inputGate1 = createInputGate(sEnv, PIPELINED_BOUNDED,
numChannels1);
+ inputGate1.setup();
+
+ SingleInputGate inputGate2 = createInputGate(sEnv, BLOCKING,
numChannels2);
+ inputGate2.setup();
+
+ ResultPartition resultPartition1 = createResultPartition(sEnv,
PIPELINED_BOUNDED, numSubs1);
+ resultPartition1.setup();
+
+ ResultPartition resultPartition2 = createResultPartition(sEnv,
BLOCKING, numSubs2);
+ resultPartition2.setup();
+
+ ResultPartition resultPartition3 = createResultPartition(sEnv,
BLOCKING, numSubs3);
+ resultPartition3.setup();
+
+ int expected =
+ calculateBuffersConsumption(inputGate1)
+ + calculateBuffersConsumption(inputGate2)
+ + calculateBuffersConsumption(resultPartition1)
+ + calculateBuffersConsumption(resultPartition2)
+ + calculateBuffersConsumption(resultPartition3);
+ assertEquals(expected, numTotalBuffers);
+
+ inputGate1.close();
+ inputGate2.close();
+ resultPartition1.close();
+ resultPartition2.close();
+ resultPartition3.close();
+ }
+
+ private SingleInputGate createInputGate(
+ NettyShuffleEnvironment network,
+ ResultPartitionType resultPartitionType,
+ int numInputChannels) {
+
+ ShuffleDescriptor[] shuffleDescriptors = new
NettyShuffleDescriptor[numInputChannels];
+ for (int i = 0; i < numInputChannels; i++) {
+ shuffleDescriptors[i] =
+ createRemoteWithIdAndLocation(
+ new IntermediateResultPartitionID(),
ResourceID.generate());
+ }
+
+ InputGateDeploymentDescriptor inputGateDeploymentDescriptor =
+ new InputGateDeploymentDescriptor(
+ new IntermediateDataSetID(), resultPartitionType, 0,
shuffleDescriptors);
+
+ ExecutionAttemptID consumerID = new ExecutionAttemptID();
+ Collection<SingleInputGate> inputGates =
+ network.createInputGates(
+ network.createShuffleIOOwnerContext(
+ "", consumerID, new
UnregisteredMetricsGroup()),
+ SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER,
+
Collections.singletonList(inputGateDeploymentDescriptor));
+
+ return inputGates.iterator().next();
+ }
+
+ private ResultPartition createResultPartition(
+ NettyShuffleEnvironment network,
+ ResultPartitionType resultPartitionType,
+ int numSubpartitions) {
+
+ ShuffleDescriptor shuffleDescriptor =
+ createRemoteWithIdAndLocation(
+ new IntermediateResultPartitionID(),
ResourceID.generate());
+
+ PartitionDescriptor partitionDescriptor =
+ new PartitionDescriptor(
+ new IntermediateDataSetID(),
+ 2,
+
shuffleDescriptor.getResultPartitionID().getPartitionId(),
+ resultPartitionType,
+ numSubpartitions,
+ 0);
+ ResultPartitionDeploymentDescriptor
resultPartitionDeploymentDescriptor =
+ new ResultPartitionDeploymentDescriptor(
+ partitionDescriptor, shuffleDescriptor, 1, true);
+
+ ExecutionAttemptID consumerID = new ExecutionAttemptID();
+ Collection<ResultPartition> resultPartitions =
+ network.createResultPartitionWriters(
+ network.createShuffleIOOwnerContext(
+ "", consumerID, new
UnregisteredMetricsGroup()),
+
Collections.singletonList(resultPartitionDeploymentDescriptor));
+
+ return resultPartitions.iterator().next();
+ }
+
+ private int calculateBuffersConsumption(SingleInputGate inputGate) throws
Exception {
+ inputGate.setChannelStateWriter(ChannelStateWriter.NO_OP);
+ inputGate.finishReadRecoveredState();
+ while (!inputGate.getStateConsumedFuture().isDone()) {
+ inputGate.pollNext();
+ }
+ inputGate.convertRecoveredInputChannels();
+
+ int ret = 0;
+ for (InputChannel ch : inputGate.getInputChannels().values()) {
+ RemoteInputChannel rChannel = (RemoteInputChannel) ch;
+ ret += rChannel.getNumberOfAvailableBuffers();
+ }
+ ret += inputGate.getBufferPool().getMaxNumberOfMemorySegments();
+ return ret;
+ }
+
+ private int calculateBuffersConsumption(ResultPartition partition) {
+ if (partition.getPartitionType().isBlocking()) {
Review comment:
According to the design, I think the result should always be max
required buffers?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
##########
@@ -58,6 +58,26 @@ static void connectVertexToResult(
}
}
+ /**
+ * Given parallelisms of two job vertices, calculate the max number of
input or output edges of
+ * each execution in the first job vertex.
+ *
+ * @param thisParallelism parallelism of the first job vertex.
+ * @param targetParallelism parallelism of the second job vertex.
+ * @param distributionPattern the {@link DistributionPattern} of the
connecting edge.
+ */
+ public static int getMaxNumEdgesToTarget(
+ int thisParallelism, int targetParallelism, DistributionPattern
distributionPattern) {
+ switch (distributionPattern) {
+ case POINTWISE:
+ return (int) Math.ceil((double) targetParallelism /
thisParallelism);
Review comment:
It can be `(targetParallelism + thisParallelism - 1)/thisParallelism`
Floating number computing had caused several problem due to the precision
and I hope to avoid it if possible, e.g. FLINK-22863.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
##########
@@ -58,6 +58,26 @@ static void connectVertexToResult(
}
}
+ /**
+ * Given parallelisms of two job vertices, calculate the max number of
input or output edges of
+ * each execution in the first job vertex.
+ *
+ * @param thisParallelism parallelism of the first job vertex.
+ * @param targetParallelism parallelism of the second job vertex.
+ * @param distributionPattern the {@link DistributionPattern} of the
connecting edge.
+ */
+ public static int getMaxNumEdgesToTarget(
Review comment:
I find the name is a bit confusing although the doc is clear.
But I do not have any good idea yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]