This is an automated email from the ASF dual-hosted git repository.

zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 69c772b7c9 refactor: enhance backpressure handling with byte tracking  
(#16028)
69c772b7c9 is described below

commit 69c772b7c929fa3c6a71e2bda1afee5554c477c6
Author: earthchen <[email protected]>
AuthorDate: Thu Jan 22 16:33:51 2026 +0800

    refactor: enhance backpressure handling with byte tracking  (#16028)
    
    * refactor: enhance backpressure handling with byte tracking in HTTP/2 
streams
    
    * Update 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * fix
    
    * refactor: expose methods for byte tracking in AbstractTripleClientStream 
and Http2ServerChannelObserver
    
    * refactor: enhance error handling and byte tracking in HTTP/2 stream 
observers
    
    * refactor: improve closing logic and state management in 
LengthFieldStreamingDecoder
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../http12/AbstractServerHttpChannelObserver.java  |  20 +-
 .../dubbo/remoting/http12/HttpOutputMessage.java   |  12 +
 .../remoting/http12/h1/Http1OutputMessage.java     |   8 +
 .../http12/h2/Http2OutputMessageFrame.java         |   8 +
 .../http12/h2/Http2ServerChannelObserver.java      | 134 ++++++++++-
 .../message/LengthFieldStreamingDecoder.java       |  58 ++++-
 ...Http2ServerChannelObserverByteCountingTest.java | 260 ++++++++++++++++++++
 .../http2/GenericHttp2ServerTransportListener.java |   7 +-
 .../tri/stream/AbstractTripleClientStream.java     |  89 ++++---
 ...AbstractTripleClientStreamByteCountingTest.java | 265 +++++++++++++++++++++
 .../protocol/tri/test/MockHttp2OutputMessage.java  |   8 +
 11 files changed, 815 insertions(+), 54 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index 3e4194e542..b4f0aaca83 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -25,6 +25,7 @@ import org.apache.dubbo.rpc.RpcContext;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
@@ -146,7 +147,11 @@ public abstract class AbstractServerHttpChannelObserver<H 
extends HttpChannel> i
         if (!headerSent) {
             sendMetadata(buildMetadata(statusCode, data, null, 
HttpOutputMessage.EMPTY_MESSAGE));
         }
-        sendMessage(buildMessage(statusCode, data));
+        sendMessage(buildMessage(statusCode, data)).whenComplete((unused, 
throwable) -> {
+            if (throwable != null) {
+                LOGGER.error(INTERNAL_ERROR, "", "", "Failed to send message 
on channel " + httpChannel, throwable);
+            }
+        });
     }
 
     protected final int resolveStatusCode(Object data) {
@@ -241,12 +246,13 @@ public abstract class AbstractServerHttpChannelObserver<H 
extends HttpChannel> i
         return getHttpChannel().newOutputMessage();
     }
 
-    protected final void sendMessage(HttpOutputMessage message) throws 
Throwable {
+    protected CompletableFuture<Void> sendMessage(HttpOutputMessage message) 
throws Throwable {
         if (message == null) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
-        getHttpChannel().writeMessage(message);
+        CompletableFuture<Void> future = 
getHttpChannel().writeMessage(message);
         postOutputMessage(message);
+        return future;
     }
 
     protected void preOutputMessage(HttpOutputMessage message) throws 
Throwable {}
@@ -274,7 +280,11 @@ public abstract class AbstractServerHttpChannelObserver<H 
extends HttpChannel> i
         if (!headerSent) {
             sendMetadata(buildMetadata(statusCode, data, throwable, 
HttpOutputMessage.EMPTY_MESSAGE));
         }
-        sendMessage(buildMessage(statusCode, data));
+        sendMessage(buildMessage(statusCode, data)).whenComplete((unused, t) 
-> {
+            if (t != null) {
+                LOGGER.error(INTERNAL_ERROR, "", "", "Failed to send error 
message on channel " + httpChannel, t);
+            }
+        });
     }
 
     protected final int resolveErrorStatusCode(Throwable throwable) {
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java
index ac64685bd0..a3a589d8e1 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java
@@ -30,10 +30,22 @@ public interface HttpOutputMessage extends AutoCloseable {
         public OutputStream getBody() {
             return INPUT_STREAM;
         }
+
+        @Override
+        public int messageSize() {
+            return 0;
+        }
     };
 
     OutputStream getBody();
 
+    /**
+     * Returns the size of the message body in bytes.
+     *
+     * @return the size of the message body, or 0 if unknown
+     */
+    int messageSize();
+
     @Override
     default void close() throws IOException {
         getBody().close();
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java
index e325743449..538556f29c 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java
@@ -43,4 +43,12 @@ public final class Http1OutputMessage implements 
HttpOutputMessage {
         }
         outputStream.close();
     }
+
+    @Override
+    public int messageSize() {
+        if (outputStream instanceof ByteBufOutputStream) {
+            return ((ByteBufOutputStream) 
outputStream).buffer().readableBytes();
+        }
+        return 0;
+    }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java
index 6a005893fc..8fe0aa4ccc 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java
@@ -49,4 +49,12 @@ public final class Http2OutputMessageFrame implements 
Http2OutputMessage {
     public boolean isEndStream() {
         return endStream;
     }
+
+    @Override
+    public int messageSize() {
+        if (body instanceof ByteBufOutputStream) {
+            return ((ByteBufOutputStream) body).buffer().readableBytes();
+        }
+        return 0;
+    }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
index 0fbf9bab0c..d413e25d13 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
@@ -24,39 +24,83 @@ import org.apache.dubbo.remoting.http12.HttpConstants;
 import org.apache.dubbo.remoting.http12.HttpHeaderNames;
 import org.apache.dubbo.remoting.http12.HttpHeaders;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
+import org.apache.dubbo.remoting.http12.HttpOutputMessage;
 import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
 import org.apache.dubbo.remoting.http12.netty4.NettyHttpHeaders;
 import org.apache.dubbo.rpc.CancellationContext;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
 import io.netty.handler.codec.http2.DefaultHttp2Headers;
 
 /**
  * HTTP/2 server-side stream observer with flow control and backpressure 
support.
- * Implements {@link ServerCallStreamObserver} following gRPC's pattern for 
backpressure.
+ * <p>
+ * Backpressure is implemented using a byte-counting strategy. Outbound 
messages are
+ * tracked in {@code numSentBytesQueued}, which represents the approximate 
number of
+ * bytes that have been queued but not yet acknowledged as sent.
+ * <p>
+ * The {@code ON_READY_THRESHOLD} (32KB) defines when this observer is 
considered "ready":
+ * <ul>
+ *     <li>{@link #isReady()} returns {@code true} when {@code 
numSentBytesQueued < ON_READY_THRESHOLD}</li>
+ *     <li>When the queued byte count drops from at or above the threshold to 
below it,
+ *         the registered {@code onReadyHandler} is invoked to signal that 
more data can be sent</li>
+ * </ul>
  */
 public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserver<H2StreamChannel>
         implements FlowControlStreamObserver<Object>,
                 Http2CancelableStreamObserver<Object>,
                 ServerCallStreamObserver<Object> {
 
+    /**
+     * Number of bytes currently queued, waiting to be sent.
+     * When this falls below ON_READY_THRESHOLD, onReady will be triggered.
+     */
+    private final AtomicLong numSentBytesQueued = new AtomicLong(0);
+
+    /**
+     * The threshold below which isReady() returns true (32KB).
+     */
+    protected static final long ON_READY_THRESHOLD = 32 * 1024;
+
     private CancellationContext cancellationContext;
 
     private StreamingDecoder streamingDecoder;
 
     private boolean autoRequestN = true;
 
-    private Runnable onReadyHandler;
+    private volatile Runnable onReadyHandler;
+
+    private volatile Executor executor = Runnable::run;
 
     public Http2ServerChannelObserver(H2StreamChannel h2StreamChannel) {
         super(h2StreamChannel);
     }
 
+    /**
+     * Sets the executor for async dispatch of callbacks.
+     */
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
     /**
      * Returns whether the stream is ready for writing.
-     * If false, the caller should avoid calling onNext to prevent blocking or 
excessive buffering.
+     * <p>
+     * Ready state is determined by byte counting: returns {@code true} when 
the number
+     * of queued bytes is below the threshold (32KB). If {@code false}, the 
caller should
+     * avoid calling {@code onNext} to prevent excessive buffering.
+     *
+     * @return {@code true} if the stream is ready for more data, {@code 
false} otherwise
      */
     public boolean isReady() {
-        return getHttpChannel().isReady();
+        H2StreamChannel channel = getHttpChannel();
+        if (channel == null) {
+            return false;
+        }
+        return numSentBytesQueued.get() < ON_READY_THRESHOLD;
     }
 
     /**
@@ -67,13 +111,15 @@ public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserve
     }
 
     /**
-     * Called when the channel writability changes.
-     * Triggers the onReadyHandler if the channel is now writable.
+     * Called by the transport layer when the underlying channel's writability 
changes.
+     * <p>
+     * This serves as an additional trigger point for notifying the {@code 
onReadyHandler}
+     * when the channel becomes writable again. The actual ready state is 
still determined
+     * by the byte counting mechanism in {@link #isReady()}.
      */
     public void onWritabilityChanged() {
-        Runnable handler = this.onReadyHandler;
-        if (handler != null && isReady()) {
-            handler.run();
+        if (isReady()) {
+            notifyOnReady();
         }
     }
 
@@ -81,6 +127,76 @@ public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserve
         this.streamingDecoder = streamingDecoder;
     }
 
+    /**
+     * Override to add byte counting for backpressure support.
+     */
+    @Override
+    protected CompletableFuture<Void> sendMessage(HttpOutputMessage message) 
throws Throwable {
+        if (message == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        int messageSize = message.messageSize();
+        onSendingBytes(messageSize);
+
+        CompletableFuture<Void> future = super.sendMessage(message);
+
+        future.whenComplete((v, t) -> {
+            if (t == null) {
+                onSentBytes(messageSize);
+            } else {
+                rollbackSendingBytes(messageSize);
+            }
+        });
+
+        return future;
+    }
+
+    /**
+     * Called before bytes are sent to track pending bytes.
+     */
+    protected void onSendingBytes(int numBytes) {
+        numSentBytesQueued.addAndGet(numBytes);
+    }
+
+    /**
+     * Called when sending fails to rollback the pending bytes count.
+     */
+    protected void rollbackSendingBytes(int numBytes) {
+        numSentBytesQueued.addAndGet(-numBytes);
+    }
+
+    /**
+     * Called when bytes have been successfully sent to the remote endpoint.
+     */
+    protected void onSentBytes(int numBytes) {
+        long oldValue = numSentBytesQueued.getAndAdd(-numBytes);
+        long newValue = oldValue - numBytes;
+        // Trigger onReady when transitioning from "not ready" to "ready"
+        if (oldValue >= ON_READY_THRESHOLD && newValue < ON_READY_THRESHOLD) {
+            notifyOnReady();
+        }
+    }
+
+    /**
+     * Returns the number of bytes currently queued for sending.
+     * Visible for testing.
+     */
+    protected long getNumSentBytesQueued() {
+        return numSentBytesQueued.get();
+    }
+
+    /**
+     * Notify the onReadyHandler that the stream is ready for writing.
+     */
+    protected void notifyOnReady() {
+        Runnable handler = this.onReadyHandler;
+        if (handler == null) {
+            return;
+        }
+        executor.execute(handler);
+    }
+
     @Override
     protected HttpMetadata encodeHttpMetadata(boolean endStream) {
         HttpHeaders headers = new NettyHttpHeaders<>(new 
DefaultHttp2Headers(false, 8));
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
index 282398d181..d1130375d7 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
@@ -80,16 +80,55 @@ public class LengthFieldStreamingDecoder implements 
StreamingDecoder {
 
     @Override
     public final void request(int numMessages) {
+        if (isClosed()) {
+            return;
+        }
         pendingDeliveries += numMessages;
         deliver();
     }
 
+    /**
+     * Marks the decoder for closing. The decoder will actually close when all
+     * requested messages have been delivered and no more data is available 
(stalled).
+     */
     @Override
     public final void close() {
+        if (isClosed()) {
+            return;
+        }
+        if (isStalled()) {
+            // No more data available, close immediately
+            doClose();
+            return;
+        }
+        // Mark for closing, will close when stalled
         closing = true;
         deliver();
     }
 
+    /**
+     * Actually close the decoder and notify the listener.
+     */
+    private void doClose() {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        try {
+            accumulate.close();
+        } catch (IOException e) {
+            // ignore
+        }
+        listener.onClose();
+    }
+
+    /**
+     * Returns true if the decoder has been closed.
+     */
+    private boolean isClosed() {
+        return closed;
+    }
+
     @Override
     public final void onStreamClosed() {
         if (closed) {
@@ -137,12 +176,12 @@ public class LengthFieldStreamingDecoder implements 
StreamingDecoder {
                         throw new AssertionError("Invalid state: " + state);
                 }
             }
-            if (closing) {
-                if (!closed) {
-                    closed = true;
-                    accumulate.close();
-                    listener.onClose();
-                }
+            // only close when stalled (no more data available).
+            // This ensures that when disableAutoRequest() is used and more 
messages are
+            // still buffered, the stream won't close prematurely. The 
application needs
+            // to call request() to receive remaining messages.
+            if (closing && isStalled()) {
+                doClose();
             }
         } catch (IOException e) {
             throw new DecodeException(e);
@@ -151,6 +190,13 @@ public class LengthFieldStreamingDecoder implements 
StreamingDecoder {
         }
     }
 
+    /**
+     * Returns true if there's no more data available to process.
+     */
+    private boolean isStalled() {
+        return accumulate.available() == 0;
+    }
+
     private void processHeader() throws IOException {
         byte[] offsetData = new byte[lengthFieldOffset];
         int ignore = accumulate.read(offsetData);
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserverByteCountingTest.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserverByteCountingTest.java
new file mode 100644
index 0000000000..117e454a0f
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserverByteCountingTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.h2;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for byte counting backpressure mechanism in 
Http2ServerChannelObserver.
+ */
+@Timeout(10)
+class Http2ServerChannelObserverByteCountingTest {
+
+    /**
+     * Test isReady returns true when below threshold.
+     */
+    @Test
+    void testIsReadyWhenBelowThreshold() {
+        TestableHttp2ServerChannelObserver observer = createObserver();
+
+        assertTrue(observer.isReady());
+
+        observer.onSendingBytes(1000);
+        assertTrue(observer.isReady());
+
+        observer.onSendingBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD - 1001);
+        assertTrue(observer.isReady());
+    }
+
+    /**
+     * Test isReady returns false when at or above threshold.
+     */
+    @Test
+    void testIsReadyWhenAtOrAboveThreshold() {
+        TestableHttp2ServerChannelObserver observer = createObserver();
+
+        observer.onSendingBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD);
+        assertFalse(observer.isReady());
+
+        observer.onSendingBytes(1000);
+        assertFalse(observer.isReady());
+    }
+
+    /**
+     * Test onReady is triggered when transitioning from not-ready to ready.
+     */
+    @Test
+    void testOnReadyTriggeredOnTransition() {
+        TestableHttp2ServerChannelObserver observer = createObserver();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+        // Send bytes to exceed threshold
+        observer.onSendingBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+        assertFalse(observer.isReady());
+        assertEquals(0, onReadyCount.get());
+
+        // Complete sending - should trigger onReady when crossing threshold
+        observer.onSentBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+        assertTrue(observer.isReady());
+        assertEquals(1, onReadyCount.get());
+    }
+
+    /**
+     * Test onReady is NOT triggered when staying below threshold.
+     */
+    @Test
+    void testOnReadyNotTriggeredWhenStayingBelowThreshold() {
+        TestableHttp2ServerChannelObserver observer = createObserver();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+        // Send small amount
+        observer.onSendingBytes(1000);
+        observer.onSentBytes(1000);
+        assertEquals(0, onReadyCount.get());
+
+        // Send another small amount
+        observer.onSendingBytes(2000);
+        observer.onSentBytes(2000);
+        assertEquals(0, onReadyCount.get());
+    }
+
+    /**
+     * Test multiple transitions trigger onReady each time.
+     */
+    @Test
+    void testMultipleTransitions() {
+        TestableHttp2ServerChannelObserver observer = createObserver();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+        // First cycle
+        observer.onSendingBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+        observer.onSentBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+        assertEquals(1, onReadyCount.get());
+
+        // Second cycle
+        observer.onSendingBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 2000);
+        observer.onSentBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 2000);
+        assertEquals(2, onReadyCount.get());
+
+        // Third cycle
+        observer.onSendingBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 3000);
+        observer.onSentBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 3000);
+        assertEquals(3, onReadyCount.get());
+    }
+
+    /**
+     * Test concurrent sends only trigger onReady once for single transition.
+     */
+    @Test
+    void testConcurrentSendsOnlyTriggerOnReadyOnce() throws 
InterruptedException {
+        TestableHttp2ServerChannelObserver observer = createObserver();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+        // Exceed threshold
+        observer.onSendingBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 10000);
+
+        // Simulate concurrent completions
+        int threadCount = 10;
+        int bytesPerThread = ((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 10000) / threadCount;
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch doneLatch = new CountDownLatch(threadCount);
+
+        for (int i = 0; i < threadCount; i++) {
+            executor.submit(() -> {
+                try {
+                    startLatch.await();
+                    observer.onSentBytes(bytesPerThread);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        startLatch.countDown();
+        doneLatch.await(5, TimeUnit.SECONDS);
+        executor.shutdown();
+
+        // Only one thread should trigger onReady
+        assertEquals(1, onReadyCount.get());
+        assertTrue(observer.isReady());
+    }
+
+    /**
+     * Test initial state is ready.
+     */
+    @Test
+    void testInitialStateIsReady() {
+        TestableHttp2ServerChannelObserver observer = createObserver();
+        assertTrue(observer.isReady());
+        assertEquals(0, observer.getNumSentBytesQueued());
+    }
+
+    /**
+     * Test rollback does not trigger onReady.
+     */
+    @Test
+    void testRollbackDoesNotTriggerOnReady() {
+        TestableHttp2ServerChannelObserver observer = createObserver();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+        // Exceed threshold
+        observer.onSendingBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+
+        // Rollback (simulating send failure)
+        observer.rollbackSendingBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+
+        // Should not trigger onReady
+        assertTrue(observer.isReady());
+        assertEquals(0, onReadyCount.get());
+    }
+
+    /**
+     * Test exact threshold boundary.
+     */
+    @Test
+    void testExactThresholdBoundary() {
+        TestableHttp2ServerChannelObserver observer = createObserver();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+        // At exactly threshold - not ready
+        observer.onSendingBytes((int) 
Http2ServerChannelObserver.ON_READY_THRESHOLD);
+        assertFalse(observer.isReady());
+
+        // Send 1 byte to go below threshold
+        observer.onSentBytes(1);
+        assertTrue(observer.isReady());
+        assertEquals(1, onReadyCount.get());
+    }
+
+    // ==================== Helper Methods ====================
+
+    private TestableHttp2ServerChannelObserver createObserver() {
+        H2StreamChannel mockChannel = mock(H2StreamChannel.class);
+        return new TestableHttp2ServerChannelObserver(mockChannel);
+    }
+
+    /**
+     * Testable subclass that exposes protected methods for testing.
+     */
+    private static class TestableHttp2ServerChannelObserver extends 
Http2ServerChannelObserver {
+
+        public TestableHttp2ServerChannelObserver(H2StreamChannel 
h2StreamChannel) {
+            super(h2StreamChannel);
+        }
+
+        @Override
+        public void onSendingBytes(int numBytes) {
+            super.onSendingBytes(numBytes);
+        }
+
+        @Override
+        public void rollbackSendingBytes(int numBytes) {
+            super.rollbackSendingBytes(numBytes);
+        }
+
+        @Override
+        public void onSentBytes(int numBytes) {
+            super.onSentBytes(numBytes);
+        }
+
+        @Override
+        public long getNumSentBytesQueued() {
+            return super.getNumSentBytesQueued();
+        }
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 03d6807393..64ca2ab699 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -179,6 +179,7 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
     @Override
     protected void onMetadataCompletion(Http2Header metadata) {
         
responseObserver.setResponseEncoder(getContext().getHttpMessageEncoder());
+        responseObserver.setExecutor(getExecutor());
         responseObserver.request(1);
         if (metadata.isEndStream()) {
             getStreamingDecoder().close();
@@ -233,11 +234,7 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
 
     @Override
     public void onWritabilityChanged() {
-        if (getExecutor() == null) {
-            responseObserver.onWritabilityChanged();
-        } else {
-            getExecutor().execute(responseObserver::onWritabilityChanged);
-        }
+        responseObserver.onWritabilityChanged();
     }
 
     private static final class Http2StreamingDecodeListener implements 
ListeningDecoder.Listener {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
index dfbf1cb007..339517e714 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
@@ -49,6 +49,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.protobuf.Any;
 import com.google.rpc.DebugInfo;
@@ -87,9 +88,15 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
     private boolean isReturnTriException = false;
 
     /**
-     * Tracks the last known ready state to detect when the state changes from 
"not ready" to "ready".
+     * Number of bytes currently queued, waiting to be sent.
+     * When this falls below ON_READY_THRESHOLD, onReady will be triggered.
      */
-    private volatile boolean lastReadyState = false;
+    private final AtomicLong numSentBytesQueued = new AtomicLong(0);
+
+    /**
+     * The threshold below which isReady() returns true (32KB).
+     */
+    protected static final long ON_READY_THRESHOLD = 32 * 1024;
 
     protected AbstractTripleClientStream(
             FrameworkModel frameworkModel,
@@ -194,20 +201,64 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
         if (!checkResult.isSuccess()) {
             return checkResult;
         }
+
+        final int messageSize = message.length;
+        onSendingBytes(messageSize);
+
         final DataQueueCommand cmd = 
DataQueueCommand.create(streamChannelFuture, message, false, compressFlag);
         return this.writeQueue.enqueueFuture(cmd, 
parent.eventLoop()).addListener(future -> {
             if (!future.isSuccess()) {
+                rollbackSendingBytes(messageSize);
                 cancelByLocal(TriRpcStatus.INTERNAL
                         .withDescription("Client write message failed")
                         .withCause(future.cause()));
                 transportException(future.cause());
             } else {
-                // After successful write, check if we need to trigger onReady
-                notifyOnReady(false);
+                onSentBytes(messageSize);
             }
         });
     }
 
+    /**
+     * Called before bytes are sent to track pending bytes.
+     *
+     * @param numBytes the number of bytes about to be sent
+     */
+    protected void onSendingBytes(int numBytes) {
+        numSentBytesQueued.addAndGet(numBytes);
+    }
+
+    /**
+     * Called when sending fails to rollback the pending bytes count.
+     *
+     * @param numBytes the number of bytes to rollback
+     */
+    protected void rollbackSendingBytes(int numBytes) {
+        numSentBytesQueued.addAndGet(-numBytes);
+    }
+
+    /**
+     * Called when bytes have been successfully sent to the remote endpoint.
+     *
+     * @param numBytes the number of bytes that were sent
+     */
+    protected void onSentBytes(int numBytes) {
+        long oldValue = numSentBytesQueued.getAndAdd(-numBytes);
+        long newValue = oldValue - numBytes;
+        // Trigger onReady when transitioning from "not ready" to "ready"
+        if (oldValue >= ON_READY_THRESHOLD && newValue < ON_READY_THRESHOLD) {
+            listener.onReady();
+        }
+    }
+
+    /**
+     * Returns the number of bytes currently queued for sending.
+     * Visible for testing.
+     */
+    protected long getNumSentBytesQueued() {
+        return numSentBytesQueued.get();
+    }
+
     @Override
     public void request(int n) {
         deframer.request(n);
@@ -256,43 +307,23 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
         if (channel == null) {
             return false;
         }
-        return channel.isWritable();
+        return numSentBytesQueued.get() < ON_READY_THRESHOLD;
     }
 
     /**
      * Called when the channel writability changes.
-     * This method should be invoked by the transport handler when 
channelWritabilityChanged is triggered.
-     * It synchronously notifies the listener (TripleClientCall) which is 
responsible for
-     * asynchronously triggering all necessary callbacks through its executor.
      */
     protected void onWritabilityChanged() {
-        notifyOnReady(false);
+        if (isReady()) {
+            listener.onReady();
+        }
     }
 
     /**
      * Called by InitOnReadyQueueCommand to trigger the initial onReady 
notification.
      */
     public void triggerInitialOnReady() {
-        notifyOnReady(true);
-    }
-
-    /**
-     * notify listener when stream becomes ready
-     *
-     * @param forceNotify if true, always trigger onReady (for initial 
notification);
-     *                    if false, only trigger when state changes from "not 
ready" to "ready"
-     */
-    private synchronized void notifyOnReady(boolean forceNotify) {
-        boolean wasReady = lastReadyState;
-        boolean isNowReady = isReady();
-        lastReadyState = isNowReady;
-
-        // Trigger onReady if:
-        // 1. forceNotify is true (initial notification, spurious is OK), or
-        // 2. state changes from "not ready" to "ready"
-        if (forceNotify || (!wasReady && isNowReady)) {
-            listener.onReady();
-        }
+        listener.onReady();
     }
 
     class ClientTransportListener extends AbstractH2TransportListener 
implements H2TransportListener {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStreamByteCountingTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStreamByteCountingTest.java
new file mode 100644
index 0000000000..d814575d14
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStreamByteCountingTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.stream;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for byte counting backpressure mechanism in 
AbstractTripleClientStream.
+ * This test class mirrors the logic used in AbstractTripleClientStream to 
verify
+ * the correctness of the byte counting mechanism.
+ */
+@Timeout(10)
+class AbstractTripleClientStreamByteCountingTest {
+
+    private static final long ON_READY_THRESHOLD = 32 * 1024; // 32KB
+
+    /**
+     * Test isReady returns true when below threshold.
+     */
+    @Test
+    void testIsReadyWhenBelowThreshold() {
+        ClientStreamByteCounter counter = new ClientStreamByteCounter();
+
+        assertTrue(counter.isReady());
+
+        counter.onSendingBytes(1000);
+        assertTrue(counter.isReady());
+
+        counter.onSendingBytes((int) ON_READY_THRESHOLD - 1001);
+        assertTrue(counter.isReady());
+    }
+
+    /**
+     * Test isReady returns false when at or above threshold.
+     */
+    @Test
+    void testIsReadyWhenAtOrAboveThreshold() {
+        ClientStreamByteCounter counter = new ClientStreamByteCounter();
+
+        counter.onSendingBytes((int) ON_READY_THRESHOLD);
+        assertFalse(counter.isReady());
+
+        counter.onSendingBytes(1000);
+        assertFalse(counter.isReady());
+    }
+
+    /**
+     * Test onReady is triggered when transitioning from not-ready to ready.
+     */
+    @Test
+    void testOnReadyTriggeredOnTransition() {
+        ClientStreamByteCounter counter = new ClientStreamByteCounter();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+        // Send bytes to exceed threshold
+        counter.onSendingBytes((int) ON_READY_THRESHOLD + 1000);
+        assertFalse(counter.isReady());
+        assertEquals(0, onReadyCount.get());
+
+        // Complete sending - should trigger onReady when crossing threshold
+        counter.onSentBytes((int) ON_READY_THRESHOLD + 1000);
+        assertTrue(counter.isReady());
+        assertEquals(1, onReadyCount.get());
+    }
+
+    /**
+     * Test onReady is NOT triggered when staying below threshold.
+     */
+    @Test
+    void testOnReadyNotTriggeredWhenStayingBelowThreshold() {
+        ClientStreamByteCounter counter = new ClientStreamByteCounter();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+        // Send small amount
+        counter.onSendingBytes(1000);
+        counter.onSentBytes(1000);
+        assertEquals(0, onReadyCount.get());
+
+        // Send another small amount
+        counter.onSendingBytes(2000);
+        counter.onSentBytes(2000);
+        assertEquals(0, onReadyCount.get());
+    }
+
+    /**
+     * Test multiple transitions trigger onReady each time.
+     */
+    @Test
+    void testMultipleTransitions() {
+        ClientStreamByteCounter counter = new ClientStreamByteCounter();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+        // First cycle
+        counter.onSendingBytes((int) ON_READY_THRESHOLD + 1000);
+        counter.onSentBytes((int) ON_READY_THRESHOLD + 1000);
+        assertEquals(1, onReadyCount.get());
+
+        // Second cycle
+        counter.onSendingBytes((int) ON_READY_THRESHOLD + 2000);
+        counter.onSentBytes((int) ON_READY_THRESHOLD + 2000);
+        assertEquals(2, onReadyCount.get());
+
+        // Third cycle
+        counter.onSendingBytes((int) ON_READY_THRESHOLD + 3000);
+        counter.onSentBytes((int) ON_READY_THRESHOLD + 3000);
+        assertEquals(3, onReadyCount.get());
+    }
+
+    /**
+     * Test concurrent sends only trigger onReady once for single transition.
+     */
+    @Test
+    void testConcurrentSendsOnlyTriggerOnReadyOnce() throws 
InterruptedException {
+        ClientStreamByteCounter counter = new ClientStreamByteCounter();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+        // Exceed threshold
+        counter.onSendingBytes((int) ON_READY_THRESHOLD + 10000);
+
+        // Simulate concurrent completions
+        int threadCount = 10;
+        int bytesPerThread = ((int) ON_READY_THRESHOLD + 10000) / threadCount;
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch doneLatch = new CountDownLatch(threadCount);
+
+        for (int i = 0; i < threadCount; i++) {
+            executor.submit(() -> {
+                try {
+                    startLatch.await();
+                    counter.onSentBytes(bytesPerThread);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        startLatch.countDown();
+        doneLatch.await(5, TimeUnit.SECONDS);
+        executor.shutdown();
+
+        // Only one thread should trigger onReady
+        assertEquals(1, onReadyCount.get());
+        assertTrue(counter.isReady());
+    }
+
+    /**
+     * Test initial state is ready.
+     */
+    @Test
+    void testInitialStateIsReady() {
+        ClientStreamByteCounter counter = new ClientStreamByteCounter();
+        assertTrue(counter.isReady());
+        assertEquals(0, counter.getNumSentBytesQueued());
+    }
+
+    /**
+     * Test rollback does not trigger onReady.
+     */
+    @Test
+    void testRollbackDoesNotTriggerOnReady() {
+        ClientStreamByteCounter counter = new ClientStreamByteCounter();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+        // Exceed threshold
+        counter.onSendingBytes((int) ON_READY_THRESHOLD + 1000);
+
+        // Rollback (simulating send failure)
+        counter.rollbackSendingBytes((int) ON_READY_THRESHOLD + 1000);
+
+        // Should not trigger onReady
+        assertTrue(counter.isReady());
+        assertEquals(0, onReadyCount.get());
+    }
+
+    /**
+     * Test exact threshold boundary.
+     */
+    @Test
+    void testExactThresholdBoundary() {
+        ClientStreamByteCounter counter = new ClientStreamByteCounter();
+        AtomicInteger onReadyCount = new AtomicInteger(0);
+        counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+        // At exactly threshold - not ready
+        counter.onSendingBytes((int) ON_READY_THRESHOLD);
+        assertFalse(counter.isReady());
+
+        // Send 1 byte to go below threshold
+        counter.onSentBytes(1);
+        assertTrue(counter.isReady());
+        assertEquals(1, onReadyCount.get());
+    }
+
+    /**
+     * Simulates the byte counting logic from AbstractTripleClientStream for 
testing.
+     */
+    private static class ClientStreamByteCounter {
+        private final AtomicLong numSentBytesQueued = new AtomicLong(0);
+        private Runnable onReadyCallback;
+
+        public boolean isReady() {
+            return numSentBytesQueued.get() < ON_READY_THRESHOLD;
+        }
+
+        public void setOnReadyCallback(Runnable callback) {
+            this.onReadyCallback = callback;
+        }
+
+        public void onSendingBytes(int numBytes) {
+            numSentBytesQueued.addAndGet(numBytes);
+        }
+
+        public void rollbackSendingBytes(int numBytes) {
+            numSentBytesQueued.addAndGet(-numBytes);
+        }
+
+        public void onSentBytes(int numBytes) {
+            long oldValue = numSentBytesQueued.getAndAdd(-numBytes);
+            long newValue = oldValue - numBytes;
+            if (oldValue >= ON_READY_THRESHOLD && newValue < 
ON_READY_THRESHOLD) {
+                if (onReadyCallback != null) {
+                    onReadyCallback.run();
+                }
+            }
+        }
+
+        public long getNumSentBytesQueued() {
+            return numSentBytesQueued.get();
+        }
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/MockHttp2OutputMessage.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/MockHttp2OutputMessage.java
index 18fd78158a..a328575f42 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/MockHttp2OutputMessage.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/MockHttp2OutputMessage.java
@@ -40,4 +40,12 @@ public class MockHttp2OutputMessage implements 
Http2OutputMessage {
     public boolean isEndStream() {
         return endStream;
     }
+
+    @Override
+    public int messageSize() {
+        if (outputStream instanceof ByteArrayOutputStream) {
+            return ((ByteArrayOutputStream) outputStream).size();
+        }
+        return 0;
+    }
 }


Reply via email to