This is an automated email from the ASF dual-hosted git repository.
rainyu pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 893cd2343d Implement stream initialization in ClientStream and
AbstractTripleClientStream (#16014)
893cd2343d is described below
commit 893cd2343da69b962bc287d1d1926f8f642a456c
Author: earthchen <[email protected]>
AuthorDate: Mon Jan 19 15:10:46 2026 +0800
Implement stream initialization in ClientStream and
AbstractTripleClientStream (#16014)
---
.../apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java | 1 +
.../rpc/protocol/tri/stream/AbstractTripleClientStream.java | 10 +++++++---
.../org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java | 5 +++++
.../dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java | 1 +
4 files changed, 14 insertions(+), 3 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index c4de49e0f1..b76d9a00c9 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -307,6 +307,7 @@ public class TripleClientCall implements ClientCall,
ClientStream.Listener {
for (ClientStreamFactory factory :
frameworkModel.getActivateExtensions(ClientStreamFactory.class)) {
stream = factory.createClientStream(connectionClient,
frameworkModel, executor, this, writeQueue);
if (stream != null) {
+ stream.initStream();
this.stream = stream;
return;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
index 6c40a3f9a4..dfbf1cb007 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
@@ -80,7 +80,7 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
protected final TripleWriteQueue writeQueue;
private Deframer deframer;
private final Channel parent;
- private final TripleStreamChannelFuture streamChannelFuture;
+ private TripleStreamChannelFuture streamChannelFuture;
private boolean halfClosed;
private boolean rst;
@@ -101,7 +101,6 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
this.parent = http2StreamChannel.parent();
this.listener = listener;
this.writeQueue = writeQueue;
- this.streamChannelFuture = initStreamChannel(http2StreamChannel);
}
protected AbstractTripleClientStream(
@@ -114,11 +113,16 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
this.parent = parent;
this.listener = listener;
this.writeQueue = writeQueue;
- this.streamChannelFuture = initStreamChannel(parent);
+ }
+
+ @Override
+ public void initStream() {
+ initStreamChannel(this.parent);
}
private TripleStreamChannelFuture initStreamChannel(Channel parent) {
TripleStreamChannelFuture tripleStreamChannelFuture =
initStreamChannel0(parent);
+ this.streamChannelFuture = tripleStreamChannelFuture;
/**
* Enqueue InitOnReadyQueueCommand after the stream creation command.
* Since WriteQueue executes commands in order within the EventLoop,
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
index feb2037e33..b417a05440 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
@@ -67,6 +67,11 @@ public interface ClientStream extends Stream {
default void onReady() {}
}
+ /**
+ * Initialize the stream
+ */
+ void initStream();
+
/**
* Send message to remote peer.
*
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
index c8722e4072..623a149745 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
@@ -88,6 +88,7 @@ class TripleClientStreamTest {
writeQueue,
listener,
http2StreamChannel);
+ stream.initStream();
verify(writeQueue).enqueue(any(CreateStreamQueueCommand.class));
verify(writeQueue).enqueue(any(InitOnReadyQueueCommand.class));