This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8319bf4 [FLINK-23223] Notifies if there are available data on
resumption for pipelined subpartition
8319bf4 is described below
commit 8319bf44b1561a4b69851b105fd379dec161e675
Author: Yun Gao <[email protected]>
AuthorDate: Sun Jul 4 22:17:42 2021 +0800
[FLINK-23223] Notifies if there are available data on resumption for
pipelined subpartition
---
.../network/partition/PipelinedSubpartition.java | 9 ++---
.../PipelinedSubpartitionWithReadViewTest.java | 47 ++++++++++++++++++----
.../partition/consumer/LocalInputChannelTest.java | 37 +++++++++++++++++
3 files changed, 81 insertions(+), 12 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index e13f413..134ef68 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -463,11 +463,10 @@ public class PipelinedSubpartition extends
ResultSubpartition
}
// if there is more then 1 buffer, we already notified the reader
// (at the latest when adding the second buffer)
- notifyDataAvailable =
- !isBlocked
- && buffers.size() == 1
- &&
buffers.peek().getBufferConsumer().isDataAvailable();
- flushRequested = buffers.size() > 1 || notifyDataAvailable;
+ boolean isDataAvailableInUnfinishedBuffer =
+ buffers.size() == 1 &&
buffers.peek().getBufferConsumer().isDataAvailable();
+ notifyDataAvailable = !isBlocked &&
isDataAvailableInUnfinishedBuffer;
+ flushRequested = buffers.size() > 1 ||
isDataAvailableInUnfinishedBuffer;
}
if (notifyDataAvailable) {
notifyDataAvailable();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
index 972455b..de1b32b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -487,8 +487,7 @@ public class PipelinedSubpartitionWithReadViewTest {
}
@Test
- public void testBlockedByCheckpointAndResumeConsumption()
- throws IOException, InterruptedException {
+ public void testResumeBlockedSubpartitionWithEvents() throws IOException,
InterruptedException {
blockSubpartitionByCheckpoint(1);
// add an event after subpartition blocked
@@ -496,33 +495,67 @@ public class PipelinedSubpartitionWithReadViewTest {
// no data available notification after adding an event
checkNumNotificationsAndAvailability(1);
+ // Resumption will make the subpartition available.
resumeConsumptionAndCheckAvailability(0, true);
assertNextEvent(readView, BUFFER_SIZE, null, false, 0, false, true);
+ }
- blockSubpartitionByCheckpoint(2);
+ @Test
+ public void testResumeBlockedSubpartitionWithUnfinishedBufferFlushed()
+ throws IOException, InterruptedException {
+ blockSubpartitionByCheckpoint(1);
// add a buffer and flush the subpartition
subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
subpartition.flush();
// no data available notification after adding a buffer and flushing
the subpartition
- checkNumNotificationsAndAvailability(2);
+ checkNumNotificationsAndAvailability(1);
- resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false);
+ // Resumption will make the subpartition available.
+ resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, true);
assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true);
+ }
+
+ @Test
+ public void testResumeBlockedSubpartitionWithUnfinishedBufferNotFlushed()
+ throws IOException, InterruptedException {
+ blockSubpartitionByCheckpoint(1);
+
+ // add a buffer but not flush the subpartition.
+ subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
+ // no data available notification after adding a buffer.
+ checkNumNotificationsAndAvailability(1);
+
+ // Resumption will not make the subpartition available since the data
is not flushed before.
+ resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false);
+ }
- blockSubpartitionByCheckpoint(3);
+ @Test
+ public void testResumeBlockedSubpartitionWithFinishedBuffers()
+ throws IOException, InterruptedException {
+ blockSubpartitionByCheckpoint(1);
// add two buffers to the subpartition
subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
// no data available notification after adding the second buffer
- checkNumNotificationsAndAvailability(3);
+ checkNumNotificationsAndAvailability(1);
+ // Resumption will make the subpartition available.
resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, true);
assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true);
assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true);
}
+ @Test
+ public void testResumeBlockedEmptySubpartition() throws IOException,
InterruptedException {
+ blockSubpartitionByCheckpoint(1);
+
+ // Resumption will not make the subpartition available since it is
empty.
+ resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false);
+ assertNoNextBuffer(readView);
+ }
+
// ------------------------------------------------------------------------
private void blockSubpartitionByCheckpoint(int numNotifications)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index a6c2c57..a48a645 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -523,6 +523,43 @@ public class LocalInputChannelTest {
}
@Test
+ public void testEnqueueAvailableChannelWhenResuming() throws IOException,
InterruptedException {
+ PipelinedResultPartition parent =
+ (PipelinedResultPartition)
+ PartitionTestUtils.createPartition(
+ ResultPartitionType.PIPELINED,
NoOpFileChannelManager.INSTANCE);
+ ResultSubpartition subpartition = parent.getAllPartitions()[0];
+ ResultSubpartitionView subpartitionView =
subpartition.createReadView(() -> {});
+
+ TestingResultPartitionManager partitionManager =
+ new TestingResultPartitionManager(subpartitionView);
+ LocalInputChannel channel =
+ createLocalInputChannel(new SingleInputGateBuilder().build(),
partitionManager);
+ channel.requestSubpartition(0);
+
+ // Block the subpartition
+ subpartition.add(
+ EventSerializer.toBufferConsumer(
+ new CheckpointBarrier(
+ 1, 1,
CheckpointOptions.forCheckpointWithDefaultLocation()),
+ false));
+ assertTrue(channel.getNextBuffer().isPresent());
+
+ // Add more data
+
subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
+ subpartition.flush();
+
+ // No buffer since the subpartition is blocked.
+ assertFalse(channel.inputGate.pollNext().isPresent());
+
+ // Resumption makes the subpartition available.
+ channel.resumeConsumption();
+ Optional<BufferOrEvent> nextBuffer = channel.inputGate.pollNext();
+ assertTrue(nextBuffer.isPresent());
+ assertTrue(nextBuffer.get().isBuffer());
+ }
+
+ @Test
public void testCheckpointingInflightData() throws Exception {
SingleInputGate inputGate = new SingleInputGateBuilder().build();