xintongsong commented on code in PR #22975:
URL: https://github.com/apache/flink/pull/22975#discussion_r1260841837
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -322,7 +322,8 @@ public class NettyShuffleEnvironmentOptions {
.intType()
.defaultValue(1024)
.withDescription(
- "Controls the segment size(in bytes) of hybrid
spilled file data index.");
+ "Controls the segment size(in bytes) of hybrid
spilled file data index."
+ + "This option will be ignored in new mode
of hybrid shuffle.");
Review Comment:
Better points to the corresponding config key.
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions {
+ "tasks have occupied all the buffers and
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
+ "the tie by failing the request of
exclusive buffers and ask users to increase the number of total buffers.");
+ /** The option to configure the base remote storage path for the new mode
of hybrid shuffle. */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ @Experimental
+ public static final ConfigOption<String>
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH =
+ key("taskmanager.network.hybrid-shuffle.remote.path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The option is used to configure the base path of
remote storage for the new mode of hybrid shuffle. The new shuffle mode will
store"
+ + "the shuffle data in remote storage when
the disk space is not enough."
+ + "If the option is configured and the new
shuffle mode is disabled, this option will be ignored."
+ + "If the option is not configured and the
new shuffle mode is enabled, the remote storage will be disabled in new mode");
+
+ /** The option to enable the new mode of hybrid shuffle. */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ @Experimental
+ public static final ConfigOption<Boolean>
NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE =
+
ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-new-mode")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "The option is used to enable the new mode of
hybrid shuffle. The following are the differences between new mode and legacy
mode:"
+ + "1. Execution efficiency: the new mode
has optimized efficiency and will be faster than the legacy mode in many cases."
+ + "2. Job stability: "
+ + "(a) the new mode uses less required
network memory. For large parallelism jobs, less network memory is required."
+ + "(b) the new mode can store shuffle data
in remote storage when the disk space is not enough, which will avoid
insufficient"
+ + "disk space errors. It's supported only
when the"
+ +
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH.key()
+ + " is configured.");
Review Comment:
The indention looks good in codes. You may want to check how the document
built from this look like.
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions {
+ "tasks have occupied all the buffers and
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
+ "the tie by failing the request of
exclusive buffers and ask users to increase the number of total buffers.");
+ /** The option to configure the base remote storage path for the new mode
of hybrid shuffle. */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ @Experimental
+ public static final ConfigOption<String>
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH =
+ key("taskmanager.network.hybrid-shuffle.remote.path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The option is used to configure the base path of
remote storage for the new mode of hybrid shuffle. The new shuffle mode will
store"
+ + "the shuffle data in remote storage when
the disk space is not enough."
+ + "If the option is configured and the new
shuffle mode is disabled, this option will be ignored."
+ + "If the option is not configured and the
new shuffle mode is enabled, the remote storage will be disabled in new mode");
Review Comment:
Should points to the corresponding config key. Also, we need to mention the
new / legacy mode, but we probably also want to minimize the changes needed
later when removing the legacy mode.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -253,6 +348,46 @@ public ResultPartition create(
return partition;
}
+ private List<TieredStorageMemorySpec> createTieredStorageMemorySpecs(
+ int numberOfSubpartitions,
+ BufferAccumulator bufferAccumulator,
+ List<TierProducerAgent> tierProducerAgents) {
+ List<TieredStorageMemorySpec> tieredStorageMemorySpecs = new
ArrayList<>();
+ tieredStorageMemorySpecs.add(
+ new TieredStorageMemorySpec(
+ bufferAccumulator,
+ Math.min(
+ numberOfSubpartitions + 1,
+ checkNotNull(tieredStorageConfiguration)
+ .getAccumulatorExclusiveBuffers())));
+ for (TierProducerAgent tierProducerAgent : tierProducerAgents) {
+ if (tierProducerAgent.getClass() == MemoryTierProducerAgent.class)
{
+ tieredStorageMemorySpecs.add(
+ new TieredStorageMemorySpec(
+ tierProducerAgent,
+ checkNotNull(tieredStorageConfiguration)
+ .getMemoryTierExclusiveBuffers()));
+ }
+
+ if (tierProducerAgent.getClass() == DiskTierProducerAgent.class) {
+ tieredStorageMemorySpecs.add(
+ new TieredStorageMemorySpec(
+ tierProducerAgent,
+ checkNotNull(tieredStorageConfiguration)
+ .getDiskTierExclusiveBuffers()));
+ }
+
+ if (tierProducerAgent.getClass() == RemoteTierProducerAgent.class)
{
+ tieredStorageMemorySpecs.add(
+ new TieredStorageMemorySpec(
+ tierProducerAgent,
+ checkNotNull(tieredStorageConfiguration)
+ .getRemoteTierExclusiveBuffers()));
+ }
Review Comment:
This doesn't seem right. We should not bind the memory spec configuration to
the agent classes. Instead, we should create both the factories and memory
specs from configuration, possibility in
`TieredStorageConfiguration#createDefaultFactories`.
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions {
+ "tasks have occupied all the buffers and
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
+ "the tie by failing the request of
exclusive buffers and ask users to increase the number of total buffers.");
+ /** The option to configure the base remote storage path for the new mode
of hybrid shuffle. */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ @Experimental
+ public static final ConfigOption<String>
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH =
+ key("taskmanager.network.hybrid-shuffle.remote.path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The option is used to configure the base path of
remote storage for the new mode of hybrid shuffle. The new shuffle mode will
store"
+ + "the shuffle data in remote storage when
the disk space is not enough."
+ + "If the option is configured and the new
shuffle mode is disabled, this option will be ignored."
+ + "If the option is not configured and the
new shuffle mode is enabled, the remote storage will be disabled in new mode");
+
+ /** The option to enable the new mode of hybrid shuffle. */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ @Experimental
+ public static final ConfigOption<Boolean>
NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE =
+
ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-new-mode")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "The option is used to enable the new mode of
hybrid shuffle. The following are the differences between new mode and legacy
mode:"
+ + "1. Execution efficiency: the new mode
has optimized efficiency and will be faster than the legacy mode in many cases."
+ + "2. Job stability: "
+ + "(a) the new mode uses less required
network memory. For large parallelism jobs, less network memory is required."
+ + "(b) the new mode can store shuffle data
in remote storage when the disk space is not enough, which will avoid
insufficient"
+ + "disk space errors. It's supported only
when the"
+ +
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH.key()
+ + " is configured.");
Review Comment:
We should also mention that this option will be removed in future, as well
as the legacy mode.
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -379,6 +381,36 @@ public class NettyShuffleEnvironmentOptions {
+ "tasks have occupied all the buffers and
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
+ "the tie by failing the request of
exclusive buffers and ask users to increase the number of total buffers.");
+ /** The option to configure the base remote storage path for the new mode
of hybrid shuffle. */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ @Experimental
+ public static final ConfigOption<String>
NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH =
+ key("taskmanager.network.hybrid-shuffle.remote.path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The option is used to configure the base path of
remote storage for the new mode of hybrid shuffle. The new shuffle mode will
store"
+ + "the shuffle data in remote storage when
the disk space is not enough."
+ + "If the option is configured and the new
shuffle mode is disabled, this option will be ignored."
+ + "If the option is not configured and the
new shuffle mode is enabled, the remote storage will be disabled in new mode");
+
+ /** The option to enable the new mode of hybrid shuffle. */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ @Experimental
+ public static final ConfigOption<Boolean>
NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE =
+
ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-new-mode")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "The option is used to enable the new mode of
hybrid shuffle. The following are the differences between new mode and legacy
mode:"
+ + "1. Execution efficiency: the new mode
has optimized efficiency and will be faster than the legacy mode in many cases."
Review Comment:
Are we sure about this? What cases exactly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]