This is an automated email from the ASF dual-hosted git repository.

zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 5aa764c07e fix tri backpressure race condition (#16004)
5aa764c07e is described below

commit 5aa764c07e917c89849e17b5602a06caf9aeaf3a
Author: earthchen <[email protected]>
AuthorDate: Fri Jan 16 16:47:10 2026 +0800

    fix tri backpressure race condition (#16004)
    
    * fix tri backpressure race condition
    
    * add business ready check
    
    * fix: enhance onReady trigger mechanism to prevent race conditions
    
    * fix: improve onReady notification mechanism to prevent race conditions
    
    * fix
    
    * mvn spotless:apply
    
    * fix: resolve race condition in onReady notification by using isReady 
method
    
    * Update 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Revert "Update 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java"
    
    This reverts commit 8dd204e82b6150af32061a29fd8706b33c9aeb86.
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../rpc/protocol/tri/call/TripleClientCall.java    |  6 +++-
 .../tri/command/InitOnReadyQueueCommand.java       | 19 ++++++-----
 .../tri/stream/AbstractTripleClientStream.java     | 38 +++++++++++++++++++---
 3 files changed, 49 insertions(+), 14 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index 12e56e2ee0..c4de49e0f1 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -168,7 +168,11 @@ public class TripleClientCall implements ClientCall, 
ClientStream.Listener {
         if (listener == null) {
             return;
         }
-        // ObserverToClientCallListenerAdapter.onReady() triggers the 
onReadyHandler
+        // ObserverToClientCallListenerAdapter.onReady() triggers the 
onReadyHandler.
+        // Note: We do NOT check isReady() here because of the async dispatch 
model.
+        // The handler is always called (following gRPC's "spurious 
notifications" semantics),
+        // and it should check isReady() internally via while(isReady()) { 
send(); }.
+        // Subsequent channelWritabilityChanged events will trigger onReady() 
again if needed.
         executor.execute(() -> {
             try {
                 listener.onReady();
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
index 61505532d0..df7159c4f1 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
@@ -16,7 +16,7 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.command;
 
-import org.apache.dubbo.rpc.protocol.tri.stream.ClientStream;
+import org.apache.dubbo.rpc.protocol.tri.stream.AbstractTripleClientStream;
 import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
 
 import io.netty.channel.Channel;
@@ -36,18 +36,18 @@ public class InitOnReadyQueueCommand extends QueuedCommand {
 
     private final TripleStreamChannelFuture streamChannelFuture;
 
-    private final ClientStream.Listener listener;
+    private final AbstractTripleClientStream stream;
 
-    private InitOnReadyQueueCommand(TripleStreamChannelFuture 
streamChannelFuture, ClientStream.Listener listener) {
+    private InitOnReadyQueueCommand(TripleStreamChannelFuture 
streamChannelFuture, AbstractTripleClientStream stream) {
         this.streamChannelFuture = streamChannelFuture;
-        this.listener = listener;
+        this.stream = stream;
         this.promise(streamChannelFuture.getParentChannel().newPromise());
         this.channel(streamChannelFuture.getParentChannel());
     }
 
     public static InitOnReadyQueueCommand create(
-            TripleStreamChannelFuture streamChannelFuture, 
ClientStream.Listener listener) {
-        return new InitOnReadyQueueCommand(streamChannelFuture, listener);
+            TripleStreamChannelFuture streamChannelFuture, 
AbstractTripleClientStream stream) {
+        return new InitOnReadyQueueCommand(streamChannelFuture, stream);
     }
 
     @Override
@@ -59,9 +59,10 @@ public class InitOnReadyQueueCommand extends QueuedCommand {
     public void run(Channel channel) {
         // Work in I/O thread, after CreateStreamQueueCommand has completed
         Channel streamChannel = streamChannelFuture.getNow();
-        if (streamChannel != null && streamChannel.isWritable()) {
-            // Trigger initial onReady to allow application to start sending.
-            listener.onReady();
+        if (streamChannel != null) {
+            // Trigger initial onReady through the stream
+            // update lastReadyState and notify the listener
+            stream.triggerInitialOnReady();
         }
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
index d377a774c0..6c40a3f9a4 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
@@ -86,6 +86,11 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
 
     private boolean isReturnTriException = false;
 
+    /**
+     * Tracks the last known ready state to detect when the state changes from 
"not ready" to "ready".
+     */
+    private volatile boolean lastReadyState = false;
+
     protected AbstractTripleClientStream(
             FrameworkModel frameworkModel,
             Executor executor,
@@ -122,7 +127,7 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
          * This is necessary because onReady is only triggered by 
channelWritabilityChanged,
          * which won't fire if the channel is always writable from creation.
          */
-        
writeQueue.enqueue(InitOnReadyQueueCommand.create(tripleStreamChannelFuture, 
listener));
+        
writeQueue.enqueue(InitOnReadyQueueCommand.create(tripleStreamChannelFuture, 
this));
         return tripleStreamChannelFuture;
     }
 
@@ -192,6 +197,9 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
                         .withDescription("Client write message failed")
                         .withCause(future.cause()));
                 transportException(future.cause());
+            } else {
+                // After successful write, check if we need to trigger onReady
+                notifyOnReady(false);
             }
         });
     }
@@ -254,9 +262,31 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
      * asynchronously triggering all necessary callbacks through its executor.
      */
     protected void onWritabilityChanged() {
-        Channel channel = streamChannelFuture.getNow();
-        if (channel != null && channel.isWritable()) {
-            // Synchronously call listener.onReady(), which will use executor 
to run the callback
+        notifyOnReady(false);
+    }
+
+    /**
+     * Called by InitOnReadyQueueCommand to trigger the initial onReady 
notification.
+     */
+    public void triggerInitialOnReady() {
+        notifyOnReady(true);
+    }
+
+    /**
+     * notify listener when stream becomes ready
+     *
+     * @param forceNotify if true, always trigger onReady (for initial 
notification);
+     *                    if false, only trigger when state changes from "not 
ready" to "ready"
+     */
+    private synchronized void notifyOnReady(boolean forceNotify) {
+        boolean wasReady = lastReadyState;
+        boolean isNowReady = isReady();
+        lastReadyState = isNowReady;
+
+        // Trigger onReady if:
+        // 1. forceNotify is true (initial notification, spurious is OK), or
+        // 2. state changes from "not ready" to "ready"
+        if (forceNotify || (!wasReady && isNowReady)) {
             listener.onReady();
         }
     }

Reply via email to