This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 69c772b7c9 refactor: enhance backpressure handling with byte tracking
(#16028)
69c772b7c9 is described below
commit 69c772b7c929fa3c6a71e2bda1afee5554c477c6
Author: earthchen <[email protected]>
AuthorDate: Thu Jan 22 16:33:51 2026 +0800
refactor: enhance backpressure handling with byte tracking (#16028)
* refactor: enhance backpressure handling with byte tracking in HTTP/2
streams
* Update
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
Co-authored-by: Copilot <[email protected]>
* Update
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
Co-authored-by: Copilot <[email protected]>
* fix
* refactor: expose methods for byte tracking in AbstractTripleClientStream
and Http2ServerChannelObserver
* refactor: enhance error handling and byte tracking in HTTP/2 stream
observers
* refactor: improve closing logic and state management in
LengthFieldStreamingDecoder
---------
Co-authored-by: Copilot <[email protected]>
---
.../http12/AbstractServerHttpChannelObserver.java | 20 +-
.../dubbo/remoting/http12/HttpOutputMessage.java | 12 +
.../remoting/http12/h1/Http1OutputMessage.java | 8 +
.../http12/h2/Http2OutputMessageFrame.java | 8 +
.../http12/h2/Http2ServerChannelObserver.java | 134 ++++++++++-
.../message/LengthFieldStreamingDecoder.java | 58 ++++-
...Http2ServerChannelObserverByteCountingTest.java | 260 ++++++++++++++++++++
.../http2/GenericHttp2ServerTransportListener.java | 7 +-
.../tri/stream/AbstractTripleClientStream.java | 89 ++++---
...AbstractTripleClientStreamByteCountingTest.java | 265 +++++++++++++++++++++
.../protocol/tri/test/MockHttp2OutputMessage.java | 8 +
11 files changed, 815 insertions(+), 54 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index 3e4194e542..b4f0aaca83 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -25,6 +25,7 @@ import org.apache.dubbo.rpc.RpcContext;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
@@ -146,7 +147,11 @@ public abstract class AbstractServerHttpChannelObserver<H
extends HttpChannel> i
if (!headerSent) {
sendMetadata(buildMetadata(statusCode, data, null,
HttpOutputMessage.EMPTY_MESSAGE));
}
- sendMessage(buildMessage(statusCode, data));
+ sendMessage(buildMessage(statusCode, data)).whenComplete((unused,
throwable) -> {
+ if (throwable != null) {
+ LOGGER.error(INTERNAL_ERROR, "", "", "Failed to send message
on channel " + httpChannel, throwable);
+ }
+ });
}
protected final int resolveStatusCode(Object data) {
@@ -241,12 +246,13 @@ public abstract class AbstractServerHttpChannelObserver<H
extends HttpChannel> i
return getHttpChannel().newOutputMessage();
}
- protected final void sendMessage(HttpOutputMessage message) throws
Throwable {
+ protected CompletableFuture<Void> sendMessage(HttpOutputMessage message)
throws Throwable {
if (message == null) {
- return;
+ return CompletableFuture.completedFuture(null);
}
- getHttpChannel().writeMessage(message);
+ CompletableFuture<Void> future =
getHttpChannel().writeMessage(message);
postOutputMessage(message);
+ return future;
}
protected void preOutputMessage(HttpOutputMessage message) throws
Throwable {}
@@ -274,7 +280,11 @@ public abstract class AbstractServerHttpChannelObserver<H
extends HttpChannel> i
if (!headerSent) {
sendMetadata(buildMetadata(statusCode, data, throwable,
HttpOutputMessage.EMPTY_MESSAGE));
}
- sendMessage(buildMessage(statusCode, data));
+ sendMessage(buildMessage(statusCode, data)).whenComplete((unused, t)
-> {
+ if (t != null) {
+ LOGGER.error(INTERNAL_ERROR, "", "", "Failed to send error
message on channel " + httpChannel, t);
+ }
+ });
}
protected final int resolveErrorStatusCode(Throwable throwable) {
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java
index ac64685bd0..a3a589d8e1 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpOutputMessage.java
@@ -30,10 +30,22 @@ public interface HttpOutputMessage extends AutoCloseable {
public OutputStream getBody() {
return INPUT_STREAM;
}
+
+ @Override
+ public int messageSize() {
+ return 0;
+ }
};
OutputStream getBody();
+ /**
+ * Returns the size of the message body in bytes.
+ *
+ * @return the size of the message body, or 0 if unknown
+ */
+ int messageSize();
+
@Override
default void close() throws IOException {
getBody().close();
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java
index e325743449..538556f29c 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1OutputMessage.java
@@ -43,4 +43,12 @@ public final class Http1OutputMessage implements
HttpOutputMessage {
}
outputStream.close();
}
+
+ @Override
+ public int messageSize() {
+ if (outputStream instanceof ByteBufOutputStream) {
+ return ((ByteBufOutputStream)
outputStream).buffer().readableBytes();
+ }
+ return 0;
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java
index 6a005893fc..8fe0aa4ccc 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2OutputMessageFrame.java
@@ -49,4 +49,12 @@ public final class Http2OutputMessageFrame implements
Http2OutputMessage {
public boolean isEndStream() {
return endStream;
}
+
+ @Override
+ public int messageSize() {
+ if (body instanceof ByteBufOutputStream) {
+ return ((ByteBufOutputStream) body).buffer().readableBytes();
+ }
+ return 0;
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
index 0fbf9bab0c..d413e25d13 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
@@ -24,39 +24,83 @@ import org.apache.dubbo.remoting.http12.HttpConstants;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
+import org.apache.dubbo.remoting.http12.HttpOutputMessage;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
import org.apache.dubbo.remoting.http12.netty4.NettyHttpHeaders;
import org.apache.dubbo.rpc.CancellationContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
import io.netty.handler.codec.http2.DefaultHttp2Headers;
/**
* HTTP/2 server-side stream observer with flow control and backpressure
support.
- * Implements {@link ServerCallStreamObserver} following gRPC's pattern for
backpressure.
+ * <p>
+ * Backpressure is implemented using a byte-counting strategy. Outbound
messages are
+ * tracked in {@code numSentBytesQueued}, which represents the approximate
number of
+ * bytes that have been queued but not yet acknowledged as sent.
+ * <p>
+ * The {@code ON_READY_THRESHOLD} (32KB) defines when this observer is
considered "ready":
+ * <ul>
+ * <li>{@link #isReady()} returns {@code true} when {@code
numSentBytesQueued < ON_READY_THRESHOLD}</li>
+ * <li>When the queued byte count drops from at or above the threshold to
below it,
+ * the registered {@code onReadyHandler} is invoked to signal that
more data can be sent</li>
+ * </ul>
*/
public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserver<H2StreamChannel>
implements FlowControlStreamObserver<Object>,
Http2CancelableStreamObserver<Object>,
ServerCallStreamObserver<Object> {
+ /**
+ * Number of bytes currently queued, waiting to be sent.
+ * When this falls below ON_READY_THRESHOLD, onReady will be triggered.
+ */
+ private final AtomicLong numSentBytesQueued = new AtomicLong(0);
+
+ /**
+ * The threshold below which isReady() returns true (32KB).
+ */
+ protected static final long ON_READY_THRESHOLD = 32 * 1024;
+
private CancellationContext cancellationContext;
private StreamingDecoder streamingDecoder;
private boolean autoRequestN = true;
- private Runnable onReadyHandler;
+ private volatile Runnable onReadyHandler;
+
+ private volatile Executor executor = Runnable::run;
public Http2ServerChannelObserver(H2StreamChannel h2StreamChannel) {
super(h2StreamChannel);
}
+ /**
+ * Sets the executor for async dispatch of callbacks.
+ */
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
/**
* Returns whether the stream is ready for writing.
- * If false, the caller should avoid calling onNext to prevent blocking or
excessive buffering.
+ * <p>
+ * Ready state is determined by byte counting: returns {@code true} when
the number
+ * of queued bytes is below the threshold (32KB). If {@code false}, the
caller should
+ * avoid calling {@code onNext} to prevent excessive buffering.
+ *
+ * @return {@code true} if the stream is ready for more data, {@code
false} otherwise
*/
public boolean isReady() {
- return getHttpChannel().isReady();
+ H2StreamChannel channel = getHttpChannel();
+ if (channel == null) {
+ return false;
+ }
+ return numSentBytesQueued.get() < ON_READY_THRESHOLD;
}
/**
@@ -67,13 +111,15 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
}
/**
- * Called when the channel writability changes.
- * Triggers the onReadyHandler if the channel is now writable.
+ * Called by the transport layer when the underlying channel's writability
changes.
+ * <p>
+ * This serves as an additional trigger point for notifying the {@code
onReadyHandler}
+ * when the channel becomes writable again. The actual ready state is
still determined
+ * by the byte counting mechanism in {@link #isReady()}.
*/
public void onWritabilityChanged() {
- Runnable handler = this.onReadyHandler;
- if (handler != null && isReady()) {
- handler.run();
+ if (isReady()) {
+ notifyOnReady();
}
}
@@ -81,6 +127,76 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
this.streamingDecoder = streamingDecoder;
}
+ /**
+ * Override to add byte counting for backpressure support.
+ */
+ @Override
+ protected CompletableFuture<Void> sendMessage(HttpOutputMessage message)
throws Throwable {
+ if (message == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ int messageSize = message.messageSize();
+ onSendingBytes(messageSize);
+
+ CompletableFuture<Void> future = super.sendMessage(message);
+
+ future.whenComplete((v, t) -> {
+ if (t == null) {
+ onSentBytes(messageSize);
+ } else {
+ rollbackSendingBytes(messageSize);
+ }
+ });
+
+ return future;
+ }
+
+ /**
+ * Called before bytes are sent to track pending bytes.
+ */
+ protected void onSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(numBytes);
+ }
+
+ /**
+ * Called when sending fails to rollback the pending bytes count.
+ */
+ protected void rollbackSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(-numBytes);
+ }
+
+ /**
+ * Called when bytes have been successfully sent to the remote endpoint.
+ */
+ protected void onSentBytes(int numBytes) {
+ long oldValue = numSentBytesQueued.getAndAdd(-numBytes);
+ long newValue = oldValue - numBytes;
+ // Trigger onReady when transitioning from "not ready" to "ready"
+ if (oldValue >= ON_READY_THRESHOLD && newValue < ON_READY_THRESHOLD) {
+ notifyOnReady();
+ }
+ }
+
+ /**
+ * Returns the number of bytes currently queued for sending.
+ * Visible for testing.
+ */
+ protected long getNumSentBytesQueued() {
+ return numSentBytesQueued.get();
+ }
+
+ /**
+ * Notify the onReadyHandler that the stream is ready for writing.
+ */
+ protected void notifyOnReady() {
+ Runnable handler = this.onReadyHandler;
+ if (handler == null) {
+ return;
+ }
+ executor.execute(handler);
+ }
+
@Override
protected HttpMetadata encodeHttpMetadata(boolean endStream) {
HttpHeaders headers = new NettyHttpHeaders<>(new
DefaultHttp2Headers(false, 8));
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
index 282398d181..d1130375d7 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
@@ -80,16 +80,55 @@ public class LengthFieldStreamingDecoder implements
StreamingDecoder {
@Override
public final void request(int numMessages) {
+ if (isClosed()) {
+ return;
+ }
pendingDeliveries += numMessages;
deliver();
}
+ /**
+ * Marks the decoder for closing. The decoder will actually close when all
+ * requested messages have been delivered and no more data is available
(stalled).
+ */
@Override
public final void close() {
+ if (isClosed()) {
+ return;
+ }
+ if (isStalled()) {
+ // No more data available, close immediately
+ doClose();
+ return;
+ }
+ // Mark for closing, will close when stalled
closing = true;
deliver();
}
+ /**
+ * Actually close the decoder and notify the listener.
+ */
+ private void doClose() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ accumulate.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ listener.onClose();
+ }
+
+ /**
+ * Returns true if the decoder has been closed.
+ */
+ private boolean isClosed() {
+ return closed;
+ }
+
@Override
public final void onStreamClosed() {
if (closed) {
@@ -137,12 +176,12 @@ public class LengthFieldStreamingDecoder implements
StreamingDecoder {
throw new AssertionError("Invalid state: " + state);
}
}
- if (closing) {
- if (!closed) {
- closed = true;
- accumulate.close();
- listener.onClose();
- }
+ // only close when stalled (no more data available).
+ // This ensures that when disableAutoRequest() is used and more
messages are
+ // still buffered, the stream won't close prematurely. The
application needs
+ // to call request() to receive remaining messages.
+ if (closing && isStalled()) {
+ doClose();
}
} catch (IOException e) {
throw new DecodeException(e);
@@ -151,6 +190,13 @@ public class LengthFieldStreamingDecoder implements
StreamingDecoder {
}
}
+ /**
+ * Returns true if there's no more data available to process.
+ */
+ private boolean isStalled() {
+ return accumulate.available() == 0;
+ }
+
private void processHeader() throws IOException {
byte[] offsetData = new byte[lengthFieldOffset];
int ignore = accumulate.read(offsetData);
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserverByteCountingTest.java
b/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserverByteCountingTest.java
new file mode 100644
index 0000000000..117e454a0f
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserverByteCountingTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.h2;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for byte counting backpressure mechanism in
Http2ServerChannelObserver.
+ */
+@Timeout(10)
+class Http2ServerChannelObserverByteCountingTest {
+
+ /**
+ * Test isReady returns true when below threshold.
+ */
+ @Test
+ void testIsReadyWhenBelowThreshold() {
+ TestableHttp2ServerChannelObserver observer = createObserver();
+
+ assertTrue(observer.isReady());
+
+ observer.onSendingBytes(1000);
+ assertTrue(observer.isReady());
+
+ observer.onSendingBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD - 1001);
+ assertTrue(observer.isReady());
+ }
+
+ /**
+ * Test isReady returns false when at or above threshold.
+ */
+ @Test
+ void testIsReadyWhenAtOrAboveThreshold() {
+ TestableHttp2ServerChannelObserver observer = createObserver();
+
+ observer.onSendingBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD);
+ assertFalse(observer.isReady());
+
+ observer.onSendingBytes(1000);
+ assertFalse(observer.isReady());
+ }
+
+ /**
+ * Test onReady is triggered when transitioning from not-ready to ready.
+ */
+ @Test
+ void testOnReadyTriggeredOnTransition() {
+ TestableHttp2ServerChannelObserver observer = createObserver();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+ // Send bytes to exceed threshold
+ observer.onSendingBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+ assertFalse(observer.isReady());
+ assertEquals(0, onReadyCount.get());
+
+ // Complete sending - should trigger onReady when crossing threshold
+ observer.onSentBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+ assertTrue(observer.isReady());
+ assertEquals(1, onReadyCount.get());
+ }
+
+ /**
+ * Test onReady is NOT triggered when staying below threshold.
+ */
+ @Test
+ void testOnReadyNotTriggeredWhenStayingBelowThreshold() {
+ TestableHttp2ServerChannelObserver observer = createObserver();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+ // Send small amount
+ observer.onSendingBytes(1000);
+ observer.onSentBytes(1000);
+ assertEquals(0, onReadyCount.get());
+
+ // Send another small amount
+ observer.onSendingBytes(2000);
+ observer.onSentBytes(2000);
+ assertEquals(0, onReadyCount.get());
+ }
+
+ /**
+ * Test multiple transitions trigger onReady each time.
+ */
+ @Test
+ void testMultipleTransitions() {
+ TestableHttp2ServerChannelObserver observer = createObserver();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+ // First cycle
+ observer.onSendingBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+ observer.onSentBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+ assertEquals(1, onReadyCount.get());
+
+ // Second cycle
+ observer.onSendingBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 2000);
+ observer.onSentBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 2000);
+ assertEquals(2, onReadyCount.get());
+
+ // Third cycle
+ observer.onSendingBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 3000);
+ observer.onSentBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 3000);
+ assertEquals(3, onReadyCount.get());
+ }
+
+ /**
+ * Test concurrent sends only trigger onReady once for single transition.
+ */
+ @Test
+ void testConcurrentSendsOnlyTriggerOnReadyOnce() throws
InterruptedException {
+ TestableHttp2ServerChannelObserver observer = createObserver();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+ // Exceed threshold
+ observer.onSendingBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 10000);
+
+ // Simulate concurrent completions
+ int threadCount = 10;
+ int bytesPerThread = ((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 10000) / threadCount;
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ observer.onSentBytes(bytesPerThread);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+ doneLatch.await(5, TimeUnit.SECONDS);
+ executor.shutdown();
+
+ // Only one thread should trigger onReady
+ assertEquals(1, onReadyCount.get());
+ assertTrue(observer.isReady());
+ }
+
+ /**
+ * Test initial state is ready.
+ */
+ @Test
+ void testInitialStateIsReady() {
+ TestableHttp2ServerChannelObserver observer = createObserver();
+ assertTrue(observer.isReady());
+ assertEquals(0, observer.getNumSentBytesQueued());
+ }
+
+ /**
+ * Test rollback does not trigger onReady.
+ */
+ @Test
+ void testRollbackDoesNotTriggerOnReady() {
+ TestableHttp2ServerChannelObserver observer = createObserver();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+ // Exceed threshold
+ observer.onSendingBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+
+ // Rollback (simulating send failure)
+ observer.rollbackSendingBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD + 1000);
+
+ // Should not trigger onReady
+ assertTrue(observer.isReady());
+ assertEquals(0, onReadyCount.get());
+ }
+
+ /**
+ * Test exact threshold boundary.
+ */
+ @Test
+ void testExactThresholdBoundary() {
+ TestableHttp2ServerChannelObserver observer = createObserver();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ observer.setOnReadyHandler(onReadyCount::incrementAndGet);
+
+ // At exactly threshold - not ready
+ observer.onSendingBytes((int)
Http2ServerChannelObserver.ON_READY_THRESHOLD);
+ assertFalse(observer.isReady());
+
+ // Send 1 byte to go below threshold
+ observer.onSentBytes(1);
+ assertTrue(observer.isReady());
+ assertEquals(1, onReadyCount.get());
+ }
+
+ // ==================== Helper Methods ====================
+
+ private TestableHttp2ServerChannelObserver createObserver() {
+ H2StreamChannel mockChannel = mock(H2StreamChannel.class);
+ return new TestableHttp2ServerChannelObserver(mockChannel);
+ }
+
+ /**
+ * Testable subclass that exposes protected methods for testing.
+ */
+ private static class TestableHttp2ServerChannelObserver extends
Http2ServerChannelObserver {
+
+ public TestableHttp2ServerChannelObserver(H2StreamChannel
h2StreamChannel) {
+ super(h2StreamChannel);
+ }
+
+ @Override
+ public void onSendingBytes(int numBytes) {
+ super.onSendingBytes(numBytes);
+ }
+
+ @Override
+ public void rollbackSendingBytes(int numBytes) {
+ super.rollbackSendingBytes(numBytes);
+ }
+
+ @Override
+ public void onSentBytes(int numBytes) {
+ super.onSentBytes(numBytes);
+ }
+
+ @Override
+ public long getNumSentBytesQueued() {
+ return super.getNumSentBytesQueued();
+ }
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 03d6807393..64ca2ab699 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -179,6 +179,7 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
@Override
protected void onMetadataCompletion(Http2Header metadata) {
responseObserver.setResponseEncoder(getContext().getHttpMessageEncoder());
+ responseObserver.setExecutor(getExecutor());
responseObserver.request(1);
if (metadata.isEndStream()) {
getStreamingDecoder().close();
@@ -233,11 +234,7 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
@Override
public void onWritabilityChanged() {
- if (getExecutor() == null) {
- responseObserver.onWritabilityChanged();
- } else {
- getExecutor().execute(responseObserver::onWritabilityChanged);
- }
+ responseObserver.onWritabilityChanged();
}
private static final class Http2StreamingDecodeListener implements
ListeningDecoder.Listener {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
index dfbf1cb007..339517e714 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
@@ -49,6 +49,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
import com.google.protobuf.Any;
import com.google.rpc.DebugInfo;
@@ -87,9 +88,15 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
private boolean isReturnTriException = false;
/**
- * Tracks the last known ready state to detect when the state changes from
"not ready" to "ready".
+ * Number of bytes currently queued, waiting to be sent.
+ * When this falls below ON_READY_THRESHOLD, onReady will be triggered.
*/
- private volatile boolean lastReadyState = false;
+ private final AtomicLong numSentBytesQueued = new AtomicLong(0);
+
+ /**
+ * The threshold below which isReady() returns true (32KB).
+ */
+ protected static final long ON_READY_THRESHOLD = 32 * 1024;
protected AbstractTripleClientStream(
FrameworkModel frameworkModel,
@@ -194,20 +201,64 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
if (!checkResult.isSuccess()) {
return checkResult;
}
+
+ final int messageSize = message.length;
+ onSendingBytes(messageSize);
+
final DataQueueCommand cmd =
DataQueueCommand.create(streamChannelFuture, message, false, compressFlag);
return this.writeQueue.enqueueFuture(cmd,
parent.eventLoop()).addListener(future -> {
if (!future.isSuccess()) {
+ rollbackSendingBytes(messageSize);
cancelByLocal(TriRpcStatus.INTERNAL
.withDescription("Client write message failed")
.withCause(future.cause()));
transportException(future.cause());
} else {
- // After successful write, check if we need to trigger onReady
- notifyOnReady(false);
+ onSentBytes(messageSize);
}
});
}
+ /**
+ * Called before bytes are sent to track pending bytes.
+ *
+ * @param numBytes the number of bytes about to be sent
+ */
+ protected void onSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(numBytes);
+ }
+
+ /**
+ * Called when sending fails to rollback the pending bytes count.
+ *
+ * @param numBytes the number of bytes to rollback
+ */
+ protected void rollbackSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(-numBytes);
+ }
+
+ /**
+ * Called when bytes have been successfully sent to the remote endpoint.
+ *
+ * @param numBytes the number of bytes that were sent
+ */
+ protected void onSentBytes(int numBytes) {
+ long oldValue = numSentBytesQueued.getAndAdd(-numBytes);
+ long newValue = oldValue - numBytes;
+ // Trigger onReady when transitioning from "not ready" to "ready"
+ if (oldValue >= ON_READY_THRESHOLD && newValue < ON_READY_THRESHOLD) {
+ listener.onReady();
+ }
+ }
+
+ /**
+ * Returns the number of bytes currently queued for sending.
+ * Visible for testing.
+ */
+ protected long getNumSentBytesQueued() {
+ return numSentBytesQueued.get();
+ }
+
@Override
public void request(int n) {
deframer.request(n);
@@ -256,43 +307,23 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
if (channel == null) {
return false;
}
- return channel.isWritable();
+ return numSentBytesQueued.get() < ON_READY_THRESHOLD;
}
/**
* Called when the channel writability changes.
- * This method should be invoked by the transport handler when
channelWritabilityChanged is triggered.
- * It synchronously notifies the listener (TripleClientCall) which is
responsible for
- * asynchronously triggering all necessary callbacks through its executor.
*/
protected void onWritabilityChanged() {
- notifyOnReady(false);
+ if (isReady()) {
+ listener.onReady();
+ }
}
/**
* Called by InitOnReadyQueueCommand to trigger the initial onReady
notification.
*/
public void triggerInitialOnReady() {
- notifyOnReady(true);
- }
-
- /**
- * notify listener when stream becomes ready
- *
- * @param forceNotify if true, always trigger onReady (for initial
notification);
- * if false, only trigger when state changes from "not
ready" to "ready"
- */
- private synchronized void notifyOnReady(boolean forceNotify) {
- boolean wasReady = lastReadyState;
- boolean isNowReady = isReady();
- lastReadyState = isNowReady;
-
- // Trigger onReady if:
- // 1. forceNotify is true (initial notification, spurious is OK), or
- // 2. state changes from "not ready" to "ready"
- if (forceNotify || (!wasReady && isNowReady)) {
- listener.onReady();
- }
+ listener.onReady();
}
class ClientTransportListener extends AbstractH2TransportListener
implements H2TransportListener {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStreamByteCountingTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStreamByteCountingTest.java
new file mode 100644
index 0000000000..d814575d14
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStreamByteCountingTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.stream;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for byte counting backpressure mechanism in
AbstractTripleClientStream.
+ * This test class mirrors the logic used in AbstractTripleClientStream to
verify
+ * the correctness of the byte counting mechanism.
+ */
+@Timeout(10)
+class AbstractTripleClientStreamByteCountingTest {
+
+ private static final long ON_READY_THRESHOLD = 32 * 1024; // 32KB
+
+ /**
+ * Test isReady returns true when below threshold.
+ */
+ @Test
+ void testIsReadyWhenBelowThreshold() {
+ ClientStreamByteCounter counter = new ClientStreamByteCounter();
+
+ assertTrue(counter.isReady());
+
+ counter.onSendingBytes(1000);
+ assertTrue(counter.isReady());
+
+ counter.onSendingBytes((int) ON_READY_THRESHOLD - 1001);
+ assertTrue(counter.isReady());
+ }
+
+ /**
+ * Test isReady returns false when at or above threshold.
+ */
+ @Test
+ void testIsReadyWhenAtOrAboveThreshold() {
+ ClientStreamByteCounter counter = new ClientStreamByteCounter();
+
+ counter.onSendingBytes((int) ON_READY_THRESHOLD);
+ assertFalse(counter.isReady());
+
+ counter.onSendingBytes(1000);
+ assertFalse(counter.isReady());
+ }
+
+ /**
+ * Test onReady is triggered when transitioning from not-ready to ready.
+ */
+ @Test
+ void testOnReadyTriggeredOnTransition() {
+ ClientStreamByteCounter counter = new ClientStreamByteCounter();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+ // Send bytes to exceed threshold
+ counter.onSendingBytes((int) ON_READY_THRESHOLD + 1000);
+ assertFalse(counter.isReady());
+ assertEquals(0, onReadyCount.get());
+
+ // Complete sending - should trigger onReady when crossing threshold
+ counter.onSentBytes((int) ON_READY_THRESHOLD + 1000);
+ assertTrue(counter.isReady());
+ assertEquals(1, onReadyCount.get());
+ }
+
+ /**
+ * Test onReady is NOT triggered when staying below threshold.
+ */
+ @Test
+ void testOnReadyNotTriggeredWhenStayingBelowThreshold() {
+ ClientStreamByteCounter counter = new ClientStreamByteCounter();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+ // Send small amount
+ counter.onSendingBytes(1000);
+ counter.onSentBytes(1000);
+ assertEquals(0, onReadyCount.get());
+
+ // Send another small amount
+ counter.onSendingBytes(2000);
+ counter.onSentBytes(2000);
+ assertEquals(0, onReadyCount.get());
+ }
+
+ /**
+ * Test multiple transitions trigger onReady each time.
+ */
+ @Test
+ void testMultipleTransitions() {
+ ClientStreamByteCounter counter = new ClientStreamByteCounter();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+ // First cycle
+ counter.onSendingBytes((int) ON_READY_THRESHOLD + 1000);
+ counter.onSentBytes((int) ON_READY_THRESHOLD + 1000);
+ assertEquals(1, onReadyCount.get());
+
+ // Second cycle
+ counter.onSendingBytes((int) ON_READY_THRESHOLD + 2000);
+ counter.onSentBytes((int) ON_READY_THRESHOLD + 2000);
+ assertEquals(2, onReadyCount.get());
+
+ // Third cycle
+ counter.onSendingBytes((int) ON_READY_THRESHOLD + 3000);
+ counter.onSentBytes((int) ON_READY_THRESHOLD + 3000);
+ assertEquals(3, onReadyCount.get());
+ }
+
+ /**
+ * Test concurrent sends only trigger onReady once for single transition.
+ */
+ @Test
+ void testConcurrentSendsOnlyTriggerOnReadyOnce() throws
InterruptedException {
+ ClientStreamByteCounter counter = new ClientStreamByteCounter();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+ // Exceed threshold
+ counter.onSendingBytes((int) ON_READY_THRESHOLD + 10000);
+
+ // Simulate concurrent completions
+ int threadCount = 10;
+ int bytesPerThread = ((int) ON_READY_THRESHOLD + 10000) / threadCount;
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ counter.onSentBytes(bytesPerThread);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+ doneLatch.await(5, TimeUnit.SECONDS);
+ executor.shutdown();
+
+ // Only one thread should trigger onReady
+ assertEquals(1, onReadyCount.get());
+ assertTrue(counter.isReady());
+ }
+
+ /**
+ * Test initial state is ready.
+ */
+ @Test
+ void testInitialStateIsReady() {
+ ClientStreamByteCounter counter = new ClientStreamByteCounter();
+ assertTrue(counter.isReady());
+ assertEquals(0, counter.getNumSentBytesQueued());
+ }
+
+ /**
+ * Test rollback does not trigger onReady.
+ */
+ @Test
+ void testRollbackDoesNotTriggerOnReady() {
+ ClientStreamByteCounter counter = new ClientStreamByteCounter();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+ // Exceed threshold
+ counter.onSendingBytes((int) ON_READY_THRESHOLD + 1000);
+
+ // Rollback (simulating send failure)
+ counter.rollbackSendingBytes((int) ON_READY_THRESHOLD + 1000);
+
+ // Should not trigger onReady
+ assertTrue(counter.isReady());
+ assertEquals(0, onReadyCount.get());
+ }
+
+ /**
+ * Test exact threshold boundary.
+ */
+ @Test
+ void testExactThresholdBoundary() {
+ ClientStreamByteCounter counter = new ClientStreamByteCounter();
+ AtomicInteger onReadyCount = new AtomicInteger(0);
+ counter.setOnReadyCallback(onReadyCount::incrementAndGet);
+
+ // At exactly threshold - not ready
+ counter.onSendingBytes((int) ON_READY_THRESHOLD);
+ assertFalse(counter.isReady());
+
+ // Send 1 byte to go below threshold
+ counter.onSentBytes(1);
+ assertTrue(counter.isReady());
+ assertEquals(1, onReadyCount.get());
+ }
+
+ /**
+ * Simulates the byte counting logic from AbstractTripleClientStream for
testing.
+ */
+ private static class ClientStreamByteCounter {
+ private final AtomicLong numSentBytesQueued = new AtomicLong(0);
+ private Runnable onReadyCallback;
+
+ public boolean isReady() {
+ return numSentBytesQueued.get() < ON_READY_THRESHOLD;
+ }
+
+ public void setOnReadyCallback(Runnable callback) {
+ this.onReadyCallback = callback;
+ }
+
+ public void onSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(numBytes);
+ }
+
+ public void rollbackSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(-numBytes);
+ }
+
+ public void onSentBytes(int numBytes) {
+ long oldValue = numSentBytesQueued.getAndAdd(-numBytes);
+ long newValue = oldValue - numBytes;
+ if (oldValue >= ON_READY_THRESHOLD && newValue <
ON_READY_THRESHOLD) {
+ if (onReadyCallback != null) {
+ onReadyCallback.run();
+ }
+ }
+ }
+
+ public long getNumSentBytesQueued() {
+ return numSentBytesQueued.get();
+ }
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/MockHttp2OutputMessage.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/MockHttp2OutputMessage.java
index 18fd78158a..a328575f42 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/MockHttp2OutputMessage.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/MockHttp2OutputMessage.java
@@ -40,4 +40,12 @@ public class MockHttp2OutputMessage implements
Http2OutputMessage {
public boolean isEndStream() {
return endStream;
}
+
+ @Override
+ public int messageSize() {
+ if (outputStream instanceof ByteArrayOutputStream) {
+ return ((ByteArrayOutputStream) outputStream).size();
+ }
+ return 0;
+ }
}