[
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265044#comment-16265044
]
ASF GitHub Bot commented on FLINK-7499:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4581#discussion_r152918616
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
---
@@ -320,4 +559,40 @@ void awaitNotifications(long
awaitedNumNotifiedBuffers, long timeoutMillis) thro
}
}
}
+
+ /**
+ * An {@link IOManagerAsync} that creates closed {@link
BufferFileWriter} instances in its
+ * {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+ *
+ * <p>These {@link BufferFileWriter} objects will thus throw an
exception when trying to add
+ * write requests, e.g. by calling {@link
BufferFileWriter#writeBlock(Object)}.
+ */
+ private static class IOManagerAsyncWithClosedBufferFileWriter extends
IOManagerAsync {
+ @Override
+ public BufferFileWriter createBufferFileWriter(FileIOChannel.ID
channelID)
+ throws IOException {
+ BufferFileWriter bufferFileWriter =
super.createBufferFileWriter(channelID);
+ bufferFileWriter.close();
+ return bufferFileWriter;
+ }
+ }
+
+ /**
+ * An {@link IOManagerAsync} that creates stalling {@link
BufferFileWriter} instances in its
+ * {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+ *
+ * <p>These {@link BufferFileWriter} objects will accept {@link
BufferFileWriter#writeBlock(Object)}
+ * requests but never actually perform any write operation (be sure to
clean up the buffers
+ * manually!).
+ */
+ private static class IOManagerAsyncWithStallingBufferFileWriter extends
IOManagerAsync {
+ @Override
+ public BufferFileWriter createBufferFileWriter(FileIOChannel.ID
channelID)
+ throws IOException {
+ BufferFileWriter bufferFileWriter =
spy(super.createBufferFileWriter(channelID));
--- End diff --
:/ and with every such case we increase the costs of refactors. I would
still extract those classes to the correct package.
> double buffer release in SpillableSubpartitionView
> --------------------------------------------------
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0,
> 1.3.2, 1.3.3
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice:
> once asynchronously after the write operation and once in
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
> the buffer is retained and to be used in parallel somewhere else it may also
> not be available anymore or contain corrupt data.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)