This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new abddf21a6 fix(Python-client):add thrift client timeout support (#2304)
abddf21a6 is described below
commit abddf21a6293d8580a592bab75621868de0fbc23
Author: Jun 11 <[email protected]>
AuthorDate: Mon Jan 5 20:59:15 2026 +0800
fix(Python-client):add thrift client timeout support (#2304)
This PR fixes a critical issue where Python client Thrift requests were
being dropped by the server
when the `rpc_request_dropped_before_execution_when_timeout` parameter is
enabled, due to
the `client_timeout` field not being properly set in the Thrift header.
---
python-client/pypegasus/operate/packet.py | 3 ++-
python-client/pypegasus/pgclient.py | 6 +++---
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/python-client/pypegasus/operate/packet.py
b/python-client/pypegasus/operate/packet.py
index fd5000dd4..d49edacf9 100644
--- a/python-client/pypegasus/operate/packet.py
+++ b/python-client/pypegasus/operate/packet.py
@@ -74,7 +74,8 @@ class ClientOperator(object):
self.request = request
self.response = None
- def prepare_thrift_header(self, body_length):
+ def prepare_thrift_header(self, body_length, timeout):
+ self.header.client_timeout = max(0, timeout)
self.header.body_length = body_length
self.header.thread_hash =
tools.dsn_gpid_to_thread_hash(self.header.app_id, self.header.partition_index)
return self.header.to_bytes()
diff --git a/python-client/pypegasus/pgclient.py
b/python-client/pypegasus/pgclient.py
index 9960f44c2..a897b345a 100644
--- a/python-client/pypegasus/pgclient.py
+++ b/python-client/pypegasus/pgclient.py
@@ -107,7 +107,7 @@ class BaseSession(object):
self._requests[seqid] = dr
# ds(deferred send) will wait dr(deferred receive)
- ds = defer.maybeDeferred(self.send_req, op, seqid)
+ ds = defer.maybeDeferred(self.send_req, op, seqid, timeout)
ds.addCallbacks(
callback=self.cb_send,
callbackArgs=(seqid,),
@@ -116,13 +116,13 @@ class BaseSession(object):
ds.addTimeout(timeout/1000.0, reactor, self.on_timeout)
return ds
- def send_req(self, op, seqid):
+ def send_req(self, op, seqid, timeout):
oprot = self._oprot_factory.getProtocol(self._transport)
oprot.trans.seek(ThriftHeader.HEADER_LENGTH) # skip
header
op.send_data(oprot, seqid)
body_length = oprot.trans.tell() - ThriftHeader.HEADER_LENGTH
oprot.trans.seek(0) # back
to header
- oprot.trans.write(op.prepare_thrift_header(body_length))
+ oprot.trans.write(op.prepare_thrift_header(body_length, timeout))
oprot.trans.flush()
def recv_ACK(self, iprot, mtype, rseqid, errno, result_type, parser):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]