xintongsong commented on code in PR #22975: URL: https://github.com/apache/flink/pull/22975#discussion_r1260841837
########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -322,7 +322,8 @@ public class NettyShuffleEnvironmentOptions { .intType() .defaultValue(1024) .withDescription( - "Controls the segment size(in bytes) of hybrid spilled file data index."); + "Controls the segment size(in bytes) of hybrid spilled file data index." + + "This option will be ignored in new mode of hybrid shuffle."); Review Comment: Better points to the corresponding config key. ########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); + /** The option to configure the base remote storage path for the new mode of hybrid shuffle. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + public static final ConfigOption<String> NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH = + key("taskmanager.network.hybrid-shuffle.remote.path") + .stringType() + .noDefaultValue() + .withDescription( + "The option is used to configure the base path of remote storage for the new mode of hybrid shuffle. The new shuffle mode will store" + + "the shuffle data in remote storage when the disk space is not enough." + + "If the option is configured and the new shuffle mode is disabled, this option will be ignored." + + "If the option is not configured and the new shuffle mode is enabled, the remote storage will be disabled in new mode"); + + /** The option to enable the new mode of hybrid shuffle. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + public static final ConfigOption<Boolean> NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE = + ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-new-mode") + .booleanType() + .defaultValue(true) + .withDescription( + "The option is used to enable the new mode of hybrid shuffle. The following are the differences between new mode and legacy mode:" + + "1. Execution efficiency: the new mode has optimized efficiency and will be faster than the legacy mode in many cases." + + "2. Job stability: " + + "(a) the new mode uses less required network memory. For large parallelism jobs, less network memory is required." + + "(b) the new mode can store shuffle data in remote storage when the disk space is not enough, which will avoid insufficient" + + "disk space errors. It's supported only when the" + + NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH.key() + + " is configured."); Review Comment: The indention looks good in codes. You may want to check how the document built from this look like. ########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); + /** The option to configure the base remote storage path for the new mode of hybrid shuffle. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + public static final ConfigOption<String> NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH = + key("taskmanager.network.hybrid-shuffle.remote.path") + .stringType() + .noDefaultValue() + .withDescription( + "The option is used to configure the base path of remote storage for the new mode of hybrid shuffle. The new shuffle mode will store" + + "the shuffle data in remote storage when the disk space is not enough." + + "If the option is configured and the new shuffle mode is disabled, this option will be ignored." + + "If the option is not configured and the new shuffle mode is enabled, the remote storage will be disabled in new mode"); Review Comment: Should points to the corresponding config key. Also, we need to mention the new / legacy mode, but we probably also want to minimize the changes needed later when removing the legacy mode. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java: ########## @@ -253,6 +348,46 @@ public ResultPartition create( return partition; } + private List<TieredStorageMemorySpec> createTieredStorageMemorySpecs( + int numberOfSubpartitions, + BufferAccumulator bufferAccumulator, + List<TierProducerAgent> tierProducerAgents) { + List<TieredStorageMemorySpec> tieredStorageMemorySpecs = new ArrayList<>(); + tieredStorageMemorySpecs.add( + new TieredStorageMemorySpec( + bufferAccumulator, + Math.min( + numberOfSubpartitions + 1, + checkNotNull(tieredStorageConfiguration) + .getAccumulatorExclusiveBuffers()))); + for (TierProducerAgent tierProducerAgent : tierProducerAgents) { + if (tierProducerAgent.getClass() == MemoryTierProducerAgent.class) { + tieredStorageMemorySpecs.add( + new TieredStorageMemorySpec( + tierProducerAgent, + checkNotNull(tieredStorageConfiguration) + .getMemoryTierExclusiveBuffers())); + } + + if (tierProducerAgent.getClass() == DiskTierProducerAgent.class) { + tieredStorageMemorySpecs.add( + new TieredStorageMemorySpec( + tierProducerAgent, + checkNotNull(tieredStorageConfiguration) + .getDiskTierExclusiveBuffers())); + } + + if (tierProducerAgent.getClass() == RemoteTierProducerAgent.class) { + tieredStorageMemorySpecs.add( + new TieredStorageMemorySpec( + tierProducerAgent, + checkNotNull(tieredStorageConfiguration) + .getRemoteTierExclusiveBuffers())); + } Review Comment: This doesn't seem right. We should not bind the memory spec configuration to the agent classes. Instead, we should create both the factories and memory specs from configuration, possibility in `TieredStorageConfiguration#createDefaultFactories`. ########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); + /** The option to configure the base remote storage path for the new mode of hybrid shuffle. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + public static final ConfigOption<String> NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH = + key("taskmanager.network.hybrid-shuffle.remote.path") + .stringType() + .noDefaultValue() + .withDescription( + "The option is used to configure the base path of remote storage for the new mode of hybrid shuffle. The new shuffle mode will store" + + "the shuffle data in remote storage when the disk space is not enough." + + "If the option is configured and the new shuffle mode is disabled, this option will be ignored." + + "If the option is not configured and the new shuffle mode is enabled, the remote storage will be disabled in new mode"); + + /** The option to enable the new mode of hybrid shuffle. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + public static final ConfigOption<Boolean> NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE = + ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-new-mode") + .booleanType() + .defaultValue(true) + .withDescription( + "The option is used to enable the new mode of hybrid shuffle. The following are the differences between new mode and legacy mode:" + + "1. Execution efficiency: the new mode has optimized efficiency and will be faster than the legacy mode in many cases." + + "2. Job stability: " + + "(a) the new mode uses less required network memory. For large parallelism jobs, less network memory is required." + + "(b) the new mode can store shuffle data in remote storage when the disk space is not enough, which will avoid insufficient" + + "disk space errors. It's supported only when the" + + NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH.key() + + " is configured."); Review Comment: We should also mention that this option will be removed in future, as well as the legacy mode. ########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); + /** The option to configure the base remote storage path for the new mode of hybrid shuffle. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + public static final ConfigOption<String> NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH = + key("taskmanager.network.hybrid-shuffle.remote.path") + .stringType() + .noDefaultValue() + .withDescription( + "The option is used to configure the base path of remote storage for the new mode of hybrid shuffle. The new shuffle mode will store" + + "the shuffle data in remote storage when the disk space is not enough." + + "If the option is configured and the new shuffle mode is disabled, this option will be ignored." + + "If the option is not configured and the new shuffle mode is enabled, the remote storage will be disabled in new mode"); + + /** The option to enable the new mode of hybrid shuffle. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + public static final ConfigOption<Boolean> NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE = + ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-new-mode") + .booleanType() + .defaultValue(true) + .withDescription( + "The option is used to enable the new mode of hybrid shuffle. The following are the differences between new mode and legacy mode:" + + "1. Execution efficiency: the new mode has optimized efficiency and will be faster than the legacy mode in many cases." Review Comment: Are we sure about this? What cases exactly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org