This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new abddf21a6 fix(Python-client):add thrift client timeout support (#2304)
abddf21a6 is described below

commit abddf21a6293d8580a592bab75621868de0fbc23
Author: Jun 11 <[email protected]>
AuthorDate: Mon Jan 5 20:59:15 2026 +0800

    fix(Python-client):add thrift client timeout support (#2304)
    
    This PR fixes a critical issue where Python client Thrift requests were 
being dropped by the server
    when the `rpc_request_dropped_before_execution_when_timeout` parameter is 
enabled, due to
    the `client_timeout` field not being properly set in the Thrift header.
---
 python-client/pypegasus/operate/packet.py | 3 ++-
 python-client/pypegasus/pgclient.py       | 6 +++---
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/python-client/pypegasus/operate/packet.py 
b/python-client/pypegasus/operate/packet.py
index fd5000dd4..d49edacf9 100644
--- a/python-client/pypegasus/operate/packet.py
+++ b/python-client/pypegasus/operate/packet.py
@@ -74,7 +74,8 @@ class ClientOperator(object):
         self.request = request
         self.response = None
 
-    def prepare_thrift_header(self, body_length):
+    def prepare_thrift_header(self, body_length, timeout):
+        self.header.client_timeout = max(0, timeout)
         self.header.body_length = body_length
         self.header.thread_hash = 
tools.dsn_gpid_to_thread_hash(self.header.app_id, self.header.partition_index)
         return self.header.to_bytes()
diff --git a/python-client/pypegasus/pgclient.py 
b/python-client/pypegasus/pgclient.py
index 9960f44c2..a897b345a 100644
--- a/python-client/pypegasus/pgclient.py
+++ b/python-client/pypegasus/pgclient.py
@@ -107,7 +107,7 @@ class BaseSession(object):
         self._requests[seqid] = dr
 
         # ds(deferred send) will wait dr(deferred receive)
-        ds = defer.maybeDeferred(self.send_req, op, seqid)
+        ds = defer.maybeDeferred(self.send_req, op, seqid, timeout)
         ds.addCallbacks(
             callback=self.cb_send,
             callbackArgs=(seqid,),
@@ -116,13 +116,13 @@ class BaseSession(object):
         ds.addTimeout(timeout/1000.0, reactor, self.on_timeout)
         return ds
 
-    def send_req(self, op, seqid):
+    def send_req(self, op, seqid, timeout):
         oprot = self._oprot_factory.getProtocol(self._transport)
         oprot.trans.seek(ThriftHeader.HEADER_LENGTH)                    # skip 
header
         op.send_data(oprot, seqid)
         body_length = oprot.trans.tell() - ThriftHeader.HEADER_LENGTH
         oprot.trans.seek(0)                                             # back 
to header
-        oprot.trans.write(op.prepare_thrift_header(body_length))
+        oprot.trans.write(op.prepare_thrift_header(body_length, timeout))
         oprot.trans.flush()
 
     def recv_ACK(self, iprot, mtype, rseqid, errno, result_type, parser):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to