reswqa commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1450161095
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##########
@@ -291,13 +291,18 @@ public void releaseAllResources() throws IOException {
}
@Override
- public void notifyDataAvailable() {
+ public void notifyDataAvailable(ResultSubpartitionView view) {
requestQueue.notifyReaderNonEmpty(this);
}
@Override
public void notifyPriorityEvent(int prioritySequenceNumber) {
- notifyDataAvailable();
+ notifyDataAvailable(this.subpartitionView);
+ }
+
+ @VisibleForTesting
+ public void notifyDataAvailable() {
+ notifyDataAvailable(subpartitionView);
Review Comment:
`subpartitionView ` -> `this.subpartitionView ` to consistent with
`notifyPriorityEvent`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java:
##########
@@ -69,10 +69,10 @@ default void notifyPriorityEvent(int priorityBufferNumber)
{}
* Get the availability and backlog of the view. The availability
represents if the view is
* ready to get buffer from it. The backlog represents the number of
available data buffers.
*
- * @param numCreditsAvailable the available credits for this {@link
ResultSubpartitionView}.
+ * @param isCreditAvailable the availability of credits for this {@link
ResultSubpartitionView}.
* @return availability and backlog.
*/
- AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable);
+ AvailabilityWithBacklog getAvailabilityAndBacklog(boolean
isCreditAvailable);
Review Comment:
Is this commit a must-have change, or just a refactor?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+
+/** A collection of subpartition indexes. */
+public class ResultSubpartitionIndexSet extends IndexRange {
Review Comment:
I wonder what is the difference between `IndexRange` and `IndexSet`. From my
side, it's better to align with the name of the parent class.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##########
@@ -291,13 +291,18 @@ public void releaseAllResources() throws IOException {
}
@Override
- public void notifyDataAvailable() {
+ public void notifyDataAvailable(ResultSubpartitionView view) {
requestQueue.notifyReaderNonEmpty(this);
}
@Override
public void notifyPriorityEvent(int prioritySequenceNumber) {
- notifyDataAvailable();
+ notifyDataAvailable(this.subpartitionView);
+ }
+
+ @VisibleForTesting
Review Comment:
Why this method is `@VisibleForTesting`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]