TanYuxin-tyx commented on code in PR #22280:
URL: https://github.com/apache/flink/pull/22280#discussion_r1150216597
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java:
##########
@@ -392,6 +395,46 @@ void testCompareTo(@TempDir Path tempPath) throws
Exception {
assertThat(fileReader1).isGreaterThan(fileReader2);
}
+ @Test
+ void testConsumeBufferRecycleBuffersIndexLessThanExpected() throws
Throwable {
+ TestingSubpartitionConsumerInternalOperation viewNotifier =
+ new TestingSubpartitionConsumerInternalOperation();
+ HsSubpartitionFileReaderImpl subpartitionFileReader =
+ createSubpartitionFileReader(0, viewNotifier);
+ writeDataToFile(0, 0, 0, 4);
+
+ Queue<MemorySegment> memorySegments = createsMemorySegments(4);
+ subpartitionFileReader.prepareForScheduling();
+ // trigger reading, add buffer to queue.
+ subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
+ ArrayList<Buffer> buffers = new ArrayList<>();
+ // expected buffer index is 2, buffer 0 & 1 should be recycled.
+ subpartitionFileReader.consumeBuffer(2, buffers);
+ assertThat(buffers).hasSize(2);
+ assertThat(buffers).element(0).satisfies((buffer ->
assertBufferContentEqualTo(buffer, 0)));
+ assertThat(buffers).element(1).satisfies((buffer ->
assertBufferContentEqualTo(buffer, 1)));
+ }
+
+ @Test
+ void testPeekNextToConsumeDataTypeRecycleBuffersIndexLessThanExpected()
throws Throwable {
Review Comment:
The test method name is too long to express the meaning, I think we can
simplify and improve it.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java:
##########
@@ -91,7 +92,8 @@ public boolean addBuffer(HsBufferContext bufferContext) {
// Note that: callWithLock ensure that code block guarded by
resultPartitionReadLock and
// subpartitionLock.
@Override
- public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int
toConsumeIndex) {
+ public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(
Review Comment:
The method annotations should also add the new argument description
`buffersToRecycle`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java:
##########
@@ -392,6 +395,46 @@ void testCompareTo(@TempDir Path tempPath) throws
Exception {
assertThat(fileReader1).isGreaterThan(fileReader2);
}
+ @Test
+ void testConsumeBufferRecycleBuffersIndexLessThanExpected() throws
Throwable {
+ TestingSubpartitionConsumerInternalOperation viewNotifier =
+ new TestingSubpartitionConsumerInternalOperation();
+ HsSubpartitionFileReaderImpl subpartitionFileReader =
+ createSubpartitionFileReader(0, viewNotifier);
+ writeDataToFile(0, 0, 0, 4);
+
+ Queue<MemorySegment> memorySegments = createsMemorySegments(4);
+ subpartitionFileReader.prepareForScheduling();
+ // trigger reading, add buffer to queue.
+ subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
+ ArrayList<Buffer> buffers = new ArrayList<>();
+ // expected buffer index is 2, buffer 0 & 1 should be recycled.
+ subpartitionFileReader.consumeBuffer(2, buffers);
+ assertThat(buffers).hasSize(2);
+ assertThat(buffers).element(0).satisfies((buffer ->
assertBufferContentEqualTo(buffer, 0)));
+ assertThat(buffers).element(1).satisfies((buffer ->
assertBufferContentEqualTo(buffer, 1)));
+ }
+
+ @Test
+ void testPeekNextToConsumeDataTypeRecycleBuffersIndexLessThanExpected()
throws Throwable {
Review Comment:
Maybe `testRecycleBuffersForPeekNextToConsumeDataType`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java:
##########
@@ -251,6 +273,18 @@ public Buffer.DataType peekNextToConsumeDataType(int
nextBufferToConsume) {
@Override
public void releaseDataView() {
+ Queue<Buffer> bufferToRecycle = new ArrayDeque<>();
+ synchronized (lock) {
+ isReleased = true;
+ // all loaded buffers should be recycled after data view released.
+ while (!loadedBuffers.isEmpty()) {
+ BufferIndexOrError bufferIndexOrError = loadedBuffers.poll();
Review Comment:
I don't clearly get the idea why we add the to-recycled buffer to a new
Queue and then recycle them. Why not recycle each buffer directly. Is this
because of the lock? If true, maybe we can add some comments.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java:
##########
@@ -392,6 +395,46 @@ void testCompareTo(@TempDir Path tempPath) throws
Exception {
assertThat(fileReader1).isGreaterThan(fileReader2);
}
+ @Test
+ void testConsumeBufferRecycleBuffersIndexLessThanExpected() throws
Throwable {
Review Comment:
Maybe `testRecycleBuffersForConsumeBuffer`? Or use a more meaningful method
name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]