This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 0c1af6c [FLINK-22424][network] Prevent releasing
PipelinedSubpartition while Task can still write to it
0c1af6c is described below
commit 0c1af6c1dfdc97275fa41abfe41125f64405f7f9
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Apr 23 14:45:44 2021 +0200
[FLINK-22424][network] Prevent releasing PipelinedSubpartition while Task
can still write to it
This bug was happening when a downstream tasks were failing over or being
cancelled. If all
of the downstream tasks released subpartition views, underlying memory
segments/buffers could
have been recycled, while upstream task was still writting some records.
The problem is fixed by adding the writer (result partition) itself as one
more reference
counted user of the result partition
---
.../ReleaseOnConsumptionResultPartition.java | 33 ++++--
.../io/network/buffer/UnpooledBufferPool.java | 118 +++++++++++++++++++++
.../network/netty/PartitionRequestQueueTest.java | 1 +
.../ReleaseOnConsumptionResultPartitionTest.java | 25 ++++-
4 files changed, 166 insertions(+), 11 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
index 95b73bc..f2c8ff2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
@@ -30,6 +30,7 @@ import static org.apache.flink.util.Preconditions.checkState;
/** ResultPartition that releases itself once all subpartitions have been
consumed. */
public class ReleaseOnConsumptionResultPartition extends ResultPartition {
+ private static final int PIPELINED_RESULT_PARTITION_ITSELF = -42;
private static final Object lock = new Object();
@@ -40,7 +41,7 @@ public class ReleaseOnConsumptionResultPartition extends
ResultPartition {
* The total number of references to subpartitions of this result. The
result partition can be
* safely released, iff the reference count is zero.
*/
- private int numUnconsumedSubpartitions;
+ private int numberOfUsers;
ReleaseOnConsumptionResultPartition(
String owningTaskName,
@@ -64,19 +65,23 @@ public class ReleaseOnConsumptionResultPartition extends
ResultPartition {
bufferPoolFactory);
this.consumedSubpartitions = new boolean[subpartitions.length];
- this.numUnconsumedSubpartitions = subpartitions.length;
+ this.numberOfUsers = subpartitions.length + 1;
}
@Override
public ResultSubpartitionView createSubpartitionView(
int index, BufferAvailabilityListener availabilityListener) throws
IOException {
- checkState(numUnconsumedSubpartitions > 0, "Partition not pinned.");
+ checkState(numberOfUsers > 0, "Partition not pinned.");
return super.createSubpartitionView(index, availabilityListener);
}
@Override
void onConsumedSubpartition(int subpartitionIndex) {
+ decrementNumberOfUsers(subpartitionIndex);
+ }
+
+ private void decrementNumberOfUsers(int subpartitionIndex) {
if (isReleased()) {
return;
}
@@ -86,13 +91,15 @@ public class ReleaseOnConsumptionResultPartition extends
ResultPartition {
// we synchronize only the bookkeeping section, to avoid holding the
lock during any
// calls into other components
synchronized (lock) {
- if (consumedSubpartitions[subpartitionIndex]) {
- // repeated call - ignore
- return;
- }
+ if (subpartitionIndex != PIPELINED_RESULT_PARTITION_ITSELF) {
+ if (consumedSubpartitions[subpartitionIndex]) {
+ // repeated call - ignore
+ return;
+ }
- consumedSubpartitions[subpartitionIndex] = true;
- remainingUnconsumed = (--numUnconsumedSubpartitions);
+ consumedSubpartitions[subpartitionIndex] = true;
+ }
+ remainingUnconsumed = (--numberOfUsers);
}
LOG.debug(
@@ -115,7 +122,13 @@ public class ReleaseOnConsumptionResultPartition extends
ResultPartition {
+ ", "
+ subpartitions.length
+ " subpartitions, "
- + numUnconsumedSubpartitions
+ + numberOfUsers
+ " pending consumptions]";
}
+
+ @Override
+ public void close() {
+ decrementNumberOfUsers(PIPELINED_RESULT_PARTITION_ITSELF);
+ super.close();
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
new file mode 100644
index 0000000..68ac9c1
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Implementation of {@link BufferPool} that works on un-pooled memory
segments. */
+public class UnpooledBufferPool implements BufferPool {
+
+ private static final int SEGMENT_SIZE = 1024;
+
+ @Override
+ public void lazyDestroy() {}
+
+ @Override
+ public Buffer requestBuffer() {
+ return new NetworkBuffer(requestMemorySegment(), this);
+ }
+
+ @Override
+ public BufferBuilder requestBufferBuilder() {
+ return new BufferBuilder(requestMemorySegment(), this);
+ }
+
+ private MemorySegment requestMemorySegment() {
+ return
MemorySegmentFactory.allocateUnpooledOffHeapMemory(SEGMENT_SIZE, null);
+ }
+
+ @Override
+ public BufferBuilder requestBufferBuilderBlocking() throws
InterruptedException {
+ return requestBufferBuilder();
+ }
+
+ @Override
+ public BufferBuilder requestBufferBuilder(int targetChannel) {
+ return requestBufferBuilder();
+ }
+
+ @Override
+ public BufferBuilder requestBufferBuilderBlocking(int targetChannel)
+ throws InterruptedException {
+ return requestBufferBuilder();
+ }
+
+ @Override
+ public boolean addBufferListener(BufferListener listener) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumberOfRequiredMemorySegments() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public int getMaxNumberOfMemorySegments() {
+ return -1;
+ }
+
+ @Override
+ public int getNumBuffers() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void setNumBuffers(int numBuffers) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumberOfAvailableMemorySegments() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public int bestEffortGetNumOfUsedBuffers() {
+ return 0;
+ }
+
+ @Override
+ public BufferRecycler[] getSubpartitionBufferRecyclers() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void recycle(MemorySegment memorySegment) {
+ memorySegment.free();
+ }
+
+ @Override
+ public CompletableFuture<?> getAvailableFuture() {
+ return AVAILABLE;
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index e7ef08c..ad117cb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -484,6 +484,7 @@ public class PartitionRequestQueueTest {
assertFalse(queue.getAvailableReaders().contains(reader));
// the partition and its reader view should all be released
assertTrue(reader.isReleased());
+ partition.close();
assertTrue(partition.isReleased());
for (ResultSubpartition subpartition : partition.getAllPartitions()) {
assertTrue(subpartition.isReleased());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
index 57976bd..cf49ae5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
@@ -17,10 +17,14 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.runtime.io.network.buffer.UnpooledBufferPool;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -43,6 +47,25 @@ public class ReleaseOnConsumptionResultPartitionTest extends
TestLogger {
assertFalse(partition.isReleased());
partition.onConsumedSubpartition(1);
+ partition.close();
+ assertTrue(partition.isReleased());
+ }
+
+ @Test
+ public void testConsumptionBeforePartitionClose() throws IOException,
InterruptedException {
+ final ResultPartition partition =
+ new ResultPartitionBuilder()
+ .setResultPartitionType(ResultPartitionType.PIPELINED)
+ .setNumberOfSubpartitions(1)
+ .setBufferPoolFactory(ignored -> new
UnpooledBufferPool())
+ .build();
+
+ partition.setup();
+ partition.getBufferBuilder(0).append(ByteBuffer.allocate(16));
+ partition.onConsumedSubpartition(0);
+ assertFalse(partition.isReleased());
+ partition.getBufferBuilder(0).append(ByteBuffer.allocate(16));
+ partition.close();
assertTrue(partition.isReleased());
}
@@ -77,7 +100,7 @@ public class ReleaseOnConsumptionResultPartitionTest extends
TestLogger {
partition.onConsumedSubpartition(0);
partition.onConsumedSubpartition(0);
partition.onConsumedSubpartition(1);
-
+ partition.close();
assertTrue(partition.isReleased());
}
}