WencongLiu commented on code in PR #23179:
URL: https://github.com/apache/flink/pull/23179#discussion_r1294530227


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java:
##########
@@ -197,17 +201,32 @@ private Optional<Buffer> 
readNettyPayload(Queue<NettyPayload> nettyPayloadQueue)
         }
     }
 
-    private Buffer.DataType getNettyPayloadNextDataType(Queue<NettyPayload> 
nettyPayload) {
-        NettyPayload nextBuffer = nettyPayload.peek();
-        if (nextBuffer == null || !nextBuffer.getBuffer().isPresent()) {
+    private int getBacklog() {
+        int backlog = 0;
+        for (NettyPayloadQueue queue : nettyPayloadQueues) {
+            backlog += queue.getBacklog();
+        }
+        return backlog;
+    }
+
+    private boolean ignoreZeroCredit(NettyPayloadQueue nettyPayloadQueue) {
+        NettyPayload nettyPayload = nettyPayloadQueue.peek();
+        return nettyPayload != null
+                && (nettyPayload.getError().isPresent()
+                        || (nettyPayload.getBuffer().isPresent()
+                                && nettyPayload.getBuffer().get().isBuffer()));

Review Comment:
   Sorry for the careless mistake. Fixed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java:
##########
@@ -197,17 +201,32 @@ private Optional<Buffer> 
readNettyPayload(Queue<NettyPayload> nettyPayloadQueue)
         }
     }
 
-    private Buffer.DataType getNettyPayloadNextDataType(Queue<NettyPayload> 
nettyPayload) {
-        NettyPayload nextBuffer = nettyPayload.peek();
-        if (nextBuffer == null || !nextBuffer.getBuffer().isPresent()) {
+    private int getBacklog() {
+        int backlog = 0;
+        for (NettyPayloadQueue queue : nettyPayloadQueues) {
+            backlog += queue.getBacklog();
+        }
+        return backlog;
+    }
+
+    private boolean ignoreZeroCredit(NettyPayloadQueue nettyPayloadQueue) {

Review Comment:
   Fixed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java:
##########
@@ -94,14 +94,14 @@ public BufferAndBacklog getNextBuffer() throws IOException {
     @Override
     public AvailabilityWithBacklog getAvailabilityAndBacklog(int 
numCreditsAvailable) {
         if (findCurrentNettyPayloadQueue()) {
-            Queue<NettyPayload> currentQueue =
+            NettyPayloadQueue currentQueue =
                     nettyPayloadQueues.get(queueIndexContainsCurrentSegment);
+            checkState(numCreditsAvailable >= 0);

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to