TanYuxin-tyx commented on code in PR #22280: URL: https://github.com/apache/flink/pull/22280#discussion_r1150216597
########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java: ########## @@ -392,6 +395,46 @@ void testCompareTo(@TempDir Path tempPath) throws Exception { assertThat(fileReader1).isGreaterThan(fileReader2); } + @Test + void testConsumeBufferRecycleBuffersIndexLessThanExpected() throws Throwable { + TestingSubpartitionConsumerInternalOperation viewNotifier = + new TestingSubpartitionConsumerInternalOperation(); + HsSubpartitionFileReaderImpl subpartitionFileReader = + createSubpartitionFileReader(0, viewNotifier); + writeDataToFile(0, 0, 0, 4); + + Queue<MemorySegment> memorySegments = createsMemorySegments(4); + subpartitionFileReader.prepareForScheduling(); + // trigger reading, add buffer to queue. + subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {}); + ArrayList<Buffer> buffers = new ArrayList<>(); + // expected buffer index is 2, buffer 0 & 1 should be recycled. + subpartitionFileReader.consumeBuffer(2, buffers); + assertThat(buffers).hasSize(2); + assertThat(buffers).element(0).satisfies((buffer -> assertBufferContentEqualTo(buffer, 0))); + assertThat(buffers).element(1).satisfies((buffer -> assertBufferContentEqualTo(buffer, 1))); + } + + @Test + void testPeekNextToConsumeDataTypeRecycleBuffersIndexLessThanExpected() throws Throwable { Review Comment: The test method name is too long to express the meaning, I think we can simplify and improve it. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java: ########## @@ -91,7 +92,8 @@ public boolean addBuffer(HsBufferContext bufferContext) { // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and // subpartitionLock. @Override - public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int toConsumeIndex) { + public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer( Review Comment: The method annotations should also add the new argument description `buffersToRecycle`. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java: ########## @@ -392,6 +395,46 @@ void testCompareTo(@TempDir Path tempPath) throws Exception { assertThat(fileReader1).isGreaterThan(fileReader2); } + @Test + void testConsumeBufferRecycleBuffersIndexLessThanExpected() throws Throwable { + TestingSubpartitionConsumerInternalOperation viewNotifier = + new TestingSubpartitionConsumerInternalOperation(); + HsSubpartitionFileReaderImpl subpartitionFileReader = + createSubpartitionFileReader(0, viewNotifier); + writeDataToFile(0, 0, 0, 4); + + Queue<MemorySegment> memorySegments = createsMemorySegments(4); + subpartitionFileReader.prepareForScheduling(); + // trigger reading, add buffer to queue. + subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {}); + ArrayList<Buffer> buffers = new ArrayList<>(); + // expected buffer index is 2, buffer 0 & 1 should be recycled. + subpartitionFileReader.consumeBuffer(2, buffers); + assertThat(buffers).hasSize(2); + assertThat(buffers).element(0).satisfies((buffer -> assertBufferContentEqualTo(buffer, 0))); + assertThat(buffers).element(1).satisfies((buffer -> assertBufferContentEqualTo(buffer, 1))); + } + + @Test + void testPeekNextToConsumeDataTypeRecycleBuffersIndexLessThanExpected() throws Throwable { Review Comment: Maybe `testRecycleBuffersForPeekNextToConsumeDataType`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java: ########## @@ -251,6 +273,18 @@ public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) { @Override public void releaseDataView() { + Queue<Buffer> bufferToRecycle = new ArrayDeque<>(); + synchronized (lock) { + isReleased = true; + // all loaded buffers should be recycled after data view released. + while (!loadedBuffers.isEmpty()) { + BufferIndexOrError bufferIndexOrError = loadedBuffers.poll(); Review Comment: I don't clearly get the idea why we add the to-recycled buffer to a new Queue and then recycle them. Why not recycle each buffer directly. Is this because of the lock? If true, maybe we can add some comments. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java: ########## @@ -392,6 +395,46 @@ void testCompareTo(@TempDir Path tempPath) throws Exception { assertThat(fileReader1).isGreaterThan(fileReader2); } + @Test + void testConsumeBufferRecycleBuffersIndexLessThanExpected() throws Throwable { Review Comment: Maybe `testRecycleBuffersForConsumeBuffer`? Or use a more meaningful method name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org