This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new f2c580b  Fix pending requests failed with ResultConnectError when 
disconnecting (#322)
f2c580b is described below

commit f2c580bbd092d9f441a9dba1e0e6bdb71f1f3392
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Oct 7 00:19:35 2023 +0800

    Fix pending requests failed with ResultConnectError when disconnecting 
(#322)
    
    ### Motivation
    
    When there are multiple pending requests in the same `ClientConnection`,
    if one request failed with a retryable error, e.g. the
    `ServiceUnitNotReady` error when finding the owner broker of a topic,
    the socket will be closed in `checkServerError` and `close()` will be
    called subsequently in `handleRead` (`err` is `eof` or
    `operation_failed`). However, the default value of 1st parameter is
    `ResultConnectError` for `close`, which is not retryable.
    
    ### Modifications
    
    If the connection is disconnected by the client, pass
    `ResultDisconnected` to `close` and treat it as retryable.
    
    closeSocket is replaced with close(ResultDisconnected) to avoid the 
connection being the status that socket is closed but TLS stream is not closed.
---
 lib/ClientConnection.cc  | 57 +++++++++++++++++++++++-------------------------
 lib/ClientConnection.h   | 11 +++++++++-
 lib/ConsumerImpl.cc      |  3 ++-
 lib/HandlerBase.cc       |  5 +++--
 lib/ProducerImpl.cc      |  3 ++-
 lib/ResultUtils.h        | 29 ++++++++++++++++++++++++
 lib/RetryableOperation.h |  3 ++-
 7 files changed, 75 insertions(+), 36 deletions(-)

diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index f32ed81..6d18780 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -31,6 +31,7 @@
 #include "OpSendMsg.h"
 #include "ProducerImpl.h"
 #include "PulsarApi.pb.h"
+#include "ResultUtils.h"
 #include "Url.h"
 #include "auth/InitialAuthData.h"
 #include "checksum/ChecksumProvider.h"
@@ -205,7 +206,7 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
                     ctx.load_verify_file(trustCertFilePath);
                 } else {
                     LOG_ERROR(trustCertFilePath << ": No such trustCertFile");
-                    close();
+                    close(ResultAuthenticationError, false);
                     return;
                 }
             } else {
@@ -215,7 +216,7 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
 
         if (!authentication_) {
             LOG_ERROR("Invalid authentication plugin");
-            close();
+            close(ResultAuthenticationError, false);
             return;
         }
 
@@ -229,12 +230,12 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
             tlsPrivateKey = authData->getTlsPrivateKey();
             if (!file_exists(tlsCertificates)) {
                 LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
-                close();
+                close(ResultAuthenticationError, false);
                 return;
             }
             if (!file_exists(tlsCertificates)) {
                 LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
-                close();
+                close(ResultAuthenticationError, false);
                 return;
             }
             ctx.use_private_key_file(tlsPrivateKey, 
boost::asio::ssl::context::pem);
@@ -660,7 +661,7 @@ void ClientConnection::handleRead(const 
boost::system::error_code& err, size_t b
         } else {
             LOG_ERROR(cnxString_ << "Read operation failed: " << 
err.message());
         }
-        close();
+        close(ResultDisconnected);
     } else if (bytesTransferred < minReadSize) {
         // Read the remaining part, use a slice of buffer to write on the next
         // region
@@ -718,7 +719,7 @@ void ClientConnection::processIncomingBuffer() {
         proto::BaseCommand incomingCmd;
         if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
             LOG_ERROR(cnxString_ << "Error parsing protocol buffer command");
-            close();
+            close(ResultDisconnected);
             return;
         }
 
@@ -742,7 +743,7 @@ void ClientConnection::processIncomingBuffer() {
                                          << 
incomingCmd.message().message_id().ledgerid() << ", entry id "
                                          << 
incomingCmd.message().message_id().entryid()
                                          << "] Error parsing broker entry 
metadata");
-                    close();
+                    close(ResultDisconnected);
                     return;
                 }
                 incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + 
brokerEntryMetadataSize);
@@ -760,7 +761,7 @@ void ClientConnection::processIncomingBuffer() {
                                      << 
incomingCmd.message().message_id().ledgerid()  //
                                      << ", entry id " << 
incomingCmd.message().message_id().entryid()
                                      << "] Error parsing message metadata");
