xintongsong commented on code in PR #22975:
URL: https://github.com/apache/flink/pull/22975#discussion_r1260841837


##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -322,7 +322,8 @@ public class NettyShuffleEnvironmentOptions {
                     .intType()
                     .defaultValue(1024)
                     .withDescription(
-                            "Controls the segment size(in bytes) of hybrid 
spilled file data index.");
+                            "Controls the segment size(in bytes) of hybrid 
spilled file data index."
+                                    + "This option will be ignored in new mode 
of hybrid shuffle.");

Review Comment:
   Better points to the corresponding config key.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions {
                                     + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
                                     + "the tie by failing the request of 
exclusive buffers and ask users to increase the number of total buffers.");
 
+    /** The option to configure the base remote storage path for the new mode 
of hybrid shuffle. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
+    public static final ConfigOption<String> 
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH =
+            key("taskmanager.network.hybrid-shuffle.remote.path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The option is used to configure the base path of 
remote storage for the new mode of hybrid shuffle. The new shuffle mode will 
store"
+                                    + "the shuffle data in remote storage when 
the disk space is not enough."
+                                    + "If the option is configured and the new 
shuffle mode is disabled, this option will be ignored."
+                                    + "If the option is not configured and the 
new shuffle mode is enabled, the remote storage will be disabled in new mode");
+
+    /** The option to enable the new mode of hybrid shuffle. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
+    public static final ConfigOption<Boolean> 
NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE =
+            
ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-new-mode")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "The option is used to enable the new mode of 
hybrid shuffle. The following are the differences between new mode and legacy 
mode:"
+                                    + "1. Execution efficiency: the new mode 
has optimized efficiency and will be faster than the legacy mode in many cases."
+                                    + "2. Job stability: "
+                                    + "(a) the new mode uses less required 
network memory. For large parallelism jobs, less network memory is required."
+                                    + "(b) the new mode can store shuffle data 
in remote storage when the disk space is not enough, which will avoid 
insufficient"
+                                    + "disk space errors. It's supported only 
when the"
+                                    + 
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH.key()
+                                    + " is configured.");

Review Comment:
   The indention looks good in codes. You may want to check how the document 
built from this look like.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions {
                                     + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
                                     + "the tie by failing the request of 
exclusive buffers and ask users to increase the number of total buffers.");
 
+    /** The option to configure the base remote storage path for the new mode 
of hybrid shuffle. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
+    public static final ConfigOption<String> 
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH =
+            key("taskmanager.network.hybrid-shuffle.remote.path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The option is used to configure the base path of 
remote storage for the new mode of hybrid shuffle. The new shuffle mode will 
store"
+                                    + "the shuffle data in remote storage when 
the disk space is not enough."
+                                    + "If the option is configured and the new 
shuffle mode is disabled, this option will be ignored."
+                                    + "If the option is not configured and the 
new shuffle mode is enabled, the remote storage will be disabled in new mode");

Review Comment:
   Should points to the corresponding config key. Also, we need to mention the 
new / legacy mode, but we probably also want to minimize the changes needed 
later when removing the legacy mode.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -253,6 +348,46 @@ public ResultPartition create(
         return partition;
     }
 
+    private List<TieredStorageMemorySpec> createTieredStorageMemorySpecs(
+            int numberOfSubpartitions,
+            BufferAccumulator bufferAccumulator,
+            List<TierProducerAgent> tierProducerAgents) {
+        List<TieredStorageMemorySpec> tieredStorageMemorySpecs = new 
ArrayList<>();
+        tieredStorageMemorySpecs.add(
+                new TieredStorageMemorySpec(
+                        bufferAccumulator,
+                        Math.min(
+                                numberOfSubpartitions + 1,
+                                checkNotNull(tieredStorageConfiguration)
+                                        .getAccumulatorExclusiveBuffers())));
+        for (TierProducerAgent tierProducerAgent : tierProducerAgents) {
+            if (tierProducerAgent.getClass() == MemoryTierProducerAgent.class) 
{
+                tieredStorageMemorySpecs.add(
+                        new TieredStorageMemorySpec(
+                                tierProducerAgent,
+                                checkNotNull(tieredStorageConfiguration)
+                                        .getMemoryTierExclusiveBuffers()));
+            }
+
+            if (tierProducerAgent.getClass() == DiskTierProducerAgent.class) {
+                tieredStorageMemorySpecs.add(
+                        new TieredStorageMemorySpec(
+                                tierProducerAgent,
+                                checkNotNull(tieredStorageConfiguration)
+                                        .getDiskTierExclusiveBuffers()));
+            }
+
+            if (tierProducerAgent.getClass() == RemoteTierProducerAgent.class) 
{
+                tieredStorageMemorySpecs.add(
+                        new TieredStorageMemorySpec(
+                                tierProducerAgent,
+                                checkNotNull(tieredStorageConfiguration)
+                                        .getRemoteTierExclusiveBuffers()));
+            }

Review Comment:
   This doesn't seem right. We should not bind the memory spec configuration to 
the agent classes. Instead, we should create both the factories and memory 
specs from configuration, possibility in 
`TieredStorageConfiguration#createDefaultFactories`.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions {
                                     + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
                                     + "the tie by failing the request of 
exclusive buffers and ask users to increase the number of total buffers.");
 
+    /** The option to configure the base remote storage path for the new mode 
of hybrid shuffle. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
+    public static final ConfigOption<String> 
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH =
+            key("taskmanager.network.hybrid-shuffle.remote.path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The option is used to configure the base path of 
remote storage for the new mode of hybrid shuffle. The new shuffle mode will 
store"
+                                    + "the shuffle data in remote storage when 
the disk space is not enough."
+                                    + "If the option is configured and the new 
shuffle mode is disabled, this option will be ignored."
+                                    + "If the option is not configured and the 
new shuffle mode is enabled, the remote storage will be disabled in new mode");
+
+    /** The option to enable the new mode of hybrid shuffle. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
+    public static final ConfigOption<Boolean> 
NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE =
+            
ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-new-mode")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "The option is used to enable the new mode of 
hybrid shuffle. The following are the differences between new mode and legacy 
mode:"
+                                    + "1. Execution efficiency: the new mode 
has optimized efficiency and will be faster than the legacy mode in many cases."
+                                    + "2. Job stability: "
+                                    + "(a) the new mode uses less required 
network memory. For large parallelism jobs, less network memory is required."
+                                    + "(b) the new mode can store shuffle data 
in remote storage when the disk space is not enough, which will avoid 
insufficient"
+                                    + "disk space errors. It's supported only 
when the"
+                                    + 
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH.key()
+                                    + " is configured.");

Review Comment:
   We should also mention that this option will be removed in future, as well 
as the legacy mode.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions {
                                     + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
                                     + "the tie by failing the request of 
exclusive buffers and ask users to increase the number of total buffers.");
 
+    /** The option to configure the base remote storage path for the new mode 
of hybrid shuffle. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
+    public static final ConfigOption<String> 
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH =
+            key("taskmanager.network.hybrid-shuffle.remote.path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The option is used to configure the base path of 
remote storage for the new mode of hybrid shuffle. The new shuffle mode will 
store"
+                                    + "the shuffle data in remote storage when 
the disk space is not enough."
+                                    + "If the option is configured and the new 
shuffle mode is disabled, this option will be ignored."
+                                    + "If the option is not configured and the 
new shuffle mode is enabled, the remote storage will be disabled in new mode");
+
+    /** The option to enable the new mode of hybrid shuffle. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
+    public static final ConfigOption<Boolean> 
NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE =
+            
ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-new-mode")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "The option is used to enable the new mode of 
hybrid shuffle. The following are the differences between new mode and legacy 
mode:"
+                                    + "1. Execution efficiency: the new mode 
has optimized efficiency and will be faster than the legacy mode in many cases."

Review Comment:
   Are we sure about this? What cases exactly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to