Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4581#discussion_r152827018
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
---
@@ -300,6 +315,230 @@ public void
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
+ /**
+ * Tests {@link SpillableSubpartition#add(Buffer)} with a spillable
finished partition.
+ */
+ @Test
+ public void testAddOnFinishedSpillablePartition() throws Exception {
+ SpillableSubpartition partition = createSubpartition();
+ partition.finish();
+
+ Buffer buffer = new
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+ FreeingBufferRecycler.INSTANCE);
+ try {
+ partition.add(buffer);
+ } finally {
+ if (!buffer.isRecycled()) {
+ Assert.fail("buffer not recycled");
+ buffer.recycle();
+ }
+ // finish adds an EndOfPartitionEvent
+ assertEquals(1, partition.getTotalNumberOfBuffers());
+ assertEquals(4, partition.getTotalNumberOfBytes());
+ }
+ }
+
+ /**
+ * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled
finished partition.
+ */
+ @Test
+ public void testAddOnFinishedSpilledPartition() throws Exception {
+ SpillableSubpartition partition = createSubpartition();
+ assertEquals(0, partition.releaseMemory());
+ partition.finish();
+
+ Buffer buffer = new
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+ FreeingBufferRecycler.INSTANCE);
+ try {
+ partition.add(buffer);
+ } finally {
+ if (!buffer.isRecycled()) {
+ Assert.fail("buffer not recycled");
+ buffer.recycle();
+ }
+ // finish adds an EndOfPartitionEvent
+ assertEquals(1, partition.getTotalNumberOfBuffers());
+ assertEquals(4, partition.getTotalNumberOfBytes());
+ }
+ }
+
+ /**
+ * Tests {@link SpillableSubpartition#add(Buffer)} with a spillable
released partition.
+ */
+ @Test
+ public void testAddOnReleasedSpillablePartition() throws Exception {
+ SpillableSubpartition partition = createSubpartition();
+ partition.release();
+
+ Buffer buffer = new
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+ FreeingBufferRecycler.INSTANCE);
+ try {
+ partition.add(buffer);
+ } finally {
+ if (!buffer.isRecycled()) {
+ Assert.fail("buffer not recycled");
+ buffer.recycle();
+ }
+ assertEquals(0, partition.getTotalNumberOfBuffers());
+ assertEquals(0, partition.getTotalNumberOfBytes());
+ }
+ }
+
+ /**
+ * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled
released partition.
+ */
+ @Test
+ public void testAddOnReleasedSpilledPartition() throws Exception {
+ SpillableSubpartition partition = createSubpartition();
+ partition.release();
+ assertEquals(0, partition.releaseMemory());
+
+ Buffer buffer = new
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+ FreeingBufferRecycler.INSTANCE);
+ try {
+ partition.add(buffer);
+ } finally {
+ if (!buffer.isRecycled()) {
+ Assert.fail("buffer not recycled");
+ buffer.recycle();
+ }
+ assertEquals(0, partition.getTotalNumberOfBuffers());
+ assertEquals(0, partition.getTotalNumberOfBytes());
+ }
+ }
+
+ /**
+ * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled
partition where adding the
+ * write request fails with an exception.
+ */
+ @Test
+ public void testAddOnSpilledPartitionWithSlowWriter() throws Exception {
+ IOManager ioManager = new
IOManagerAsyncWithStallingBufferFileWriter();
+ SpillableSubpartition partition = createSubpartition(ioManager);
+ assertEquals(0, partition.releaseMemory());
+
+ Buffer buffer = new
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+ FreeingBufferRecycler.INSTANCE);
+ try {
+ partition.add(buffer);
+ } finally {
+ ioManager.shutdown();
--- End diff --
shouldn't this test assert for some conditions before shutdown? Otherwise
it might not be testing what you are wishing for.
---