zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272919268
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##########
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
                return nettyConfig;
        }
 
+       public boolean isCreditBased() {
+               return isCreditBased;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Utility method to extract network related parameters from the 
configuration and to
+        * sanity check them.
+        *
+        * @param configuration configuration object
+        * @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+        * @param localTaskManagerCommunication true, to skip initializing the 
network stack
+        * @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+        * @return NetworkEnvironmentConfiguration
+        */
+       @Deprecated
+       public static NetworkEnvironmentConfiguration fromConfiguration(
+               Configuration configuration,
+               long maxJvmHeapMemory,
+               boolean localTaskManagerCommunication,
+               InetAddress taskManagerAddress) {
+
+               // ----> hosts / ports for communication and data exchange
+
+               final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+               ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+                       "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+               final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+               final int numNetworkBuffers;
+               if (!hasNewNetworkConfig(configuration)) {
+                       // fallback: number of network buffers
+                       numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+                       checkOldNetworkConfig(numNetworkBuffers);
+               } else {
+                       if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+                               LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+                                       
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+                       }
+
+                       final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+                       // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+                       long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+                       if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+                               throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+                                       + ") corresponds to more than MAX_INT 
pages.");
+                       }
+                       numNetworkBuffers = (int) numNetworkBuffersLong;
+               }
+
+               final NettyConfig nettyConfig;
+               if (!localTaskManagerCommunication) {
+                       final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+                       nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+                               pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+               } else {
+                       nettyConfig = null;
+               }
+
+               int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+               int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+               int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+               int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+               boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   I also do not like the way of two sources of `isCreditBased`. 
   
   I think the key problem is whether `isCreditBased` must exist together with 
`NettyConfig`. My previous thought is trying to make decouple `isCreditBased` 
with `NettyConfig`. 
   
   We know `NettyConfig` is optional currently in 
`NetworkEnvironmentConfiguration`, and most of the existing tests should verify 
the default behavior (`isCreditBased = true`) and only several tests would 
verify both modes. So `NetworkEnvironmentConfigurationBuilder` would create 
`NetworkEnvironmentConfiguration` with `isCreditBased = true` and null 
`NettyConfig` by default which is suitable and simple for most of the tests.
   
   If we remove `isCreditBased` from `NetworkEnvironmentConfiguration`, the 
`NetworkEnvironmentConfigurationBuilder` must create a default `NettyConfig` 
with enabling credit-based by default.
   
   So it might be better if we make `isCreditBased` property independent with 
`NettyConfig`. Another option is we remove `NettyConfig#isCreditBasedEnabled` 
because via `NettyConfig#getConfig` we could get everything including the 
`isCreditBased`, not need that explicit method for every property. To do so it 
might seem no duplications between `NettyConfig` and 
`NetworkEnvironmentConfiguration`. What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to