This is an automated email from the ASF dual-hosted git repository.

rainyu pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 893cd2343d Implement stream initialization in ClientStream and 
AbstractTripleClientStream (#16014)
893cd2343d is described below

commit 893cd2343da69b962bc287d1d1926f8f642a456c
Author: earthchen <[email protected]>
AuthorDate: Mon Jan 19 15:10:46 2026 +0800

    Implement stream initialization in ClientStream and 
AbstractTripleClientStream (#16014)
---
 .../apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java   |  1 +
 .../rpc/protocol/tri/stream/AbstractTripleClientStream.java    | 10 +++++++---
 .../org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java |  5 +++++
 .../dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java  |  1 +
 4 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index c4de49e0f1..b76d9a00c9 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -307,6 +307,7 @@ public class TripleClientCall implements ClientCall, 
ClientStream.Listener {
         for (ClientStreamFactory factory : 
frameworkModel.getActivateExtensions(ClientStreamFactory.class)) {
             stream = factory.createClientStream(connectionClient, 
frameworkModel, executor, this, writeQueue);
             if (stream != null) {
+                stream.initStream();
                 this.stream = stream;
                 return;
             }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
index 6c40a3f9a4..dfbf1cb007 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
@@ -80,7 +80,7 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
     protected final TripleWriteQueue writeQueue;
     private Deframer deframer;
     private final Channel parent;
-    private final TripleStreamChannelFuture streamChannelFuture;
+    private TripleStreamChannelFuture streamChannelFuture;
     private boolean halfClosed;
     private boolean rst;
 
@@ -101,7 +101,6 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
         this.parent = http2StreamChannel.parent();
         this.listener = listener;
         this.writeQueue = writeQueue;
-        this.streamChannelFuture = initStreamChannel(http2StreamChannel);
     }
 
     protected AbstractTripleClientStream(
@@ -114,11 +113,16 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
         this.parent = parent;
         this.listener = listener;
         this.writeQueue = writeQueue;
-        this.streamChannelFuture = initStreamChannel(parent);
+    }
+
+    @Override
+    public void initStream() {
+        initStreamChannel(this.parent);
     }
 
     private TripleStreamChannelFuture initStreamChannel(Channel parent) {
         TripleStreamChannelFuture tripleStreamChannelFuture = 
initStreamChannel0(parent);
+        this.streamChannelFuture = tripleStreamChannelFuture;
         /**
          * Enqueue InitOnReadyQueueCommand after the stream creation command.
          * Since WriteQueue executes commands in order within the EventLoop,
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
index feb2037e33..b417a05440 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
@@ -67,6 +67,11 @@ public interface ClientStream extends Stream {
         default void onReady() {}
     }
 
+    /**
+     * Initialize the stream
+     */
+    void initStream();
+
     /**
      * Send message to remote peer.
      *
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
index c8722e4072..623a149745 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
@@ -88,6 +88,7 @@ class TripleClientStreamTest {
                 writeQueue,
                 listener,
                 http2StreamChannel);
+        stream.initStream();
         verify(writeQueue).enqueue(any(CreateStreamQueueCommand.class));
         verify(writeQueue).enqueue(any(InitOnReadyQueueCommand.class));
 

Reply via email to