This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 13416ea03d0d36f0e87954a0450994bf3b46fec9 Author: Yunze Xu <[email protected]> AuthorDate: Thu Sep 2 22:07:11 2021 +0800 Fix incorrect connect timeout (#11889) Fixes #10721 ### Motivation Currently the connection timer stops after the TCP connection is established immediately (the state becomes `TcpConnected`). However, the connect phase should also include sending the `CommandConnect` request and receiving the `CommandConnected` response from broker successfully. For example, in a case like #10721 described, if the Pulsar broker received a SIGSTOP signal and became pending, the TCP connection can be established but no response could be received from a stopped broker. [...] ### Modifications Stop the connection timer only after the `ClientConnection`'s state becomes `Ready`, which means the client has received the `CommandConnected` response successfully. It's also consistent with Java client's implementation, see https://github.com/apache/pulsar/blob/235e678a56d0284e68b45e46706b6237d7c6d5f9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L329-L330 ### Verifying this change - [ ] Make sure that the change passes the CI checks. It's hard to simulate the scenario in unit test. But we can simply reproduce it in local env. 1. Run a pulsar standalone and send `SIGSTOP` signal by `pkill -SIGSTOP -f pulsar`. 2. Run any C++/Python client with this fix to connect to the standalone. ``` >>> import pulsar >>> c = pulsar.Client('pulsar://localhost:6650') >>> c.create_producer('xxx') 2021-09-02 11:54:38.828 INFO [0x10d72ae00] ClientConnection:181 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000 2021-09-02 11:54:38.828 INFO [0x10d72ae00] ConnectionPool:96 | Created connection for pulsar://localhost:6650 2021-09-02 11:54:38.969 INFO [0x700003136000] ClientConnection:367 | [127.0.0.1:62784 -> 127.0.0.1:6650] Connected to broker 2021-09-02 11:54:48.973 ERROR [0x700003136000] ClientConnection:532 | [127.0.0.1:62784 -> 127.0.0.1:6650] Connection was not established in 10000 ms, close the socket 2021-09-02 11:54:48.973 ERROR [0x700003136000] ClientConnection:572 | [127.0.0.1:62784 -> 127.0.0.1:6650] Read failed: Operation canceled 2021-09-02 11:54:48.973 INFO [0x700003136000] ClientConnection:1495 | [127.0.0.1:62784 -> 127.0.0.1:6650] Connection closed 2021-09-02 11:54:48.973 ERROR [0x700003136000] ClientImpl:188 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/xxx -- ConnectError Traceback (most recent call last): # ... _pulsar.ConnectError: Pulsar error: ConnectError ``` we can see after 10 seconds (the default connect timeout), it failed. (cherry picked from commit cdd0f41f90ee0a8a03201adf759af056b065a547) --- pulsar-client-cpp/lib/ClientConnection.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 43f358f..3705bc2 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -277,6 +277,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte } state_ = Ready; + connectTimeoutTask_->stop(); serverProtocolVersion_ = cmdConnected.protocol_version(); connectPromise_.setValue(shared_from_this()); @@ -377,7 +378,6 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_); } state_ = TcpConnected; - connectTimeoutTask_->stop(); boost::system::error_code error; socket_->set_option(tcp::no_delay(true), error); @@ -536,7 +536,7 @@ void ClientConnection::handleResolve(const boost::system::error_code& err, auto self = shared_from_this(); connectTimeoutTask_->setCallback([this, self](const PeriodicTask::ErrorCode& ec) { - if (state_ != TcpConnected) { + if (state_ != Ready) { LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs() << " ms, close the socket"); PeriodicTask::ErrorCode err;
