This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new e6b9d09  [refactor] Handle responses in methods instead of switch 
cases (#197)
e6b9d09 is described below

commit e6b9d091892da0d911e89b54b36f643cf8d4efa7
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Feb 20 22:07:43 2023 +0800

    [refactor] Handle responses in methods instead of switch cases (#197)
    
    ### Motivation
    
    The C++ Client handles responses in a huge switch block in
    `ClientConnection::handleIncomingCommand`, which has 553 lines
    currently. It's not good to maintain since we have some responses to
    handle in future.
    
    ### Modifications
    
    Add a series of `handleXXX` methods to handle responses in
    `ClientConnection`.
---
 lib/ClientConnection.cc | 952 ++++++++++++++++++++++++------------------------
 lib/ClientConnection.h  |  28 ++
 2 files changed, 503 insertions(+), 477 deletions(-)

diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index d908537..7abd295 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -767,6 +767,8 @@ bool ClientConnection::verifyChecksum(SharedBuffer& 
incomingBuffer_, uint32_t& r
 }
 
 void ClientConnection::handleActiveConsumerChange(const 
proto::CommandActiveConsumerChange& change) {
+    LOG_DEBUG(cnxString_ << "Received notification about active consumer 
change, consumer_id: "
+                         << change.consumer_id() << " isActive: " << 
change.is_active());
     Lock lock(mutex_);
     ConsumersMap::iterator it = consumers_.find(change.consumer_id());
     if (it != consumers_.end()) {
@@ -843,524 +845,80 @@ void 
ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
 
             // Handle normal commands
             switch (incomingCmd.type()) {
-                case BaseCommand::SEND_RECEIPT: {
-                    const auto& sendReceipt = incomingCmd.send_receipt();
-                    int producerId = sendReceipt.producer_id();
-                    uint64_t sequenceId = sendReceipt.sequence_id();
-                    const proto::MessageIdData& messageIdData = 
sendReceipt.message_id();
-                    auto messageId = toMessageId(messageIdData);
-
-                    LOG_DEBUG(cnxString_ << "Got receipt for producer: " << 
producerId
-                                         << " -- msg: " << sequenceId << "-- 
message id: " << messageId);
-
-                    Lock lock(mutex_);
-                    ProducersMap::iterator it = producers_.find(producerId);
-                    if (it != producers_.end()) {
-                        ProducerImplPtr producer = it->second.lock();
-                        lock.unlock();
-
-                        if (producer) {
-                            if (!producer->ackReceived(sequenceId, messageId)) 
{
-                                // If the producer fails to process the ack, 
we need to close the connection
-                                // to give it a chance to recover from there
-                                close();
-                            }
-                        }
-                    } else {
-                        LOG_ERROR(cnxString_ << "Got invalid producer Id in 
SendReceipt: "  //
-                                             << producerId << " -- msg: " << 
sequenceId);
-                    }
-
+                case BaseCommand::SEND_RECEIPT:
+                    handleSendReceipt(incomingCmd.send_receipt());
                     break;
-                }
 
-                case BaseCommand::SEND_ERROR: {
-                    const auto& error = incomingCmd.send_error();
-                    LOG_WARN(cnxString_ << "Received send error from server: " 
<< error.message());
-                    if (ChecksumError == error.error()) {
-                        long producerId = error.producer_id();
-                        long sequenceId = error.sequence_id();
-                        Lock lock(mutex_);
-                        ProducersMap::iterator it = 
producers_.find(producerId);
-                        if (it != producers_.end()) {
-                            ProducerImplPtr producer = it->second.lock();
-                            lock.unlock();
-
-                            if (producer) {
-                                if 
(!producer->removeCorruptMessage(sequenceId)) {
-                                    // If the producer fails to remove corrupt 
msg, we need to close the
-                                    // connection to give it a chance to 
recover from there
-                                    close();
-                                }
-                            }
-                        }
-                    } else {
-                        close();
-                    }
+                case BaseCommand::SEND_ERROR:
+                    handleSendError(incomingCmd.send_error());
                     break;
-                }
 
-                case BaseCommand::SUCCESS: {
-                    const auto& success = incomingCmd.success();
-                    LOG_DEBUG(cnxString_ << "Received success response from 
server. req_id: "
-                                         << success.request_id());
-
-                    Lock lock(mutex_);
-                    PendingRequestsMap::iterator it = 
pendingRequests_.find(success.request_id());
-                    if (it != pendingRequests_.end()) {
-                        PendingRequestData requestData = it->second;
-                        pendingRequests_.erase(it);
-                        lock.unlock();
-
-                        requestData.promise.setValue({});
-                        requestData.timer->cancel();
-                    }
+                case BaseCommand::SUCCESS:
+                    handleSuccess(incomingCmd.success());
                     break;
-                }
 
-                case BaseCommand::PARTITIONED_METADATA_RESPONSE: {
-                    const auto& partitionMetadataResponse = 
incomingCmd.partitionmetadataresponse();
-                    LOG_DEBUG(cnxString_ << "Received partition-metadata 
response from server. req_id: "
-                                         << 
partitionMetadataResponse.request_id());
-
-                    Lock lock(mutex_);
-                    PendingLookupRequestsMap::iterator it =
-                        
pendingLookupRequests_.find(partitionMetadataResponse.request_id());
-                    if (it != pendingLookupRequests_.end()) {
-                        it->second.timer->cancel();
-                        LookupDataResultPromisePtr lookupDataPromise = 
it->second.promise;
-                        pendingLookupRequests_.erase(it);
-                        numOfPendingLookupRequest_--;
-                        lock.unlock();
-
-                        if (!partitionMetadataResponse.has_response() ||
-                            (partitionMetadataResponse.response() ==
-                             
proto::CommandPartitionedTopicMetadataResponse::Failed)) {
-                            if (partitionMetadataResponse.has_error()) {
-                                LOG_ERROR(cnxString_ << "Failed 
partition-metadata lookup req_id: "
-                                                     << 
partitionMetadataResponse.request_id()
-                                                     << " error: " << 
partitionMetadataResponse.error()
-                                                     << " msg: " << 
partitionMetadataResponse.message());
-                                
checkServerError(partitionMetadataResponse.error());
-                                
lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error(),
-                                                                       
partitionMetadataResponse.message()));
-                            } else {
-                                LOG_ERROR(cnxString_ << "Failed 
partition-metadata lookup req_id: "
-                                                     << 
partitionMetadataResponse.request_id()
-                                                     << " with empty response: 
");
-                                
lookupDataPromise->setFailed(ResultConnectError);
-                            }
-                        } else {
-                            LookupDataResultPtr lookupResultPtr = 
std::make_shared<LookupDataResult>();
-                            
lookupResultPtr->setPartitions(partitionMetadataResponse.partitions());
-                            lookupDataPromise->setValue(lookupResultPtr);
-                        }
-
-                    } else {
-                        LOG_WARN("Received unknown request id from server: "
-                                 << partitionMetadataResponse.request_id());
-                    }
+                case BaseCommand::PARTITIONED_METADATA_RESPONSE:
+                    
handlePartitionedMetadataResponse(incomingCmd.partitionmetadataresponse());
                     break;
-                }
 
-                case BaseCommand::CONSUMER_STATS_RESPONSE: {
-                    const auto& consumerStatsResponse = 
incomingCmd.consumerstatsresponse();
-                    LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - 
Received consumer stats "
-                                            "response from server. req_id: "
-                                         << 
consumerStatsResponse.request_id());
-                    Lock lock(mutex_);
-                    PendingConsumerStatsMap::iterator it =
-                        
pendingConsumerStatsMap_.find(consumerStatsResponse.request_id());
-                    if (it != pendingConsumerStatsMap_.end()) {
-                        Promise<Result, BrokerConsumerStatsImpl> 
consumerStatsPromise = it->second;
-                        pendingConsumerStatsMap_.erase(it);
-                        lock.unlock();
-
-                        if (consumerStatsResponse.has_error_code()) {
-                            if (consumerStatsResponse.has_error_message()) {
-                                LOG_ERROR(cnxString_ << " Failed to get 
consumer stats - "
-                                                     << 
consumerStatsResponse.error_message());
-                            }
-                            
consumerStatsPromise.setFailed(getResult(consumerStatsResponse.error_code(),
-                                                                     
consumerStatsResponse.error_message()));
-                        } else {
-                            LOG_DEBUG(cnxString_ << "ConsumerStatsResponse 
command - Received consumer stats "
-                                                    "response from server. 
req_id: "
-                                                 << 
consumerStatsResponse.request_id() << " Stats: ");
-                            BrokerConsumerStatsImpl brokerStats(
-                                consumerStatsResponse.msgrateout(), 
consumerStatsResponse.msgthroughputout(),
-                                consumerStatsResponse.msgrateredeliver(),
-                                consumerStatsResponse.consumername(),
-                                consumerStatsResponse.availablepermits(),
-                                consumerStatsResponse.unackedmessages(),
-                                
consumerStatsResponse.blockedconsumeronunackedmsgs(),
-                                consumerStatsResponse.address(), 
consumerStatsResponse.connectedsince(),
-                                consumerStatsResponse.type(), 
consumerStatsResponse.msgrateexpired(),
-                                consumerStatsResponse.msgbacklog());
-                            consumerStatsPromise.setValue(brokerStats);
-                        }
-                    } else {
-                        LOG_WARN("ConsumerStatsResponse command - Received 
unknown request id from server: "
-                                 << consumerStatsResponse.request_id());
-                    }
+                case BaseCommand::CONSUMER_STATS_RESPONSE:
+                    
handleConsumerStatsResponse(incomingCmd.consumerstatsresponse());
                     break;
-                }
 
-                case BaseCommand::LOOKUP_RESPONSE: {
-                    const auto& lookupTopicResponse = 
incomingCmd.lookuptopicresponse();
-                    LOG_DEBUG(cnxString_ << "Received lookup response from 
server. req_id: "
-                                         << lookupTopicResponse.request_id());
-
-                    Lock lock(mutex_);
-                    PendingLookupRequestsMap::iterator it =
-                        
pendingLookupRequests_.find(lookupTopicResponse.request_id());
-                    if (it != pendingLookupRequests_.end()) {
-                        it->second.timer->cancel();
-                        LookupDataResultPromisePtr lookupDataPromise = 
it->second.promise;
-                        pendingLookupRequests_.erase(it);
-                        numOfPendingLookupRequest_--;
-                        lock.unlock();
-
-                        if (!lookupTopicResponse.has_response() ||
-                            (lookupTopicResponse.response() == 
proto::CommandLookupTopicResponse::Failed)) {
-                            if (lookupTopicResponse.has_error()) {
-                                LOG_ERROR(cnxString_
-                                          << "Failed lookup req_id: " << 
lookupTopicResponse.request_id()
-                                          << " error: " << 
lookupTopicResponse.error()
-                                          << " msg: " << 
lookupTopicResponse.message());
-                                checkServerError(lookupTopicResponse.error());
-                                lookupDataPromise->setFailed(
-                                    getResult(lookupTopicResponse.error(), 
lookupTopicResponse.message()));
-                            } else {
-                                LOG_ERROR(cnxString_
-                                          << "Failed lookup req_id: " << 
lookupTopicResponse.request_id()
-                                          << " with empty response: ");
-                                
lookupDataPromise->setFailed(ResultConnectError);
-                            }
-                        } else {
-                            LOG_DEBUG(cnxString_
-                                      << "Received lookup response from 
server. req_id: "
-                                      << lookupTopicResponse.request_id()  //
-                                      << " -- broker-url: " << 
lookupTopicResponse.brokerserviceurl()
-                                      << " -- broker-tls-url: "  //
-                                      << 
lookupTopicResponse.brokerserviceurltls()
-                                      << " authoritative: " << 
lookupTopicResponse.authoritative()  //
-                                      << " redirect: " << 
lookupTopicResponse.response());
-                            LookupDataResultPtr lookupResultPtr = 
std::make_shared<LookupDataResult>();
-
-                            if (tlsSocket_) {
-                                
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurltls());
-                            } else {
-                                
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurl());
-                            }
-
-                            
lookupResultPtr->setBrokerUrlTls(lookupTopicResponse.brokerserviceurltls());
-                            
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
-                            
lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
-                                                         
proto::CommandLookupTopicResponse::Redirect);
-                            lookupResultPtr->setShouldProxyThroughServiceUrl(
-                                
lookupTopicResponse.proxy_through_service_url());
-                            lookupDataPromise->setValue(lookupResultPtr);
-                        }
-
-                    } else {
-                        LOG_WARN(
-                            "Received unknown request id from server: " << 
lookupTopicResponse.request_id());
-                    }
+                case BaseCommand::LOOKUP_RESPONSE:
+                    
handleLookupTopicRespose(incomingCmd.lookuptopicresponse());
                     break;
-                }
 
-                case BaseCommand::PRODUCER_SUCCESS: {
-                    const auto& producerSuccess = 
incomingCmd.producer_success();
-                    LOG_DEBUG(cnxString_ << "Received success producer 
response from server. req_id: "
-                                         << producerSuccess.request_id()  //
-                                         << " -- producer name: " << 
producerSuccess.producer_name());
-
-                    Lock lock(mutex_);
-                    PendingRequestsMap::iterator it = 
pendingRequests_.find(producerSuccess.request_id());
-                    if (it != pendingRequests_.end()) {
-                        PendingRequestData requestData = it->second;
-                        if (!producerSuccess.producer_ready()) {
-                            LOG_INFO(cnxString_ << " Producer " << 
producerSuccess.producer_name()
-                                                << " has been queued up at 
broker. req_id: "
-                                                << 
producerSuccess.request_id());
-                            requestData.hasGotResponse->store(true);
-                            lock.unlock();
-                        } else {
-                            pendingRequests_.erase(it);
-                            lock.unlock();
-                            ResponseData data;
-                            data.producerName = 
producerSuccess.producer_name();
-                            data.lastSequenceId = 
producerSuccess.last_sequence_id();
-                            if (producerSuccess.has_schema_version()) {
-                                data.schemaVersion = 
producerSuccess.schema_version();
-                            }
-                            if (producerSuccess.has_topic_epoch()) {
-                                data.topicEpoch = 
boost::make_optional(producerSuccess.topic_epoch());
-                            } else {
-                                data.topicEpoch = boost::none;
-                            }
-                            requestData.promise.setValue(data);
-                            requestData.timer->cancel();
-                        }
-                    }
+                case BaseCommand::PRODUCER_SUCCESS:
+                    handleProducerSuccess(incomingCmd.producer_success());
                     break;
-                }
 
-                case BaseCommand::ERROR: {
-                    const auto& error = incomingCmd.error();
-                    Result result = getResult(error.error(), error.message());
-                    LOG_WARN(cnxString_ << "Received error response from 
server: " << result
-                                        << (error.has_message() ? (" (" + 
error.message() + ")") : "")
-                                        << " -- req_id: " << 
error.request_id());
-
-                    Lock lock(mutex_);
-
-                    PendingRequestsMap::iterator it = 
pendingRequests_.find(error.request_id());
-                    if (it != pendingRequests_.end()) {
-                        PendingRequestData requestData = it->second;
-                        pendingRequests_.erase(it);
-                        lock.unlock();
-
-                        requestData.promise.setFailed(result);
-                        requestData.timer->cancel();
-                    } else {
-                        PendingGetLastMessageIdRequestsMap::iterator it =
-                            
pendingGetLastMessageIdRequests_.find(error.request_id());
-                        if (it != pendingGetLastMessageIdRequests_.end()) {
-                            auto getLastMessageIdPromise = it->second;
-                            pendingGetLastMessageIdRequests_.erase(it);
-                            lock.unlock();
-
-                            getLastMessageIdPromise.setFailed(result);
-                        } else {
-                            PendingGetNamespaceTopicsMap::iterator it =
-                                
pendingGetNamespaceTopicsRequests_.find(error.request_id());
-                            if (it != 
pendingGetNamespaceTopicsRequests_.end()) {
-                                Promise<Result, NamespaceTopicsPtr> 
getNamespaceTopicsPromise = it->second;
-                                pendingGetNamespaceTopicsRequests_.erase(it);
-                                lock.unlock();
-
-                                getNamespaceTopicsPromise.setFailed(result);
-                            } else {
-                                lock.unlock();
-                            }
-                        }
-                    }
+                case BaseCommand::ERROR:
+                    handleError(incomingCmd.error());
                     break;
-                }
-
-                case BaseCommand::CLOSE_PRODUCER: {
-                    const auto& closeProducer = incomingCmd.close_producer();
-                    int producerId = closeProducer.producer_id();
-
-                    LOG_DEBUG("Broker notification of Closed producer: " << 
producerId);
-
-                    Lock lock(mutex_);
-                    ProducersMap::iterator it = producers_.find(producerId);
-                    if (it != producers_.end()) {
-                        ProducerImplPtr producer = it->second.lock();
-                        producers_.erase(it);
-                        lock.unlock();
-
-                        if (producer) {
-                            producer->disconnectProducer();
-                        }
-                    } else {
-                        LOG_ERROR(cnxString_ << "Got invalid producer Id in 
closeProducer command: "
-                                             << producerId);
-                    }
 
+                case BaseCommand::CLOSE_PRODUCER:
+                    handleCloseProducer(incomingCmd.close_producer());
                     break;
-                }
-
-                case BaseCommand::CLOSE_CONSUMER: {
-                    const auto& closeconsumer = incomingCmd.close_consumer();
-                    int consumerId = closeconsumer.consumer_id();
-
-                    LOG_DEBUG("Broker notification of Closed consumer: " << 
consumerId);
-
-                    Lock lock(mutex_);
-                    ConsumersMap::iterator it = consumers_.find(consumerId);
-                    if (it != consumers_.end()) {
-                        ConsumerImplPtr consumer = it->second.lock();
-                        consumers_.erase(it);
-                        lock.unlock();
-
-                        if (consumer) {
-                            consumer->disconnectConsumer();
-                        }
-                    } else {
-                        LOG_ERROR(cnxString_ << "Got invalid consumer Id in 
closeConsumer command: "
-                                             << consumerId);
-                    }
 
+                case BaseCommand::CLOSE_CONSUMER:
+                    handleCloseConsumer(incomingCmd.close_consumer());
                     break;
-                }
 
-                case BaseCommand::PING: {
+                case BaseCommand::PING:
                     // Respond to ping request
                     LOG_DEBUG(cnxString_ << "Replying to ping command");
                     sendCommand(Commands::newPong());
                     break;
-                }
 
-                case BaseCommand::PONG: {
+                case BaseCommand::PONG:
                     LOG_DEBUG(cnxString_ << "Received response to ping 
message");
                     break;
-                }
 
-                case BaseCommand::AUTH_CHALLENGE: {
-                    LOG_DEBUG(cnxString_ << "Received auth challenge from 
broker");
-
-                    Result result;
-                    SharedBuffer buffer = 
Commands::newAuthResponse(authentication_, result);
-                    if (result != ResultOk) {
-                        LOG_ERROR(cnxString_ << "Failed to send auth response: 
" << result);
-                        close(result);
-                        break;
-                    }
-                    asyncWrite(buffer.const_asio_buffer(),
-                               
std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(),
-                                         std::placeholders::_1, buffer));
+                case BaseCommand::AUTH_CHALLENGE:
+                    handleAuthChallenge();
                     break;
-                }
 
-                case BaseCommand::ACTIVE_CONSUMER_CHANGE: {
-                    const auto& change = incomingCmd.active_consumer_change();
-                    LOG_DEBUG(cnxString_
-                              << "Received notification about active consumer 
change, consumer_id: "
-                              << change.consumer_id() << " isActive: " << 
change.is_active());
-                    handleActiveConsumerChange(change);
+                case BaseCommand::ACTIVE_CONSUMER_CHANGE:
+                    
handleActiveConsumerChange(incomingCmd.active_consumer_change());
                     break;
-                }
 
-                case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE: {
-                    const auto& getLastMessageIdResponse = 
incomingCmd.getlastmessageidresponse();
-                    LOG_DEBUG(cnxString_ << "Received getLastMessageIdResponse 
from server. req_id: "
-                                         << 
getLastMessageIdResponse.request_id());
-
-                    Lock lock(mutex_);
-                    PendingGetLastMessageIdRequestsMap::iterator it =
-                        
pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());
-
-                    if (it != pendingGetLastMessageIdRequests_.end()) {
-                        auto getLastMessageIdPromise = it->second;
-                        pendingGetLastMessageIdRequests_.erase(it);
-                        lock.unlock();
-
-                        if 
(getLastMessageIdResponse.has_consumer_mark_delete_position()) {
-                            getLastMessageIdPromise.setValue(
-                                
{toMessageId(getLastMessageIdResponse.last_message_id()),
-                                 
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
-                        } else {
-                            getLastMessageIdPromise.setValue(
-                                
{toMessageId(getLastMessageIdResponse.last_message_id())});
-                        }
-                    } else {
-                        lock.unlock();
-                        LOG_WARN(
-                            "getLastMessageIdResponse command - Received 
unknown request id from server: "
-                            << getLastMessageIdResponse.request_id());
-                    }
+                case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
+                    
handleGetLastMessageIdResponse(incomingCmd.getlastmessageidresponse());
                     break;
-                }
 
-                case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: {
-                    const auto& response = 
incomingCmd.gettopicsofnamespaceresponse();
-
-                    LOG_DEBUG(cnxString_ << "Received 
GetTopicsOfNamespaceResponse from server. req_id: "
-                                         << response.request_id() << " 
topicsSize" << response.topics_size());
-
-                    Lock lock(mutex_);
-                    PendingGetNamespaceTopicsMap::iterator it =
-                        
pendingGetNamespaceTopicsRequests_.find(response.request_id());
-
-                    if (it != pendingGetNamespaceTopicsRequests_.end()) {
-                        Promise<Result, NamespaceTopicsPtr> getTopicsPromise = 
it->second;
-                        pendingGetNamespaceTopicsRequests_.erase(it);
-                        lock.unlock();
-
-                        int numTopics = response.topics_size();
-                        std::set<std::string> topicSet;
-                        // get all topics
-                        for (int i = 0; i < numTopics; i++) {
-                            // remove partition part
-                            const std::string& topicName = response.topics(i);
-                            int pos = topicName.find("-partition-");
-                            std::string filteredName = topicName.substr(0, 
pos);
-
-                            // filter duped topic name
-                            if (topicSet.find(filteredName) == topicSet.end()) 
{
-                                topicSet.insert(filteredName);
-                            }
-                        }
-
-                        NamespaceTopicsPtr topicsPtr =
-                            
std::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end());
-
-                        getTopicsPromise.setValue(topicsPtr);
-                    } else {
-                        lock.unlock();
-                        LOG_WARN(
-                            "GetTopicsOfNamespaceResponse command - Received 
unknown request id from "
-                            "server: "
-                            << response.request_id());
-                    }
+                case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
+                    
handleGetTopicOfNamespaceResponse(incomingCmd.gettopicsofnamespaceresponse());
                     break;
-                }
 
-                case BaseCommand::GET_SCHEMA_RESPONSE: {
-                    const auto& response = incomingCmd.getschemaresponse();
-                    LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from 
server. req_id: "
-                                         << response.request_id());
-                    Lock lock(mutex_);
-                    auto it = 
pendingGetSchemaRequests_.find(response.request_id());
-                    if (it != pendingGetSchemaRequests_.end()) {
-                        Promise<Result, boost::optional<SchemaInfo>> 
getSchemaPromise = it->second;
-                        pendingGetSchemaRequests_.erase(it);
-                        lock.unlock();
-
-                        if (response.has_error_code()) {
-                            if (response.error_code() == proto::TopicNotFound) 
{
-                                getSchemaPromise.setValue(boost::none);
-                            } else {
-                                Result result = 
getResult(response.error_code(), response.error_message());
-                                LOG_WARN(cnxString_ << "Received error 
GetSchemaResponse from server "
-                                                    << result
-                                                    << 
(response.has_error_message()
-                                                            ? (" (" + 
response.error_message() + ")")
-                                                            : "")
-                                                    << " -- req_id: " << 
response.request_id());
-                                getSchemaPromise.setFailed(result);
-                            }
-                            return;
-                        }
-
-                        const auto& schema = response.schema();
-                        const auto& properMap = schema.properties();
-                        StringMap properties;
-                        for (auto kv = properMap.begin(); kv != 
properMap.end(); ++kv) {
-                            properties[kv->key()] = kv->value();
-                        }
-                        SchemaInfo 
schemaInfo(static_cast<SchemaType>(schema.type()), "",
-                                              schema.schema_data(), 
properties);
-                        getSchemaPromise.setValue(schemaInfo);
-                    } else {
-                        lock.unlock();
-                        LOG_WARN(
-                            "GetSchemaResponse command - Received unknown 
request id from "
-                            "server: "
-                            << response.request_id());
-                    }
+                case BaseCommand::GET_SCHEMA_RESPONSE:
+                    handleGetSchemaResponse(incomingCmd.getschemaresponse());
                     break;
-                }
 
-                default: {
+                default:
                     LOG_WARN(cnxString_ << "Received invalid message from 
server");
                     close();
                     break;
-                }
             }
         }
     }
@@ -1798,4 +1356,444 @@ void ClientConnection::checkServerError(ServerError 
error) {
     }
 }
 
