Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4581#discussion_r152813605
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
---
@@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws
Exception {
Buffer read = reader.getNextBuffer();
assertNotNull(read);
+ assertNotSame(buffer, read);
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
read = reader.getNextBuffer();
assertNotNull(read);
+ assertNotSame(buffer, read);
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
read = reader.getNextBuffer();
assertNotNull(read);
+ assertNotSame(buffer, read);
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
// End of partition
read = reader.getNextBuffer();
assertNotNull(read);
assertEquals(EndOfPartitionEvent.class,
EventSerializer.fromBuffer(read,
ClassLoader.getSystemClassLoader()).getClass());
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
+
+ // finally check that the buffer has been freed after a
successful (or failed) write
+ final long deadline = System.currentTimeMillis() + 30_000L; //
30 secs
+ while (!buffer.isRecycled() && System.currentTimeMillis() <
deadline) {
+ Thread.sleep(1);
+ }
+ assertTrue(buffer.isRecycled());
--- End diff --
No, it's not recycled in `partition.releaseMemory()` directly (or at least
should not! - which is fixed now). The buffer will be recycled by the
asynchronous writer thread once its content has been written to disk, i.e.
after the `partition.releaseMemory()` call - I included such a check right
before that call but actually, this is of limited use.
---