zhuzhurk commented on a change in pull request #16173:
URL: https://github.com/apache/flink/pull/16173#discussion_r662800195



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utils to calculate network memory requirement of a vertex from network 
configuration and details
+ * of input and output. The methods help to decide the volume of buffer pools 
when initializing
+ * shuffle environment and also guide network memory announcing in 
fine-grained resource management.
+ */
+public class NettyShuffleUtils {
+
+    public static Pair<Integer, Integer> getMinMaxFloatingBuffersPerInputGate(
+            final int numFloatingBuffersPerGate) {
+        // We should guarantee at-least one floating buffer for local channel 
state recovery.
+        return Pair.of(1, numFloatingBuffersPerGate);
+    }
+
+    public static Pair<Integer, Integer> 
getMinMaxNetworkBuffersPerResultPartition(
+            final int numBuffersPerChannel,
+            final int numFloatingBuffersPerGate,
+            final int sortShuffleMinParallelism,
+            final int sortShuffleMinBuffers,
+            final int numSubpartitions,
+            final ResultPartitionType type) {
+        int min =
+                type.isBlocking() && numSubpartitions >= 
sortShuffleMinParallelism
+                        ? sortShuffleMinBuffers
+                        : numSubpartitions + 1;
+        int max =
+                type.isBounded()
+                        ? numSubpartitions * numBuffersPerChannel + 
numFloatingBuffersPerGate
+                        : Integer.MAX_VALUE;
+        return Pair.of(min, max);
+    }
+
+    public static int computeNetworkBuffersForAnnouncing(
+            final int numBuffersPerChannel,
+            final int numFloatingBuffersPerGate,
+            final int sortShuffleMinParallelism,
+            final int sortShuffleMinBuffers,
+            final int numTotalInputChannels,
+            final int numTotalInputGates,
+            final Map<IntermediateDataSetID, Integer> subpartitionNums,
+            final Map<IntermediateDataSetID, ResultPartitionType> 
partitionTypes) {
+
+        // Each input channel will retain N exclusive network buffers, N = 
numBuffersPerChannel.
+        // Each input gate is guaranteed to have a number of floating buffers.
+        int requirmentForInputs =
+                numBuffersPerChannel * numTotalInputChannels
+                        + numFloatingBuffersPerGate * numTotalInputGates;

Review comment:
       To keep consistency, I think `getMinMaxFloatingBuffersPerInputGate` 
should be changed to `getMinMaxFloatingBuffersPerInputGate 
(getMinMaxFloatingBuffersPerInputGate).getRight()`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SSGNetworkMemoryCalculationUtilsTest.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import 
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link SSGNetworkMemoryCalculationUtils}. */
+public class SSGNetworkMemoryCalculationUtilsTest {
+
+    private static final TestShuffleMaster SHUFFLE_MASTER = new 
TestShuffleMaster();
+
+    private static final ResourceProfile DEFAULT_RESOURCE = 
ResourceProfile.fromResources(1.0, 100);
+
+    private JobGraph jobGraph;
+
+    private ExecutionGraph executionGraph;
+
+    private List<SlotSharingGroup> slotSharingGroups;
+
+    @Test
+    public void testGenerateEnrichedResourceProfile() throws Exception {
+        setup(DEFAULT_RESOURCE);
+
+        slotSharingGroups.forEach(
+                ssg ->
+                        SSGNetworkMemoryCalculationUtils.enrichNetworkMemory(
+                                ssg, executionGraph.getAllVertices()::get, 
SHUFFLE_MASTER));
+
+        assertEquals(
+                new MemorySize(
+                        TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 
2)
+                                + 
TestShuffleMaster.computeRequiredShuffleMemoryBytes(1, 6)),
+                
slotSharingGroups.get(0).getResourceProfile().getNetworkMemory());
+
+        assertEquals(
+                new 
MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(5, 0)),
+                
slotSharingGroups.get(1).getResourceProfile().getNetworkMemory());
+    }
+
+    @Test
+    public void testGenerateUnknownResourceProfile() throws Exception {
+        setup(ResourceProfile.UNKNOWN);
+
+        slotSharingGroups.forEach(
+                ssg ->
+                        SSGNetworkMemoryCalculationUtils.enrichNetworkMemory(
+                                ssg, executionGraph.getAllVertices()::get, 
SHUFFLE_MASTER));
+
+        for (SlotSharingGroup slotSharingGroup : slotSharingGroups) {
+            assertEquals(ResourceProfile.UNKNOWN, 
slotSharingGroup.getResourceProfile());
+        }
+    }
+
+    private void setup(final ResourceProfile resourceProfile) throws Exception 
{
+        slotSharingGroups = Arrays.asList(new SlotSharingGroup(), new 
SlotSharingGroup());
+
+        for (SlotSharingGroup slotSharingGroup : slotSharingGroups) {
+            slotSharingGroup.setResourceProfile(resourceProfile);
+        }
+
+        jobGraph = createJobGraph(slotSharingGroups);
+        executionGraph =
+                
TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
+    }
+
+    private static JobGraph createJobGraph(final List<SlotSharingGroup> 
slotSharingGroups) {
+
+        JobVertex source = new JobVertex("source");
+        source.setInvokableClass(NoOpInvokable.class);
+        source.setParallelism(4);
+        source.setSlotSharingGroup(slotSharingGroups.get(0));
+
+        JobVertex map = new JobVertex("map");
+        map.setInvokableClass(NoOpInvokable.class);
+        map.setParallelism(5);
+        map.setSlotSharingGroup(slotSharingGroups.get(0));
+
+        JobVertex sink = new JobVertex("sink");
+        sink.setInvokableClass(NoOpInvokable.class);
+        sink.setParallelism(6);
+        sink.setSlotSharingGroup(slotSharingGroups.get(1));
+
+        map.connectNewDataSetAsInput(
+                source, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+        sink.connectNewDataSetAsInput(
+                map, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+
+        return JobGraphTestUtils.streamingJobGraph(source, map, sink);
+    }
+
+    static class TestShuffleMaster implements ShuffleMaster<ShuffleDescriptor> 
{

Review comment:
       can be private?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SSGNetworkMemoryCalculationUtils.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.EdgeManagerBuildUtil;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Util to analyze inputs & outputs of {@link ExecutionJobVertex} and 
calculate network memory
+ * requirement for slot sharing group (SSG).
+ */
+public class SSGNetworkMemoryCalculationUtils {

Review comment:
       NIT: the community now recommends this kind of name to be written as  
`SsgNetworkMemoryCalculationUtils`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtilTest.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
+import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link EdgeManagerBuildUtil} to verify the max number of 
connecting edges between
+ * vertices for pattern of both {@link DistributionPattern#POINTWISE} and 
{@link
+ * DistributionPattern#ALL_TO_ALL}.
+ */
+public class EdgeManagerBuildUtilTest {
+
+    @Test
+    public void testPointwise() throws Exception {
+        test(17, 17, POINTWISE);
+        test(17, 23, POINTWISE);
+        test(17, 34, POINTWISE);
+        test(34, 17, POINTWISE);
+        test(23, 17, POINTWISE);
+    }
+
+    @Test
+    public void testAllToAll() throws Exception {
+        test(17, 17, ALL_TO_ALL);
+        test(17, 23, ALL_TO_ALL);
+        test(17, 34, ALL_TO_ALL);
+        test(34, 17, ALL_TO_ALL);
+        test(23, 17, ALL_TO_ALL);
+    }
+
+    private void test(int upstream, int downstream, DistributionPattern 
pattern) throws Exception {

Review comment:
       I think the name should be `testGetMaxNumEdgesToTarget()` to be easier 
to understand what it does.
   Furthermore, `testPointwise` can be 
`testGetMaxNumEdgesToTargetInPointwiseConnection`, etc.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utils to calculate network memory requirement of a vertex from network 
configuration and details
+ * of input and output. The methods help to decide the volume of buffer pools 
when initializing
+ * shuffle environment and also guide network memory announcing in 
fine-grained resource management.
+ */
+public class NettyShuffleUtils {
+
+    public static Pair<Integer, Integer> getMinMaxFloatingBuffersPerInputGate(
+            final int numFloatingBuffersPerGate) {
+        // We should guarantee at-least one floating buffer for local channel 
state recovery.
+        return Pair.of(1, numFloatingBuffersPerGate);
+    }
+
+    public static Pair<Integer, Integer> 
getMinMaxNetworkBuffersPerResultPartition(
+            final int numBuffersPerChannel,
+            final int numFloatingBuffersPerGate,
+            final int sortShuffleMinParallelism,
+            final int sortShuffleMinBuffers,
+            final int numSubpartitions,
+            final ResultPartitionType type) {
+        int min =
+                type.isBlocking() && numSubpartitions >= 
sortShuffleMinParallelism
+                        ? sortShuffleMinBuffers
+                        : numSubpartitions + 1;
+        int max =
+                type.isBounded()
+                        ? numSubpartitions * numBuffersPerChannel + 
numFloatingBuffersPerGate
+                        : Integer.MAX_VALUE;
+        return Pair.of(min, max);
+    }
+
+    public static int computeNetworkBuffersForAnnouncing(
+            final int numBuffersPerChannel,
+            final int numFloatingBuffersPerGate,
+            final int sortShuffleMinParallelism,
+            final int sortShuffleMinBuffers,
+            final int numTotalInputChannels,
+            final int numTotalInputGates,
+            final Map<IntermediateDataSetID, Integer> subpartitionNums,
+            final Map<IntermediateDataSetID, ResultPartitionType> 
partitionTypes) {
+
+        // Each input channel will retain N exclusive network buffers, N = 
numBuffersPerChannel.
+        // Each input gate is guaranteed to have a number of floating buffers.
+        int requirmentForInputs =
+                numBuffersPerChannel * numTotalInputChannels
+                        + numFloatingBuffersPerGate * numTotalInputGates;
+
+        int requirementForOutputs = 0;
+        for (IntermediateDataSetID dataSetId : subpartitionNums.keySet()) {
+            int numSubs = subpartitionNums.get(dataSetId);
+            checkArgument(partitionTypes.containsKey(dataSetId));
+            ResultPartitionType partitionType = partitionTypes.get(dataSetId);
+
+            requirementForOutputs +=
+                    getNumBuffersToAnnounceForResultPartition(
+                            partitionType,
+                            numBuffersPerChannel,
+                            numFloatingBuffersPerGate,
+                            sortShuffleMinParallelism,
+                            sortShuffleMinBuffers,
+                            numSubs);
+        }
+
+        return requirmentForInputs + requirementForOutputs;
+    }
+
+    private static int getNumBuffersToAnnounceForResultPartition(
+            ResultPartitionType type,
+            int numBuffersPerChannel,
+            int floatingBuffersPerGate,
+            int sortShuffleMinParallelism,
+            int sortShuffleMinBuffers,
+            int numSubpartitions) {
+
+        Pair<Integer, Integer> minAndMax =
+                getMinMaxNetworkBuffersPerResultPartition(
+                        numBuffersPerChannel,
+                        floatingBuffersPerGate,
+                        sortShuffleMinParallelism,
+                        sortShuffleMinBuffers,
+                        numSubpartitions,
+                        type);
+
+        final int ret;
+        if (type.isPipelined()) {

Review comment:
       I think we first need to document that our goal is to avoid buffer 
request timeout (see FLINK-12852).
   For pipelined shuffle type, the floating buffers may not be returned in time 
due to back pressure so we need to include all the floating buffers in the 
announcement, i.e. we should take the max value.
   For blocking shuffle type, it is back pressure free and and floating buffers 
can be recycled in time, so that the minimum required buffers would be enough.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtilTest.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
+import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link EdgeManagerBuildUtil} to verify the max number of 
connecting edges between
+ * vertices for pattern of both {@link DistributionPattern#POINTWISE} and 
{@link
+ * DistributionPattern#ALL_TO_ALL}.
+ */
+public class EdgeManagerBuildUtilTest {
+
+    @Test
+    public void testPointwise() throws Exception {
+        test(17, 17, POINTWISE);
+        test(17, 23, POINTWISE);
+        test(17, 34, POINTWISE);
+        test(34, 17, POINTWISE);
+        test(23, 17, POINTWISE);
+    }
+
+    @Test
+    public void testAllToAll() throws Exception {
+        test(17, 17, ALL_TO_ALL);
+        test(17, 23, ALL_TO_ALL);
+        test(17, 34, ALL_TO_ALL);
+        test(34, 17, ALL_TO_ALL);
+        test(23, 17, ALL_TO_ALL);
+    }
+
+    private void test(int upstream, int downstream, DistributionPattern 
pattern) throws Exception {
+
+        Pair<ExecutionJobVertex, ExecutionJobVertex> pair =
+                setupExecutionGraph(upstream, downstream, pattern);
+        ExecutionJobVertex upstreamEJV = pair.getLeft();
+        ExecutionJobVertex downstreamEJV = pair.getRight();
+
+        for (ExecutionVertex ev : upstreamEJV.getTaskVertices()) {
+            assertEquals(1, ev.getProducedPartitions().size());
+
+            IntermediateResultPartition partition =
+                    ev.getProducedPartitions().values().iterator().next();
+            assertEquals(1, partition.getConsumerVertexGroups().size());
+
+            ConsumerVertexGroup consumerVertexGroup = 
partition.getConsumerVertexGroups().get(0);
+            int actual = consumerVertexGroup.size();
+            int maxNumEdges =
+                    EdgeManagerBuildUtil.getMaxNumEdgesToTarget(upstream, 
downstream, pattern);
+            switch (pattern) {

Review comment:
       I think we do not need to differentiate ALL_TO_ALL or POINTWISE here.
   Instead, we should calculate the actual max edge number and check if it 
equals to the result of `EdgeManagerBuildUtil.getMaxNumEdgesToTarget`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to