+void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt& 
sendReceipt) {
+    int producerId = sendReceipt.producer_id();
+    uint64_t sequenceId = sendReceipt.sequence_id();
+    const proto::MessageIdData& messageIdData = sendReceipt.message_id();
+    auto messageId = toMessageId(messageIdData);
+
+    LOG_DEBUG(cnxString_ << "Got receipt for producer: " << producerId << " -- 
msg: " << sequenceId
+                         << "-- message id: " << messageId);
+
+    Lock lock(mutex_);
+    auto it = producers_.find(producerId);
+    if (it != producers_.end()) {
+        ProducerImplPtr producer = it->second.lock();
+        lock.unlock();
+
+        if (producer) {
+            if (!producer->ackReceived(sequenceId, messageId)) {
+                // If the producer fails to process the ack, we need to close 
the connection
+                // to give it a chance to recover from there
+                close();
+            }
+        }
+    } else {
+        LOG_ERROR(cnxString_ << "Got invalid producer Id in SendReceipt: "  //
+                             << producerId << " -- msg: " << sequenceId);
+    }
+}
+
+void ClientConnection::handleSendError(const proto::CommandSendError& error) {
+    LOG_WARN(cnxString_ << "Received send error from server: " << 
error.message());
+    if (ChecksumError == error.error()) {
+        long producerId = error.producer_id();
+        long sequenceId = error.sequence_id();
+        Lock lock(mutex_);
+        auto it = producers_.find(producerId);
+        if (it != producers_.end()) {
+            ProducerImplPtr producer = it->second.lock();
+            lock.unlock();
+
+            if (producer) {
+                if (!producer->removeCorruptMessage(sequenceId)) {
+                    // If the producer fails to remove corrupt msg, we need to 
close the
+                    // connection to give it a chance to recover from there
+                    close();
+                }
+            }
+        }
+    } else {
+        close();
+    }
+}
+
+void ClientConnection::handleSuccess(const proto::CommandSuccess& success) {
+    LOG_DEBUG(cnxString_ << "Received success response from server. req_id: " 
<< success.request_id());
+
+    Lock lock(mutex_);
+    auto it = pendingRequests_.find(success.request_id());
+    if (it != pendingRequests_.end()) {
+        PendingRequestData requestData = it->second;
+        pendingRequests_.erase(it);
+        lock.unlock();
+
+        requestData.promise.setValue({});
+        requestData.timer->cancel();
+    }
+}
+
+void ClientConnection::handlePartitionedMetadataResponse(
+    const proto::CommandPartitionedTopicMetadataResponse& 
partitionMetadataResponse) {
+    LOG_DEBUG(cnxString_ << "Received partition-metadata response from server. 
req_id: "
+                         << partitionMetadataResponse.request_id());
+
+    Lock lock(mutex_);
+    auto it = 
pendingLookupRequests_.find(partitionMetadataResponse.request_id());
+    if (it != pendingLookupRequests_.end()) {
+        it->second.timer->cancel();
+        LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
+        pendingLookupRequests_.erase(it);
+        numOfPendingLookupRequest_--;
+        lock.unlock();
+
+        if (!partitionMetadataResponse.has_response() ||
+            (partitionMetadataResponse.response() ==
+             proto::CommandPartitionedTopicMetadataResponse::Failed)) {
+            if (partitionMetadataResponse.has_error()) {
+                LOG_ERROR(cnxString_ << "Failed partition-metadata lookup 
req_id: "
+                                     << partitionMetadataResponse.request_id()
+                                     << " error: " << 
partitionMetadataResponse.error()
+                                     << " msg: " << 
partitionMetadataResponse.message());
+                checkServerError(partitionMetadataResponse.error());
+                lookupDataPromise->setFailed(
+                    getResult(partitionMetadataResponse.error(), 
partitionMetadataResponse.message()));
+            } else {
+                LOG_ERROR(cnxString_ << "Failed partition-metadata lookup 
req_id: "
+                                     << partitionMetadataResponse.request_id() 
<< " with empty response: ");
+                lookupDataPromise->setFailed(ResultConnectError);
+            }
+        } else {
+            LookupDataResultPtr lookupResultPtr = 
std::make_shared<LookupDataResult>();
+            
lookupResultPtr->setPartitions(partitionMetadataResponse.partitions());
+            lookupDataPromise->setValue(lookupResultPtr);
+        }
+
+    } else {
+        LOG_WARN("Received unknown request id from server: " << 
partitionMetadataResponse.request_id());
+    }
+}
+
+void ClientConnection::handleConsumerStatsResponse(
+    const proto::CommandConsumerStatsResponse& consumerStatsResponse) {
+    LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received consumer 
stats "
+                            "response from server. req_id: "
+                         << consumerStatsResponse.request_id());
+    Lock lock(mutex_);
+    auto it = 
pendingConsumerStatsMap_.find(consumerStatsResponse.request_id());
+    if (it != pendingConsumerStatsMap_.end()) {
+        Promise<Result, BrokerConsumerStatsImpl> consumerStatsPromise = 
it->second;
+        pendingConsumerStatsMap_.erase(it);
+        lock.unlock();
+
+        if (consumerStatsResponse.has_error_code()) {
+            if (consumerStatsResponse.has_error_message()) {
+                LOG_ERROR(cnxString_ << " Failed to get consumer stats - "
+                                     << consumerStatsResponse.error_message());
+            }
+            consumerStatsPromise.setFailed(
+                getResult(consumerStatsResponse.error_code(), 
consumerStatsResponse.error_message()));
+        } else {
+            LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received 
consumer stats "
+                                    "response from server. req_id: "
+                                 << consumerStatsResponse.request_id() << " 
Stats: ");
+            BrokerConsumerStatsImpl brokerStats(
+                consumerStatsResponse.msgrateout(), 
consumerStatsResponse.msgthroughputout(),
+                consumerStatsResponse.msgrateredeliver(), 
consumerStatsResponse.consumername(),
+                consumerStatsResponse.availablepermits(), 
consumerStatsResponse.unackedmessages(),
+                consumerStatsResponse.blockedconsumeronunackedmsgs(), 
consumerStatsResponse.address(),
+                consumerStatsResponse.connectedsince(), 
consumerStatsResponse.type(),
+                consumerStatsResponse.msgrateexpired(), 
consumerStatsResponse.msgbacklog());
+            consumerStatsPromise.setValue(brokerStats);
+        }
+    } else {
+        LOG_WARN("ConsumerStatsResponse command - Received unknown request id 
from server: "
+                 << consumerStatsResponse.request_id());
+    }
+}
+
+void ClientConnection::handleLookupTopicRespose(
+    const proto::CommandLookupTopicResponse& lookupTopicResponse) {
+    LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
+                         << lookupTopicResponse.request_id());
+
+    Lock lock(mutex_);
+    auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
+    if (it != pendingLookupRequests_.end()) {
+        it->second.timer->cancel();
+        LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
+        pendingLookupRequests_.erase(it);
+        numOfPendingLookupRequest_--;
+        lock.unlock();
+
+        if (!lookupTopicResponse.has_response() ||
+            (lookupTopicResponse.response() == 
proto::CommandLookupTopicResponse::Failed)) {
+            if (lookupTopicResponse.has_error()) {
+                LOG_ERROR(cnxString_ << "Failed lookup req_id: " << 
lookupTopicResponse.request_id()
+                                     << " error: " << 
lookupTopicResponse.error()
+                                     << " msg: " << 
lookupTopicResponse.message());
+                checkServerError(lookupTopicResponse.error());
+                lookupDataPromise->setFailed(
+                    getResult(lookupTopicResponse.error(), 
lookupTopicResponse.message()));
+            } else {
+                LOG_ERROR(cnxString_ << "Failed lookup req_id: " << 
lookupTopicResponse.request_id()
+                                     << " with empty response: ");
+                lookupDataPromise->setFailed(ResultConnectError);
+            }
+        } else {
+            LOG_DEBUG(cnxString_ << "Received lookup response from server. 
req_id: "
+                                 << lookupTopicResponse.request_id()  //
+                                 << " -- broker-url: " << 
lookupTopicResponse.brokerserviceurl()
+                                 << " -- broker-tls-url: "  //
+                                 << lookupTopicResponse.brokerserviceurltls()
+                                 << " authoritative: " << 
lookupTopicResponse.authoritative()  //
+                                 << " redirect: " << 
lookupTopicResponse.response());
+            LookupDataResultPtr lookupResultPtr = 
std::make_shared<LookupDataResult>();
+
+            if (tlsSocket_) {
+                
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurltls());
+            } else {
+                
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurl());
+            }
+
+            
lookupResultPtr->setBrokerUrlTls(lookupTopicResponse.brokerserviceurltls());
+            
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
+            lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
+                                         
proto::CommandLookupTopicResponse::Redirect);
+            
lookupResultPtr->setShouldProxyThroughServiceUrl(lookupTopicResponse.proxy_through_service_url());
+            lookupDataPromise->setValue(lookupResultPtr);
+        }
+
+    } else {
+        LOG_WARN("Received unknown request id from server: " << 
lookupTopicResponse.request_id());
+    }
+}
+
+void ClientConnection::handleProducerSuccess(const 
proto::CommandProducerSuccess& producerSuccess) {
+    LOG_DEBUG(cnxString_ << "Received success producer response from server. 
req_id: "
+                         << producerSuccess.request_id()  //
+                         << " -- producer name: " << 
producerSuccess.producer_name());
+
+    Lock lock(mutex_);
+    auto it = pendingRequests_.find(producerSuccess.request_id());
+    if (it != pendingRequests_.end()) {
+        PendingRequestData requestData = it->second;
+        if (!producerSuccess.producer_ready()) {
+            LOG_INFO(cnxString_ << " Producer " << 
producerSuccess.producer_name()
+                                << " has been queued up at broker. req_id: " 
<< producerSuccess.request_id());
+            requestData.hasGotResponse->store(true);
+            lock.unlock();
+        } else {
+            pendingRequests_.erase(it);
+            lock.unlock();
+            ResponseData data;
+            data.producerName = producerSuccess.producer_name();
+            data.lastSequenceId = producerSuccess.last_sequence_id();
+            if (producerSuccess.has_schema_version()) {
+                data.schemaVersion = producerSuccess.schema_version();
+            }
+            if (producerSuccess.has_topic_epoch()) {
+                data.topicEpoch = 
boost::make_optional(producerSuccess.topic_epoch());
+            } else {
+                data.topicEpoch = boost::none;
+            }
+            requestData.promise.setValue(data);
+            requestData.timer->cancel();
+        }
+    }
+}
+
+void ClientConnection::handleError(const proto::CommandError& error) {
+    Result result = getResult(error.error(), error.message());
+    LOG_WARN(cnxString_ << "Received error response from server: " << result
+                        << (error.has_message() ? (" (" + error.message() + 
")") : "")
+                        << " -- req_id: " << error.request_id());
+
+    Lock lock(mutex_);
+
+    auto it = pendingRequests_.find(error.request_id());
+    if (it != pendingRequests_.end()) {
+        PendingRequestData requestData = it->second;
+        pendingRequests_.erase(it);
+        lock.unlock();
+
+        requestData.promise.setFailed(result);
+        requestData.timer->cancel();
+    } else {
+        PendingGetLastMessageIdRequestsMap::iterator it =
+            pendingGetLastMessageIdRequests_.find(error.request_id());
+        if (it != pendingGetLastMessageIdRequests_.end()) {
+            auto getLastMessageIdPromise = it->second;
+            pendingGetLastMessageIdRequests_.erase(it);
+            lock.unlock();
+
+            getLastMessageIdPromise.setFailed(result);
+        } else {
+            PendingGetNamespaceTopicsMap::iterator it =
+                pendingGetNamespaceTopicsRequests_.find(error.request_id());
+            if (it != pendingGetNamespaceTopicsRequests_.end()) {
+                Promise<Result, NamespaceTopicsPtr> getNamespaceTopicsPromise 
= it->second;
+                pendingGetNamespaceTopicsRequests_.erase(it);
+                lock.unlock();
+
+                getNamespaceTopicsPromise.setFailed(result);
+            } else {
+                lock.unlock();
+            }
+        }
+    }
+}
+
+void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& 
closeProducer) {
+    int producerId = closeProducer.producer_id();
+
+    LOG_DEBUG("Broker notification of Closed producer: " << producerId);
+
+    Lock lock(mutex_);
+    auto it = producers_.find(producerId);
+    if (it != producers_.end()) {
+        ProducerImplPtr producer = it->second.lock();
+        producers_.erase(it);
+        lock.unlock();
+
+        if (producer) {
+            producer->disconnectProducer();
+        }
+    } else {
+        LOG_ERROR(cnxString_ << "Got invalid producer Id in closeProducer 
command: " << producerId);
+    }
+}
+
+void ClientConnection::handleCloseConsumer(const proto::CommandCloseConsumer& 
closeconsumer) {
+    int consumerId = closeconsumer.consumer_id();
+
+    LOG_DEBUG("Broker notification of Closed consumer: " << consumerId);
+
+    Lock lock(mutex_);
+    auto it = consumers_.find(consumerId);
+    if (it != consumers_.end()) {
+        ConsumerImplPtr consumer = it->second.lock();
+        consumers_.erase(it);
+        lock.unlock();
+
+        if (consumer) {
+            consumer->disconnectConsumer();
+        }
+    } else {
+        LOG_ERROR(cnxString_ << "Got invalid consumer Id in closeConsumer 
command: " << consumerId);
+    }
+}
+
+void ClientConnection::handleAuthChallenge() {
+    LOG_DEBUG(cnxString_ << "Received auth challenge from broker");
+
+    Result result;
+    SharedBuffer buffer = Commands::newAuthResponse(authentication_, result);
+    if (result != ResultOk) {
+        LOG_ERROR(cnxString_ << "Failed to send auth response: " << result);
+        close(result);
+        return;
+    }
+    asyncWrite(buffer.const_asio_buffer(), 
std::bind(&ClientConnection::handleSentAuthResponse,
+                                                     shared_from_this(), 
std::placeholders::_1, buffer));
+}
+
+void ClientConnection::handleGetLastMessageIdResponse(
+    const proto::CommandGetLastMessageIdResponse& getLastMessageIdResponse) {
+    LOG_DEBUG(cnxString_ << "Received getLastMessageIdResponse from server. 
req_id: "
+                         << getLastMessageIdResponse.request_id());
+
+    Lock lock(mutex_);
+    auto it = 
pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());
+
+    if (it != pendingGetLastMessageIdRequests_.end()) {
+        auto getLastMessageIdPromise = it->second;
+        pendingGetLastMessageIdRequests_.erase(it);
+        lock.unlock();
+
+        if (getLastMessageIdResponse.has_consumer_mark_delete_position()) {
+            getLastMessageIdPromise.setValue(
+                {toMessageId(getLastMessageIdResponse.last_message_id()),
+                 
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
+        } else {
+            
getLastMessageIdPromise.setValue({toMessageId(getLastMessageIdResponse.last_message_id())});
+        }
+    } else {
+        lock.unlock();
+        LOG_WARN("getLastMessageIdResponse command - Received unknown request 
id from server: "
+                 << getLastMessageIdResponse.request_id());
+    }
+}
+
+void ClientConnection::handleGetTopicOfNamespaceResponse(
+    const proto::CommandGetTopicsOfNamespaceResponse& response) {
+    LOG_DEBUG(cnxString_ << "Received GetTopicsOfNamespaceResponse from 
server. req_id: "
+                         << response.request_id() << " topicsSize" << 
response.topics_size());
+
+    Lock lock(mutex_);
+    auto it = pendingGetNamespaceTopicsRequests_.find(response.request_id());
+
+    if (it != pendingGetNamespaceTopicsRequests_.end()) {
+        Promise<Result, NamespaceTopicsPtr> getTopicsPromise = it->second;
+        pendingGetNamespaceTopicsRequests_.erase(it);
+        lock.unlock();
+
+        int numTopics = response.topics_size();
+        std::set<std::string> topicSet;
+        // get all topics
+        for (int i = 0; i < numTopics; i++) {
+            // remove partition part
+            const std::string& topicName = response.topics(i);
+            int pos = topicName.find("-partition-");
+            std::string filteredName = topicName.substr(0, pos);
+
+            // filter duped topic name
+            if (topicSet.find(filteredName) == topicSet.end()) {
+                topicSet.insert(filteredName);
+            }
+        }
+
+        NamespaceTopicsPtr topicsPtr =
+            std::make_shared<std::vector<std::string>>(topicSet.begin(), 
topicSet.end());
+
+        getTopicsPromise.setValue(topicsPtr);
+    } else {
+        lock.unlock();
+        LOG_WARN(
+            "GetTopicsOfNamespaceResponse command - Received unknown request 
id from "
+            "server: "
+            << response.request_id());
+    }
+}
+
+void ClientConnection::handleGetSchemaResponse(const 
proto::CommandGetSchemaResponse& response) {
+    LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from server. req_id: " 
<< response.request_id());
+    Lock lock(mutex_);
+    auto it = pendingGetSchemaRequests_.find(response.request_id());
+    if (it != pendingGetSchemaRequests_.end()) {
+        Promise<Result, boost::optional<SchemaInfo>> getSchemaPromise = 
it->second;
+        pendingGetSchemaRequests_.erase(it);
+        lock.unlock();
+
+        if (response.has_error_code()) {
+            if (response.error_code() == proto::TopicNotFound) {
+                getSchemaPromise.setValue(boost::none);
+            } else {
+                Result result = getResult(response.error_code(), 
response.error_message());
+                LOG_WARN(cnxString_ << "Received error GetSchemaResponse from 
server " << result
+                                    << (response.has_error_message() ? (" (" + 
response.error_message() + ")")
+                                                                     : "")
+                                    << " -- req_id: " << 
response.request_id());
+                getSchemaPromise.setFailed(result);
+            }
+            return;
+        }
+
+        const auto& schema = response.schema();
+        const auto& properMap = schema.properties();
+        StringMap properties;
+        for (auto kv = properMap.begin(); kv != properMap.end(); ++kv) {
+            properties[kv->key()] = kv->value();
+        }
+        SchemaInfo schemaInfo(static_cast<SchemaType>(schema.type()), "", 
schema.schema_data(), properties);
+        getSchemaPromise.setValue(schemaInfo);
+    } else {
+        lock.unlock();
+        LOG_WARN(
+            "GetSchemaResponse command - Received unknown request id from "
+            "server: "
+            << response.request_id());
+    }
+}
+
 }  // namespace pulsar
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index c77ca78..eae18f9 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -76,7 +76,20 @@ namespace proto {
 class BaseCommand;
 class CommandActiveConsumerChange;
 class CommandMessage;
+class CommandCloseConsumer;
+class CommandCloseProducer;
 class CommandConnected;
+class CommandConsumerStatsResponse;
+class CommandGetSchemaResponse;
+class CommandGetTopicsOfNamespaceResponse;
+class CommandError;
+class CommandGetLastMessageIdResponse;
+class CommandLookupTopicResponse;
+class CommandPartitionedTopicMetadataResponse;
+class CommandProducerSuccess;
+class CommandSendReceipt;
+class CommandSendError;
+class CommandSuccess;
 }  // namespace proto
 
 // Data returned on the request operation. Mostly used on create-producer 
command
@@ -362,6 +375,21 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     void closeSocket();
     void checkServerError(ServerError error);
+
+    void handleSendReceipt(const proto::CommandSendReceipt&);
+    void handleSendError(const proto::CommandSendError&);
+    void handleSuccess(const proto::CommandSuccess&);
+    void handlePartitionedMetadataResponse(const 
proto::CommandPartitionedTopicMetadataResponse&);
+    void handleConsumerStatsResponse(const 
proto::CommandConsumerStatsResponse&);
+    void handleLookupTopicRespose(const proto::CommandLookupTopicResponse&);
+    void handleProducerSuccess(const proto::CommandProducerSuccess&);
+    void handleError(const proto::CommandError&);
+    void handleCloseProducer(const proto::CommandCloseProducer&);
+    void handleCloseConsumer(const proto::CommandCloseConsumer&);
+    void handleAuthChallenge();
+    void handleGetLastMessageIdResponse(const 
proto::CommandGetLastMessageIdResponse&);
+    void handleGetTopicOfNamespaceResponse(const 
proto::CommandGetTopicsOfNamespaceResponse&);
+    void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
 };
 }  // namespace pulsar
 

Reply via email to