This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 860a716 [FLINK-20654] Fix double recycling of Buffers in case of an
exception on persisting
860a716 is described below
commit 860a716574c04d446deee1c2fb0756191212e77d
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Apr 21 14:53:13 2021 +0200
[FLINK-20654] Fix double recycling of Buffers in case of an exception on
persisting
Exception can be thrown for example if task is being cancelled. This was
leading to
same buffer being recycled twice. Most of the times that was just leading
to an
IllegalReferenceCount being thrown, which was ignored, as this task was
being cancelled.
However on rare occasions this buffer could have been picked up by another
task
after being recycled for the first time, recycled second time and being
picked up
by another (third task). In that case we had two users of the same buffer,
which
could lead to all sort of data corruptions.
---
.../partition/consumer/RemoteInputChannel.java | 3 +-
.../partition/consumer/RemoteInputChannelTest.java | 60 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 2bd3ce9..128fd18 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -475,8 +475,10 @@ public class RemoteInputChannel extends InputChannel {
DataType dataType = buffer.getDataType();
if (dataType.hasPriority()) {
firstPriorityEvent = addPriorityBuffer(sequenceBuffer);
+ recycleBuffer = false;
} else {
receivedBuffers.add(sequenceBuffer);
+ recycleBuffer = false;
if (dataType.requiresAnnouncement()) {
firstPriorityEvent =
addPriorityBuffer(announce(sequenceBuffer));
}
@@ -495,7 +497,6 @@ public class RemoteInputChannel extends InputChannel {
channelStatePersister.maybePersist(buffer);
++expectedSequenceNumber;
}
- recycleBuffer = false;
if (firstPriorityEvent) {
notifyPriorityEvent(sequenceNumber);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 84559e2..5182e20 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -21,6 +21,8 @@ package
org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.ConnectionID;
@@ -44,9 +46,11 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -172,6 +176,62 @@ public class RemoteInputChannelTest {
}
@Test
+ public void testExceptionOnPersisting() throws Exception {
+ // Setup
+ final SingleInputGate inputGate = createSingleInputGate(1);
+ final RemoteInputChannel inputChannel =
+ InputChannelBuilder.newBuilder()
+ .setStateWriter(
+ new
ChannelStateWriter.NoOpChannelStateWriter() {
+ @Override
+ public void addInputData(
+ long checkpointId,
+ InputChannelInfo info,
+ int startSeqNum,
+ CloseableIterator<Buffer> data) {
+ try {
+ data.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ throw new ExpectedTestException();
+ }
+ })
+ .buildRemoteChannel(inputGate);
+
+ inputChannel.checkpointStarted(
+ new CheckpointBarrier(
+ 42, System.currentTimeMillis(),
CheckpointOptions.unaligned(getDefault())));
+
+ final Buffer buffer = createBuffer(TestBufferFactory.BUFFER_SIZE);
+
+ assertFalse(buffer.isRecycled());
+ try {
+ inputChannel.onBuffer(buffer, 0, -1);
+ fail("This should have failed");
+ } catch (ExpectedTestException ex) {
+ // ignore
+ }
+ // This check is not strictly speaking necessary. Generally speaking
if exception happens
+ // during persisting, there are two potentially correct outcomes:
+ // 1. buffer is recycled only once, in #onBuffer call when handling
exception
+ // 2. buffer is stored inside RemoteInputChannel and recycled on
releaseAllResources.
+ // What's not acceptable is that it would be released twice, in both
places. Without this
+ // check below, we would be just relaying on Buffer throwing
IllegalReferenceCountException.
+ // I've added this check just to be sure. It's freezing the current
implementation that's
+ // unlikely to change, on the other hand, thanks to it we don't need
to relay on
+ // IllegalReferenceCountException being thrown from the Buffer.
+ //
+ // In other words, if you end up reading this after refactoring
RemoteInputChannel, it might
+ // be safe to remove this assertion. Just make sure double recycling
of the same buffer is
+ // still throwing IllegalReferenceCountException.
+ assertFalse(buffer.isRecycled());
+
+ inputChannel.releaseAllResources();
+ assertTrue(buffer.isRecycled());
+ }
+
+ @Test
public void testConcurrentOnBufferAndRelease() throws Exception {
testConcurrentReleaseAndSomething(
8192,