-                close();
+                close(ResultDisconnected);
                 return;
             }
 
@@ -991,7 +992,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& 
incomingCmd) {
 
                 default:
                     LOG_WARN(cnxString_ << "Received invalid message from 
server");
-                    close();
+                    close(ResultDisconnected);
                     break;
             }
         }
@@ -1133,7 +1134,7 @@ void ClientConnection::sendMessage(const 
std::shared_ptr<SendArguments>& args) {
 void ClientConnection::handleSend(const boost::system::error_code& err, const 
SharedBuffer&) {
     if (err) {
         LOG_WARN(cnxString_ << "Could not send message on connection: " << err 
<< " " << err.message());
-        close();
+        close(ResultDisconnected);
     } else {
         sendPendingCommands();
     }
@@ -1142,7 +1143,7 @@ void ClientConnection::handleSend(const 
boost::system::error_code& err, const Sh
 void ClientConnection::handleSendPair(const boost::system::error_code& err) {
     if (err) {
         LOG_WARN(cnxString_ << "Could not send pair message on connection: " 
<< err << " " << err.message());
-        close();
+        close(ResultDisconnected);
     } else {
         sendPendingCommands();
     }
@@ -1247,7 +1248,7 @@ void ClientConnection::handleKeepAliveTimeout() {
 
     if (havePendingPingRequest_) {
         LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive 
timeout");
-        close();
+        close(ResultDisconnected);
     } else {
         // Send keep alive probe to peer
         LOG_DEBUG(cnxString_ << "Sending ping message");
@@ -1287,7 +1288,14 @@ void ClientConnection::close(Result result, bool detach) 
{
     }
     state_ = Disconnected;
 
-    closeSocket();
+    if (socket_) {
+        boost::system::error_code err;
+        socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
+        socket_->close(err);
+        if (err) {
+            LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
+        }
+    }
     if (tlsSocket_) {
         boost::system::error_code err;
         tlsSocket_->lowest_layer().close(err);
@@ -1326,7 +1334,7 @@ void ClientConnection::close(Result result, bool detach) {
     }
 
     lock.unlock();
-    if (result != ResultDisconnected && result != ResultRetryable) {
+    if (!isResultRetryable(result)) {
         LOG_ERROR(cnxString_ << "Connection closed with " << result);
     } else {
         LOG_INFO(cnxString_ << "Connection disconnected");
@@ -1473,26 +1481,15 @@ Future<Result, SchemaInfo> 
ClientConnection::newGetSchema(const std::string& top
     return promise.getFuture();
 }
 
-void ClientConnection::closeSocket() {
-    boost::system::error_code err;
-    if (socket_) {
-        socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
-        socket_->close(err);
-        if (err) {
-            LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
-        }
-    }
-}
-
 void ClientConnection::checkServerError(ServerError error) {
     switch (error) {
         case proto::ServerError::ServiceNotReady:
-            closeSocket();
+            close(ResultDisconnected);
             break;
         case proto::ServerError::TooManyRequests:
             // TODO: Implement maxNumberOfRejectedRequestPerConnection like
             // https://github.com/apache/pulsar/pull/274
-            closeSocket();
+            close(ResultDisconnected);
             break;
         default:
             break;
@@ -1518,7 +1515,7 @@ void ClientConnection::handleSendReceipt(const 
proto::CommandSendReceipt& sendRe
             if (!producer->ackReceived(sequenceId, messageId)) {
                 // If the producer fails to process the ack, we need to close 
the connection
                 // to give it a chance to recover from there
-                close();
+                close(ResultDisconnected);
             }
         }
     } else {
@@ -1542,12 +1539,12 @@ void ClientConnection::handleSendError(const 
proto::CommandSendError& error) {
                 if (!producer->removeCorruptMessage(sequenceId)) {
                     // If the producer fails to remove corrupt msg, we need to 
close the
                     // connection to give it a chance to recover from there
-                    close();
+                    close(ResultDisconnected);
                 }
             }
         }
     } else {
-        close();
+        close(ResultDisconnected);
     }
 }
 
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index e74a3ae..30ea8d8 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -142,6 +142,16 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      */
     void tcpConnectAsync();
 
+    /**
+     * Close the connection.
+     *
+     * @param result all pending futures will complete with this result
+     * @param detach remove it from the pool if it's true
+     *
+     * `detach` should only be false when:
+     * 1. Before the connection is put into the pool, i.e. during the 
construction.
+     * 2. When the connection pool is closed
+     */
     void close(Result result = ResultConnectError, bool detach = true);
 
     bool isClosed() const;
@@ -392,7 +402,6 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     ConnectionPool& pool_;
     friend class PulsarFriend;
 
-    void closeSocket();
     void checkServerError(ServerError error);
 
     void handleSendReceipt(const proto::CommandSendReceipt&);
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index c9d2452..d82cf78 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -42,6 +42,7 @@
 #include "MessagesImpl.h"
 #include "ProducerConfigurationImpl.h"
 #include "PulsarApi.pb.h"
+#include "ResultUtils.h"
 #include "TimeUtils.h"
 #include "TopicName.h"
 #include "UnAckedMessageTrackerDisabled.h"
@@ -319,7 +320,7 @@ void ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result r
         } else {
             // Consumer was not yet created, retry to connect to broker if 
it's possible
             result = convertToTimeoutIfNecessary(result, creationTimestamp_);
-            if (result == ResultRetryable) {
+            if (isResultRetryable(result)) {
                 LOG_WARN(getName() << "Temporary error in creating consumer: " 
<< strResult(result));
                 scheduleReconnection();
             } else {
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 163c779..14f5601 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -22,6 +22,7 @@
 #include "ClientImpl.h"
 #include "ExecutorService.h"
 #include "LogUtils.h"
+#include "ResultUtils.h"
 #include "TimeUtils.h"
 
 DECLARE_LOG_OBJECT()
@@ -110,7 +111,7 @@ void HandlerBase::handleDisconnection(Result result, const 
ClientConnectionPtr&
 
     resetCnx();
 
-    if (result == ResultRetryable) {
+    if (isResultRetryable(result)) {
         scheduleReconnection();
         return;
     }
@@ -165,7 +166,7 @@ void HandlerBase::handleTimeout(const 
boost::system::error_code& ec) {
 }
 
 Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime 
startTimestamp) const {
-    if (result == ResultRetryable && (TimeUtils::now() - startTimestamp >= 
operationTimeut_)) {
+    if (isResultRetryable(result) && (TimeUtils::now() - startTimestamp >= 
operationTimeut_)) {
         return ResultTimeout;
     } else {
         return result;
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 6b3f5cb..3166c19 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -36,6 +36,7 @@
 #include "OpSendMsg.h"
 #include "ProducerConfigurationImpl.h"
 #include "PulsarApi.pb.h"
+#include "ResultUtils.h"
 #include "Semaphore.h"
 #include "TimeUtils.h"
 #include "TopicName.h"
@@ -272,7 +273,7 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
         } else {
             // Producer was not yet created, retry to connect to broker if 
it's possible
             result = convertToTimeoutIfNecessary(result, creationTimestamp_);
-            if (result == ResultRetryable) {
+            if (isResultRetryable(result)) {
                 LOG_WARN(getName() << "Temporary error in creating producer: " 
<< strResult(result));
                 scheduleReconnection();
             } else {
diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h
new file mode 100644
index 0000000..b5ec6cd
--- /dev/null
+++ b/lib/ResultUtils.h
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/Result.h>
+
+namespace pulsar {
+
+inline bool isResultRetryable(Result result) {
+    return result == ResultRetryable || result == ResultDisconnected;
+}
+
+}  // namespace pulsar
diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h
index 8bebb5b..d026e42 100644
--- a/lib/RetryableOperation.h
+++ b/lib/RetryableOperation.h
@@ -29,6 +29,7 @@
 #include "ExecutorService.h"
 #include "Future.h"
 #include "LogUtils.h"
+#include "ResultUtils.h"
 
 namespace pulsar {
 
@@ -95,7 +96,7 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
                 promise_.setValue(value);
                 return;
             }
-            if (result != ResultRetryable) {
+            if (!isResultRetryable(result)) {
                 promise_.setFailed(result);
                 return;
             }

Reply via email to