azagrebin commented on a change in pull request #10161: [FLINK-13986][runtime]
Clean up legacy code for FLIP-49.
URL: https://github.com/apache/flink/pull/10161#discussion_r350253136
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
##########
@@ -449,35 +390,49 @@ private static int getDataport(Configuration
configuration) {
* Calculates the number of network buffers based on configuration and
jvm heap size.
*
* @param configuration configuration object
- * @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+ * @param shuffleMemorySize the size of memory reserved for shuffle
environment
+ * @param pageSize size of memory segment
+ * @param numberOfNettyArenas number of netty arenas
* @return the number of network buffers
*/
- @SuppressWarnings("deprecation")
- private static int calculateNumberOfNetworkBuffers(Configuration
configuration, long maxJvmHeapMemory) {
- final int numberOfNetworkBuffers;
- if (!hasNewNetworkConfig(configuration)) {
- // fallback: number of network buffers
- numberOfNetworkBuffers =
configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS);
-
- checkOldNetworkConfig(numberOfNetworkBuffers);
- } else {
- if
(configuration.contains(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS)) {
- LOG.info("Ignoring old (but still present)
network buffer configuration via {}.",
-
NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS.key());
- }
-
- final long networkMemorySize =
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
-
- // tolerate offcuts between intended and allocated
memory due to segmentation (will be available to the user-space memory)
- long numberOfNetworkBuffersLong = networkMemorySize /
ConfigurationParserUtils.getPageSize(configuration);
- if (numberOfNetworkBuffersLong > Integer.MAX_VALUE) {
- throw new IllegalArgumentException("The given
number of memory bytes (" + networkMemorySize
- + ") corresponds to more than MAX_INT
pages.");
- }
- numberOfNetworkBuffers = (int)
numberOfNetworkBuffersLong;
+ private static int calculateNumberOfNetworkBuffers(
+ Configuration configuration,
+ MemorySize shuffleMemorySize,
+ int pageSize,
+ int numberOfNettyArenas) {
+
+ logIfIgnoringOldConfigs(configuration);
+
+ long networkMemorySizeByte = shuffleMemorySize.getBytes();
+ long nettyArenasSizeBytes = numberOfNettyArenas *
NettyBufferPool.ARENA_SIZE;
+ Preconditions.checkArgument(
+ networkMemorySizeByte >= nettyArenasSizeBytes,
+ String.format(
+ "Provided shuffle memory %d bytes is not enough
for direct allocation of %d netty arenas " +
+
"('taskmanager.network.netty.num-arenas', by default
'taskmanager.numberOfTaskSlots'). " +
+ "Each arena size is %d bytes. Total
configured arenas size is %d bytes. " +
+ "Try to increase shuffle memory size by
adjusting 'taskmanager.memory.shuffle.*' Flink configuration options.",
+ networkMemorySizeByte,
+ numberOfNettyArenas,
+ NettyBufferPool.ARENA_SIZE,
+ nettyArenasSizeBytes));
Review comment:
nit: this could be a function `calculateAndCheckNettyArenasSize`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services