xintongsong commented on code in PR #23851:
URL: https://github.com/apache/flink/pull/23851#discussion_r1461707655
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java:
##########
@@ -28,27 +31,35 @@
*/
public class CreditBasedInputBuffersUsageGauge extends
AbstractBuffersUsageGauge {
- private final FloatingBuffersUsageGauge floatingBuffersUsageGauge;
- private final ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;
-
- public CreditBasedInputBuffersUsageGauge(
- FloatingBuffersUsageGauge floatingBuffersUsageGauge,
- ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
- SingleInputGate[] inputGates) {
+ public CreditBasedInputBuffersUsageGauge(SingleInputGate[] inputGates) {
Review Comment:
It seems all these `AbstractBuffersUsageGauge` / `calculateUsedBuffers` /
`calculateTotalBuffers` things are for calculating
`AbstractBuffersUsageGauge#getValue`.
1. Do we still need `FloatingBuffersUsageGauge` /
`ExclusiveBuffersUsageGauge` at all?
2. Can we simply calculate the usage from used / total buffer of the pool?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -286,13 +312,13 @@ public int getMaxNumberOfMemorySegments() {
*
* @return the same value as {@link #getMaxNumberOfMemorySegments()} for
bounded pools. For
* unbounded pools it returns an approximation based upon {@link
- * #getNumberOfRequiredMemorySegments()}
+ * #getExpectedNumberOfMemorySegments()}
*/
public int getEstimatedNumberOfRequestedMemorySegments() {
if (maxNumberOfMemorySegments < NetworkBufferPool.UNBOUNDED_POOL_SIZE)
{
return maxNumberOfMemorySegments;
} else {
- return getNumberOfRequiredMemorySegments() * 2;
+ return getMinNumberOfMemorySegments() * 2;
Review Comment:
This min-times-2 estimation don't really make sense. I'd suggest to simply
use the expected number-of-segments.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java:
##########
@@ -600,73 +607,97 @@ private void redistributeBuffers() {
}
// All buffers, which are not among the required ones
- final int numAvailableMemorySegment = totalNumberOfMemorySegments -
numTotalRequiredBuffers;
+ int numAvailableMemorySegment = totalNumberOfMemorySegments -
numTotalRequiredBuffers;
if (numAvailableMemorySegment == 0) {
// in this case, we need to redistribute buffers so that every
pool gets its minimum
for (LocalBufferPool bufferPool : resizableBufferPools) {
-
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
+
bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments());
}
return;
}
- /*
- * With buffer pools being potentially limited, let's distribute the
available memory
- * segments based on the capacity of each buffer pool, i.e. the
maximum number of segments
- * an unlimited buffer pool can take is numAvailableMemorySegment, for
limited buffer pools
- * it may be less. Based on this and the sum of all these values
(totalCapacity), we build
- * a ratio that we use to distribute the buffers.
- */
+ Map<LocalBufferPool, Integer> cachedPoolSize =
+ resizableBufferPools.stream()
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+
LocalBufferPool::getMinNumberOfMemorySegments));
+
+ while (true) {
+ int remaining = redistributeBuffers(numAvailableMemorySegment,
cachedPoolSize);
+
+ // Stop the loop iteration when there is no remaining segments or
all local buffer pools
+ // have reached the max number.
+ if (remaining == 0 || remaining == numAvailableMemorySegment) {
+ for (LocalBufferPool bufferPool : resizableBufferPools) {
+ bufferPool.setNumBuffers(
+ cachedPoolSize.getOrDefault(
+ bufferPool,
bufferPool.getMinNumberOfMemorySegments()));
+ }
+ break;
+ }
+ numAvailableMemorySegment = remaining;
+ }
+ }
- long totalCapacity = 0; // long to avoid int overflow
+ /**
+ * @param numBuffersToRedistribute the buffers to be redistributed.
+ * @param cachedPoolSize the map to cache the intermediate result.
+ * @return the remaining buffers that can continue to be redistributed.
+ */
+ private int redistributeBuffers(
Review Comment:
I don't really get the calculation logics here. Shouldn't we use the
expected-number-of-buffers as the weight?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferAccumulator.java:
##########
@@ -94,29 +99,58 @@ public class SortBufferAccumulator implements
BufferAccumulator {
@Nullable
private TriConsumer<TieredStorageSubpartitionId, Buffer, Integer>
accumulatedBufferFlusher;
+ /**
+ * An executor to periodically check the size of buffer pool. If the size
is changed, the
+ * accumulated buffers should be flushed to release the buffers.
+ */
+ private final ScheduledExecutorService periodicalAccumulatorFlusher =
+ Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorThreadFactory("hybrid-shuffle-periodical-accumulator-flusher"));
+
+ private final long poolSizeCheckInterval;
+
+ private AtomicInteger poolSize;
+
/** Whether the current {@link DataBuffer} is a broadcast sort buffer. */
private boolean isBroadcastDataBuffer;
public SortBufferAccumulator(
int numSubpartitions,
- int numBuffers,
+ int numExpectedBuffers,
int bufferSizeBytes,
+ long poolSizeCheckInterval,
TieredStorageMemoryManager memoryManager,
boolean isPartialRecordAllowed) {
this.numSubpartitions = numSubpartitions;
this.bufferSizeBytes = bufferSizeBytes;
- this.numBuffers = numBuffers;
+ this.numExpectedBuffers = numExpectedBuffers;
+ this.poolSizeCheckInterval = poolSizeCheckInterval;
this.memoryManager = memoryManager;
this.isPartialRecordAllowed = isPartialRecordAllowed;
}
@Override
public void setup(TriConsumer<TieredStorageSubpartitionId, Buffer,
Integer> bufferFlusher) {
this.accumulatedBufferFlusher = bufferFlusher;
+ this.poolSize = new AtomicInteger(memoryManager.getBufferPoolSize());
+
+ if (poolSizeCheckInterval > 0) {
+ periodicalAccumulatorFlusher.scheduleAtFixedRate(
+ () -> {
+ int newSize = this.memoryManager.getBufferPoolSize();
+ int oldSize = poolSize.getAndSet(newSize);
+ if (oldSize != newSize) {
Review Comment:
Not sure about always flush if `oldSize != newSize`. What if the pool size
increases, or decreases but still have lots of spaces?
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -435,6 +438,17 @@ public class NettyShuffleEnvironmentOptions {
+ HYBRID_SHUFFLE_NEW_MODE_OPTION_NAME
+ " is true, the remote storage will be
disabled.");
+ /** The option to enable the memory-safe mode of hybrid shuffle. */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ @Experimental
+ public static final ConfigOption<Boolean>
NETWORK_HYBRID_SHUFFLE_ENABLE_MEMORY_SAFE_MODE =
+ key(HYBRID_SHUFFLE_MEMORY_SAFE_MODE_OPTION_NAME)
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "The option is used to enable the memory-safe mode
of hybrid shuffle, which"
+ + "makes the shuffle can work with a
little memory regardless of the job topology and how many task are running on
the task manager.");
+
Review Comment:
Should warn users about the potential performance impact.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -180,38 +191,48 @@ public class LocalBufferPool implements BufferPool {
* with a minimal and maximal number of network buffers being available.
*
* @param networkBufferPool global network buffer pool to get buffers from
- * @param numberOfRequiredMemorySegments minimum number of network buffers
+ * @param expectedNumberOfMemorySegments expected number of network buffers
+ * @param minNumberOfMemorySegments minimum number of network buffers
* @param maxNumberOfMemorySegments maximum number of network buffers to
allocate
* @param numberOfSubpartitions number of subpartitions
* @param maxBuffersPerChannel maximum number of buffers to use for each
channel
* @param maxOverdraftBuffersPerGate maximum number of overdraft buffers
to use for each gate
*/
LocalBufferPool(
NetworkBufferPool networkBufferPool,
- int numberOfRequiredMemorySegments,
+ int expectedNumberOfMemorySegments,
+ int minNumberOfMemorySegments,
int maxNumberOfMemorySegments,
int numberOfSubpartitions,
int maxBuffersPerChannel,
int maxOverdraftBuffersPerGate) {
checkArgument(
- numberOfRequiredMemorySegments > 0,
- "Required number of memory segments (%s) should be larger than
0.",
- numberOfRequiredMemorySegments);
+ minNumberOfMemorySegments > 0,
+ "Minimum number of memory segments (%s) should be larger than
0.",
+ minNumberOfMemorySegments);
checkArgument(
- maxNumberOfMemorySegments >= numberOfRequiredMemorySegments,
- "Maximum number of memory segments (%s) should not be smaller
than minimum (%s).",
+ expectedNumberOfMemorySegments >= minNumberOfMemorySegments,
+ "Minimum number of memory segments (%s) should not be larger
than expected (%s).",
+ minNumberOfMemorySegments,
+ expectedNumberOfMemorySegments);
+
+ checkArgument(
+ maxNumberOfMemorySegments >= expectedNumberOfMemorySegments,
+ "Maximum number of memory segments (%s) should not be smaller
than expected (%s).",
maxNumberOfMemorySegments,
- numberOfRequiredMemorySegments);
+ expectedNumberOfMemorySegments);
LOG.debug(
- "Using a local buffer pool with {}-{} buffers",
- numberOfRequiredMemorySegments,
+ "Using a local buffer pool with {}-{}-{} buffers",
Review Comment:
The meaning of each number is unclear.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -79,7 +79,9 @@ public TieredStorageProducerClient(
this.currentSubpartitionTierAgent = new
TierProducerAgent[numSubpartitions];
Arrays.fill(currentSubpartitionSegmentId, -1);
+ }
+ public void setUp() {
Review Comment:
Why do we need this separate `setup`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -153,6 +153,9 @@ public SingleInputGate create(
igdd.getConsumedPartitionType().isHybridResultPartition()
&& tieredStorageConfiguration != null;
+ boolean enableTieredStorage = tieredStorageConfiguration != null;
+ boolean enableMemorySafeMode =
Review Comment:
Never used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]