Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5423#discussion_r168724949
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
---
@@ -55,69 +51,56 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
- /** The number of non-event buffers currently in this subpartition. */
- @GuardedBy("buffers")
- private int buffersInBacklog;
-
//
------------------------------------------------------------------------
PipelinedSubpartition(int index, ResultPartition parent) {
super(index, parent);
}
@Override
- public boolean add(Buffer buffer) throws IOException {
- checkNotNull(buffer);
-
- // view reference accessible outside the lock, but assigned
inside the locked scope
- final PipelinedSubpartitionView reader;
+ public boolean add(BufferConsumer bufferConsumer) throws IOException {
+ return add(bufferConsumer, false);
+ }
+ @Override
+ public void flush() {
synchronized (buffers) {
- if (isFinished || isReleased) {
- buffer.recycleBuffer();
- return false;
+ if (readView != null) {
+ readView.notifyDataAvailable();
}
-
- // Add the buffer and update the stats
- buffers.add(buffer);
- reader = readView;
- updateStatistics(buffer);
- increaseBuffersInBacklog(buffer);
- }
-
- // Notify the listener outside of the synchronized block
- if (reader != null) {
- reader.notifyBuffersAvailable(1);
}
-
- return true;
}
@Override
public void finish() throws IOException {
- final Buffer buffer =
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
+ LOG.debug("Finished {}.", this);
+ }
- // view reference accessible outside the lock, but assigned
inside the locked scope
- final PipelinedSubpartitionView reader;
+ private boolean add(BufferConsumer bufferConsumer, boolean finish)
throws IOException {
+ checkNotNull(bufferConsumer);
synchronized (buffers) {
if (isFinished || isReleased) {
- return;
+ bufferConsumer.close();
+ return false;
}
- buffers.add(buffer);
- reader = readView;
- updateStatistics(buffer);
+ // Add the bufferConsumer and update the stats
+ buffers.add(bufferConsumer);
+ updateStatistics(bufferConsumer);
+ increaseBuffersInBacklog(bufferConsumer);
- isFinished = true;
+ if (finish) {
+ isFinished = true;
+ notifyDataAvailable();
--- End diff --
ð
---