reswqa commented on code in PR #23179:
URL: https://github.com/apache/flink/pull/23179#discussion_r1293008011
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java:
##########
@@ -43,6 +44,9 @@ public class HsSubpartitionConsumerMemoryDataManager
implements HsDataView {
@GuardedBy("consumerLock")
private final Deque<HsBufferContext> unConsumedBuffers = new
LinkedList<>();
+ @GuardedBy("consumerLock")
Review Comment:
This is guarded by a lock, why it also required as atomic?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java:
##########
@@ -173,7 +179,19 @@ public void releaseDataView() {
@GuardedBy("consumerLock")
private void trimHeadingReleasedBuffers() {
while (!unConsumedBuffers.isEmpty() &&
unConsumedBuffers.peekFirst().isReleased()) {
- unConsumedBuffers.removeFirst();
+ decreaseBacklog(unConsumedBuffers.removeFirst().getBuffer());
+ }
+ }
+
+ private void increaseBacklog(Buffer buffer) {
Review Comment:
```suggestion
private void tryIncreaseBacklog(Buffer buffer) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java:
##########
@@ -173,7 +179,19 @@ public void releaseDataView() {
@GuardedBy("consumerLock")
private void trimHeadingReleasedBuffers() {
while (!unConsumedBuffers.isEmpty() &&
unConsumedBuffers.peekFirst().isReleased()) {
- unConsumedBuffers.removeFirst();
+ decreaseBacklog(unConsumedBuffers.removeFirst().getBuffer());
+ }
+ }
+
+ private void increaseBacklog(Buffer buffer) {
+ if (buffer.isBuffer()) {
+ backlog.getAndIncrement();
+ }
+ }
+
+ private void decreaseBacklog(Buffer buffer) {
Review Comment:
```suggestion
private void tryDecreaseBacklog(Buffer buffer) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java:
##########
@@ -312,14 +319,28 @@ private Optional<BufferIndexOrError>
checkAndGetFirstBufferIndexOrError(
// Because the update of consumption progress may be delayed,
there is a
// very small probability to load the buffer that has been
consumed from memory.
// Skip these buffers directly to avoid repeated consumption.
-
buffersToRecycle.add(checkNotNull(loadedBuffers.poll()).buffer);
+ Buffer buffer = checkNotNull(loadedBuffers.poll()).buffer;
+ decreaseBacklog(buffer);
+ buffersToRecycle.add(buffer);
peek = loadedBuffers.peek();
}
}
return Optional.ofNullable(peek);
}
+ private void increaseBacklog(Buffer buffer) {
+ if (buffer.isBuffer()) {
+ backlog.getAndIncrement();
+ }
+ }
+
+ private void decreaseBacklog(Buffer buffer) {
Review Comment:
```suggestion
private void tryDecreaseBacklog(Buffer buffer) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java:
##########
@@ -197,17 +201,32 @@ private Optional<Buffer>
readNettyPayload(Queue<NettyPayload> nettyPayloadQueue)
}
}
- private Buffer.DataType getNettyPayloadNextDataType(Queue<NettyPayload>
nettyPayload) {
- NettyPayload nextBuffer = nettyPayload.peek();
- if (nextBuffer == null || !nextBuffer.getBuffer().isPresent()) {
+ private int getBacklog() {
+ int backlog = 0;
+ for (NettyPayloadQueue queue : nettyPayloadQueues) {
+ backlog += queue.getBacklog();
+ }
+ return backlog;
+ }
+
+ private boolean ignoreZeroCredit(NettyPayloadQueue nettyPayloadQueue) {
Review Comment:
```suggestion
private boolean isEventOrError(NettyPayloadQueue nettyPayloadQueue) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java:
##########
@@ -173,7 +179,19 @@ public void releaseDataView() {
@GuardedBy("consumerLock")
private void trimHeadingReleasedBuffers() {
while (!unConsumedBuffers.isEmpty() &&
unConsumedBuffers.peekFirst().isReleased()) {
- unConsumedBuffers.removeFirst();
+ decreaseBacklog(unConsumedBuffers.removeFirst().getBuffer());
+ }
+ }
+
+ private void increaseBacklog(Buffer buffer) {
+ if (buffer.isBuffer()) {
+ backlog.getAndIncrement();
+ }
+ }
+
+ private void decreaseBacklog(Buffer buffer) {
Review Comment:
I believe that we should also handle this in `trimHeadingReleasedBuffers`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java:
##########
@@ -94,14 +94,14 @@ public BufferAndBacklog getNextBuffer() throws IOException {
@Override
public AvailabilityWithBacklog getAvailabilityAndBacklog(int
numCreditsAvailable) {
if (findCurrentNettyPayloadQueue()) {
- Queue<NettyPayload> currentQueue =
+ NettyPayloadQueue currentQueue =
nettyPayloadQueues.get(queueIndexContainsCurrentSegment);
+ checkState(numCreditsAvailable >= 0);
Review Comment:
This might already checked in
`org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader#getNextBuffer`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java:
##########
@@ -312,14 +319,28 @@ private Optional<BufferIndexOrError>
checkAndGetFirstBufferIndexOrError(
// Because the update of consumption progress may be delayed,
there is a
// very small probability to load the buffer that has been
consumed from memory.
// Skip these buffers directly to avoid repeated consumption.
-
buffersToRecycle.add(checkNotNull(loadedBuffers.poll()).buffer);
+ Buffer buffer = checkNotNull(loadedBuffers.poll()).buffer;
+ decreaseBacklog(buffer);
+ buffersToRecycle.add(buffer);
peek = loadedBuffers.peek();
}
}
return Optional.ofNullable(peek);
}
+ private void increaseBacklog(Buffer buffer) {
Review Comment:
```suggestion
private void tryIncreaseBacklog(Buffer buffer) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -236,18 +236,18 @@ private int readBuffersFromFile() {
}
private List<ScheduledSubpartitionReader> sortScheduledReaders() {
+ List<ScheduledSubpartitionReader> scheduledReaders;
synchronized (lock) {
if (isReleased) {
return new ArrayList<>();
}
- List<ScheduledSubpartitionReader> scheduledReaders = new
ArrayList<>();
- for (ScheduledSubpartitionReader reader :
allScheduledReaders.values()) {
- reader.prepareForScheduling();
- scheduledReaders.add(reader);
- }
- Collections.sort(scheduledReaders);
- return scheduledReaders;
+ scheduledReaders = new ArrayList<>(allScheduledReaders.values());
+ }
+ for (ScheduledSubpartitionReader reader : scheduledReaders) {
Review Comment:
We need a test to validate this is fixed, even though such tests may not be
easy to write.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java:
##########
@@ -65,6 +65,14 @@ default void notifyPriorityEvent(int priorityBufferNumber) {}
*/
Throwable getFailureCause();
+ /**
+ * Get the availability and backlog of the view. The availability
represents if the view is
+ * ready to get buffer from it. The backlog represents the number of
available buffers whose
+ * {@link Buffer.DataType} is buffer.
Review Comment:
```suggestion
* Get the availability and backlog of the view. The availability
represents if the view is
* ready to get buffer from it. The backlog represents the number of
available data buffers.
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java:
##########
@@ -72,6 +73,9 @@ public class HsSubpartitionFileReaderImpl implements
HsSubpartitionFileReader {
private final Consumer<HsSubpartitionFileReader> fileReaderReleaser;
+ @GuardedBy("lock")
Review Comment:
Why this field is guarded by lock?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyPayloadQueue.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.Queue;
+
+/** {@link NettyPayloadQueue} is used to contain all netty payloads from a
storage tier. */
+public class NettyPayloadQueue {
+
+ private final Object lock = new Object();
+
+ private final Queue<NettyPayload> queue = new LinkedList<>();
+
+ /** Number of buffers whose {@link Buffer.DataType} is buffer in the
queue. */
+ private int backlog = 0;
Review Comment:
```suggestion
@GuardedBy("lock")
private int backlog = 0;
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java:
##########
@@ -173,7 +179,19 @@ public void releaseDataView() {
@GuardedBy("consumerLock")
private void trimHeadingReleasedBuffers() {
while (!unConsumedBuffers.isEmpty() &&
unConsumedBuffers.peekFirst().isReleased()) {
- unConsumedBuffers.removeFirst();
+ decreaseBacklog(unConsumedBuffers.removeFirst().getBuffer());
+ }
+ }
+
+ private void increaseBacklog(Buffer buffer) {
Review Comment:
```suggestion
@GuardedBy("consumerLock")
private void increaseBacklog(Buffer buffer) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java:
##########
@@ -173,7 +179,19 @@ public void releaseDataView() {
@GuardedBy("consumerLock")
private void trimHeadingReleasedBuffers() {
while (!unConsumedBuffers.isEmpty() &&
unConsumedBuffers.peekFirst().isReleased()) {
- unConsumedBuffers.removeFirst();
+ decreaseBacklog(unConsumedBuffers.removeFirst().getBuffer());
+ }
+ }
+
+ private void increaseBacklog(Buffer buffer) {
+ if (buffer.isBuffer()) {
+ backlog.getAndIncrement();
+ }
+ }
+
+ private void decreaseBacklog(Buffer buffer) {
Review Comment:
```suggestion
@GuardedBy("consumerLock")
private void decreaseBacklog(Buffer buffer) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java:
##########
@@ -197,17 +201,32 @@ private Optional<Buffer>
readNettyPayload(Queue<NettyPayload> nettyPayloadQueue)
}
}
- private Buffer.DataType getNettyPayloadNextDataType(Queue<NettyPayload>
nettyPayload) {
- NettyPayload nextBuffer = nettyPayload.peek();
- if (nextBuffer == null || !nextBuffer.getBuffer().isPresent()) {
+ private int getBacklog() {
+ int backlog = 0;
+ for (NettyPayloadQueue queue : nettyPayloadQueues) {
+ backlog += queue.getBacklog();
+ }
+ return backlog;
+ }
+
+ private boolean ignoreZeroCredit(NettyPayloadQueue nettyPayloadQueue) {
+ NettyPayload nettyPayload = nettyPayloadQueue.peek();
+ return nettyPayload != null
+ && (nettyPayload.getError().isPresent()
+ || (nettyPayload.getBuffer().isPresent()
+ && nettyPayload.getBuffer().get().isBuffer()));
Review Comment:
Is there a mistake here? Why data buffer can be vies as ignore zero credit?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]