jiangxin369 commented on code in PR #23957:
URL: https://github.com/apache/flink/pull/23957#discussion_r1444374140
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -142,6 +155,8 @@ public void setup(BufferPool bufferPool,
List<TieredStorageMemorySpec> storageMe
!tieredMemorySpecs.containsKey(memorySpec.getOwner()),
"Duplicated memory spec.");
tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec);
+ numReservedBuffersForDefinitelyRecycledOwners +=
+ memorySpec.isDefinitelyRecycled() ?
memorySpec.getNumGuaranteedBuffers() : 0;
Review Comment:
I don't think we have to, ensuring capacity before requesting
non-reclaimable buffers is enough. Besides,
1. A memory manager without non-guaranteed reclaimable buffers doesn't need
to ensure capacity at all.
2. Ensuring capacity upon setting up needs to acquire dozens of buffers at
once, Ensuring capacity at runtime may be more smooth.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java:
##########
@@ -201,6 +203,43 @@ void testMetricsUpdateForBroadcastOnlyResultPartition()
throws Exception {
}
}
+ @Test
+ void testRequestBuffersAfterPoolSizeDecreased() throws IOException {
+ final int numBuffers = 15;
+ BufferPool bufferPool = globalPool.createBufferPool(1, numBuffers);
+ TieredResultPartition resultPartition =
+ createTieredStoreResultPartitionWithStorageManager(2,
bufferPool, false);
+ ResultSubpartitionView subpartitionView =
+ resultPartition.createSubpartitionView(0, new
NoOpBufferAvailablityListener());
+
+ // Emits some records to fill up 10 buffers, including all 8 of the
memory tiers and 2 of
+ // the disk tiers.
+ for (int i = 0; i < 10; i++) {
+
resultPartition.emitRecord(ByteBuffer.allocate(NETWORK_BUFFER_SIZE), 0);
+ }
+
+ IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot();
+ assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1);
+ ResultPartitionBytes partitionBytes =
+ ioMetrics.getResultPartitionBytes().values().iterator().next();
+ assertThat(partitionBytes.getSubpartitionBytes()).contains((long) 10 *
NETWORK_BUFFER_SIZE);
+
+ // Halve the size of the buffer pool, then emit another piece of data.
+ bufferPool.setNumBuffers(numBuffers / 2);
+ resultPartition.emitRecord(ByteBuffer.allocate(NETWORK_BUFFER_SIZE),
0);
+
+ ioMetrics = taskIOMetricGroup.createSnapshot();
+ partitionBytes =
ioMetrics.getResultPartitionBytes().values().iterator().next();
+ assertThat(partitionBytes.getSubpartitionBytes()).contains((long) 11 *
NETWORK_BUFFER_SIZE);
+
+ subpartitionView.releaseAllResources();
+
+ assertThat(bufferPool.bestEffortGetNumOfUsedBuffers())
+ .isEqualTo(2 * (resultPartition.getNumberOfSubpartitions() +
1) + 1);
+
+ resultPartition.release();
+ }
Review Comment:
The purpose is to test the behavior of requesting buffers when buffers are
occupied by the memory tier and at that time the pool size is decreased.
Indeed it is not easy to understand, I'll refactor it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]