[
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973037#comment-15973037
]
ASF GitHub Bot commented on FLINK-4545:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3721#discussion_r111992933
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Unit test for {@link TaskManagerServices}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentInformation.class)
+public class TaskManagerServicesTest {
+
+ /**
+ * Test for {@link TaskManagerServices#calculateNetworkBuf(long,
Configuration)} using old
+ * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+ */
+ @SuppressWarnings("deprecation")
+ @Test
+ public void calculateNetworkBufOld() throws Exception {
+ Configuration config = new Configuration();
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+
+ // note: actual network buffer memory size is independent of
the totalJavaMemorySize
+
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+ TaskManagerServices.calculateNetworkBuf(10L << 20,
config));
+
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+ TaskManagerServices.calculateNetworkBuf(64L << 20,
config));
+
+ // test integer overflow in the memory size
+ int numBuffers = (int) ((2L << 32) /
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS,
numBuffers);
+ assertEquals(2L << 32,
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
+ }
+
+ /**
+ * Test for {@link TaskManagerServices#calculateNetworkBuf(long,
Configuration)} using new
+ * configurations via {@link
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+ * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+ * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+ */
+ @Test
+ public void calculateNetworkBufNew() throws Exception {
+ Configuration config = new Configuration();
+
+ // (1) defaults
+ final Float defaultFrac =
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+ final Long defaultMin =
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
+ final Long defaultMax =
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+ assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long)
(defaultFrac * (10L << 20)))),
+ TaskManagerServices.calculateNetworkBuf((64L << 20 +
1), config));
+ assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long)
(defaultFrac * (10L << 30)))),
+ TaskManagerServices.calculateNetworkBuf((10L << 30),
config));
+
+ calculateNetworkBufNew(config);
+ }
+
+ /**
+ * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long,
Configuration)} with the
+ * new configuration parameters.
+ *
+ * @param config configuration object
+ */
+ private static void calculateNetworkBufNew(final Configuration config) {
+ // (2) fixed size memory
+ config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN,
1L << 20); // 1MB
+ config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
1L << 20); // 1MB
+
+ // note: actual network buffer memory size is independent of
the totalJavaMemorySize
+ assertEquals(1 << 20,
TaskManagerServices.calculateNetworkBuf(10L << 20, config));
+ assertEquals(1 << 20,
TaskManagerServices.calculateNetworkBuf(64L << 20, config));
+ assertEquals(1 << 20,
TaskManagerServices.calculateNetworkBuf(1L << 30, config));
+
+ // (3) random fraction, min, and max values
+ Random ran = new Random();
+ for (int i = 0; i < 1_000; ++i){
+ float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
+
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
+
+ long min =
Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
+
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
+
+ long max = Math.max(min, ran.nextLong());
+
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
+
+ long javaMem = Math.max(max + 1, ran.nextLong());
+
+ final long networkBufMem =
TaskManagerServices.calculateNetworkBuf(javaMem, config);
--- End diff --
What we definitely need here is a catch block that prints the used
parameters if any assertion fails. It is a bit odd to use random parameters in
the first place though :/
> Flink automatically manages TM network buffer
> ---------------------------------------------
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
> Issue Type: Wish
> Components: Network
> Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and
> the memory is pre-allocated through taskmanager.network.numberOfBuffers
> config. In a Job DAG with shuffle phase, this number can go up very high
> depends on the TM cluster size. The formula for calculating the buffer count
> is documented here
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster
> size dynamically and then leverage the up-coming Flink feature to support
> scaling job parallelism/rescaling at runtime.
> If the buffer count config is static at runtime and cannot be changed without
> restarting task manager process, this may add latency and complexity for
> scaling process. I am wondering if there is already any discussion around
> whether the network buffer should be automatically managed by Flink or at
> least expose some API to allow it to be reconfigured. Let me know if there is
> any existing JIRA that I should follow.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)