[FLINK-9708] Clean up LocalBufferPool if NetworkBufferPool#createBufferPool 
fails


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/390e451f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/390e451f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/390e451f

Branch: refs/heads/master
Commit: 390e451f77d874b3255b20e0ea164d6743190aa2
Parents: efc8708
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Jul 3 16:43:23 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../io/network/buffer/LocalBufferPool.java      |  7 +++-
 .../io/network/buffer/NetworkBufferPool.java    | 31 ++++++++++----
 .../network/buffer/NetworkBufferPoolTest.java   | 44 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/390e451f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 92a8e94..7d9aa21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -298,7 +299,11 @@ class LocalBufferPool implements BufferPool {
                        }
                }
 
-               networkBufferPool.destroyBufferPool(this);
+               try {
+                       networkBufferPool.destroyBufferPool(this);
+               } catch (IOException e) {
+                       ExceptionUtils.rethrow(e);
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/390e451f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 419f6f3..a369ce5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -155,6 +155,12 @@ public class NetworkBufferPool implements 
BufferPoolFactory {
                                redistributeBuffers();
                        } catch (Throwable t) {
                                this.numTotalRequiredBuffers -= 
numRequiredBuffers;
+
+                               try {
+                                       redistributeBuffers();
+                               } catch (IOException inner) {
+                                       t.addSuppressed(inner);
+                               }
                                ExceptionUtils.rethrowIOException(t);
                        }
                }
@@ -172,7 +178,11 @@ public class NetworkBufferPool implements 
BufferPoolFactory {
                                }
                        }
                } catch (Throwable e) {
-                       recycleMemorySegments(segments, numRequiredBuffers);
+                       try {
+                               recycleMemorySegments(segments, 
numRequiredBuffers);
+                       } catch (IOException inner) {
+                               e.addSuppressed(inner);
+                       }
                        ExceptionUtils.rethrowIOException(e);
                }
 
@@ -277,14 +287,23 @@ public class NetworkBufferPool implements 
BufferPoolFactory {
 
                        allBufferPools.add(localBufferPool);
 
-                       redistributeBuffers();
+                       try {
+                               redistributeBuffers();
+                       } catch (IOException e) {
+                               try {
+                                       destroyBufferPool(localBufferPool);
+                               } catch (IOException inner) {
+                                       e.addSuppressed(inner);
+                               }
+                               ExceptionUtils.rethrowIOException(e);
+                       }
 
                        return localBufferPool;
                }
        }
 
        @Override
-       public void destroyBufferPool(BufferPool bufferPool) {
+       public void destroyBufferPool(BufferPool bufferPool) throws IOException 
{
                if (!(bufferPool instanceof LocalBufferPool)) {
                        throw new IllegalArgumentException("bufferPool is no 
LocalBufferPool");
                }
@@ -293,11 +312,7 @@ public class NetworkBufferPool implements 
BufferPoolFactory {
                        if (allBufferPools.remove(bufferPool)) {
                                numTotalRequiredBuffers -= 
bufferPool.getNumberOfRequiredMemorySegments();
 
-                               try {
-                                       redistributeBuffers();
-                               } catch (IOException e) {
-                                       throw new RuntimeException(e);
-                               }
+                               redistributeBuffers();
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/390e451f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 40dc4f3..64c7fad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -356,6 +356,50 @@ public class NetworkBufferPoolTest {
                }
        }
 
+       @Test
+       public void testCreateBufferPoolExceptionDuringBufferRedistribution() 
throws IOException {
+               final int numBuffers = 3;
+               final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(numBuffers, 128);
+
+               final List<Buffer> buffers = new ArrayList<>(numBuffers);
+               BufferPool bufferPool = networkBufferPool.createBufferPool(1, 
numBuffers);
+               bufferPool.setBufferPoolOwner(
+                       numBuffersToRecycle -> {
+                               throw new TestIOException();
+                       });
+
+               try {
+
+                       for (int i = 0; i < numBuffers; i++) {
+                               Buffer buffer = bufferPool.requestBuffer();
+                               buffers.add(buffer);
+                               assertNotNull(buffer);
+                       }
+
+                       try {
+                               networkBufferPool.createBufferPool(1, 
numBuffers);
+                               fail("Should have failed because the other 
buffer pool does not support memory release.");
+                       } catch (TestIOException expected) {
+                       }
+
+                       // destroy the faulty buffer pool
+                       for (Buffer buffer : buffers) {
+                               buffer.recycleBuffer();
+                       }
+                       buffers.clear();
+                       bufferPool.lazyDestroy();
+
+                       // now we should be able to create a new buffer pool
+                       bufferPool = 
networkBufferPool.createBufferPool(numBuffers, numBuffers);
+               } finally {
+                       for (Buffer buffer : buffers) {
+                               buffer.recycleBuffer();
+                       }
+                       bufferPool.lazyDestroy();
+                       networkBufferPool.destroy();
+               }
+       }
+
        private final class TestIOException extends IOException {
                private static final long serialVersionUID = 
-814705441998024472L;
        }

Reply via email to