[
https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378590#comment-16378590
]
ASF GitHub Bot commented on FLINK-8755:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5581#discussion_r170917769
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
---
@@ -138,11 +145,68 @@ static void assertNextBuffer(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsMoreAvailable,
- int expectedBuffersInBacklog) throws IOException,
InterruptedException {
+ int expectedBuffersInBacklog,
+ boolean expectedNextBufferIsEvent,
+ boolean expectedRecycledAfterRecycle) throws
IOException, InterruptedException {
+ assertNextBufferOrEvent(
+ readView,
+ expectedReadableBufferSize,
+ true,
+ null,
+ expectedIsMoreAvailable,
+ expectedBuffersInBacklog,
+ expectedNextBufferIsEvent,
+ expectedRecycledAfterRecycle);
+ }
+
+ static void assertNextEvent(
+ ResultSubpartitionView readView,
+ int expectedReadableBufferSize,
+ @Nullable Class<? extends AbstractEvent>
expectedEventClass,
+ boolean expectedIsMoreAvailable,
+ int expectedBuffersInBacklog,
+ boolean expectedNextBufferIsEvent,
+ boolean expectedRecycledAfterRecycle) throws
IOException, InterruptedException {
+ assertNextBufferOrEvent(
+ readView,
+ expectedReadableBufferSize,
+ false,
+ expectedEventClass,
+ expectedIsMoreAvailable,
+ expectedBuffersInBacklog,
+ expectedNextBufferIsEvent,
+ expectedRecycledAfterRecycle);
+ }
+
+ private static void assertNextBufferOrEvent(
+ ResultSubpartitionView readView,
+ int expectedReadableBufferSize,
+ boolean expectedIsBuffer,
+ Class<? extends AbstractEvent> expectedEventClass,
+ boolean expectedIsMoreAvailable,
+ int expectedBuffersInBacklog,
+ boolean expectedNextBufferIsEvent,
+ boolean expectedRecycledAfterRecycle) throws
IOException, InterruptedException {
+ checkArgument(expectedEventClass == null || !expectedIsBuffer);
+
ResultSubpartition.BufferAndBacklog bufferAndBacklog =
readView.getNextBuffer();
- assertEquals(expectedReadableBufferSize,
bufferAndBacklog.buffer().readableBytes());
- assertEquals(expectedIsMoreAvailable,
bufferAndBacklog.isMoreAvailable());
- assertEquals(expectedBuffersInBacklog,
bufferAndBacklog.buffersInBacklog());
+ assertNotNull(bufferAndBacklog);
+
+ assertEquals("buffer size", expectedReadableBufferSize,
bufferAndBacklog.buffer().readableBytes());
--- End diff --
Do those string error messages add any value besides chatter in this
method?
> SpilledSubpartitionView wrongly relys on the backlog for determining whether
> more data is available
> ---------------------------------------------------------------------------------------------------
>
> Key: FLINK-8755
> URL: https://issues.apache.org/jira/browse/FLINK-8755
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Priority: Blocker
> Fix For: 1.5.0
>
>
> {code}
> public BufferAndBacklog getNextBuffer() throws IOException,
> InterruptedException {
> //...
> int newBacklog = parent.decreaseBuffersInBacklog(current);
> return new BufferAndBacklog(current, newBacklog > 0, newBacklog,
> nextBufferIsEvent);
> {code}
> relies on the backlog to signal further data availability. However, if there
> are only events left in the buffer queue, their buffers are not included in
> the backlog counting and therefore, {{isMoreAvailable}} will be wrongly
> {{false}} here.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)