This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 826daab2c3cc52d100c38480b67393e067472b8f
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Apr 15 21:02:51 2019 +0200

    [hotfix] [network] Release unpooled buffer for events.
    
    So far, these buffers never needed to be released, because they do not come 
from a buffer pool.
    They were simply garbage collected.
    
    When changing the blocking partitions to use memory mapped files, these 
buffer were refering
    for a short time to an unmapped memory region (after the partition is 
released). Because the buffers
    were not accessed any more by any code, it did not matter when regularly 
running Flink.
    
    But, it did segfault the JVM when attaching a debugger and exploring just 
that part of the code.
    This happens because the debugger calls toString() on the buffer object as 
part of its rendering of the current
    stack frame. The toString() method access the buffer contents, which is an 
unmapped region of memory,
    and boom!
---
 .../partition/consumer/SingleInputGate.java        |  8 +++++-
 .../partition/consumer/SingleInputGateTest.java    |  4 +++
 .../partition/consumer/TestInputChannel.java       | 32 +++++++++++++++++++++-
 3 files changed, 42 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 19912b2..a584a21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -593,7 +593,13 @@ public class SingleInputGate extends InputGate {
                        return new BufferOrEvent(buffer, 
currentChannel.getChannelIndex(), moreAvailable);
                }
                else {
-                       final AbstractEvent event = 
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+                       final AbstractEvent event;
+                       try {
+                               event = EventSerializer.fromBuffer(buffer, 
getClass().getClassLoader());
+                       }
+                       finally {
+                               buffer.recycleBuffer();
+                       }
 
                        if (event.getClass() == EndOfPartitionEvent.class) {
                                
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index a6f824d..5dda8db 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -114,6 +114,10 @@ public class SingleInputGateTest extends InputGateTestBase 
{
 
                // Return null when the input gate has received all 
end-of-partition events
                assertTrue(inputGate.isFinished());
+
+               for (TestInputChannel ic : inputChannels) {
+                       ic.assertReturnedEventsAreRecycled();
+               }
        }
 
        @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index ac3f0ff..96a5db2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -27,12 +27,15 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -43,6 +46,8 @@ public class TestInputChannel extends InputChannel {
 
        private final Queue<BufferAndAvailabilityProvider> buffers = new 
ConcurrentLinkedQueue<>();
 
+       private final Collection<Buffer> allReturnedBuffers = new ArrayList<>();
+
        private BufferAndAvailabilityProvider lastProvider = null;
 
        private boolean isReleased = false;
@@ -125,7 +130,9 @@ public class TestInputChannel extends InputChannel {
 
                if (provider != null) {
                        lastProvider = provider;
-                       return provider.getBufferAvailability();
+                       Optional<BufferAndAvailability> baa = 
provider.getBufferAvailability();
+                       baa.ifPresent((v) -> 
allReturnedBuffers.add(v.buffer()));
+                       return baa;
                } else if (lastProvider != null) {
                        return lastProvider.getBufferAvailability();
                } else {
@@ -162,6 +169,29 @@ public class TestInputChannel extends InputChannel {
 
        }
 
+       public void assertReturnedDataBuffersAreRecycled() {
+               assertReturnedBuffersAreRecycled(true, false);
+       }
+
+       public void assertReturnedEventsAreRecycled() {
+               assertReturnedBuffersAreRecycled(false, true);
+       }
+
+       public void assertAllReturnedBuffersAreRecycled() {
+               assertReturnedBuffersAreRecycled(true, true);
+       }
+
+       private void assertReturnedBuffersAreRecycled(boolean assertBuffers, 
boolean assertEvents) {
+               for (Buffer b : allReturnedBuffers) {
+                       if (b.isBuffer() && assertBuffers && !b.isRecycled()) {
+                               fail("Data Buffer " + b + " not recycled");
+                       }
+                       if (!b.isBuffer() && assertEvents && !b.isRecycled()) {
+                               fail("Event Buffer " + b + " not recycled");
+                       }
+               }
+       }
+
        interface BufferAndAvailabilityProvider {
                Optional<BufferAndAvailability> getBufferAvailability() throws 
IOException, InterruptedException;
        }

Reply via email to