This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 48f3a62134 Tri backpressure (#15967)
48f3a62134 is described below
commit 48f3a62134bad498dddff7c213e79a46711f9532
Author: earthchen <[email protected]>
AuthorDate: Thu Jan 8 17:06:48 2026 +0800
Tri backpressure (#15967)
* support disableAutoInboundFlowControl api
* support back press
* fix
* fix
* Update
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java
Co-authored-by: Copilot <[email protected]>
* Update
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java
Co-authored-by: Copilot <[email protected]>
* Update
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
Co-authored-by: Copilot <[email protected]>
* Update
dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java
Co-authored-by: Copilot <[email protected]>
* Update
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
Co-authored-by: Copilot <[email protected]>
* fix
* fix
* fix test
* add isReady and setOnReadyHandler api
* fix
* fix
* fix Backpressure it
* fix
* fix
* ClientCallStreamObserver & ServerCallStreamObserver
* add license
* mvn spotless:apply
* fix dubbo plugin
* fix dubbo plugin spotless
* fix
* fix ut
* Add gRPC-compatible APIs and fix the integration tests
* fix
* fix
* remove default
* remove default
---------
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Rain Yu <[email protected]>
Co-authored-by: Wang Chengming <[email protected]>
---
.../dubbo/common/stream}/CallStreamObserver.java | 55 +++++++++---
.../common/stream/ClientCallStreamObserver.java | 99 ++++++++++++++++++++++
.../common/stream/ClientResponseObserver.java | 91 ++++++++++++++++++++
.../common/stream/ServerCallStreamObserver.java | 85 +++++++++++++++++++
.../mutiny/AbstractTripleMutinyPublisher.java | 4 +-
.../mutiny/AbstractTripleMutinySubscriber.java | 2 +-
.../dubbo/mutiny/ClientTripleMutinyPublisher.java | 2 +-
.../dubbo/mutiny/ServerTripleMutinyPublisher.java | 2 +-
.../dubbo/mutiny/ServerTripleMutinySubscriber.java | 2 +-
.../dubbo/mutiny/calls/MutinyClientCalls.java | 2 +-
.../dubbo/mutiny/calls/MutinyServerCalls.java | 2 +-
.../mutiny/handler/ManyToManyMethodHandler.java | 2 +-
.../mutiny/handler/ManyToOneMethodHandler.java | 2 +-
.../apache/dubbo/mutiny/MutinyClientCallsTest.java | 2 +-
.../apache/dubbo/mutiny/MutinyServerCallsTest.java | 2 +-
.../dubbo/mutiny/TripleMutinyPublisherTest.java | 2 +-
.../dubbo/mutiny/TripleMutinySubscriberTest.java | 2 +-
.../reactive/AbstractTripleReactorPublisher.java | 21 ++++-
.../reactive/AbstractTripleReactorSubscriber.java | 4 +-
.../reactive/ClientTripleReactorPublisher.java | 23 +++--
.../reactive/ServerTripleReactorPublisher.java | 2 +-
.../reactive/ServerTripleReactorSubscriber.java | 2 +-
.../dubbo/reactive/calls/ReactorClientCalls.java | 2 +-
.../dubbo/reactive/calls/ReactorServerCalls.java | 2 +-
.../reactive/handler/ManyToManyMethodHandler.java | 2 +-
.../reactive/handler/ManyToOneMethodHandler.java | 2 +-
.../dubbo/reactive/CreateObserverAdapter.java | 8 +-
.../http12/h2/Http2ServerChannelObserver.java | 14 ++-
.../rpc/protocol/tri/ClientStreamObserver.java | 9 +-
.../rpc/protocol/tri/ServerStreamObserver.java | 9 +-
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 67 ++++++++++++---
.../dubbo/rpc/protocol/tri/call/ClientCall.java | 6 +-
.../rpc/protocol/tri/call/TripleClientCall.java | 17 ++--
.../tri/command/InitOnReadyQueueCommand.java | 67 +++++++++++++++
.../tri/h12/http2/Http2ServerStreamObserver.java | 3 +-
.../tri/h12/http2/Http2TripleClientStream.java | 2 +-
.../protocol/tri/h3/Http3TripleClientStream.java | 2 +-
.../tri/observer/ClientCallToObserverAdapter.java | 4 +-
.../tri/stream/AbstractTripleClientStream.java | 17 +++-
.../dubbo/rpc/protocol/tri/TripleInvokerTest.java | 5 +-
.../rpc/protocol/tri/call/BackpressureTest.java | 5 +-
.../tri/stream/TripleClientStreamTest.java | 8 +-
42 files changed, 574 insertions(+), 87 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/CallStreamObserver.java
similarity index 66%
rename from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java
rename to
dubbo-common/src/main/java/org/apache/dubbo/common/stream/CallStreamObserver.java
index 46da513406..a08dd4c4b2 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/CallStreamObserver.java
@@ -14,24 +14,57 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.rpc.protocol.tri.observer;
-
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
+package org.apache.dubbo.common.stream;
/**
* An extension of {@link StreamObserver} that provides additional
functionality for flow control
- * and backpressure. This interface mirrors gRPC's {@code CallStreamObserver}
for compatibility.
+ * and backpressure. This interface mirrors gRPC's {@code
io.grpc.stub.CallStreamObserver} for compatibility.
+ *
+ * <p>This is the base interface for both client-side ({@link
ClientCallStreamObserver}) and
+ * server-side ({@link ServerCallStreamObserver}) flow control observers. It
provides two types
+ * of flow control:
+ *
+ * <h3>Send-Side Backpressure (Outbound Flow Control)</h3>
+ * <p>Controls the rate at which data is sent to avoid overwhelming the
receiver:
+ * <ul>
+ * <li>{@link #isReady()} - Check if the stream can accept more data without
blocking</li>
+ * <li>{@link #setOnReadyHandler(Runnable)} - Register a callback for when
the stream becomes writable</li>
+ * </ul>
*
- * <p>Key features:
+ * <h3>Receive-Side Backpressure (Inbound Flow Control)</h3>
+ * <p>Controls the rate at which data is received to avoid being overwhelmed:
* <ul>
- * <li>{@link #isReady()} - Check if the stream is ready to accept more
messages</li>
- * <li>{@link #setOnReadyHandler(Runnable)} - Set a callback for when the
stream becomes ready</li>
- * <li>{@link #request(int)} - Request more messages from the peer (for
inbound flow control)</li>
- * <li>{@link #disableAutoFlowControl()} - Switch to manual flow control
mode</li>
+ * <li>{@link #disableAutoFlowControl()} - Switch from automatic to manual
message requesting</li>
+ * <li>{@link #request(int)} - Explicitly request a specific number of
messages from the sender</li>
* </ul>
*
+ * <h3>Typical Usage Pattern</h3>
+ * <pre>{@code
+ * // Send-side backpressure example
+ * callStreamObserver.setOnReadyHandler(() -> {
+ * while (callStreamObserver.isReady() && hasMoreData()) {
+ * callStreamObserver.onNext(getNextData());
+ * }
+ * if (!hasMoreData()) {
+ * callStreamObserver.onCompleted();
+ * }
+ * });
+ *
+ * // Receive-side backpressure example (in beforeStart or similar)
+ * callStreamObserver.disableAutoFlowControl();
+ * callStreamObserver.request(10); // Request initial batch
+ *
+ * // Then in onNext()
+ * public void onNext(T value) {
+ * process(value);
+ * callStreamObserver.request(1); // Request next message after processing
+ * }
+ * }</pre>
+ *
* @param <T> the type of value passed to the stream
+ * @see ClientCallStreamObserver
+ * @see ServerCallStreamObserver
+ * @see ClientResponseObserver
*/
public interface CallStreamObserver<T> extends StreamObserver<T> {
@@ -84,8 +117,6 @@ public interface CallStreamObserver<T> extends
StreamObserver<T> {
* <p>
* For stream set compression needs to determine whether the metadata has
been sent, and carry
* on corresponding processing
- *
- * @param compression {@link Compressor}
*/
void setCompression(String compression);
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientCallStreamObserver.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientCallStreamObserver.java
new file mode 100644
index 0000000000..b19dff44a1
--- /dev/null
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientCallStreamObserver.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.stream;
+
+/**
+ * A client-side extension of {@link CallStreamObserver} that provides
additional functionality
+ * for controlling the outbound request stream. This interface mirrors gRPC's
+ * {@code io.grpc.stub.ClientCallStreamObserver} for compatibility.
+ *
+ * <p>This interface is typically obtained through {@link
ClientResponseObserver#beforeStart(ClientCallStreamObserver)}
+ * and provides methods for:
+ * <ul>
+ * <li>Controlling send-side backpressure via {@link #isReady()} and {@link
#setOnReadyHandler(Runnable)}</li>
+ * <li>Controlling receive-side backpressure via {@link
#disableAutoRequestWithInitial(int)} and {@link #request(int)}</li>
+ * </ul>
+ *
+ * <h3>Send-Side Backpressure (Controlling Outgoing Data Rate)</h3>
+ * <pre>{@code
+ * @Override
+ * public void beforeStart(ClientCallStreamObserver<Request> requestStream) {
+ * requestStream.disableAutoFlowControl();
+ * requestStream.setOnReadyHandler(() -> {
+ * while (requestStream.isReady() && hasMoreData()) {
+ * requestStream.onNext(getNextRequest());
+ * }
+ * });
+ * }
+ * }</pre>
+ *
+ * <h3>Receive-Side Backpressure (Controlling Incoming Data Rate)</h3>
+ * <pre>{@code
+ * @Override
+ * public void beforeStart(ClientCallStreamObserver<Request> requestStream) {
+ * // Request only 10 messages initially
+ * requestStream.disableAutoRequestWithInitial(10);
+ * }
+ *
+ * @Override
+ * public void onNext(Response response) {
+ * process(response);
+ * // Request more after processing
+ * requestStream.request(1);
+ * }
+ * }</pre>
+ *
+ * @param <ReqT> the type of messages sent to the server (request type)
+ * @see CallStreamObserver
+ * @see ClientResponseObserver
+ */
+public interface ClientCallStreamObserver<ReqT> extends
CallStreamObserver<ReqT> {
+
+ /**
+ * Disables automatic inbound flow control and sets the initial number of
messages
+ * to request from the server.
+ *
+ * <p>By default, the runtime automatically requests messages from the
server as they
+ * are consumed. Calling this method switches to manual flow control mode,
where the
+ * client must explicitly call {@link #request(int)} to receive more
messages.
+ *
+ * <p>This method <strong>must</strong> be called within
+ * {@link ClientResponseObserver#beforeStart(ClientCallStreamObserver)}
before the
+ * stream starts, otherwise it has no effect.
+ *
+ * <p><strong>Usage:</strong>
+ * <pre>{@code
+ * @Override
+ * public void beforeStart(ClientCallStreamObserver<Request>
requestStream) {
+ * // Start with 5 messages, then request more in onNext()
+ * requestStream.disableAutoRequestWithInitial(5);
+ * }
+ *
+ * @Override
+ * public void onNext(Response response) {
+ * process(response);
+ * requestStream.request(1); // Request one more message
+ * }
+ * }</pre>
+ *
+ * @param request the initial number of messages to request from the
server.
+ * A value of 0 means no messages will be delivered until
{@link #request(int)} is called.
+ * @see #request(int)
+ * @see #disableAutoFlowControl()
+ */
+ void disableAutoRequestWithInitial(int request);
+}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientResponseObserver.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientResponseObserver.java
new file mode 100644
index 0000000000..1c6c3e7d37
--- /dev/null
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ClientResponseObserver.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.stream;
+
+/**
+ * A client-side {@link StreamObserver} that provides a callback to receive a
reference to the
+ * outbound request stream observer before the call starts. This interface
mirrors gRPC's
+ * {@code io.grpc.stub.ClientResponseObserver} for compatibility.
+ *
+ * <p>This interface is used for advanced flow control scenarios where the
client needs to:
+ * <ul>
+ * <li>Configure flow control settings before the stream starts</li>
+ * <li>Set up an {@link CallStreamObserver#setOnReadyHandler(Runnable)
onReadyHandler} for send-side backpressure</li>
+ * <li>Control the rate of receiving messages using {@link
ClientCallStreamObserver#disableAutoRequestWithInitial(int)}</li>
+ * </ul>
+ *
+ * <h3>Example Usage</h3>
+ * <pre>{@code
+ * // Client streaming with backpressure
+ * ClientResponseObserver<DataChunk, Response> responseObserver =
+ * new ClientResponseObserver<DataChunk, Response>() {
+ * @Override
+ * public void beforeStart(ClientCallStreamObserver<DataChunk>
requestStream) {
+ * // Disable auto flow control for manual send control
+ * requestStream.disableAutoFlowControl();
+ *
+ * // Set up onReadyHandler for send-side backpressure
+ * requestStream.setOnReadyHandler(() -> {
+ * while (requestStream.isReady() && hasMoreData()) {
+ * requestStream.onNext(getNextChunk());
+ * }
+ * });
+ * }
+ *
+ * @Override
+ * public void onNext(Response response) { ... }
+ *
+ * @Override
+ * public void onError(Throwable t) { ... }
+ *
+ * @Override
+ * public void onCompleted() { ... }
+ * };
+ *
+ * service.clientStream(responseObserver);
+ * }</pre>
+ *
+ * @param <ReqT> the type of messages sent to the server (request type)
+ * @param <RespT> the type of messages received from the server (response type)
+ * @see ClientCallStreamObserver
+ * @see CallStreamObserver
+ */
+public interface ClientResponseObserver<ReqT, RespT> extends
StreamObserver<RespT> {
+
+ /**
+ * Called by the runtime prior to the start of a call to provide a
reference to the
+ * {@link ClientCallStreamObserver} for the outbound request stream.
+ *
+ * <p>This callback is invoked <strong>before</strong> the underlying
stream is created,
+ * allowing the client to configure flow control settings that take effect
from the
+ * beginning of the call.
+ *
+ * <p><strong>Allowed operations in this callback:</strong>
+ * <ul>
+ * <li>{@link ClientCallStreamObserver#setOnReadyHandler(Runnable)} -
Set handler for send-side backpressure</li>
+ * <li>{@link
ClientCallStreamObserver#disableAutoRequestWithInitial(int)} - Configure
receive-side backpressure</li>
+ * <li>{@link CallStreamObserver#disableAutoFlowControl()} - Disable
automatic flow control</li>
+ * </ul>
+ *
+ * <p><strong>Note:</strong> Do not call {@link
StreamObserver#onNext(Object)} or
+ * {@link StreamObserver#onCompleted()} within this callback. Data should
only be sent
+ * after the stream is ready (via the {@code onReadyHandler}).
+ *
+ * @param requestStream the {@link ClientCallStreamObserver} for sending
requests to the server
+ */
+ void beforeStart(final ClientCallStreamObserver<ReqT> requestStream);
+}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ServerCallStreamObserver.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ServerCallStreamObserver.java
new file mode 100644
index 0000000000..a80b38cf58
--- /dev/null
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/stream/ServerCallStreamObserver.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.stream;
+
+/**
+ * A server-side extension of {@link CallStreamObserver} that provides flow
control capabilities
+ * for outbound response streams. This interface mirrors gRPC's
+ * {@code io.grpc.stub.ServerCallStreamObserver} for compatibility.
+ *
+ * <p>On the server side, this interface is obtained by casting the {@link
StreamObserver}
+ * parameter passed to streaming RPC methods. It allows the server to:
+ * <ul>
+ * <li>Check if the response stream is ready using {@link #isReady()}</li>
+ * <li>Set a callback for when the stream becomes ready using {@link
#setOnReadyHandler(Runnable)}</li>
+ * <li>Control inbound flow from the client using {@link #request(int)} and
{@link #disableAutoFlowControl()}</li>
+ * </ul>
+ *
+ * <h3>Server-Side Send Backpressure Example</h3>
+ * <pre>{@code
+ * @Override
+ * public void serverStream(Request request, StreamObserver<Response>
responseObserver) {
+ * ServerCallStreamObserver<Response> serverObserver =
+ * (ServerCallStreamObserver<Response>) responseObserver;
+ *
+ * AtomicInteger sent = new AtomicInteger(0);
+ * int totalCount = 100;
+ *
+ * serverObserver.setOnReadyHandler(() -> {
+ * while (serverObserver.isReady() && sent.get() < totalCount) {
+ * int seq = sent.getAndIncrement();
+ * serverObserver.onNext(createResponse(seq));
+ * }
+ * if (sent.get() >= totalCount) {
+ * serverObserver.onCompleted();
+ * }
+ * });
+ * }
+ * }</pre>
+ *
+ * <h3>Server-Side Receive Backpressure Example (for Client/Bidi
Streaming)</h3>
+ * <pre>{@code
+ * @Override
+ * public StreamObserver<Request> clientStream(StreamObserver<Response>
responseObserver) {
+ * ServerCallStreamObserver<Response> serverObserver =
+ * (ServerCallStreamObserver<Response>) responseObserver;
+ *
+ * // Control how many messages we receive from the client
+ * serverObserver.disableAutoFlowControl();
+ * serverObserver.request(5); // Start with 5 messages
+ *
+ * return new StreamObserver<Request>() {
+ * @Override
+ * public void onNext(Request request) {
+ * process(request);
+ * serverObserver.request(1); // Request one more
+ * }
+ * // ... onError, onCompleted
+ * };
+ * }
+ * }</pre>
+ *
+ * @param <RespT> the type of messages sent to the client (response type)
+ * @see CallStreamObserver
+ * @see StreamObserver
+ */
+public interface ServerCallStreamObserver<RespT> extends
CallStreamObserver<RespT> {
+
+ default void disableAutoRequest() {
+ disableAutoFlowControl();
+ }
+}
diff --git
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
index 5ca8676a1b..c3465bc9a8 100644
---
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
+++
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinyPublisher.java
@@ -16,15 +16,15 @@
*/
package org.apache.dubbo.mutiny;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
- * The middle layer between {@link
org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Mutiny API.
<p>
+ * The middle layer between {@link
org.apache.dubbo.common.stream.CallStreamObserver} and Mutiny API. <p>
* 1. passing the data received by CallStreamObserver to Mutiny consumer <br>
* 2. passing the request of Mutiny API to CallStreamObserver
*/
diff --git
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
index 643d84e32b..c19ee45d6e 100644
---
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
+++
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/AbstractTripleMutinySubscriber.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.mutiny;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
diff --git
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
index 20fbe4aac7..c8788ddc2a 100644
---
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
+++
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ClientTripleMutinyPublisher.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.mutiny;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
import java.util.function.Consumer;
diff --git
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
index eb126492ff..180f193e66 100644
---
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
+++
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinyPublisher.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.mutiny;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
/**
* Used in ManyToOne and ManyToMany in server. <br>
diff --git
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
index 50963db3a1..7e0f63eda2 100644
---
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
+++
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/ServerTripleMutinySubscriber.java
@@ -16,9 +16,9 @@
*/
package org.apache.dubbo.mutiny;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import java.util.ArrayList;
import java.util.List;
diff --git
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
index 296017ebc9..c0da4124fb 100644
---
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
+++
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyClientCalls.java
@@ -16,12 +16,12 @@
*/
package org.apache.dubbo.mutiny.calls;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.mutiny.ClientTripleMutinyPublisher;
import org.apache.dubbo.mutiny.ClientTripleMutinySubscriber;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.model.StubMethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.stub.StubInvocationUtil;
import io.smallrye.mutiny.Multi;
diff --git
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
index 5364b26c3f..a64349a8bf 100644
---
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
+++
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/calls/MutinyServerCalls.java
@@ -16,12 +16,12 @@
*/
package org.apache.dubbo.mutiny.calls;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.mutiny.ServerTripleMutinyPublisher;
import org.apache.dubbo.mutiny.ServerTripleMutinySubscriber;
import org.apache.dubbo.rpc.StatusRpcException;
import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import java.util.List;
import java.util.concurrent.CompletableFuture;
diff --git
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
index 8915c25092..a355cf762c 100644
---
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
+++
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToManyMethodHandler.java
@@ -16,9 +16,9 @@
*/
package org.apache.dubbo.mutiny.handler;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.stub.StubMethodHandler;
import java.util.concurrent.CompletableFuture;
diff --git
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
index 634f47ccfb..33fd72c76e 100644
---
a/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
+++
b/dubbo-plugin/dubbo-mutiny/src/main/java/org/apache/dubbo/mutiny/handler/ManyToOneMethodHandler.java
@@ -16,9 +16,9 @@
*/
package org.apache.dubbo.mutiny.handler;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.stub.StubMethodHandler;
import java.util.concurrent.CompletableFuture;
diff --git
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
index e4e840790a..041bd1cb7c 100644
---
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
+++
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
@@ -16,11 +16,11 @@
*/
package org.apache.dubbo.mutiny;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.mutiny.calls.MutinyClientCalls;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.model.StubMethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.stub.StubInvocationUtil;
import java.time.Duration;
diff --git
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
index 58bb7b115c..1bfb0de8dd 100644
---
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
+++
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyServerCallsTest.java
@@ -16,9 +16,9 @@
*/
package org.apache.dubbo.mutiny;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.mutiny.calls.MutinyServerCalls;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import java.util.List;
import java.util.concurrent.CompletableFuture;
diff --git
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
index d74726b720..8d35d71e93 100644
---
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
+++
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinyPublisherTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.mutiny;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import java.util.ArrayList;
import java.util.List;
diff --git
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
index 76402ad9d9..1c5cf3093c 100644
---
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
+++
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/TripleMutinySubscriberTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.mutiny;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import java.util.concurrent.Flow;
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java
index 70d7028a00..3940f7d05c 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorPublisher.java
@@ -16,8 +16,8 @@
*/
package org.apache.dubbo.reactive;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -27,7 +27,7 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
- * The middle layer between {@link
org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Reactive
API. <p>
+ * The middle layer between {@link CallStreamObserver} and Reactive API. <p>
* 1. passing the data received by CallStreamObserver to Reactive consumer <br>
* 2. passing the request of Reactive API to CallStreamObserver
*/
@@ -71,8 +71,20 @@ public abstract class AbstractTripleReactorPublisher<T>
extends CancelableStream
if (subscription != null && this.subscription == null &&
HAS_SUBSCRIPTION.compareAndSet(false, true)) {
this.subscription = subscription;
subscription.disableAutoFlowControl();
+
+ // Set up onReadyHandler to trigger onSubscribe callback when
stream becomes ready.
+ // This is called AFTER call.start() via InitOnReadyQueueCommand,
ensuring the stream
+ // is created before any data is sent
+ // is triggered by onReady, not by onStart (which requires server
headers).
if (onSubscribe != null) {
- onSubscribe.accept(subscription);
+ subscription.setOnReadyHandler(() -> {
+ // Only execute the callback once (on first onReady)
+ Consumer<CallStreamObserver<?>> callback = onSubscribe;
+ if (callback != null && subscription.isReady()) {
+ onSubscribe = null; // Clear to prevent re-execution
+ callback.accept(subscription);
+ }
+ });
}
return;
}
@@ -148,6 +160,9 @@ public abstract class AbstractTripleReactorPublisher<T>
extends CancelableStream
synchronized (this) {
if (!canRequest) {
canRequest = true;
+ // Request buffered messages from the server.
+ // Note: onSubscribe callback is now triggered by
onReadyHandler (set in onSubscribe()),
+ // not here, because onReady is triggered earlier than onStart.
long count = requested;
subscription.request(count >= Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) count);
}
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
index b3d6fa058e..c911f903f3 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.reactive;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -26,7 +26,7 @@ import reactor.core.CoreSubscriber;
import reactor.util.annotation.NonNull;
/**
- * The middle layer between {@link
org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Reactive
API. <br>
+ * The middle layer between {@link CallStreamObserver} and Reactive API. <br>
* Passing the data from Reactive producer to CallStreamObserver.
*/
public abstract class AbstractTripleReactorSubscriber<T> implements
Subscriber<T>, CoreSubscriber<T> {
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ClientTripleReactorPublisher.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ClientTripleReactorPublisher.java
index 2ae9022804..dffcb16a1f 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ClientTripleReactorPublisher.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ClientTripleReactorPublisher.java
@@ -16,8 +16,9 @@
*/
package org.apache.dubbo.reactive;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
+import org.apache.dubbo.common.stream.CallStreamObserver;
+import org.apache.dubbo.common.stream.ClientCallStreamObserver;
+import org.apache.dubbo.common.stream.ClientResponseObserver;
import java.util.function.Consumer;
@@ -26,8 +27,13 @@ import java.util.function.Consumer;
* It is a Publisher for user subscriber to subscribe. <br>
* It is a StreamObserver for responseStream. <br>
* It is a Subscription for user subscriber to request and pass request to
requestStream.
+ * <p>
+ * Implements {@link ClientResponseObserver} following gRPC's pattern where
+ * {@link #beforeStart(ClientCallStreamObserver)} is called before the stream
starts,
+ * allowing configuration of flow control before any messages are sent.
*/
-public class ClientTripleReactorPublisher<T> extends
AbstractTripleReactorPublisher<T> {
+public class ClientTripleReactorPublisher<T> extends
AbstractTripleReactorPublisher<T>
+ implements ClientResponseObserver<Object, T> {
public ClientTripleReactorPublisher() {}
@@ -35,8 +41,15 @@ public class ClientTripleReactorPublisher<T> extends
AbstractTripleReactorPublis
super(onSubscribe, shutdownHook);
}
+ /**
+ * Called by the runtime prior to the start of a call to provide a
reference to the
+ * {@link ClientCallStreamObserver} for the outbound stream.
+ * <p>
+ * Following gRPC's pattern, this method is called BEFORE {@code
call.start()},
+ * allowing configuration of onReadyHandler and flow control settings.
+ */
@Override
- public void beforeStart(ClientCallToObserverAdapter<T>
clientCallToObserverAdapter) {
- super.onSubscribe(clientCallToObserverAdapter);
+ public void beforeStart(ClientCallStreamObserver<Object> requestStream) {
+ super.onSubscribe(requestStream);
}
}
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorPublisher.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorPublisher.java
index c5b9336741..7d21174f38 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorPublisher.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorPublisher.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.reactive;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.CallStreamObserver;
/**
* Used in ManyToOne and ManyToMany in server. <br>
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
index 3a4a9729a0..fdb8945bfe 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
@@ -16,9 +16,9 @@
*/
package org.apache.dubbo.reactive;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import java.util.ArrayList;
import java.util.List;
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorClientCalls.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorClientCalls.java
index dfe55003f3..243adc63ff 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorClientCalls.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorClientCalls.java
@@ -16,12 +16,12 @@
*/
package org.apache.dubbo.reactive.calls;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.reactive.ClientTripleReactorPublisher;
import org.apache.dubbo.reactive.ClientTripleReactorSubscriber;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.model.StubMethodDescriptor;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.stub.StubInvocationUtil;
import reactor.core.publisher.Flux;
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
index 73613f3d8c..eb65e40f5d 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
@@ -16,12 +16,12 @@
*/
package org.apache.dubbo.reactive.calls;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.reactive.ServerTripleReactorPublisher;
import org.apache.dubbo.reactive.ServerTripleReactorSubscriber;
import org.apache.dubbo.rpc.StatusRpcException;
import org.apache.dubbo.rpc.TriRpcStatus;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import java.util.List;
import java.util.concurrent.CompletableFuture;
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToManyMethodHandler.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToManyMethodHandler.java
index 6c14436249..87fe23fb5c 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToManyMethodHandler.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToManyMethodHandler.java
@@ -16,9 +16,9 @@
*/
package org.apache.dubbo.reactive.handler;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.reactive.calls.ReactorServerCalls;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.stub.StubMethodHandler;
import java.util.concurrent.CompletableFuture;
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToOneMethodHandler.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToOneMethodHandler.java
index f82a6a614b..441717d138 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToOneMethodHandler.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/ManyToOneMethodHandler.java
@@ -16,9 +16,9 @@
*/
package org.apache.dubbo.reactive.handler;
+import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.reactive.calls.ReactorServerCalls;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.stub.StubMethodHandler;
import java.util.concurrent.CompletableFuture;
diff --git
a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
index 3f303b5f3f..fd6a76a017 100644
---
a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
+++
b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreateObserverAdapter.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.reactive;
-import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
+import org.apache.dubbo.common.stream.ServerCallStreamObserver;
import java.util.concurrent.atomic.AtomicInteger;
@@ -28,7 +28,7 @@ import static org.mockito.Mockito.doAnswer;
public class CreateObserverAdapter {
- private ServerStreamObserver<String> responseObserver;
+ private ServerCallStreamObserver<String> responseObserver;
private AtomicInteger nextCounter;
private AtomicInteger completeCounter;
private AtomicInteger errorCounter;
@@ -39,7 +39,7 @@ public class CreateObserverAdapter {
completeCounter = new AtomicInteger();
errorCounter = new AtomicInteger();
- responseObserver = Mockito.mock(ServerStreamObserver.class);
+ responseObserver = Mockito.mock(ServerCallStreamObserver.class);
doAnswer(o ->
nextCounter.incrementAndGet()).when(responseObserver).onNext(anyString());
doAnswer(o ->
completeCounter.incrementAndGet()).when(responseObserver).onCompleted();
doAnswer(o ->
errorCounter.incrementAndGet()).when(responseObserver).onError(any(Throwable.class));
@@ -57,7 +57,7 @@ public class CreateObserverAdapter {
return errorCounter;
}
- public ServerStreamObserver<String> getResponseObserver() {
+ public ServerCallStreamObserver<String> getResponseObserver() {
return this.responseObserver;
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
index fb82a6c2e6..0fbf9bab0c 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.remoting.http12.h2;
+import org.apache.dubbo.common.stream.ServerCallStreamObserver;
import org.apache.dubbo.remoting.http12.AbstractServerHttpChannelObserver;
import org.apache.dubbo.remoting.http12.ErrorCodeHolder;
import org.apache.dubbo.remoting.http12.FlowControlStreamObserver;
@@ -29,8 +30,14 @@ import org.apache.dubbo.rpc.CancellationContext;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
+/**
+ * HTTP/2 server-side stream observer with flow control and backpressure
support.
+ * Implements {@link ServerCallStreamObserver} following gRPC's pattern for
backpressure.
+ */
public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserver<H2StreamChannel>
- implements FlowControlStreamObserver<Object>,
Http2CancelableStreamObserver<Object> {
+ implements FlowControlStreamObserver<Object>,
+ Http2CancelableStreamObserver<Object>,
+ ServerCallStreamObserver<Object> {
private CancellationContext cancellationContext;
@@ -118,6 +125,11 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
streamingDecoder.request(count);
}
+ @Override
+ public void setCompression(String compression) {
+ // not supported yet
+ }
+
@Override
public void disableAutoFlowControl() {
autoRequestN = false;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
index 180fc9bf9a..1c3a8a3aee 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java
@@ -16,10 +16,15 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.common.stream.ClientCallStreamObserver;
import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
-public interface ClientStreamObserver<T> extends CallStreamObserver<T> {
+/**
+ * @param <T>
+ * @deprecated use {@link ClientCallStreamObserver}
+ */
+@Deprecated
+public interface ClientStreamObserver<T> extends ClientCallStreamObserver<T> {
/**
* Swaps to manual flow control where no message will be delivered to
{@link
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
index 6dad6aaec4..7911bbbb8c 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamObserver.java
@@ -16,6 +16,11 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.common.stream.ServerCallStreamObserver;
-public interface ServerStreamObserver<T> extends CallStreamObserver<T> {}
+/**
+ * @deprecated use {@link ServerCallStreamObserver} instead
+ * @param <T>
+ */
+@Deprecated
+public interface ServerStreamObserver<T> extends ServerCallStreamObserver<T> {}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index dba35a20a3..dce7f422d2 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.stream.ClientResponseObserver;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
@@ -243,26 +244,62 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
return result;
}
+ /**
+ * Start a streaming call
+ * <p>
+ * The call sequence is:
+ * <pre>
+ * 1. Create adapter and listener
+ * 2. Call beforeStart() if observer is ClientResponseObserver or
CancelableStreamObserver
+ * (allows configuring onReadyHandler before stream starts)
+ * 3. Start the call (creates stream, may trigger initial onReady)
+ * </pre>
+ *
+ * <p>The two interfaces serve different purposes:
+ * <ul>
+ * <li>{@link ClientResponseObserver} - gRPC-compatible interface with
just beforeStart()
+ * <li>{@link CancelableStreamObserver} - Dubbo's extended interface
with cancellation and startRequest()
+ * </ul>
+ * An observer can implement both interfaces (e.g.,
ClientTripleReactorPublisher).
+ */
+ @SuppressWarnings("unchecked")
StreamObserver<Object> streamCall(
ClientCall call, RequestMetadata metadata, StreamObserver<Object>
responseObserver) {
+ ClientCallToObserverAdapter<Object> adapter = new
ClientCallToObserverAdapter<>(call, true);
+
+ // Create listener and associate with adapter
ObserverToClientCallListenerAdapter listener = new
ObserverToClientCallListenerAdapter(responseObserver);
- StreamObserver<Object> streamObserver = call.start(metadata, listener);
+ listener.setRequestAdapter(adapter);
- // Set the request adapter on the listener for onReady() to access
onReadyHandler
- if (streamObserver instanceof ClientCallToObserverAdapter) {
- listener.setRequestAdapter((ClientCallToObserverAdapter<Object>)
streamObserver);
+ // Handle CancelableStreamObserver first (for cancellation context and
startRequest)
+ // This must be done regardless of whether it also implements
ClientResponseObserver
+ if (responseObserver instanceof CancelableStreamObserver) {
+ CancelableStreamObserver<Object> cancelableObserver =
(CancelableStreamObserver<Object>) responseObserver;
+ // Set up cancellation context
+ CancellationContext context = new CancellationContext();
+ cancelableObserver.setCancellationContext(context);
+ context.addListener(ctx -> call.cancelByLocal(new
IllegalStateException("Canceled by app")));
+ // Set up startRequest to be called when stream is established
(onStart)
+ listener.setOnStartConsumer(dummy ->
cancelableObserver.startRequest());
}
- if (responseObserver instanceof CancelableStreamObserver) {
- final CancellationContext context = new CancellationContext();
- CancelableStreamObserver<Object> cancelableStreamObserver =
- (CancelableStreamObserver<Object>) responseObserver;
- cancelableStreamObserver.setCancellationContext(context);
- context.addListener(context1 -> call.cancelByLocal(new
IllegalStateException("Canceled by app")));
- listener.setOnStartConsumer(dummy ->
cancelableStreamObserver.startRequest());
-
cancelableStreamObserver.beforeStart((ClientCallToObserverAdapter<Object>)
streamObserver);
+ // Now call beforeStart() - use ClientResponseObserver if available
(gRPC-compatible),
+ // otherwise fall back to CancelableStreamObserver.beforeStart()
+ if (responseObserver instanceof ClientResponseObserver) {
+ // gRPC-compatible interface - beforeStart takes
ClientCallStreamObserver
+ ClientResponseObserver<Object, Object> clientResponseObserver =
+ (ClientResponseObserver<Object, Object>) responseObserver;
+ clientResponseObserver.beforeStart(adapter);
+ } else if (responseObserver instanceof CancelableStreamObserver) {
+ // Legacy Dubbo interface - beforeStart takes
ClientCallToObserverAdapter
+ CancelableStreamObserver<Object> cancelableObserver =
(CancelableStreamObserver<Object>) responseObserver;
+ cancelableObserver.beforeStart(adapter);
}
- return streamObserver;
+
+ // Start the call - creates stream and may trigger initial onReady
+ call.start(metadata, listener);
+
+ return adapter;
}
AsyncRpcResult invokeUnary(
@@ -303,7 +340,9 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
result.setExecutor(callbackExecutor);
ClientCall.Listener callListener = new UnaryClientCallListener(future);
- final StreamObserver<Object> requestObserver = call.start(request,
callListener);
+ // Create adapter for unary call (streamingResponse=false)
+ ClientCallToObserverAdapter<Object> requestObserver = new
ClientCallToObserverAdapter<>(call, false);
+ call.start(request, callListener);
requestObserver.onNext(pureArgument);
requestObserver.onCompleted();
return result;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
index 785b63140a..070f07f233 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
@@ -16,7 +16,6 @@
*/
package org.apache.dubbo.rpc.protocol.tri.call;
-import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.protocol.tri.RequestMetadata;
@@ -105,11 +104,12 @@ public interface ClientCall {
void sendMessage(Object message);
/**
+ * Start the call with the given metadata and response listener.
+ *
* @param metadata request metadata
* @param responseListener the listener to receive response
- * @return the stream observer representing the request sink
*/
- StreamObserver<Object> start(RequestMetadata metadata, Listener
responseListener);
+ void start(RequestMetadata metadata, Listener responseListener);
/**
* @return true if this call is auto request
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index 46f2cb0987..12e56e2ee0 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -18,14 +18,12 @@ package org.apache.dubbo.rpc.protocol.tri.call;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.tri.RequestMetadata;
import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
-import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
import org.apache.dubbo.rpc.protocol.tri.stream.ClientStream;
import org.apache.dubbo.rpc.protocol.tri.stream.ClientStreamFactory;
import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;
@@ -78,6 +76,9 @@ public class TripleClientCall implements ClientCall,
ClientStream.Listener {
if (done) {
return false;
}
+ if (stream == null) {
+ return false;
+ }
return stream.isReady();
}
@@ -292,16 +293,18 @@ public class TripleClientCall implements ClientCall,
ClientStream.Listener {
}
@Override
- public StreamObserver<Object> start(RequestMetadata metadata,
ClientCall.Listener responseListener) {
+ public void start(RequestMetadata metadata, ClientCall.Listener
responseListener) {
+ // Set listener BEFORE creating stream, so onReady() can access it
+ this.requestMetadata = metadata;
+ this.listener = responseListener;
+ this.streamingResponse = responseListener.streamingResponse();
+
ClientStream stream;
for (ClientStreamFactory factory :
frameworkModel.getActivateExtensions(ClientStreamFactory.class)) {
stream = factory.createClientStream(connectionClient,
frameworkModel, executor, this, writeQueue);
if (stream != null) {
- this.requestMetadata = metadata;
- this.listener = responseListener;
this.stream = stream;
- this.streamingResponse = responseListener.streamingResponse();
- return new ClientCallToObserverAdapter<>(this,
responseListener.streamingResponse());
+ return;
}
}
throw new IllegalStateException("No available ClientStreamFactory");
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
new file mode 100644
index 0000000000..61505532d0
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.command;
+
+import org.apache.dubbo.rpc.protocol.tri.stream.ClientStream;
+import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+/**
+ * Command to trigger initial onReady after the stream channel is created.
+ * This is necessary because onReady is only triggered by
channelWritabilityChanged,
+ * which won't fire if the channel is always writable from creation.
+ *
+ * <p>This command should be enqueued immediately after
CreateStreamQueueCommand.
+ * Since the WriteQueue executes commands in order within the EventLoop,
+ * this command will run after the stream channel has been created.
+ */
+public class InitOnReadyQueueCommand extends QueuedCommand {
+
+ private final TripleStreamChannelFuture streamChannelFuture;
+
+ private final ClientStream.Listener listener;
+
+ private InitOnReadyQueueCommand(TripleStreamChannelFuture
streamChannelFuture, ClientStream.Listener listener) {
+ this.streamChannelFuture = streamChannelFuture;
+ this.listener = listener;
+ this.promise(streamChannelFuture.getParentChannel().newPromise());
+ this.channel(streamChannelFuture.getParentChannel());
+ }
+
+ public static InitOnReadyQueueCommand create(
+ TripleStreamChannelFuture streamChannelFuture,
ClientStream.Listener listener) {
+ return new InitOnReadyQueueCommand(streamChannelFuture, listener);
+ }
+
+ @Override
+ public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
+ // NOOP - this command does not send any data
+ }
+
+ @Override
+ public void run(Channel channel) {
+ // Work in I/O thread, after CreateStreamQueueCommand has completed
+ Channel streamChannel = streamChannelFuture.getNow();
+ if (streamChannel != null && streamChannel.isWritable()) {
+ // Trigger initial onReady to allow application to start sending.
+ listener.onReady();
+ }
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerStreamObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerStreamObserver.java
index ea959f2634..4894dff07a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerStreamObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerStreamObserver.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.rpc.protocol.tri.h12.http2;
+import org.apache.dubbo.common.stream.ServerCallStreamObserver;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
@@ -31,7 +32,7 @@ import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;
import java.util.Map;
public class Http2ServerStreamObserver extends Http2ServerChannelObserver
- implements ServerStreamObserver<Object>, AttachmentHolder {
+ implements ServerStreamObserver<Object>,
ServerCallStreamObserver<Object>, AttachmentHolder {
private final FrameworkModel frameworkModel;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java
index f677e92229..87f358bade 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java
@@ -73,7 +73,7 @@ public final class Http2TripleClientStream extends
AbstractTripleClientStream {
}
@Override
- protected TripleStreamChannelFuture initStreamChannel(Channel parent) {
+ protected TripleStreamChannelFuture initStreamChannel0(Channel parent) {
Http2StreamChannelBootstrap bootstrap = new
Http2StreamChannelBootstrap(parent);
// Disable Netty's automatic stream flow control to enable manual flow
control
bootstrap.option(Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL,
false);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleClientStream.java
index 8fe8b41723..0ffd784c66 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleClientStream.java
@@ -58,7 +58,7 @@ public final class Http3TripleClientStream extends
AbstractTripleClientStream {
}
@Override
- protected TripleStreamChannelFuture initStreamChannel(Channel parent) {
+ protected TripleStreamChannelFuture initStreamChannel0(Channel parent) {
Http3RequestStreamInitializer initializer = new
Http3RequestStreamInitializer() {
@Override
protected void initRequestStream(QuicStreamChannel ch) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ClientCallToObserverAdapter.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ClientCallToObserverAdapter.java
index f832677ecc..6dc692d1f3 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ClientCallToObserverAdapter.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ClientCallToObserverAdapter.java
@@ -16,11 +16,13 @@
*/
package org.apache.dubbo.rpc.protocol.tri.observer;
+import org.apache.dubbo.common.stream.ClientCallStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.ClientStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.call.ClientCall;
-public class ClientCallToObserverAdapter<T> extends
CancelableStreamObserver<T> implements ClientStreamObserver<T> {
+public class ClientCallToObserverAdapter<T> extends CancelableStreamObserver<T>
+ implements ClientStreamObserver<T>, ClientCallStreamObserver<T> {
private final ClientCall call;
private final boolean streamingResponse;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
index 7bbd0239b0..d377a774c0 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
@@ -30,6 +30,7 @@ import
org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.EndStreamQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.InitOnReadyQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
import org.apache.dubbo.rpc.protocol.tri.frame.Deframer;
@@ -111,7 +112,21 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
this.streamChannelFuture = initStreamChannel(parent);
}
- protected abstract TripleStreamChannelFuture initStreamChannel(Channel
parent);
+ private TripleStreamChannelFuture initStreamChannel(Channel parent) {
+ TripleStreamChannelFuture tripleStreamChannelFuture =
initStreamChannel0(parent);
+ /**
+ * Enqueue InitOnReadyQueueCommand after the stream creation command.
+ * Since WriteQueue executes commands in order within the EventLoop,
+ * this command will run after the stream channel has been created by
CreateStreamQueueCommand.
+ *
+ * This is necessary because onReady is only triggered by
channelWritabilityChanged,
+ * which won't fire if the channel is always writable from creation.
+ */
+
writeQueue.enqueue(InitOnReadyQueueCommand.create(tripleStreamChannelFuture,
listener));
+ return tripleStreamChannelFuture;
+ }
+
+ protected abstract TripleStreamChannelFuture initStreamChannel0(Channel
parent);
/**
* Get the stream channel future for flow control.
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
index a468d9d815..941153fb2b 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;
class TripleInvokerTest {
@@ -59,8 +60,8 @@ class TripleInvokerTest {
.createExecutorIfAbsent(url);
TripleClientCall call = Mockito.mock(TripleClientCall.class);
StreamObserver streamObserver = Mockito.mock(StreamObserver.class);
- when(call.start(any(RequestMetadata.class),
any(ClientCall.Listener.class)))
- .thenReturn(streamObserver);
+ // start() now returns void, just verify it's called
+ doNothing().when(call).start(any(RequestMetadata.class),
any(ClientCall.Listener.class));
RpcInvocation invocation = new RpcInvocation();
invocation.setMethodName("test");
invocation.setArguments(new Object[] {streamObserver, streamObserver});
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/BackpressureTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/BackpressureTest.java
index a67c90f333..4f8ed4c477 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/BackpressureTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/BackpressureTest.java
@@ -323,9 +323,8 @@ class BackpressureTest {
public void sendMessage(Object message) {}
@Override
- public StreamObserver<Object> start(
- org.apache.dubbo.rpc.protocol.tri.RequestMetadata metadata,
Listener responseListener) {
- return null;
+ public void start(org.apache.dubbo.rpc.protocol.tri.RequestMetadata
metadata, Listener responseListener) {
+ // No-op for mock
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
index fa71ca1fa9..c8722e4072 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
@@ -32,6 +32,7 @@ import
org.apache.dubbo.rpc.protocol.tri.command.CreateStreamQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.EndStreamQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
+import org.apache.dubbo.rpc.protocol.tri.command.InitOnReadyQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
import org.apache.dubbo.rpc.protocol.tri.h12.http2.Http2TripleClientStream;
@@ -88,6 +89,7 @@ class TripleClientStreamTest {
listener,
http2StreamChannel);
verify(writeQueue).enqueue(any(CreateStreamQueueCommand.class));
+ verify(writeQueue).enqueue(any(InitOnReadyQueueCommand.class));
final RequestMetadata requestMetadata = new RequestMetadata();
requestMetadata.method = methodDescriptor;
@@ -100,11 +102,13 @@ class TripleClientStreamTest {
requestMetadata.version = url.getVersion();
stream.sendHeader(requestMetadata.toHeaders());
verify(writeQueue).enqueueFuture(any(HeaderQueueCommand.class),
any(Executor.class));
- // no other commands
- verify(writeQueue).enqueue(any(QueuedCommand.class));
+ // enqueue should have been called twice: CreateStreamQueueCommand and
InitOnReadyQueueCommand
+ verify(writeQueue, times(2)).enqueue(any(QueuedCommand.class));
stream.sendMessage(new byte[0], 0);
verify(writeQueue).enqueueFuture(any(DataQueueCommand.class),
any(Executor.class));
verify(writeQueue, times(2)).enqueueFuture(any(QueuedCommand.class),
any(Executor.class));
+ // After sendHeader and sendMessage, enqueue should have been called
twice:
+ // once for CreateStreamQueueCommand and once for
InitOnReadyQueueCommand
stream.halfClose();
verify(writeQueue).enqueueFuture(any(EndStreamQueueCommand.class),
any(Executor.class));
verify(writeQueue, times(3)).enqueueFuture(any(QueuedCommand.class),
any(Executor.class));