TanYuxin-tyx commented on code in PR #22975:
URL: https://github.com/apache/flink/pull/22975#discussion_r1269371029
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -83,6 +106,13 @@ public class ResultPartitionFactory {
private final int maxOverdraftBuffersPerGate;
+ // The following attributes will be null if tiered storage shuffle is
disabled.
Review Comment:
Use /* */ doc instead here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -101,9 +101,10 @@ public class SingleInputGateFactory {
private final BufferDebloatConfiguration debloatConfiguration;
- @Nullable
- private final TieredStorageConfiguration
- tieredStorageConfiguration; // is null if tiered storage shuffle
is disabled.
+ // The following attributes will be null if tiered storage shuffle is
disabled.
Review Comment:
Use `/* */` instead.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -266,6 +357,74 @@ private HybridShuffleConfiguration
getHybridShuffleConfiguration(
.build();
}
+ private BufferAccumulator createBufferAccumulator(
+ int numSubpartitions,
+ int accumulatorExclusiveBufferNum,
Review Comment:
ditto, `numAccumulatorExclusiveBuffers`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -227,23 +263,78 @@ public ResultPartition create(
}
} else if (type == ResultPartitionType.HYBRID_FULL
|| type == ResultPartitionType.HYBRID_SELECTIVE) {
- partition =
- new HsResultPartition(
- taskNameWithSubtaskAndId,
- partitionIndex,
- id,
- type,
- subpartitions.length,
- maxParallelism,
- batchShuffleReadBufferPool,
- batchShuffleReadIOExecutor,
- partitionManager,
- channelManager.createChannel().getPath(),
- networkBufferSize,
-
getHybridShuffleConfiguration(numberOfSubpartitions, type),
- bufferCompressor,
- isBroadcast,
- bufferPoolFactory);
+ if (tieredStorageConfiguration != null) {
+ // Create memory manager.
+ TieredStorageMemoryManager memoryManager =
+ new TieredStorageMemoryManagerImpl(
+ checkNotNull(tieredStorageConfiguration)
+ .getNumBuffersTriggerFlushRatio(),
+ true);
+
+ // Create buffer accumulator.
+ int accumulatorExclusiveBufferNum =
Review Comment:
Maybe use `numAccumulatorExclusiveBuffers` to replace
`accumulatorExclusiveBufferNum`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -18,58 +18,435 @@
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
-import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierFactory;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.RemoteTierFactory;
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
/** Configurations for the Tiered Storage. */
public class TieredStorageConfiguration {
- // TODO, after implementing the tier factory, add appreciate
implementations to the array.
- private static final TierFactory[] DEFAULT_MEMORY_DISK_TIER_FACTORIES =
new TierFactory[0];
+ private static final String DEFAULT_REMOTE_STORAGE_BASE_PATH = null;
+
+ private static final int DEFAULT_TIERED_STORAGE_BUFFER_SIZE = 32 * 1024;
+
+ private static final int DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS = 100;
+
+ private static final int DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS = 1;
+
+ private static final int DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS = 1;
+
+ private static final int
DEFAULT_NUM_BUFFERS_USE_SORT_ACCUMULATOR_THRESHOLD = 512;
+
+ private static final int DEFAULT_MEMORY_TIER_NUM_BYTES_PER_SEGMENT = 320 *
1024;
+
+ private static final int DEFAULT_DISK_TIER_NUM_BYTES_PER_SEGMENT = 8 *
1024 * 1024;
+
+ private static final int DEFAULT_REMOTE_TIER_NUM_BYTES_PER_SEGMENT = 8 *
1024 * 1024;
+
+ private static final float DEFAULT_NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.5f;
Review Comment:
Here I have a concern.
Maybe 0.5 is too low to trigger the flush process, we'd better change to at
least 0.6 or more. Because too less a trigger ratio will lead to generating too
much little size regions, which is bad for sequential reading performance.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -227,23 +263,78 @@ public ResultPartition create(
}
} else if (type == ResultPartitionType.HYBRID_FULL
|| type == ResultPartitionType.HYBRID_SELECTIVE) {
- partition =
- new HsResultPartition(
- taskNameWithSubtaskAndId,
- partitionIndex,
- id,
- type,
- subpartitions.length,
- maxParallelism,
- batchShuffleReadBufferPool,
- batchShuffleReadIOExecutor,
- partitionManager,
- channelManager.createChannel().getPath(),
- networkBufferSize,
-
getHybridShuffleConfiguration(numberOfSubpartitions, type),
- bufferCompressor,
- isBroadcast,
- bufferPoolFactory);
+ if (tieredStorageConfiguration != null) {
+ // Create memory manager.
+ TieredStorageMemoryManager memoryManager =
+ new TieredStorageMemoryManagerImpl(
+ checkNotNull(tieredStorageConfiguration)
+ .getNumBuffersTriggerFlushRatio(),
+ true);
+
+ // Create buffer accumulator.
+ int accumulatorExclusiveBufferNum =
Review Comment:
Maybe use `numAccumulatorExclusiveBuffers` to replace
`accumulatorExclusiveBufferNum`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]