1996fanrui commented on code in PR #25314: URL: https://github.com/apache/flink/pull/25314#discussion_r1755981219
########## flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java: ########## @@ -356,27 +354,9 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration( CompressionCodec compressionCodec = configuration.get(NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC); - boolean batchShuffleCompressionEnabled; - if (compressionCodec == CompressionCodec.NONE) { - batchShuffleCompressionEnabled = false; - } else { - batchShuffleCompressionEnabled = - configuration.get( - NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED); - - if (!batchShuffleCompressionEnabled) { - LOG.warn( - "Deprecated configuration key {} is used to disable the compression. " - + "Please set the {} to \"None\" to disable the compression.", - NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED.key(), - NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC.key()); - } - } + boolean batchShuffleCompressionEnabled = compressionCodec != CompressionCodec.NONE; - int maxNumConnections = - Math.max( - 1, - configuration.get(NettyShuffleEnvironmentOptions.MAX_NUM_TCP_CONNECTIONS)); + int maxNumConnections = 1; Review Comment: Same with the last comment, if it's always 1, this parameter may be not needed, and some code can be simplified. ########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java: ########## @@ -249,8 +249,7 @@ public static SlotManagerConfiguration fromConfiguration( final Duration declareNeededResourceDelay = configuration.get(ResourceManagerOptions.DECLARE_NEEDED_RESOURCE_DELAY); - boolean waitResultConsumedBeforeRelease = - configuration.get(ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED); + boolean waitResultConsumedBeforeRelease = true; Review Comment: If it's always true, do we still need it? ########## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ########## @@ -854,23 +788,12 @@ public enum TaskManagerLoadBalanceMode { /** * The method is mainly to load the {@link - * TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE} from {@link Configuration}, which is - * compatible with {@link ClusterOptions#EVENLY_SPREAD_OUT_SLOTS_STRATEGY}. + * TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE} from {@link Configuration}. */ @Internal public static TaskManagerLoadBalanceMode loadFromConfiguration( @Nonnull Configuration configuration) { - Optional<TaskManagerLoadBalanceMode> taskManagerLoadBalanceModeOptional = - configuration.getOptional(TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE); - if (taskManagerLoadBalanceModeOptional.isPresent()) { - return taskManagerLoadBalanceModeOptional.get(); - } - boolean evenlySpreadOutSlots = - configuration.get(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY); - - return evenlySpreadOutSlots - ? TaskManagerLoadBalanceMode.SLOTS - : TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE.defaultValue(); + return configuration.get(TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE); Review Comment: loadFromConfiguration is simple enough for now. Could we delete `loadFromConfiguration` method and all calller call `configuration.get(TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE);` directly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org