[FLINK-9766][network][tests] fix cleanup in RemoteInputChannelTest

If an assertion in the test fails and as a result the cleanup fails, in most
tests the original assertion was swallowed making it hard to debug.

Furthermore, #testConcurrentRecycleAndRelease2() does even not clean up at all
if successful.

This closes #6271


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/598e4604
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/598e4604
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/598e4604

Branch: refs/heads/master
Commit: 598e4604440bc327e64c747f66470a0d65609f70
Parents: a154dd5
Author: Nico Kruber <n...@data-artisans.com>
Authored: Thu Jul 5 15:49:15 2018 +0200
Committer: Nico Kruber <n...@data-artisans.com>
Committed: Thu Jul 12 11:46:15 2018 +0200

----------------------------------------------------------------------
 .../consumer/RemoteInputChannelTest.java        | 136 ++++++++++---------
 1 file changed, 70 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/598e4604/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 4080106..6c6fd96 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -39,6 +39,8 @@ import 
org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -53,7 +55,6 @@ import scala.Tuple2;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -329,6 +330,7 @@ public class RemoteInputChannelTest {
                final SingleInputGate inputGate = createSingleInputGate();
                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+               Throwable thrown = null;
                try {
                        final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
                        inputGate.setBufferPool(bufferPool);
@@ -447,13 +449,10 @@ public class RemoteInputChannelTest {
                        assertEquals("There should be 0 buffers available in 
local pool",
                                0, 
bufferPool.getNumberOfAvailableMemorySegments());
                        assertTrue(inputChannel.isWaitingForFloatingBuffers());
-
+               } catch (Throwable t) {
+                       thrown = t;
                } finally {
-                       // Release all the buffer resources
-                       inputChannel.releaseAllResources();
-
-                       networkBufferPool.destroyAllBufferPools();
-                       networkBufferPool.destroy();
+                       cleanup(networkBufferPool, null, thrown, inputChannel);
                }
        }
 
@@ -471,6 +470,7 @@ public class RemoteInputChannelTest {
                final SingleInputGate inputGate = createSingleInputGate();
                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+               Throwable thrown = null;
                try {
                        final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
                        inputGate.setBufferPool(bufferPool);
@@ -525,13 +525,10 @@ public class RemoteInputChannelTest {
                                14, inputChannel.getNumberOfRequiredBuffers());
                        assertEquals("There should be 2 buffers available in 
local pool",
                                2, 
bufferPool.getNumberOfAvailableMemorySegments());
-
+               } catch (Throwable t) {
+                       thrown = t;
                } finally {
-                       // Release all the buffer resources
-                       inputChannel.releaseAllResources();
-
-                       networkBufferPool.destroyAllBufferPools();
-                       networkBufferPool.destroy();
+                       cleanup(networkBufferPool, null, thrown, inputChannel);
                }
        }
 
@@ -549,6 +546,7 @@ public class RemoteInputChannelTest {
                final SingleInputGate inputGate = createSingleInputGate();
                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+               Throwable thrown = null;
                try {
                        final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
                        inputGate.setBufferPool(bufferPool);
@@ -617,13 +615,10 @@ public class RemoteInputChannelTest {
                                12, inputChannel.getNumberOfRequiredBuffers());
                        assertEquals("There should be 2 buffers available in 
local pool",
                                2, 
bufferPool.getNumberOfAvailableMemorySegments());
-
+               } catch (Throwable t) {
+                       thrown = t;
                } finally {
-                       // Release all the buffer resources
-                       inputChannel.releaseAllResources();
-
-                       networkBufferPool.destroyAllBufferPools();
-                       networkBufferPool.destroy();
+                       cleanup(networkBufferPool, null, thrown, inputChannel);
                }
        }
 
@@ -645,6 +640,7 @@ public class RemoteInputChannelTest {
                
inputGate.setInputChannel(channel1.partitionId.getPartitionId(), channel1);
                
inputGate.setInputChannel(channel2.partitionId.getPartitionId(), channel2);
                
inputGate.setInputChannel(channel3.partitionId.getPartitionId(), channel3);
+               Throwable thrown = null;
                try {
                        final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
                        inputGate.setBufferPool(bufferPool);
@@ -688,15 +684,10 @@ public class RemoteInputChannelTest {
                        assertEquals("There should be 3 buffers available in 
the channel", 3, channel1.getNumberOfAvailableBuffers());
                        assertEquals("There should be 3 buffers available in 
the channel", 3, channel2.getNumberOfAvailableBuffers());
                        assertEquals("There should be 3 buffers available in 
the channel", 3, channel3.getNumberOfAvailableBuffers());
-
+               } catch (Throwable t) {
+                       thrown = t;
                } finally {
-                       // Release all the buffer resources
-                       channel1.releaseAllResources();
-                       channel2.releaseAllResources();
-                       channel3.releaseAllResources();
-
-                       networkBufferPool.destroyAllBufferPools();
-                       networkBufferPool.destroy();
+                       cleanup(networkBufferPool, null, thrown, channel1, 
channel2, channel3);
                }
        }
 
@@ -717,6 +708,7 @@ public class RemoteInputChannelTest {
                final SingleInputGate inputGate = createSingleInputGate();
                final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+               Throwable thrown = null;
                try {
                        final BufferPool bufferPool = 
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
                        inputGate.setBufferPool(bufferPool);
@@ -754,17 +746,10 @@ public class RemoteInputChannelTest {
                                0, inputChannel.getNumberOfAvailableBuffers());
                        assertEquals("There should be 130 buffers available in 
local pool.",
                                130, 
bufferPool.getNumberOfAvailableMemorySegments() + 
networkBufferPool.getNumberOfAvailableMemorySegments());
-
+               } catch (Throwable t) {
+                       thrown = t;
                } finally {
-                       // Release all the buffer resources once exception
-                       if (!inputChannel.isReleased()) {
-                               inputChannel.releaseAllResources();
-                       }
-
-                       networkBufferPool.destroyAllBufferPools();
-                       networkBufferPool.destroy();
-
-                       executor.shutdown();
+                       cleanup(networkBufferPool, executor, thrown, 
inputChannel);
                }
        }
 
@@ -786,6 +771,7 @@ public class RemoteInputChannelTest {
                final SingleInputGate inputGate = createSingleInputGate();
                final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+               Throwable thrown = null;
                try {
                        final BufferPool bufferPool = 
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
                        inputGate.setBufferPool(bufferPool);
@@ -813,15 +799,10 @@ public class RemoteInputChannelTest {
                                inputChannel.getNumberOfRequiredBuffers(), 
inputChannel.getNumberOfAvailableBuffers());
                        assertEquals("There should be no buffers available in 
local pool.",
                                0, 
bufferPool.getNumberOfAvailableMemorySegments());
-
+               } catch (Throwable t) {
+                       thrown = t;
                } finally {
-                       // Release all the buffer resources
-                       inputChannel.releaseAllResources();
-
-                       networkBufferPool.destroyAllBufferPools();
-                       networkBufferPool.destroy();
-
-                       executor.shutdown();
+                       cleanup(networkBufferPool, executor, thrown, 
inputChannel);
                }
        }
 
@@ -842,6 +823,7 @@ public class RemoteInputChannelTest {
                final SingleInputGate inputGate = createSingleInputGate();
                final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+               Throwable thrown = null;
                try {
                        final BufferPool bufferPool = 
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
                        inputGate.setBufferPool(bufferPool);
@@ -869,17 +851,10 @@ public class RemoteInputChannelTest {
                                numFloatingBuffers, 
bufferPool.getNumberOfAvailableMemorySegments());
                        assertEquals("There should be " + numExclusiveSegments 
+ " buffers available in global pool.",
                                numExclusiveSegments, 
networkBufferPool.getNumberOfAvailableMemorySegments());
-
+               } catch (Throwable t) {
+                       thrown = t;
                } finally {
-                       // Release all the buffer resources once exception
-                       if (!inputChannel.isReleased()) {
-                               inputChannel.releaseAllResources();
-                       }
-
-                       networkBufferPool.destroyAllBufferPools();
-                       networkBufferPool.destroy();
-
-                       executor.shutdown();
+                       cleanup(networkBufferPool, executor, thrown, 
inputChannel);
                }
        }
 
@@ -903,6 +878,7 @@ public class RemoteInputChannelTest {
                final SingleInputGate inputGate = createSingleInputGate();
                final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+               Throwable thrown = null;
                try {
                        final BufferPool bufferPool = 
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
                        inputGate.setBufferPool(bufferPool);
@@ -958,17 +934,9 @@ public class RemoteInputChannelTest {
                        submitTasksAndWaitForResults(executor,
                                new Callable[] {bufferPoolInteractionsTask, 
channelInteractionsTask});
                } catch (Throwable t) {
-                       inputChannel.releaseAllResources();
-
-                       try {
-                               networkBufferPool.destroyAllBufferPools();
-                       } catch (Throwable tInner) {
-                               t.addSuppressed(tInner);
-                       }
-
-                       networkBufferPool.destroy();
-                       executor.shutdown();
-                       ExceptionUtils.rethrowException(t);
+                       thrown = t;
+               } finally {
+                       cleanup(networkBufferPool, executor, thrown, 
inputChannel);
                }
        }
 
@@ -1089,4 +1057,40 @@ public class RemoteInputChannelTest {
                        result.get();
                }
        }
+
+       /**
+        * Helper code to ease cleanup handling with suppressed exceptions.
+        */
+       private void cleanup(
+                       NetworkBufferPool networkBufferPool,
+                       @Nullable ExecutorService executor,
+                       @Nullable Throwable throwable,
+                       InputChannel... inputChannels) throws Exception {
+               for (InputChannel inputChannel : inputChannels) {
+                       try {
+                               inputChannel.releaseAllResources();
+                       } catch (Throwable tInner) {
+                               throwable = 
ExceptionUtils.firstOrSuppressed(tInner, throwable);
+                       }
+               }
+
+               try {
+                       networkBufferPool.destroyAllBufferPools();
+               } catch (Throwable tInner) {
+                       throwable = ExceptionUtils.firstOrSuppressed(tInner, 
throwable);
+               }
+
+               try {
+                       networkBufferPool.destroy();
+               } catch (Throwable tInner) {
+                       throwable = ExceptionUtils.firstOrSuppressed(tInner, 
throwable);
+               }
+
+               if (executor != null) {
+                       executor.shutdown();
+               }
+               if (throwable != null) {
+                       ExceptionUtils.rethrowException(throwable);
+               }
+       }
 }

Reply via email to