This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 860a716  [FLINK-20654] Fix double recycling of Buffers in case of an 
exception on persisting
860a716 is described below

commit 860a716574c04d446deee1c2fb0756191212e77d
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Apr 21 14:53:13 2021 +0200

    [FLINK-20654] Fix double recycling of Buffers in case of an exception on 
persisting
    
    Exception can be thrown for example if task is being cancelled. This was 
leading to
    same buffer being recycled twice. Most of the times that was just leading 
to an
    IllegalReferenceCount being thrown, which was ignored, as this task was 
being cancelled.
    However on rare occasions this buffer could have been picked up by another 
task
    after being recycled for the first time, recycled second time and being 
picked up
    by another (third task). In that case we had two users of the same buffer, 
which
    could lead to all sort of data corruptions.
---
 .../partition/consumer/RemoteInputChannel.java     |  3 +-
 .../partition/consumer/RemoteInputChannelTest.java | 60 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 2bd3ce9..128fd18 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -475,8 +475,10 @@ public class RemoteInputChannel extends InputChannel {
                 DataType dataType = buffer.getDataType();
                 if (dataType.hasPriority()) {
                     firstPriorityEvent = addPriorityBuffer(sequenceBuffer);
+                    recycleBuffer = false;
                 } else {
                     receivedBuffers.add(sequenceBuffer);
+                    recycleBuffer = false;
                     if (dataType.requiresAnnouncement()) {
                         firstPriorityEvent = 
addPriorityBuffer(announce(sequenceBuffer));
                     }
@@ -495,7 +497,6 @@ public class RemoteInputChannel extends InputChannel {
                 channelStatePersister.maybePersist(buffer);
                 ++expectedSequenceNumber;
             }
-            recycleBuffer = false;
 
             if (firstPriorityEvent) {
                 notifyPriorityEvent(sequenceNumber);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 84559e2..5182e20 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -21,6 +21,8 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.ConnectionID;
@@ -44,9 +46,11 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -172,6 +176,62 @@ public class RemoteInputChannelTest {
     }
 
     @Test
+    public void testExceptionOnPersisting() throws Exception {
+        // Setup
+        final SingleInputGate inputGate = createSingleInputGate(1);
+        final RemoteInputChannel inputChannel =
+                InputChannelBuilder.newBuilder()
+                        .setStateWriter(
+                                new 
ChannelStateWriter.NoOpChannelStateWriter() {
+                                    @Override
+                                    public void addInputData(
+                                            long checkpointId,
+                                            InputChannelInfo info,
+                                            int startSeqNum,
+                                            CloseableIterator<Buffer> data) {
+                                        try {
+                                            data.close();
+                                        } catch (Exception e) {
+                                            throw new RuntimeException(e);
+                                        }
+                                        throw new ExpectedTestException();
+                                    }
+                                })
+                        .buildRemoteChannel(inputGate);
+
+        inputChannel.checkpointStarted(
+                new CheckpointBarrier(
+                        42, System.currentTimeMillis(), 
CheckpointOptions.unaligned(getDefault())));
+
+        final Buffer buffer = createBuffer(TestBufferFactory.BUFFER_SIZE);
+
+        assertFalse(buffer.isRecycled());
+        try {
+            inputChannel.onBuffer(buffer, 0, -1);
+            fail("This should have failed");
+        } catch (ExpectedTestException ex) {
+            // ignore
+        }
+        // This check is not strictly speaking necessary. Generally speaking 
if exception happens
+        // during persisting, there are two potentially correct outcomes:
+        // 1. buffer is recycled only once, in #onBuffer call when handling 
exception
+        // 2. buffer is stored inside RemoteInputChannel and recycled on 
releaseAllResources.
+        // What's not acceptable is that it would be released twice, in both 
places. Without this
+        // check below, we would be just relaying on Buffer throwing 
IllegalReferenceCountException.
+        // I've added this check just to be sure. It's freezing the current 
implementation that's
+        // unlikely to change, on the other hand, thanks to it we don't need 
to relay on
+        // IllegalReferenceCountException being thrown from the Buffer.
+        //
+        // In other words, if you end up reading this after refactoring 
RemoteInputChannel, it might
+        // be safe to remove this assertion. Just make sure double recycling 
of the same buffer is
+        // still throwing IllegalReferenceCountException.
+        assertFalse(buffer.isRecycled());
+
+        inputChannel.releaseAllResources();
+        assertTrue(buffer.isRecycled());
+    }
+
+    @Test
     public void testConcurrentOnBufferAndRelease() throws Exception {
         testConcurrentReleaseAndSomething(
                 8192,

Reply via email to