TanYuxin-tyx commented on code in PR #22280:
URL: https://github.com/apache/flink/pull/22280#discussion_r1150216597


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java:
##########
@@ -392,6 +395,46 @@ void testCompareTo(@TempDir Path tempPath) throws 
Exception {
         assertThat(fileReader1).isGreaterThan(fileReader2);
     }
 
+    @Test
+    void testConsumeBufferRecycleBuffersIndexLessThanExpected() throws 
Throwable {
+        TestingSubpartitionConsumerInternalOperation viewNotifier =
+                new TestingSubpartitionConsumerInternalOperation();
+        HsSubpartitionFileReaderImpl subpartitionFileReader =
+                createSubpartitionFileReader(0, viewNotifier);
+        writeDataToFile(0, 0, 0, 4);
+
+        Queue<MemorySegment> memorySegments = createsMemorySegments(4);
+        subpartitionFileReader.prepareForScheduling();
+        // trigger reading, add buffer to queue.
+        subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
+        ArrayList<Buffer> buffers = new ArrayList<>();
+        // expected buffer index is 2, buffer 0 & 1 should be recycled.
+        subpartitionFileReader.consumeBuffer(2, buffers);
+        assertThat(buffers).hasSize(2);
+        assertThat(buffers).element(0).satisfies((buffer -> 
assertBufferContentEqualTo(buffer, 0)));
+        assertThat(buffers).element(1).satisfies((buffer -> 
assertBufferContentEqualTo(buffer, 1)));
+    }
+
+    @Test
+    void testPeekNextToConsumeDataTypeRecycleBuffersIndexLessThanExpected() 
throws Throwable {

Review Comment:
   The test method name is too long to express the meaning, I think we can 
simplify and improve it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java:
##########
@@ -91,7 +92,8 @@ public boolean addBuffer(HsBufferContext bufferContext) {
     // Note that: callWithLock ensure that code block guarded by 
resultPartitionReadLock and
     // subpartitionLock.
     @Override
-    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int 
toConsumeIndex) {
+    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(

Review Comment:
   The method annotations should also add the new argument description 
`buffersToRecycle`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java:
##########
@@ -392,6 +395,46 @@ void testCompareTo(@TempDir Path tempPath) throws 
Exception {
         assertThat(fileReader1).isGreaterThan(fileReader2);
     }
 
+    @Test
+    void testConsumeBufferRecycleBuffersIndexLessThanExpected() throws 
Throwable {
+        TestingSubpartitionConsumerInternalOperation viewNotifier =
+                new TestingSubpartitionConsumerInternalOperation();
+        HsSubpartitionFileReaderImpl subpartitionFileReader =
+                createSubpartitionFileReader(0, viewNotifier);
+        writeDataToFile(0, 0, 0, 4);
+
+        Queue<MemorySegment> memorySegments = createsMemorySegments(4);
+        subpartitionFileReader.prepareForScheduling();
+        // trigger reading, add buffer to queue.
+        subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
+        ArrayList<Buffer> buffers = new ArrayList<>();
+        // expected buffer index is 2, buffer 0 & 1 should be recycled.
+        subpartitionFileReader.consumeBuffer(2, buffers);
+        assertThat(buffers).hasSize(2);
+        assertThat(buffers).element(0).satisfies((buffer -> 
assertBufferContentEqualTo(buffer, 0)));
+        assertThat(buffers).element(1).satisfies((buffer -> 
assertBufferContentEqualTo(buffer, 1)));
+    }
+
+    @Test
+    void testPeekNextToConsumeDataTypeRecycleBuffersIndexLessThanExpected() 
throws Throwable {

Review Comment:
   Maybe `testRecycleBuffersForPeekNextToConsumeDataType`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java:
##########
@@ -251,6 +273,18 @@ public Buffer.DataType peekNextToConsumeDataType(int 
nextBufferToConsume) {
 
     @Override
     public void releaseDataView() {
+        Queue<Buffer> bufferToRecycle = new ArrayDeque<>();
+        synchronized (lock) {
+            isReleased = true;
+            // all loaded buffers should be recycled after data view released.
+            while (!loadedBuffers.isEmpty()) {
+                BufferIndexOrError bufferIndexOrError = loadedBuffers.poll();

Review Comment:
   I don't clearly get the idea why we add the to-recycled buffer to a new 
Queue and then recycle them. Why not recycle each buffer directly. Is this 
because of the lock? If true, maybe we can add some comments.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java:
##########
@@ -392,6 +395,46 @@ void testCompareTo(@TempDir Path tempPath) throws 
Exception {
         assertThat(fileReader1).isGreaterThan(fileReader2);
     }
 
+    @Test
+    void testConsumeBufferRecycleBuffersIndexLessThanExpected() throws 
Throwable {

Review Comment:
   Maybe `testRecycleBuffersForConsumeBuffer`? Or use a more meaningful method 
name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to