This is an automated email from the ASF dual-hosted git repository.

zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 48f3a62134 Tri backpressure (#15967)
48f3a62134 is described below

commit 48f3a62134bad498dddff7c213e79a46711f9532
Author: earthchen <[email protected]>
AuthorDate: Thu Jan 8 17:06:48 2026 +0800

    Tri backpressure (#15967)
    
    * support disableAutoInboundFlowControl api
    
    * support back press
    
    * fix
    
    * fix
    
    * Update 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * fix
    
    * fix
    
    * fix test
    
    * add isReady and setOnReadyHandler api
    
    * fix
    
    * fix
    
    * fix Backpressure it
    
    * fix
    
    * fix
    
    * ClientCallStreamObserver & ServerCallStreamObserver
    
    * add license
    
    * mvn spotless:apply
    
    * fix dubbo plugin
    
    * fix dubbo plugin spotless
    
    * fix
    
    * fix ut
    
    * Add gRPC-compatible APIs and fix the integration tests
    
    * fix
    
    * fix
    
    * remove default
    
    * remove default
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: Rain Yu <[email protected]>
    Co-authored-by: Wang Chengming <[email protected]>
---
 .../dubbo/common/stream}/CallStreamObserver.java   | 55 +++++++++---
 .../common/stream/ClientCallStreamObserver.java    | 99 ++++++++++++++++++++++
 .../common/stream/ClientResponseObserver.java      | 91 ++++++++++++++++++++
 .../common/stream/ServerCallStreamObserver.java    | 85 +++++++++++++++++++
 .../mutiny/AbstractTripleMutinyPublisher.java      |  4 +-
 .../mutiny/AbstractTripleMutinySubscriber.java     |  2 +-
 .../dubbo/mutiny/ClientTripleMutinyPublisher.java  |  2 +-
 .../dubbo/mutiny/ServerTripleMutinyPublisher.java  |  2 +-
 .../dubbo/mutiny/ServerTripleMutinySubscriber.java |  2 +-
 .../dubbo/mutiny/calls/MutinyClientCalls.java      |  2 +-
 .../dubbo/mutiny/calls/MutinyServerCalls.java      |  2 +-
 .../mutiny/handler/ManyToManyMethodHandler.java    |  2 +-
 .../mutiny/handler/ManyToOneMethodHandler.java     |  2 +-
 .../apache/dubbo/mutiny/MutinyClientCallsTest.java |  2 +-
 .../apache/dubbo/mutiny/MutinyServerCallsTest.java |  2 +-
 .../dubbo/mutiny/TripleMutinyPublisherTest.java    |  2 +-
 .../dubbo/mutiny/TripleMutinySubscriberTest.java   |  2 +-
 .../reactive/AbstractTripleReactorPublisher.java   | 21 ++++-
 .../reactive/AbstractTripleReactorSubscriber.java  |  4 +-
 .../reactive/ClientTripleReactorPublisher.java     | 23 +++--
 .../reactive/ServerTripleReactorPublisher.java     |  2 +-
 .../reactive/ServerTripleReactorSubscriber.java    |  2 +-
 .../dubbo/reactive/calls/ReactorClientCalls.java   |  2 +-
 .../dubbo/reactive/calls/ReactorServerCalls.java   |  2 +-
 .../reactive/handler/ManyToManyMethodHandler.java  |  2 +-
 .../reactive/handler/ManyToOneMethodHandler.java   |  2 +-
 .../dubbo/reactive/CreateObserverAdapter.java      |  8 +-
 .../http12/h2/Http2ServerChannelObserver.java      | 14 ++-
 .../rpc/protocol/tri/ClientStreamObserver.java     |  9 +-
 .../rpc/protocol/tri/ServerStreamObserver.java     |  9 +-
 .../dubbo/rpc/protocol/tri/TripleInvoker.java      | 67 ++++++++++++---
 .../dubbo/rpc/protocol/tri/call/ClientCall.java    |  6 +-
 .../rpc/protocol/tri/call/TripleClientCall.java    | 17 ++--
 .../tri/command/InitOnReadyQueueCommand.java       | 67 +++++++++++++++
 .../tri/h12/http2/Http2ServerStreamObserver.java   |  3 +-
 .../tri/h12/http2/Http2TripleClientStream.java     |  2 +-
 .../protocol/tri/h3/Http3TripleClientStream.java   |  2 +-
 .../tri/observer/ClientCallToObserverAdapter.java  |  4 +-
 .../tri/stream/AbstractTripleClientStream.java     | 17 +++-
 .../dubbo/rpc/protocol/tri/TripleInvokerTest.java  |  5 +-
 .../rpc/protocol/tri/call/BackpressureTest.java    |  5 +-
 .../tri/stream/TripleClientStreamTest.java         |  8 +-
 42 files changed, 574 insertions(+), 87 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/CallStreamObserver.java
similarity index 66%
rename from 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java
rename to 
dubbo-common/src/main/java/org/apache/dubbo/common/stream/CallStreamObserver.java
index 46da513406..a08dd4c4b2 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/CallStreamObserver.java
@@ -14,24 +14,57 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.rpc.protocol.tri.observer;
-
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
+package org.apache.dubbo.common.stream;
 
 /**
  * An extension of {@link StreamObserver} that provides additional 
functionality for flow control
- * and backpressure. This interface mirrors gRPC's {@code CallStreamObserver} 
for compatibility.
+ * and backpressure. This interface mirrors gRPC's {@code 
io.grpc.stub.CallStreamObserver} for compatibility.
+ *
+ * <p>This is the base interface for both client-side ({@link 
ClientCallStreamObserver}) and
+ * server-side ({@link ServerCallStreamObserver}) flow control observers. It 
provides two types
+ * of flow control:
+ *
+ * <h3>Send-Side Backpressure (Outbound Flow Control)</h3>
+ * <p>Controls the rate at which data is sent to avoid overwhelming the 
receiver:
+ * <ul>
+ *   <li>{@link #isReady()} - Check if the stream can accept more data without 
blocking</li>
+ *   <li>{@link #setOnReadyHandler(Runnable)} - Register a callback for when 
the stream becomes writable</li>
+ * </ul>
  *
- * <p>Key features:
+ * <h3>Receive-Side Backpressure (Inbound Flow Control)</h3>
+ * <p>Controls the rate at which data is received to avoid being overwhelmed:
  * <ul>
- *   <li>{@link #isReady()} - Check if the stream is ready to accept more 
messages</li>
- *   <li>{@link #setOnReadyHandler(Runnable)} - Set a callback for when the 
stream becomes ready</li>
- *   <li>{@link #request(int)} - Request more messages from the peer (for 
inbound flow control)</li>
- *   <li>{@link #disableAutoFlowControl()} - Switch to manual flow control 
mode</li>
+ *   <li>{@link #disableAutoFlowControl()} - Switch from automatic to manual 
message requesting</li>
+ *   <li>{@link #request(int)} - Explicitly request a specific number of 
messages from the sender</li>
  * </ul>
  *
+ * <h3>Typical Usage Pattern</h3>
+ * <pre>{@code
+ * // Send-side backpressure example
+ * callStreamObserver.setOnReadyHandler(() -> {
+ *     while (callStreamObserver.isReady() && hasMoreData()) {
+ *         callStreamObserver.onNext(getNextData());
+ *     }
+ *     if (!hasMoreData()) {
+ *         callStreamObserver.onCompleted();
+ *     }
+ * });
+ *
+ * // Receive-side backpressure example (in beforeStart or similar)
+ * callStreamObserver.disableAutoFlowControl();
+ * callStreamObserver.request(10); // Request initial batch
+ *
+ * // Then in onNext()
+ * public void onNext(T value) {
+ *     process(value);
+ *     callStreamObserver.request(1); // Request next message after processing
+ * }
+ * }</pre>
+ *
  * @param <T> the type of value passed to the stream
+ * @see ClientCallStreamObserver
+ * @see ServerCallStreamObserver
+ * @see ClientResponseObserver
  */
 public interface CallStreamObserver<T> extends StreamObserver<T> {
 
@@ -84,8 +117,6 @@ public interface CallStreamObserver<T> extends 
StreamObserver<T> {
      * <p>
      * For stream set compression needs to determine whether the metadata has 
been sent, and carry
      * on corresponding processing
-     *
-     * @param compression {@link Compressor}
      */
     void setCompression(String compression);
 
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientCallStreamObserver.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientCallStreamObserver.java
new file mode 100644
index 0000000000..b19dff44a1
--- /dev/null
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientCallStreamObserver.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.stream;
+
+/**
+ * A client-side extension of {@link CallStreamObserver} that provides 
additional functionality
+ * for controlling the outbound request stream. This interface mirrors gRPC's
+ * {@code io.grpc.stub.ClientCallStreamObserver} for compatibility.
+ *
+ * <p>This interface is typically obtained through {@link 
ClientResponseObserver#beforeStart(ClientCallStreamObserver)}
+ * and provides methods for:
+ * <ul>
+ *   <li>Controlling send-side backpressure via {@link #isReady()} and {@link 
#setOnReadyHandler(Runnable)}</li>
+ *   <li>Controlling receive-side backpressure via {@link 
#disableAutoRequestWithInitial(int)} and {@link #request(int)}</li>
+ * </ul>
+ *
+ * <h3>Send-Side Backpressure (Controlling Outgoing Data Rate)</h3>
+ * <pre>{@code
+ * @Override
+ * public void beforeStart(ClientCallStreamObserver<Request> requestStream) {
+ *     requestStream.disableAutoFlowControl();
+ *     requestStream.setOnReadyHandler(() -> {
+ *         while (requestStream.isReady() && hasMoreData()) {
+ *             requestStream.onNext(getNextRequest());
+ *         }
+ *     });
+ * }
+ * }</pre>
+ *
+ * <h3>Receive-Side Backpressure (Controlling Incoming Data Rate)</h3>
+ * <pre>{@code
+ * @Override
+ * public void beforeStart(ClientCallStreamObserver<Request> requestStream) {
+ *     // Request only 10 messages initially
+ *     requestStream.disableAutoRequestWithInitial(10);
+ * }
+ *
+ * @Override
+ * public void onNext(Response response) {
+ *     process(response);
+ *     // Request more after processing
+ *     requestStream.request(1);
+ * }
+ * }</pre>
+ *
+ * @param <ReqT> the type of messages sent to the server (request type)
+ * @see CallStreamObserver
+ * @see ClientResponseObserver
+ */
+public interface ClientCallStreamObserver<ReqT> extends 
CallStreamObserver<ReqT> {
+
+    /**
+     * Disables automatic inbound flow control and sets the initial number of 
messages
+     * to request from the server.
+     *
+     * <p>By default, the runtime automatically requests messages from the 
server as they
+     * are consumed. Calling this method switches to manual flow control mode, 
where the
+     * client must explicitly call {@link #request(int)} to receive more 
messages.
+     *
+     * <p>This method <strong>must</strong> be called within
+     * {@link ClientResponseObserver#beforeStart(ClientCallStreamObserver)} 
before the
+     * stream starts, otherwise it has no effect.
+     *
+     * <p><strong>Usage:</strong>
+     * <pre>{@code
+     * @Override
+     * public void beforeStart(ClientCallStreamObserver<Request> 
requestStream) {
+     *     // Start with 5 messages, then request more in onNext()
+     *     requestStream.disableAutoRequestWithInitial(5);
+     * }
+     *
+     * @Override
+     * public void onNext(Response response) {
+     *     process(response);
+     *     requestStream.request(1); // Request one more message
+     * }
+     * }</pre>
+     *
+     * @param request the initial number of messages to request from the 
server.
+     *                A value of 0 means no messages will be delivered until 
{@link #request(int)} is called.
+     * @see #request(int)
+     * @see #disableAutoFlowControl()
+     */
+    void disableAutoRequestWithInitial(int request);
+}
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientResponseObserver.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientResponseObserver.java
new file mode 100644
index 0000000000..1c6c3e7d37
--- /dev/null
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientResponseObserver.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.stream;
+
+/**
+ * A client-side {@link StreamObserver} that provides a callback to receive a 
reference to the
+ * outbound request stream observer before the call starts. This interface 
mirrors gRPC's
+ * {@code io.grpc.stub.ClientResponseObserver} for compatibility.
+ *
+ * <p>This interface is used for advanced flow control scenarios where the 
client needs to:
+ * <ul>
+ *   <li>Configure flow control settings before the stream starts</li>
+ *   <li>Set up an {@link CallStreamObserver#setOnReadyHandler(Runnable) 
onReadyHandler} for send-side backpressure</li>
+ *   <li>Control the rate of receiving messages using {@link 
ClientCallStreamObserver#disableAutoRequestWithInitial(int)}</li>
+ * </ul>
+ *
+ * <h3>Example Usage</h3>
+ * <pre>{@code
+ * // Client streaming with backpressure
+ * ClientResponseObserver<DataChunk, Response> responseObserver =
+ *         new ClientResponseObserver<DataChunk, Response>() {
+ *     @Override
+ *     public void beforeStart(ClientCallStreamObserver<DataChunk> 
requestStream) {
+ *         // Disable auto flow control for manual send control
+ *         requestStream.disableAutoFlowControl();
+ *
+ *         // Set up onReadyHandler for send-side backpressure
+ *         requestStream.setOnReadyHandler(() -> {
+ *             while (requestStream.isReady() && hasMoreData()) {
+ *                 requestStream.onNext(getNextChunk());
+ *             }
+ *         });
+ *     }
+ *
+ *     @Override
+ *     public void onNext(Response response) { ... }
+ *
+ *     @Override
+ *     public void onError(Throwable t) { ... }
+ *
+ *     @Override
+ *     public void onCompleted() { ... }
+ * };
+ *
+ * service.clientStream(responseObserver);
+ * }</pre>
+ *
+ * @param <ReqT> the type of messages sent to the server (request type)
+ * @param <RespT> the type of messages received from the server (response type)
+ * @see ClientCallStreamObserver
+ * @see CallStreamObserver
+ */
+public interface ClientResponseObserver<ReqT, RespT> extends 
StreamObserver<RespT> {
+
+    /**
+     * Called by the runtime prior to the start of a call to provide a 
reference to the
+     * {@link ClientCallStreamObserver} for the outbound request stream.
+     *
+     * <p>This callback is invoked <strong>before</strong> the underlying 
stream is created,
+     * allowing the client to configure flow control settings that take effect 
from the
+     * beginning of the call.
+     *
+     * <p><strong>Allowed operations in this callback:</strong>
+     * <ul>
+     *   <li>{@link ClientCallStreamObserver#setOnReadyHandler(Runnable)} - 
Set handler for send-side backpressure</li>
+     *   <li>{@link 
ClientCallStreamObserver#disableAutoRequestWithInitial(int)} - Configure 
receive-side backpressure</li>
+     *   <li>{@link CallStreamObserver#disableAutoFlowControl()} - Disable 
automatic flow control</li>
+     * </ul>
+     *
+     * <p><strong>Note:</strong> Do not call {@link 
StreamObserver#onNext(Object)} or
+     * {@link StreamObserver#onCompleted()} within this callback. Data should 
only be sent
+     * after the stream is ready (via the {@code onReadyHandler}).
+     *
+     * @param requestStream the {@link ClientCallStreamObserver} for sending 
requests to the server
+     */
+    void beforeStart(final ClientCallStreamObserver<ReqT> requestStream);
+}
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ServerCallStreamObserver.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ServerCallStreamObserver.java
new file mode 100644
index 0000000000..a80b38cf58
--- /dev/null
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ServerCallStreamObserver.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.stream;
+
+/**
+ * A server-side extension of {@link CallStreamObserver} that provides flow 
control capabilities
+ * for outbound response streams. This interface mirrors gRPC's
+ * {@code io.grpc.stub.ServerCallStreamObserver} for compatibility.
+ *
+ * <p>On the server side, this interface is obtained by casting the {@link 
StreamObserver}
+ * parameter passed to streaming RPC methods. It allows the server to:
+ * <ul>
+ *   <li>Check if the response stream is ready using {@link #isReady()}</li>
+ *   <li>Set a callback for when the stream becomes ready using {@link 
#setOnReadyHandler(Runnable)}</li>
+ *   <li>Control inbound flow from the client using {@link #request(int)} and 
{@link #disableAutoFlowControl()}</li>
+ * </ul>
+ *
+ * <h3>Server-Side Send Backpressure Example</h3>
+ * <pre>{@code
+ * @Override
+ * public void serverStream(Request request, StreamObserver<Response> 
responseObserver) {
+ *     ServerCallStreamObserver<Response> serverObserver =
+ *             (ServerCallStreamObserver<Response>) responseObserver;
+ *
+ *     AtomicInteger sent = new AtomicInteger(0);
+ *     int totalCount = 100;
+ *
+ *     serverObserver.setOnReadyHandler(() -> {
+ *         while (serverObserver.isReady() && sent.get() < totalCount) {
+ *             int seq = sent.getAndIncrement();
+ *             serverObserver.onNext(createResponse(seq));
+ *         }
+ *         if (sent.get() >= totalCount) {
+ *             serverObserver.onCompleted();
+ *         }
+ *     });
+ * }
+ * }</pre>
+ *
+ * <h3>Server-Side Receive Backpressure Example (for Client/Bidi 
Streaming)</h3>
+ * <pre>{@code
+ * @Override
+ * public StreamObserver<Request> clientStream(StreamObserver<Response> 
responseObserver) {
+ *     ServerCallStreamObserver<Response> serverObserver =
+ *             (ServerCallStreamObserver<Response>) responseObserver;
+ *
+ *     // Control how many messages we receive from the client
+ *     serverObserver.disableAutoFlowControl();
+ *     serverObserver.request(5); // Start with 5 messages
+ *
+ *     return new StreamObserver<Request>() {
+ *         @Override
+ *         public void onNext(Request request) {
+ *             process(request);
+ *             serverObserver.request(1); // Request one more
+ *         }
+ *         // ... onError, onCompleted
+ *     };
+ * }
+ * }</pre>
+ *
+ * @param <RespT> the type of messages sent to the client (response type)
+ * @see CallStreamObserver
+ * @see StreamObserver
+ */
+public interface ServerCallStreamObserver<RespT> extends 
CallStreamObserver<RespT> {
+
+    default void disableAutoRequest() {
+        disableAutoFlowControl();
+    }
+}
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
index 5ca8676a1b..c3465bc9a8 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
@@ -16,15 +16,15 @@
  */
 package org.apache.dubbo.mutiny;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 
 import java.util.concurrent.Flow;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
 /**
- * The middle layer between {@link 
org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Mutiny API. 
<p>
+ * The middle layer between {@link 
org.apache.dubbo.common.stream.CallStreamObserver} and Mutiny API. <p>
  * 1. passing the data received by CallStreamObserver to Mutiny consumer <br>
  * 2. passing the request of Mutiny API to CallStreamObserver
  */
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
index 643d84e32b..c19ee45d6e 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.mutiny;
 
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
 
 import java.util.concurrent.Flow;
 import java.util.concurrent.atomic.AtomicBoolean;
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
index 20fbe4aac7..c8788ddc2a 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.mutiny;
 
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
 
 import java.util.function.Consumer;
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
index eb126492ff..180f193e66 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.mutiny;
 
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
 
 /**
  * Used in ManyToOne and ManyToMany in server. <br>
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
index 50963db3a1..7e0f63eda2 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
@@ -16,9 +16,9 @@
  */
 package org.apache.dubbo.mutiny;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.rpc.CancellationContext;
 import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
index 296017ebc9..c0da4124fb 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
@@ -16,12 +16,12 @@
  */
 package org.apache.dubbo.mutiny.calls;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.mutiny.ClientTripleMutinyPublisher;
 import org.apache.dubbo.mutiny.ClientTripleMutinySubscriber;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.model.StubMethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 import org.apache.dubbo.rpc.stub.StubInvocationUtil;
 
 import io.smallrye.mutiny.Multi;
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
index 5364b26c3f..a64349a8bf 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
@@ -16,12 +16,12 @@
  */
 package org.apache.dubbo.mutiny.calls;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.mutiny.ServerTripleMutinyPublisher;
 import org.apache.dubbo.mutiny.ServerTripleMutinySubscriber;
 import org.apache.dubbo.rpc.StatusRpcException;
 import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
index 8915c25092..a355cf762c 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
@@ -16,9 +16,9 @@
  */
 package org.apache.dubbo.mutiny.handler;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 import org.apache.dubbo.rpc.stub.StubMethodHandler;
 
 import java.util.concurrent.CompletableFuture;
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
index 634f47ccfb..33fd72c76e 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
@@ -16,9 +16,9 @@
  */
 package org.apache.dubbo.mutiny.handler;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 import org.apache.dubbo.rpc.stub.StubMethodHandler;
 
 import java.util.concurrent.CompletableFuture;
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
 
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
index e4e840790a..041bd1cb7c 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
@@ -16,11 +16,11 @@
  */
 package org.apache.dubbo.mutiny;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.mutiny.calls.MutinyClientCalls;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.model.StubMethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 import org.apache.dubbo.rpc.stub.StubInvocationUtil;
 
 import java.time.Duration;
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
 
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
index 58bb7b115c..1bfb0de8dd 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
@@ -16,9 +16,9 @@
  */
 package org.apache.dubbo.mutiny;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
 
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
index d74726b720..8d35d71e93 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.mutiny;
 
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git 
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
 
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
index 76402ad9d9..1c5cf3093c 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.mutiny;
 
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
 
 import java.util.concurrent.Flow;
 
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java
index 70d7028a00..3940f7d05c 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java
@@ -16,8 +16,8 @@
  */
 package org.apache.dubbo.reactive;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
@@ -27,7 +27,7 @@ import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
 /**
- * The middle layer between {@link 
org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Reactive 
API. <p>
+ * The middle layer between {@link CallStreamObserver} and Reactive API. <p>
  * 1. passing the data received by CallStreamObserver to Reactive consumer <br>
  * 2. passing the request of Reactive API to CallStreamObserver
  */
@@ -71,8 +71,20 @@ public abstract class AbstractTripleReactorPublisher<T> 
extends CancelableStream
         if (subscription != null && this.subscription == null && 
HAS_SUBSCRIPTION.compareAndSet(false, true)) {
             this.subscription = subscription;
             subscription.disableAutoFlowControl();
+
+            // Set up onReadyHandler to trigger onSubscribe callback when 
stream becomes ready.
+            // This is called AFTER call.start() via InitOnReadyQueueCommand, 
ensuring the stream
+            // is created before any data is sent
+            // is triggered by onReady, not by onStart (which requires server 
headers).
             if (onSubscribe != null) {
-                onSubscribe.accept(subscription);
+                subscription.setOnReadyHandler(() -> {
+                    // Only execute the callback once (on first onReady)
+                    Consumer<CallStreamObserver<?>> callback = onSubscribe;
+                    if (callback != null && subscription.isReady()) {
+                        onSubscribe = null; // Clear to prevent re-execution
+                        callback.accept(subscription);
+                    }
+                });
             }
             return;
         }
@@ -148,6 +160,9 @@ public abstract class AbstractTripleReactorPublisher<T> 
extends CancelableStream
         synchronized (this) {
             if (!canRequest) {
                 canRequest = true;
+                // Request buffered messages from the server.
+                // Note: onSubscribe callback is now triggered by 
onReadyHandler (set in onSubscribe()),
+                // not here, because onReady is triggered earlier than onStart.
                 long count = requested;
                 subscription.request(count >= Integer.MAX_VALUE ? 
Integer.MAX_VALUE : (int) count);
             }
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
index b3d6fa058e..c911f903f3 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.reactive;
 
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -26,7 +26,7 @@ import reactor.core.CoreSubscriber;
 import reactor.util.annotation.NonNull;
 
 /**
- * The middle layer between {@link 
org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Reactive 
API. <br>
+ * The middle layer between {@link CallStreamObserver} and Reactive API. <br>
  * Passing the data from Reactive producer to CallStreamObserver.
  */
 public abstract class AbstractTripleReactorSubscriber<T> implements 
Subscriber<T>, CoreSubscriber<T> {
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ClientTripleReactorPublisher.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ClientTripleReactorPublisher.java
index 2ae9022804..dffcb16a1f 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ClientTripleReactorPublisher.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ClientTripleReactorPublisher.java
@@ -16,8 +16,9 @@
  */
 package org.apache.dubbo.reactive;
 
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
+import org.apache.dubbo.common.stream.CallStreamObserver;
+import org.apache.dubbo.common.stream.ClientCallStreamObserver;
+import org.apache.dubbo.common.stream.ClientResponseObserver;
 
 import java.util.function.Consumer;
 
@@ -26,8 +27,13 @@ import java.util.function.Consumer;
  * It is a Publisher for user subscriber to subscribe. <br>
  * It is a StreamObserver for responseStream. <br>
  * It is a Subscription for user subscriber to request and pass request to 
requestStream.
+ * <p>
+ * Implements {@link ClientResponseObserver} following gRPC's pattern where
+ * {@link #beforeStart(ClientCallStreamObserver)} is called before the stream 
starts,
+ * allowing configuration of flow control before any messages are sent.
  */
-public class ClientTripleReactorPublisher<T> extends 
AbstractTripleReactorPublisher<T> {
+public class ClientTripleReactorPublisher<T> extends 
AbstractTripleReactorPublisher<T>
+        implements ClientResponseObserver<Object, T> {
 
     public ClientTripleReactorPublisher() {}
 
@@ -35,8 +41,15 @@ public class ClientTripleReactorPublisher<T> extends 
AbstractTripleReactorPublis
         super(onSubscribe, shutdownHook);
     }
 
+    /**
+     * Called by the runtime prior to the start of a call to provide a 
reference to the
+     * {@link ClientCallStreamObserver} for the outbound stream.
+     * <p>
+     * Following gRPC's pattern, this method is called BEFORE {@code 
call.start()},
+     * allowing configuration of onReadyHandler and flow control settings.
+     */
     @Override
-    public void beforeStart(ClientCallToObserverAdapter<T> 
clientCallToObserverAdapter) {
-        super.onSubscribe(clientCallToObserverAdapter);
+    public void beforeStart(ClientCallStreamObserver<Object> requestStream) {
+        super.onSubscribe(requestStream);
     }
 }
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorPublisher.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorPublisher.java
index c5b9336741..7d21174f38 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorPublisher.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorPublisher.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.reactive;
 
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
 
 /**
  * Used in ManyToOne and ManyToMany in server. <br>
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
index 3a4a9729a0..fdb8945bfe 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
@@ -16,9 +16,9 @@
  */
 package org.apache.dubbo.reactive;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.rpc.CancellationContext;
 import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorClientCalls.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorClientCalls.java
index dfe55003f3..243adc63ff 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorClientCalls.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorClientCalls.java
@@ -16,12 +16,12 @@
  */
 package org.apache.dubbo.reactive.calls;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.reactive.ClientTripleReactorPublisher;
 import org.apache.dubbo.reactive.ClientTripleReactorSubscriber;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.model.StubMethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 import org.apache.dubbo.rpc.stub.StubInvocationUtil;
 
 import reactor.core.publisher.Flux;
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
index 73613f3d8c..eb65e40f5d 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
@@ -16,12 +16,12 @@
  */
 package org.apache.dubbo.reactive.calls;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.reactive.ServerTripleReactorPublisher;
 import org.apache.dubbo.reactive.ServerTripleReactorSubscriber;
 import org.apache.dubbo.rpc.StatusRpcException;
 import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToManyMethodHandler.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToManyMethodHandler.java
index 6c14436249..87fe23fb5c 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToManyMethodHandler.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToManyMethodHandler.java
@@ -16,9 +16,9 @@
  */
 package org.apache.dubbo.reactive.handler;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.reactive.calls.ReactorServerCalls;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 import org.apache.dubbo.rpc.stub.StubMethodHandler;
 
 import java.util.concurrent.CompletableFuture;
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToOneMethodHandler.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToOneMethodHandler.java
index f82a6a614b..441717d138 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToOneMethodHandler.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToOneMethodHandler.java
@@ -16,9 +16,9 @@
  */
 package org.apache.dubbo.reactive.handler;
 
+import org.apache.dubbo.common.stream.CallStreamObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.reactive.calls.ReactorServerCalls;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 import org.apache.dubbo.rpc.stub.StubMethodHandler;
 
 import java.util.concurrent.CompletableFuture;
diff --git 
a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
 
b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
index 3f303b5f3f..fd6a76a017 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.reactive;
 
-import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
+import org.apache.dubbo.common.stream.ServerCallStreamObserver;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -28,7 +28,7 @@ import static org.mockito.Mockito.doAnswer;
 
 public class CreateObserverAdapter {
 
-    private ServerStreamObserver<String> responseObserver;
+    private ServerCallStreamObserver<String> responseObserver;
     private AtomicInteger nextCounter;
     private AtomicInteger completeCounter;
     private AtomicInteger errorCounter;
@@ -39,7 +39,7 @@ public class CreateObserverAdapter {
         completeCounter = new AtomicInteger();
         errorCounter = new AtomicInteger();
 
-        responseObserver = Mockito.mock(ServerStreamObserver.class);
+        responseObserver = Mockito.mock(ServerCallStreamObserver.class);
         doAnswer(o -> 
nextCounter.incrementAndGet()).when(responseObserver).onNext(anyString());
         doAnswer(o -> 
completeCounter.incrementAndGet()).when(responseObserver).onCompleted();
         doAnswer(o -> 
errorCounter.incrementAndGet()).when(responseObserver).onError(any(Throwable.class));
@@ -57,7 +57,7 @@ public class CreateObserverAdapter {
         return errorCounter;
     }
 
-    public ServerStreamObserver<String> getResponseObserver() {
+    public ServerCallStreamObserver<String> getResponseObserver() {
         return this.responseObserver;
     }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
index fb82a6c2e6..0fbf9bab0c 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.remoting.http12.h2;
 
+import org.apache.dubbo.common.stream.ServerCallStreamObserver;
 import org.apache.dubbo.remoting.http12.AbstractServerHttpChannelObserver;
 import org.apache.dubbo.remoting.http12.ErrorCodeHolder;
 import org.apache.dubbo.remoting.http12.FlowControlStreamObserver;
@@ -29,8 +30,14 @@ import org.apache.dubbo.rpc.CancellationContext;
 
 import io.netty.handler.codec.http2.DefaultHttp2Headers;
 
+/**
+ * HTTP/2 server-side stream observer with flow control and backpressure 
support.
+ * Implements {@link ServerCallStreamObserver} following gRPC's pattern for 
backpressure.
+ */
 public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserver<H2StreamChannel>
-        implements FlowControlStreamObserver<Object>, 
Http2CancelableStreamObserver<Object> {
+        implements FlowControlStreamObserver<Object>,
+                Http2CancelableStreamObserver<Object>,
+                ServerCallStreamObserver<Object> {
 
     private CancellationContext cancellationContext;
 
@@ -118,6 +125,11 @@ public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserve
         streamingDecoder.request(count);
     }
 
+    @Override
+    public void setCompression(String compression) {
+        // not supported yet
+    }
+
     @Override
     public void disableAutoFlowControl() {
         autoRequestN = false;
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
index 180fc9bf9a..1c3a8a3aee 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
@@ -16,10 +16,15 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
+import org.apache.dubbo.common.stream.ClientCallStreamObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 
-public interface ClientStreamObserver<T> extends CallStreamObserver<T> {
+/**
+ * @param <T>
+ * @deprecated use {@link ClientCallStreamObserver}
+ */
+@Deprecated
+public interface ClientStreamObserver<T> extends ClientCallStreamObserver<T> {
 
     /**
      * Swaps to manual flow control where no message will be delivered to 
{@link
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
index 6dad6aaec4..7911bbbb8c 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
@@ -16,6 +16,11 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.ServerCallStreamObserver;
 
-public interface ServerStreamObserver<T> extends CallStreamObserver<T> {}
+/**
+ * @deprecated use {@link ServerCallStreamObserver} instead
+ * @param <T>
+ */
+@Deprecated
+public interface ServerStreamObserver<T> extends ServerCallStreamObserver<T> {}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index dba35a20a3..dce7f422d2 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.stream.ClientResponseObserver;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
 import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
@@ -243,26 +244,62 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
         return result;
     }
 
+    /**
+     * Start a streaming call
+     * <p>
+     * The call sequence is:
+     * <pre>
+     * 1. Create adapter and listener
+     * 2. Call beforeStart() if observer is ClientResponseObserver or 
CancelableStreamObserver
+     *   (allows configuring onReadyHandler before stream starts)
+     * 3. Start the call (creates stream, may trigger initial onReady)
+     * </pre>
+     *
+     * <p>The two interfaces serve different purposes:
+     * <ul>
+     *   <li>{@link ClientResponseObserver} - gRPC-compatible interface with 
just beforeStart()
+     *   <li>{@link CancelableStreamObserver} - Dubbo's extended interface 
with cancellation and startRequest()
+     * </ul>
+     * An observer can implement both interfaces (e.g., 
ClientTripleReactorPublisher).
+     */
+    @SuppressWarnings("unchecked")
     StreamObserver<Object> streamCall(
             ClientCall call, RequestMetadata metadata, StreamObserver<Object> 
responseObserver) {
+        ClientCallToObserverAdapter<Object> adapter = new 
ClientCallToObserverAdapter<>(call, true);
+
+        // Create listener and associate with adapter
         ObserverToClientCallListenerAdapter listener = new 
ObserverToClientCallListenerAdapter(responseObserver);
-        StreamObserver<Object> streamObserver = call.start(metadata, listener);
+        listener.setRequestAdapter(adapter);
 
-        // Set the request adapter on the listener for onReady() to access 
onReadyHandler
-        if (streamObserver instanceof ClientCallToObserverAdapter) {
-            listener.setRequestAdapter((ClientCallToObserverAdapter<Object>) 
streamObserver);
+        // Handle CancelableStreamObserver first (for cancellation context and 
startRequest)
+        // This must be done regardless of whether it also implements 
ClientResponseObserver
+        if (responseObserver instanceof CancelableStreamObserver) {
+            CancelableStreamObserver<Object> cancelableObserver = 
(CancelableStreamObserver<Object>) responseObserver;
+            // Set up cancellation context
+            CancellationContext context = new CancellationContext();
+            cancelableObserver.setCancellationContext(context);
+            context.addListener(ctx -> call.cancelByLocal(new 
IllegalStateException("Canceled by app")));
+            // Set up startRequest to be called when stream is established 
(onStart)
+            listener.setOnStartConsumer(dummy -> 
cancelableObserver.startRequest());
         }
 
-        if (responseObserver instanceof CancelableStreamObserver) {
-            final CancellationContext context = new CancellationContext();
-            CancelableStreamObserver<Object> cancelableStreamObserver =
-                    (CancelableStreamObserver<Object>) responseObserver;
-            cancelableStreamObserver.setCancellationContext(context);
-            context.addListener(context1 -> call.cancelByLocal(new 
IllegalStateException("Canceled by app")));
-            listener.setOnStartConsumer(dummy -> 
cancelableStreamObserver.startRequest());
-            
cancelableStreamObserver.beforeStart((ClientCallToObserverAdapter<Object>) 
streamObserver);
+        // Now call beforeStart() - use ClientResponseObserver if available 
(gRPC-compatible),
+        // otherwise fall back to CancelableStreamObserver.beforeStart()
+        if (responseObserver instanceof ClientResponseObserver) {
+            // gRPC-compatible interface - beforeStart takes 
ClientCallStreamObserver
+            ClientResponseObserver<Object, Object> clientResponseObserver =
+                    (ClientResponseObserver<Object, Object>) responseObserver;
+            clientResponseObserver.beforeStart(adapter);
+        } else if (responseObserver instanceof CancelableStreamObserver) {
+            // Legacy Dubbo interface - beforeStart takes 
ClientCallToObserverAdapter
+            CancelableStreamObserver<Object> cancelableObserver = 
(CancelableStreamObserver<Object>) responseObserver;
+            cancelableObserver.beforeStart(adapter);
         }
-        return streamObserver;
+
+        // Start the call - creates stream and may trigger initial onReady
+        call.start(metadata, listener);
+
+        return adapter;
     }
 
     AsyncRpcResult invokeUnary(
@@ -303,7 +340,9 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
         result.setExecutor(callbackExecutor);
         ClientCall.Listener callListener = new UnaryClientCallListener(future);
 
-        final StreamObserver<Object> requestObserver = call.start(request, 
callListener);
+        // Create adapter for unary call (streamingResponse=false)
+        ClientCallToObserverAdapter<Object> requestObserver = new 
ClientCallToObserverAdapter<>(call, false);
+        call.start(request, callListener);
         requestObserver.onNext(pureArgument);
         requestObserver.onCompleted();
         return result;
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
index 785b63140a..070f07f233 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
@@ -16,7 +16,6 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.call;
 
-import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.protocol.tri.RequestMetadata;
 
@@ -105,11 +104,12 @@ public interface ClientCall {
     void sendMessage(Object message);
 
     /**
+     * Start the call with the given metadata and response listener.
+     *
      * @param metadata         request metadata
      * @param responseListener the listener to receive response
-     * @return the stream observer representing the request sink
      */
-    StreamObserver<Object> start(RequestMetadata metadata, Listener 
responseListener);
+    void start(RequestMetadata metadata, Listener responseListener);
 
     /**
      * @return true if this call is auto request
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index 46f2cb0987..12e56e2ee0 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -18,14 +18,12 @@ package org.apache.dubbo.rpc.protocol.tri.call;
 
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
 import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.protocol.tri.RequestMetadata;
 import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
 import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
-import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
 import org.apache.dubbo.rpc.protocol.tri.stream.ClientStream;
 import org.apache.dubbo.rpc.protocol.tri.stream.ClientStreamFactory;
 import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;
@@ -78,6 +76,9 @@ public class TripleClientCall implements ClientCall, 
ClientStream.Listener {
         if (done) {
             return false;
         }
+        if (stream == null) {
+            return false;
+        }
         return stream.isReady();
     }
 
@@ -292,16 +293,18 @@ public class TripleClientCall implements ClientCall, 
ClientStream.Listener {
     }
 
     @Override
-    public StreamObserver<Object> start(RequestMetadata metadata, 
ClientCall.Listener responseListener) {
+    public void start(RequestMetadata metadata, ClientCall.Listener 
responseListener) {
+        // Set listener BEFORE creating stream, so onReady() can access it
+        this.requestMetadata = metadata;
+        this.listener = responseListener;
+        this.streamingResponse = responseListener.streamingResponse();
+
         ClientStream stream;
         for (ClientStreamFactory factory : 
frameworkModel.getActivateExtensions(ClientStreamFactory.class)) {
             stream = factory.createClientStream(connectionClient, 
frameworkModel, executor, this, writeQueue);
             if (stream != null) {
-                this.requestMetadata = metadata;
-                this.listener = responseListener;
                 this.stream = stream;
-                this.streamingResponse = responseListener.streamingResponse();
-                return new ClientCallToObserverAdapter<>(this, 
responseListener.streamingResponse());
+                return;
             }
         }
         throw new IllegalStateException("No available ClientStreamFactory");
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
new file mode 100644
index 0000000000..61505532d0
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import org.apache.dubbo.rpc.protocol.tri.stream.ClientStream;
+import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+/**
+ * Command to trigger initial onReady after the stream channel is created.
+ * This is necessary because onReady is only triggered by 
channelWritabilityChanged,
+ * which won't fire if the channel is always writable from creation.
+ *
+ * <p>This command should be enqueued immediately after 
CreateStreamQueueCommand.
+ * Since the WriteQueue executes commands in order within the EventLoop,
+ * this command will run after the stream channel has been created.
+ */
+public class InitOnReadyQueueCommand extends QueuedCommand {
+
+    private final TripleStreamChannelFuture streamChannelFuture;
+
+    private final ClientStream.Listener listener;
+
+    private InitOnReadyQueueCommand(TripleStreamChannelFuture 
streamChannelFuture, ClientStream.Listener listener) {
+        this.streamChannelFuture = streamChannelFuture;
+        this.listener = listener;
+        this.promise(streamChannelFuture.getParentChannel().newPromise());
+        this.channel(streamChannelFuture.getParentChannel());
+    }
+
+    public static InitOnReadyQueueCommand create(
+            TripleStreamChannelFuture streamChannelFuture, 
ClientStream.Listener listener) {
+        return new InitOnReadyQueueCommand(streamChannelFuture, listener);
+    }
+
+    @Override
+    public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+        // NOOP - this command does not send any data
+    }
+
+    @Override
+    public void run(Channel channel) {
+        // Work in I/O thread, after CreateStreamQueueCommand has completed
+        Channel streamChannel = streamChannelFuture.getNow();
+        if (streamChannel != null && streamChannel.isWritable()) {
+            // Trigger initial onReady to allow application to start sending.
+            listener.onReady();
+        }
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerStreamObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerStreamObserver.java
index ea959f2634..4894dff07a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerStreamObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerStreamObserver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.h12.http2;
 
+import org.apache.dubbo.common.stream.ServerCallStreamObserver;
 import org.apache.dubbo.remoting.http12.HttpHeaders;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
 import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
@@ -31,7 +32,7 @@ import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;
 import java.util.Map;
 
 public class Http2ServerStreamObserver extends Http2ServerChannelObserver
-        implements ServerStreamObserver<Object>, AttachmentHolder {
+        implements ServerStreamObserver<Object>, 
ServerCallStreamObserver<Object>, AttachmentHolder {
 
     private final FrameworkModel frameworkModel;
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java
index f677e92229..87f358bade 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java
@@ -73,7 +73,7 @@ public final class Http2TripleClientStream extends 
AbstractTripleClientStream {
     }
 
     @Override
-    protected TripleStreamChannelFuture initStreamChannel(Channel parent) {
+    protected TripleStreamChannelFuture initStreamChannel0(Channel parent) {
         Http2StreamChannelBootstrap bootstrap = new 
Http2StreamChannelBootstrap(parent);
         // Disable Netty's automatic stream flow control to enable manual flow 
control
         bootstrap.option(Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL, 
false);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleClientStream.java
index 8fe8b41723..0ffd784c66 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleClientStream.java
@@ -58,7 +58,7 @@ public final class Http3TripleClientStream extends 
AbstractTripleClientStream {
     }
 
     @Override
-    protected TripleStreamChannelFuture initStreamChannel(Channel parent) {
+    protected TripleStreamChannelFuture initStreamChannel0(Channel parent) {
         Http3RequestStreamInitializer initializer = new 
Http3RequestStreamInitializer() {
             @Override
             protected void initRequestStream(QuicStreamChannel ch) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ClientCallToObserverAdapter.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ClientCallToObserverAdapter.java
index f832677ecc..6dc692d1f3 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ClientCallToObserverAdapter.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ClientCallToObserverAdapter.java
@@ -16,11 +16,13 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.observer;
 
+import org.apache.dubbo.common.stream.ClientCallStreamObserver;
 import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
 import org.apache.dubbo.rpc.protocol.tri.ClientStreamObserver;
 import org.apache.dubbo.rpc.protocol.tri.call.ClientCall;
 
-public class ClientCallToObserverAdapter<T> extends 
CancelableStreamObserver<T> implements ClientStreamObserver<T> {
+public class ClientCallToObserverAdapter<T> extends CancelableStreamObserver<T>
+        implements ClientStreamObserver<T>, ClientCallStreamObserver<T> {
 
     private final ClientCall call;
     private final boolean streamingResponse;
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
index 7bbd0239b0..d377a774c0 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
@@ -30,6 +30,7 @@ import 
org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
 import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
 import org.apache.dubbo.rpc.protocol.tri.command.EndStreamQueueCommand;
 import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.InitOnReadyQueueCommand;
 import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
 import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
 import org.apache.dubbo.rpc.protocol.tri.frame.Deframer;
@@ -111,7 +112,21 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
         this.streamChannelFuture = initStreamChannel(parent);
     }
 
-    protected abstract TripleStreamChannelFuture initStreamChannel(Channel 
parent);
+    private TripleStreamChannelFuture initStreamChannel(Channel parent) {
+        TripleStreamChannelFuture tripleStreamChannelFuture = 
initStreamChannel0(parent);
+        /**
+         * Enqueue InitOnReadyQueueCommand after the stream creation command.
+         * Since WriteQueue executes commands in order within the EventLoop,
+         * this command will run after the stream channel has been created by 
CreateStreamQueueCommand.
+         *
+         * This is necessary because onReady is only triggered by 
channelWritabilityChanged,
+         * which won't fire if the channel is always writable from creation.
+         */
+        
writeQueue.enqueue(InitOnReadyQueueCommand.create(tripleStreamChannelFuture, 
listener));
+        return tripleStreamChannelFuture;
+    }
+
+    protected abstract TripleStreamChannelFuture initStreamChannel0(Channel 
parent);
 
     /**
      * Get the stream channel future for flow control.
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
index a468d9d815..941153fb2b 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.when;
 
 class TripleInvokerTest {
@@ -59,8 +60,8 @@ class TripleInvokerTest {
                 .createExecutorIfAbsent(url);
         TripleClientCall call = Mockito.mock(TripleClientCall.class);
         StreamObserver streamObserver = Mockito.mock(StreamObserver.class);
-        when(call.start(any(RequestMetadata.class), 
any(ClientCall.Listener.class)))
-                .thenReturn(streamObserver);
+        // start() now returns void, just verify it's called
+        doNothing().when(call).start(any(RequestMetadata.class), 
any(ClientCall.Listener.class));
         RpcInvocation invocation = new RpcInvocation();
         invocation.setMethodName("test");
         invocation.setArguments(new Object[] {streamObserver, streamObserver});
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/BackpressureTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/BackpressureTest.java
index a67c90f333..4f8ed4c477 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/BackpressureTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/BackpressureTest.java
@@ -323,9 +323,8 @@ class BackpressureTest {
         public void sendMessage(Object message) {}
 
         @Override
-        public StreamObserver<Object> start(
-                org.apache.dubbo.rpc.protocol.tri.RequestMetadata metadata, 
Listener responseListener) {
-            return null;
+        public void start(org.apache.dubbo.rpc.protocol.tri.RequestMetadata 
metadata, Listener responseListener) {
+            // No-op for mock
         }
 
         @Override
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
index fa71ca1fa9..c8722e4072 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
@@ -32,6 +32,7 @@ import 
org.apache.dubbo.rpc.protocol.tri.command.CreateStreamQueueCommand;
 import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
 import org.apache.dubbo.rpc.protocol.tri.command.EndStreamQueueCommand;
 import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.InitOnReadyQueueCommand;
 import org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
 import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
 import org.apache.dubbo.rpc.protocol.tri.h12.http2.Http2TripleClientStream;
@@ -88,6 +89,7 @@ class TripleClientStreamTest {
                 listener,
                 http2StreamChannel);
         verify(writeQueue).enqueue(any(CreateStreamQueueCommand.class));
+        verify(writeQueue).enqueue(any(InitOnReadyQueueCommand.class));
 
         final RequestMetadata requestMetadata = new RequestMetadata();
         requestMetadata.method = methodDescriptor;
@@ -100,11 +102,13 @@ class TripleClientStreamTest {
         requestMetadata.version = url.getVersion();
         stream.sendHeader(requestMetadata.toHeaders());
         verify(writeQueue).enqueueFuture(any(HeaderQueueCommand.class), 
any(Executor.class));
-        // no other commands
-        verify(writeQueue).enqueue(any(QueuedCommand.class));
+        // enqueue should have been called twice: CreateStreamQueueCommand and 
InitOnReadyQueueCommand
+        verify(writeQueue, times(2)).enqueue(any(QueuedCommand.class));
         stream.sendMessage(new byte[0], 0);
         verify(writeQueue).enqueueFuture(any(DataQueueCommand.class), 
any(Executor.class));
         verify(writeQueue, times(2)).enqueueFuture(any(QueuedCommand.class), 
any(Executor.class));
+        // After sendHeader and sendMessage, enqueue should have been called 
twice:
+        // once for CreateStreamQueueCommand and once for 
InitOnReadyQueueCommand
         stream.halfClose();
         verify(writeQueue).enqueueFuture(any(EndStreamQueueCommand.class), 
any(Executor.class));
         verify(writeQueue, times(3)).enqueueFuture(any(QueuedCommand.class), 
any(Executor.class));

Reply via email to