WencongLiu commented on code in PR #23179:
URL: https://github.com/apache/flink/pull/23179#discussion_r1294530862
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java:
##########
@@ -173,7 +179,19 @@ public void releaseDataView() {
@GuardedBy("consumerLock")
private void trimHeadingReleasedBuffers() {
while (!unConsumedBuffers.isEmpty() &&
unConsumedBuffers.peekFirst().isReleased()) {
- unConsumedBuffers.removeFirst();
+ decreaseBacklog(unConsumedBuffers.removeFirst().getBuffer());
+ }
+ }
+
+ private void increaseBacklog(Buffer buffer) {
+ if (buffer.isBuffer()) {
+ backlog.getAndIncrement();
+ }
+ }
+
+ private void decreaseBacklog(Buffer buffer) {
Review Comment:
There exists `decreaseBacklog` in `trimHeadingReleasedBuffers ` currently. 🤔
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java:
##########
@@ -173,7 +179,19 @@ public void releaseDataView() {
@GuardedBy("consumerLock")
private void trimHeadingReleasedBuffers() {
while (!unConsumedBuffers.isEmpty() &&
unConsumedBuffers.peekFirst().isReleased()) {
- unConsumedBuffers.removeFirst();
+ decreaseBacklog(unConsumedBuffers.removeFirst().getBuffer());
+ }
+ }
+
+ private void increaseBacklog(Buffer buffer) {
+ if (buffer.isBuffer()) {
+ backlog.getAndIncrement();
+ }
+ }
+
+ private void decreaseBacklog(Buffer buffer) {
Review Comment:
There exists `decreaseBacklog` in `trimHeadingReleasedBuffers ` currently. 🤔
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java:
##########
@@ -43,6 +44,9 @@ public class HsSubpartitionConsumerMemoryDataManager
implements HsDataView {
@GuardedBy("consumerLock")
private final Deque<HsBufferContext> unConsumedBuffers = new
LinkedList<>();
+ @GuardedBy("consumerLock")
Review Comment:
The write or read is not always be locked by `consumerLock` in all methods,
such as `getBacklog()`. The `consumerLock` is removed currently.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java:
##########
@@ -65,6 +65,14 @@ default void notifyPriorityEvent(int priorityBufferNumber) {}
*/
Throwable getFailureCause();
+ /**
+ * Get the availability and backlog of the view. The availability
represents if the view is
+ * ready to get buffer from it. The backlog represents the number of
available buffers whose
+ * {@link Buffer.DataType} is buffer.
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]