This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new f2c580b Fix pending requests failed with ResultConnectError when
disconnecting (#322)
f2c580b is described below
commit f2c580bbd092d9f441a9dba1e0e6bdb71f1f3392
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Oct 7 00:19:35 2023 +0800
Fix pending requests failed with ResultConnectError when disconnecting
(#322)
### Motivation
When there are multiple pending requests in the same `ClientConnection`,
if one request failed with a retryable error, e.g. the
`ServiceUnitNotReady` error when finding the owner broker of a topic,
the socket will be closed in `checkServerError` and `close()` will be
called subsequently in `handleRead` (`err` is `eof` or
`operation_failed`). However, the default value of 1st parameter is
`ResultConnectError` for `close`, which is not retryable.
### Modifications
If the connection is disconnected by the client, pass
`ResultDisconnected` to `close` and treat it as retryable.
closeSocket is replaced with close(ResultDisconnected) to avoid the
connection being the status that socket is closed but TLS stream is not closed.
---
lib/ClientConnection.cc | 57 +++++++++++++++++++++++-------------------------
lib/ClientConnection.h | 11 +++++++++-
lib/ConsumerImpl.cc | 3 ++-
lib/HandlerBase.cc | 5 +++--
lib/ProducerImpl.cc | 3 ++-
lib/ResultUtils.h | 29 ++++++++++++++++++++++++
lib/RetryableOperation.h | 3 ++-
7 files changed, 75 insertions(+), 36 deletions(-)
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index f32ed81..6d18780 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -31,6 +31,7 @@
#include "OpSendMsg.h"
#include "ProducerImpl.h"
#include "PulsarApi.pb.h"
+#include "ResultUtils.h"
#include "Url.h"
#include "auth/InitialAuthData.h"
#include "checksum/ChecksumProvider.h"
@@ -205,7 +206,7 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
ctx.load_verify_file(trustCertFilePath);
} else {
LOG_ERROR(trustCertFilePath << ": No such trustCertFile");
- close();
+ close(ResultAuthenticationError, false);
return;
}
} else {
@@ -215,7 +216,7 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
if (!authentication_) {
LOG_ERROR("Invalid authentication plugin");
- close();
+ close(ResultAuthenticationError, false);
return;
}
@@ -229,12 +230,12 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
tlsPrivateKey = authData->getTlsPrivateKey();
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
- close();
+ close(ResultAuthenticationError, false);
return;
}
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
- close();
+ close(ResultAuthenticationError, false);
return;
}
ctx.use_private_key_file(tlsPrivateKey,
boost::asio::ssl::context::pem);
@@ -660,7 +661,7 @@ void ClientConnection::handleRead(const
boost::system::error_code& err, size_t b
} else {
LOG_ERROR(cnxString_ << "Read operation failed: " <<
err.message());
}
- close();
+ close(ResultDisconnected);
} else if (bytesTransferred < minReadSize) {
// Read the remaining part, use a slice of buffer to write on the next
// region
@@ -718,7 +719,7 @@ void ClientConnection::processIncomingBuffer() {
proto::BaseCommand incomingCmd;
if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
LOG_ERROR(cnxString_ << "Error parsing protocol buffer command");
- close();
+ close(ResultDisconnected);
return;
}
@@ -742,7 +743,7 @@ void ClientConnection::processIncomingBuffer() {
<<
incomingCmd.message().message_id().ledgerid() << ", entry id "
<<
incomingCmd.message().message_id().entryid()
<< "] Error parsing broker entry
metadata");
- close();
+ close(ResultDisconnected);
return;
}
incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 +
brokerEntryMetadataSize);
@@ -760,7 +761,7 @@ void ClientConnection::processIncomingBuffer() {
<<
incomingCmd.message().message_id().ledgerid() //
<< ", entry id " <<
incomingCmd.message().message_id().entryid()
<< "] Error parsing message metadata");
- close();
+ close(ResultDisconnected);
return;
}
@@ -991,7 +992,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand&
incomingCmd) {
default:
LOG_WARN(cnxString_ << "Received invalid message from
server");
- close();
+ close(ResultDisconnected);
break;
}
}
@@ -1133,7 +1134,7 @@ void ClientConnection::sendMessage(const
std::shared_ptr<SendArguments>& args) {
void ClientConnection::handleSend(const boost::system::error_code& err, const
SharedBuffer&) {
if (err) {
LOG_WARN(cnxString_ << "Could not send message on connection: " << err
<< " " << err.message());
- close();
+ close(ResultDisconnected);
} else {
sendPendingCommands();
}
@@ -1142,7 +1143,7 @@ void ClientConnection::handleSend(const
boost::system::error_code& err, const Sh
void ClientConnection::handleSendPair(const boost::system::error_code& err) {
if (err) {
LOG_WARN(cnxString_ << "Could not send pair message on connection: "
<< err << " " << err.message());
- close();
+ close(ResultDisconnected);
} else {
sendPendingCommands();
}
@@ -1247,7 +1248,7 @@ void ClientConnection::handleKeepAliveTimeout() {
if (havePendingPingRequest_) {
LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive
timeout");
- close();
+ close(ResultDisconnected);
} else {
// Send keep alive probe to peer
LOG_DEBUG(cnxString_ << "Sending ping message");
@@ -1287,7 +1288,14 @@ void ClientConnection::close(Result result, bool detach)
{
}
state_ = Disconnected;
- closeSocket();
+ if (socket_) {
+ boost::system::error_code err;
+ socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
+ socket_->close(err);
+ if (err) {
+ LOG_WARN(cnxString_ << "Failed to close socket: " <<
err.message());
+ }
+ }
if (tlsSocket_) {
boost::system::error_code err;
tlsSocket_->lowest_layer().close(err);
@@ -1326,7 +1334,7 @@ void ClientConnection::close(Result result, bool detach) {
}
lock.unlock();
- if (result != ResultDisconnected && result != ResultRetryable) {
+ if (!isResultRetryable(result)) {
LOG_ERROR(cnxString_ << "Connection closed with " << result);
} else {
LOG_INFO(cnxString_ << "Connection disconnected");
@@ -1473,26 +1481,15 @@ Future<Result, SchemaInfo>
ClientConnection::newGetSchema(const std::string& top
return promise.getFuture();
}
-void ClientConnection::closeSocket() {
- boost::system::error_code err;
- if (socket_) {
- socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
- socket_->close(err);
- if (err) {
- LOG_WARN(cnxString_ << "Failed to close socket: " <<
err.message());
- }
- }
-}
-
void ClientConnection::checkServerError(ServerError error) {
switch (error) {
case proto::ServerError::ServiceNotReady:
- closeSocket();
+ close(ResultDisconnected);
break;
case proto::ServerError::TooManyRequests:
// TODO: Implement maxNumberOfRejectedRequestPerConnection like
// https://github.com/apache/pulsar/pull/274
- closeSocket();
+ close(ResultDisconnected);
break;
default:
break;
@@ -1518,7 +1515,7 @@ void ClientConnection::handleSendReceipt(const
proto::CommandSendReceipt& sendRe
if (!producer->ackReceived(sequenceId, messageId)) {
// If the producer fails to process the ack, we need to close
the connection
// to give it a chance to recover from there
- close();
+ close(ResultDisconnected);
}
}
} else {
@@ -1542,12 +1539,12 @@ void ClientConnection::handleSendError(const
proto::CommandSendError& error) {
if (!producer->removeCorruptMessage(sequenceId)) {
// If the producer fails to remove corrupt msg, we need to
close the
// connection to give it a chance to recover from there
- close();
+ close(ResultDisconnected);
}
}
}
} else {
- close();
+ close(ResultDisconnected);
}
}
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index e74a3ae..30ea8d8 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -142,6 +142,16 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
*/
void tcpConnectAsync();
+ /**
+ * Close the connection.
+ *
+ * @param result all pending futures will complete with this result
+ * @param detach remove it from the pool if it's true
+ *
+ * `detach` should only be false when:
+ * 1. Before the connection is put into the pool, i.e. during the
construction.
+ * 2. When the connection pool is closed
+ */
void close(Result result = ResultConnectError, bool detach = true);
bool isClosed() const;
@@ -392,7 +402,6 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
ConnectionPool& pool_;
friend class PulsarFriend;
- void closeSocket();
void checkServerError(ServerError error);
void handleSendReceipt(const proto::CommandSendReceipt&);
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index c9d2452..d82cf78 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -42,6 +42,7 @@
#include "MessagesImpl.h"
#include "ProducerConfigurationImpl.h"
#include "PulsarApi.pb.h"
+#include "ResultUtils.h"
#include "TimeUtils.h"
#include "TopicName.h"
#include "UnAckedMessageTrackerDisabled.h"
@@ -319,7 +320,7 @@ void ConsumerImpl::handleCreateConsumer(const
ClientConnectionPtr& cnx, Result r
} else {
// Consumer was not yet created, retry to connect to broker if
it's possible
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
- if (result == ResultRetryable) {
+ if (isResultRetryable(result)) {
LOG_WARN(getName() << "Temporary error in creating consumer: "
<< strResult(result));
scheduleReconnection();
} else {
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 163c779..14f5601 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -22,6 +22,7 @@
#include "ClientImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
+#include "ResultUtils.h"
#include "TimeUtils.h"
DECLARE_LOG_OBJECT()
@@ -110,7 +111,7 @@ void HandlerBase::handleDisconnection(Result result, const
ClientConnectionPtr&
resetCnx();
- if (result == ResultRetryable) {
+ if (isResultRetryable(result)) {
scheduleReconnection();
return;
}
@@ -165,7 +166,7 @@ void HandlerBase::handleTimeout(const
boost::system::error_code& ec) {
}
Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime
startTimestamp) const {
- if (result == ResultRetryable && (TimeUtils::now() - startTimestamp >=
operationTimeut_)) {
+ if (isResultRetryable(result) && (TimeUtils::now() - startTimestamp >=
operationTimeut_)) {
return ResultTimeout;
} else {
return result;
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 6b3f5cb..3166c19 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -36,6 +36,7 @@
#include "OpSendMsg.h"
#include "ProducerConfigurationImpl.h"
#include "PulsarApi.pb.h"
+#include "ResultUtils.h"
#include "Semaphore.h"
#include "TimeUtils.h"
#include "TopicName.h"
@@ -272,7 +273,7 @@ void ProducerImpl::handleCreateProducer(const
ClientConnectionPtr& cnx, Result r
} else {
// Producer was not yet created, retry to connect to broker if
it's possible
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
- if (result == ResultRetryable) {
+ if (isResultRetryable(result)) {
LOG_WARN(getName() << "Temporary error in creating producer: "
<< strResult(result));
scheduleReconnection();
} else {
diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h
new file mode 100644
index 0000000..b5ec6cd
--- /dev/null
+++ b/lib/ResultUtils.h
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/Result.h>
+
+namespace pulsar {
+
+inline bool isResultRetryable(Result result) {
+ return result == ResultRetryable || result == ResultDisconnected;
+}
+
+} // namespace pulsar
diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h
index 8bebb5b..d026e42 100644
--- a/lib/RetryableOperation.h
+++ b/lib/RetryableOperation.h
@@ -29,6 +29,7 @@
#include "ExecutorService.h"
#include "Future.h"
#include "LogUtils.h"
+#include "ResultUtils.h"
namespace pulsar {
@@ -95,7 +96,7 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
promise_.setValue(value);
return;
}
- if (result != ResultRetryable) {
+ if (!isResultRetryable(result)) {
promise_.setFailed(result);
return;
}