akalash commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r661557618
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
##########
@@ -91,10 +91,14 @@ public BufferAndBacklog getNextBuffer() throws IOException {
updateStatistics(current);
- // We simply assume all the data are non-events for batch jobs to
avoid pre-fetching the
- // next header
- Buffer.DataType nextDataType =
- numDataAndEventBuffers > 0 ? Buffer.DataType.DATA_BUFFER :
Buffer.DataType.NONE;
+ // We simply assume all the data except for the last one
(EndOfPartitionEvent)
+ // are non-events for batch jobs to avoid pre-fetching the next header
+ Buffer.DataType nextDataType = Buffer.DataType.NONE;
+ if (numDataBuffers > 0) {
+ nextDataType = Buffer.DataType.DATA_BUFFER;
+ } else if (numDataAndEventBuffers > 0) {
+ nextDataType = Buffer.DataType.EVENT_BUFFER;
+ }
Review comment:
Is it also a bug? Or why do we distinguish EVENT_BUFFER and DATA_BUFFER
now?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##########
@@ -130,11 +136,11 @@ private boolean shouldContinueRequest(BufferPool
bufferPool) {
/** Requests exclusive buffers from the provider. */
void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException {
- Collection<MemorySegment> segments =
globalPool.requestMemorySegments(numExclusiveBuffers);
- checkArgument(
- !segments.isEmpty(),
- "The number of exclusive buffers per channel should be larger
than 0.");
+ if (numExclusiveBuffers <= 0) {
Review comment:
As I understand, the negative number is still illegal. So maybe it makes
sense to add checkArgument for `numExclusiveBuffers < 0`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
##########
@@ -110,7 +114,12 @@ private void updateStatistics(Buffer buffer) {
public boolean isAvailable(int numCreditsAvailable) {
// We simply assume there are no events except EndOfPartitionEvent for
bath jobs,
// then it has no essential effect to ignore the judgement of next
event buffer.
- return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
+ return (numCreditsAvailable > 0 || numDataBuffers == 0) &&
numDataAndEventBuffers > 0;
Review comment:
Does this mean that we don't need the credit for sending the event?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -100,8 +106,19 @@ public void addCredit(int creditDeltas) {
numCreditsAvailable += creditDeltas;
}
+ @Override
+ public boolean needAnnounceBacklog() {
+ return initialCredit == 0 && numCreditsAvailable == 0;
+ }
+
@Override
public void resumeConsumption() {
+ if (initialCredit == 0) {
+ // reset available credit if no exclusive buffer is available at
the
+ // consumer side for all floating buffers must have been released
+ numCreditsAvailable = 0;
Review comment:
Do we do it because we know that all floating buffers would be released
before the checkpoint when `initialCredit == 0`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##########
@@ -164,6 +168,27 @@ void addCreditOrResumeConsumption(
}
}
+ /**
+ * Announces remaining backlog to the consumer after the available data
notification or data
+ * consumption resumption.
+ */
+ private void announceBacklog(NetworkSequenceViewReader reader) {
+ int backlog = reader.getRemainingBacklog();
+ if (backlog > 0) {
Review comment:
as I understand, the backlog can not be less or equal to 0 here. So
maybe convert it to checkArgument? Or I missed something?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
##########
@@ -340,7 +355,15 @@ private void decodeBufferOrEvent(
RemoteInputChannel inputChannel, NettyMessage.BufferResponse
bufferOrEvent)
throws Throwable {
if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
- inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber,
bufferOrEvent.backlog);
+ try {
+ inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber,
bufferOrEvent.backlog);
+ } finally {
+ // recycle the empty buffer directly
+ Buffer buffer = bufferOrEvent.getBuffer();
+ if (buffer != null) {
+ buffer.recycleBuffer();
Review comment:
Before these changes where this buffer was recycled? or was it the bug?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -288,9 +288,7 @@ BufferAndBacklog pollBuffer() {
if (buffers.isEmpty()) {
flushRequested = false;
- }
-
- while (!buffers.isEmpty()) {
+ } else {
Review comment:
I don't really get the idea of these changes.
Let's suppose PipelindedSubpartition#buffers contain several but the first
one is empty and finished already.
How it was before the changes:
- PartitionRequestQueue requests the buffer.
- in any case, PipelindedSubpartition#pollBuffer returns a buffer(it skip
the first one because it is empty and finished but it returns the buffer from
the next consumer)
- PartitionRequestQueue continues to request from this Reader until
PipelindedSubpartition#buffers is not empty.
After the changes:
- PartitionRequestQueue requests the buffer.
- PipelindedSubpartition#pollBuffer returns null.
- PartitionRequestQueue remove this reader from the available readers
- Other buffers from PipelindedSubpartition#buffers will be sent only when
timeout happens and this reader is added to the available list again.
What the point to delay the sending if we already have credit for it and we
have the buffer ready to be sent?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -513,19 +514,20 @@ private void increaseBuffersInBacklog(BufferConsumer
buffer) {
}
}
- /**
- * Gets the number of non-event buffers in this subpartition.
- *
- * <p><strong>Beware:</strong> This method should only be used in tests in
non-concurrent access
- * scenarios since it does not make any concurrency guarantees.
- */
- @SuppressWarnings("FieldAccessNotGuarded")
- @VisibleForTesting
+ /** Gets the number of non-event buffers in this subpartition. */
public int getBuffersInBacklog() {
- if (flushRequested || isFinished) {
- return buffersInBacklog;
- } else {
- return Math.max(buffersInBacklog - 1, 0);
+ synchronized (buffers) {
+ if (isBlocked || buffers.isEmpty()) {
+ return 0;
+ }
+
+ if (flushRequested
+ || isFinished
+ ||
!checkNotNull(buffers.peekLast()).getBufferConsumer().isBuffer()) {
Review comment:
So dangerous, why you so sure that buffers contain at least one object?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -265,6 +265,13 @@ public void run() {
channelInfo,
channelStatePersister,
next.getSequenceNumber());
+
+ // ignore the empty buffer directly
+ if (buffer.readableBytes() == 0) {
+ buffer.recycleBuffer();
Review comment:
The same question that earlier - is this bug? or where did we recycle
the buffer before this changes?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##########
@@ -83,8 +84,13 @@ public BufferManager(
// ------------------------------------------------------------------------
@Nullable
- Buffer requestBuffer() {
+ Buffer requestBuffer(int initialCredit) {
synchronized (bufferQueue) {
+ // decrease the number of buffers require to avoid the possibility
of
+ // allocating more than required buffers after the buffer is taken
+ if (initialCredit == 0) {
+ --numRequiredBuffers;
Review comment:
I don't understand why `initialCredit == 0` should be handled
differently here. Even if `initialCredit == 1` and the Buffer is requested we
should decrease this value, or am I wrong?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -357,11 +359,26 @@ public void resumeConsumption() throws IOException {
checkState(!isReleased.get(), "Channel released.");
checkPartitionRequestQueueInitialized();
+ if (initialCredit == 0) {
+ unannouncedCredit.set(0);
Review comment:
Do we need to do so because we released all buffers in
onBlockingUpstream? If so can we hold this code in one place, ex. to move it
into onBlockingUpstream?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##########
@@ -83,8 +84,13 @@ public BufferManager(
// ------------------------------------------------------------------------
@Nullable
- Buffer requestBuffer() {
+ Buffer requestBuffer(int initialCredit) {
Review comment:
As I understand, initialCredit is an unchangeable value, and
BufferManager and AvailableBufferQueue know this value so maybe it is better to
avoid this parameter?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -357,11 +359,26 @@ public void resumeConsumption() throws IOException {
checkState(!isReleased.get(), "Channel released.");
checkPartitionRequestQueueInitialized();
+ if (initialCredit == 0) {
+ unannouncedCredit.set(0);
+ }
+
// notifies the producer that this channel is ready to
// unblock from checkpoint and resume data consumption
partitionRequestClient.resumeConsumption(this);
}
+ private void onBlockingUpstream() {
+ if (initialCredit == 0) {
Review comment:
Why can not we do the same thing for any number of credits not only for
`initialCredit == 0`? Or does it slow down the load after resuming?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##########
@@ -215,9 +221,15 @@ public void recycle(MemorySegment segment) {
}
void releaseFloatingBuffers() {
+ Queue<Buffer> buffers;
synchronized (bufferQueue) {
numRequiredBuffers = 0;
- bufferQueue.releaseFloatingBuffers();
+ buffers = bufferQueue.clearFloatingBuffers();
+ }
+
+ // recycle all buffers out of the synchronization block to avoid dead
lock
Review comment:
Can you explain what kind of deadlock can happen? Between
LocalBufferPool and BufferManager?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -265,6 +265,13 @@ public void run() {
channelInfo,
channelStatePersister,
next.getSequenceNumber());
+
+ // ignore the empty buffer directly
+ if (buffer.readableBytes() == 0) {
Review comment:
How is it even possible to get an empty buffer? Who is sending it?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -513,19 +514,20 @@ private void increaseBuffersInBacklog(BufferConsumer
buffer) {
}
}
- /**
- * Gets the number of non-event buffers in this subpartition.
- *
- * <p><strong>Beware:</strong> This method should only be used in tests in
non-concurrent access
- * scenarios since it does not make any concurrency guarantees.
- */
- @SuppressWarnings("FieldAccessNotGuarded")
- @VisibleForTesting
+ /** Gets the number of non-event buffers in this subpartition. */
public int getBuffersInBacklog() {
- if (flushRequested || isFinished) {
- return buffersInBacklog;
- } else {
- return Math.max(buffersInBacklog - 1, 0);
+ synchronized (buffers) {
+ if (isBlocked || buffers.isEmpty()) {
Review comment:
It seems that it is the wrong place for such condition. Logically, even
if the subpartition is blocked it still has the buffers. But as I understand,
specifically for the case where 'initialCredit == 0' it should return 0. So it
needs to think how does it do better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]