xintongsong commented on code in PR #23957:
URL: https://github.com/apache/flink/pull/23957#discussion_r1441269144


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemorySpec.java:
##########
@@ -30,9 +30,21 @@ public class TieredStorageMemorySpec {
     /** The number of guaranteed buffers of this memory owner. */
     private final int numGuaranteedBuffers;
 
+    /**
+     * Whether the buffers of this owner can definitely be recycled, even if 
the downstream does not
+     * consume them promptly.
+     */
+    private final boolean definitelyRecycled;

Review Comment:
   1. I'd suggest the name `guaranteedRecyclable`.
   2. I'm not entirely sure about whether buffers from a tier can always be 
described as all recyclable or non-recyclable. Would it be possible that a tier 
contains both recyclable and non-recyclable buffers?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -91,6 +94,15 @@ public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManage
      */
     private final Map<Object, Integer> numOwnerRequestedBuffers;
 
+    /**
+     * The queue that contains all available buffers. This field should be 
thread-safe because it
+     * can be touched both by the task thread and the netty thread.
+     */
+    private final BlockingQueue<MemorySegment> bufferQueue;

Review Comment:
   How is the thread safety achieved?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -244,6 +269,63 @@ public void release() {
                 ExceptionUtils.rethrow(e);
             }
         }
+        while (!bufferQueue.isEmpty()) {
+            MemorySegment segment = bufferQueue.poll();
+            bufferPool.recycle(segment);
+            numRequestedBuffers.decrementAndGet();
+        }
+    }
+
+    @VisibleForTesting

Review Comment:
   This usually means we are doing white-box tests, rather than black-box.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -142,6 +155,8 @@ public void setup(BufferPool bufferPool, 
List<TieredStorageMemorySpec> storageMe
                     !tieredMemorySpecs.containsKey(memorySpec.getOwner()),
                     "Duplicated memory spec.");
             tieredMemorySpecs.put(memorySpec.getOwner(), memorySpec);
+            numReservedBuffersForDefinitelyRecycledOwners +=
+                    memorySpec.isDefinitelyRecycled() ? 
memorySpec.getNumGuaranteedBuffers() : 0;

Review Comment:
   Shouldn't we ensure capacity upon setting up?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java:
##########
@@ -129,6 +129,7 @@ public boolean tryWrite(
         }
         if (finishedBuffer.isBuffer()) {
             memoryManager.transferBufferOwnership(bufferOwner, this, 
finishedBuffer);
+            memoryManager.ensureCapacity();

Review Comment:
   How could this prevent hanging? IIUC, what `ensureCapacity` does is simply 
to request new buffers if the number of reserved buffers cannot be satisfied. 
However, such request may not be fulfilled, and we still may hang in `tryWrite`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java:
##########
@@ -201,6 +203,43 @@ void testMetricsUpdateForBroadcastOnlyResultPartition() 
throws Exception {
         }
     }
 
+    @Test
+    void testRequestBuffersAfterPoolSizeDecreased() throws IOException {
+        final int numBuffers = 15;
+        BufferPool bufferPool = globalPool.createBufferPool(1, numBuffers);
+        TieredResultPartition resultPartition =
+                createTieredStoreResultPartitionWithStorageManager(2, 
bufferPool, false);
+        ResultSubpartitionView subpartitionView =
+                resultPartition.createSubpartitionView(0, new 
NoOpBufferAvailablityListener());
+
+        // Emits some records to fill up 10 buffers, including all 8 of the 
memory tiers and 2 of
+        // the disk tiers.
+        for (int i = 0; i < 10; i++) {
+            
resultPartition.emitRecord(ByteBuffer.allocate(NETWORK_BUFFER_SIZE), 0);
+        }
+
+        IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot();
+        assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1);
+        ResultPartitionBytes partitionBytes =
+                ioMetrics.getResultPartitionBytes().values().iterator().next();
+        assertThat(partitionBytes.getSubpartitionBytes()).contains((long) 10 * 
NETWORK_BUFFER_SIZE);
+
+        // Halve the size of the buffer pool, then emit another piece of data.
+        bufferPool.setNumBuffers(numBuffers / 2);
+        resultPartition.emitRecord(ByteBuffer.allocate(NETWORK_BUFFER_SIZE), 
0);
+
+        ioMetrics = taskIOMetricGroup.createSnapshot();
+        partitionBytes = 
ioMetrics.getResultPartitionBytes().values().iterator().next();
+        assertThat(partitionBytes.getSubpartitionBytes()).contains((long) 11 * 
NETWORK_BUFFER_SIZE);
+
+        subpartitionView.releaseAllResources();
+
+        assertThat(bufferPool.bestEffortGetNumOfUsedBuffers())
+                .isEqualTo(2 * (resultPartition.getNumberOfSubpartitions() + 
1) + 1);
+
+        resultPartition.release();
+    }

Review Comment:
   The purpose of this case is not very clear to me. What are the expected 
behaviors that we are verifying after decreasing the pool size?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to