This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a4b7f12  Retry on new partition producer creation failure (#378)
a4b7f12 is described below

commit a4b7f120aa0a7f74d8e1d837629a48caab1403bd
Author: erobot <[email protected]>
AuthorDate: Thu Jan 4 10:28:41 2024 +0800

    Retry on new partition producer creation failure (#378)
    
    Fixes #319
    
    ### Motivation
    
    Already created producer should not fail after new partition producers 
creation failure.
    
    ### Modifications
    
    `ProducerImpl`: Add an option retryOnCreationError to control whether to 
retry on creation error
    `PartitionedProducerImpl`: Use retryOnCreationError=true to create new 
partition producers
---
 lib/PartitionedProducerImpl.cc | 12 +++++++-----
 lib/PartitionedProducerImpl.h  |  2 +-
 lib/ProducerImpl.cc            |  7 ++++---
 lib/ProducerImpl.h             |  5 ++++-
 tests/ProducerTest.cc          | 32 ++++++++++++++++++++++++++++++++
 5 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index a799d5c..54e96c8 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -92,10 +92,12 @@ unsigned int 
PartitionedProducerImpl::getNumPartitionsWithLock() const {
     return getNumPartitions();
 }
 
-ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int 
partition, bool lazy) {
+ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int 
partition, bool lazy,
+                                                             bool 
retryOnCreationError) {
     using namespace std::placeholders;
     auto client = client_.lock();
-    auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, 
interceptors_, partition);
+    auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, 
interceptors_, partition,
+                                                   retryOnCreationError);
     if (!client) {
         return producer;
     }
@@ -127,13 +129,13 @@ void PartitionedProducerImpl::start() {
 
         for (unsigned int i = 0; i < getNumPartitions(); i++) {
             bool lazy = (short)i != partition;
-            producers_.push_back(newInternalProducer(i, lazy));
+            producers_.push_back(newInternalProducer(i, lazy, false));
         }
 
         producers_[partition]->start();
     } else {
         for (unsigned int i = 0; i < getNumPartitions(); i++) {
-            producers_.push_back(newInternalProducer(i, false));
+            producers_.push_back(newInternalProducer(i, false, false));
         }
 
         for (ProducerList::const_iterator prod = producers_.begin(); prod != 
producers_.end(); prod++) {
@@ -461,7 +463,7 @@ void PartitionedProducerImpl::handleGetPartitions(Result 
result,
             for (unsigned int i = currentNumPartitions; i < newNumPartitions; 
i++) {
                 ProducerImplPtr producer;
                 try {
-                    producer = newInternalProducer(i, lazy);
+                    producer = newInternalProducer(i, lazy, true);
                 } catch (const std::runtime_error& e) {
                     LOG_ERROR("Failed to create producer for partition " << i 
<< ": " << e.what());
                     producers.clear();
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 25ba9c3..2d07a81 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -135,7 +135,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     unsigned int getNumPartitions() const;
     unsigned int getNumPartitionsWithLock() const;
-    ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy);
+    ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy, 
bool retryOnCreationError);
     MessageRoutingPolicyPtr getMessageRouter();
     void runPartitionUpdateTask();
     void getPartitionMetadata();
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 61b95bf..0a12925 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -48,7 +48,7 @@ DECLARE_LOG_OBJECT()
 
 ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
                            const ProducerConfiguration& conf, const 
ProducerInterceptorsPtr& interceptors,
-                           int32_t partition)
+                           int32_t partition, bool retryOnCreationError)
     : HandlerBase(client, (partition < 0) ? topicName.toString() : 
topicName.getTopicPartitionName(partition),
                   
Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()),
                           
milliseconds(client->getClientConfig().getMaxBackoffIntervalMs()),
@@ -67,7 +67,8 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const 
TopicName& topicName,
       dataKeyRefreshTask_(*executor_, 4 * 60 * 60 * 1000),
       memoryLimitController_(client->getMemoryLimitController()),
       chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() 
&& !conf_.getBatchingEnabled()),
-      interceptors_(interceptors) {
+      interceptors_(interceptors),
+      retryOnCreationError_(retryOnCreationError) {
     LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on 
topic " << topic()
                                 << " id: " << producerId_);
     if (!producerName_.empty()) {
@@ -273,7 +274,7 @@ Result ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result
             lock.unlock();
             producerCreatedPromise_.setFailed(result);
             handleResult = result;
-        } else if (producerCreatedPromise_.isComplete()) {
+        } else if (producerCreatedPromise_.isComplete() || 
retryOnCreationError_) {
             if (result == ResultProducerBlockedQuotaExceededException) {
                 LOG_WARN(getName() << "Backlog is exceeded on topic. Sending 
exception to producer");
                 
failPendingMessages(ResultProducerBlockedQuotaExceededException, false);
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 2fb0b88..b467458 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -66,7 +66,8 @@ class ProducerImpl : public HandlerBase, public 
ProducerImplBase {
    public:
     ProducerImpl(ClientImplPtr client, const TopicName& topic,
                  const ProducerConfiguration& producerConfiguration,
-                 const ProducerInterceptorsPtr& interceptors, int32_t 
partition = -1);
+                 const ProducerInterceptorsPtr& interceptors, int32_t 
partition = -1,
+                 bool retryOnCreationError = false);
     ~ProducerImpl();
 
     // overrided methods from ProducerImplBase
@@ -202,6 +203,8 @@ class ProducerImpl : public HandlerBase, public 
ProducerImplBase {
     boost::optional<uint64_t> topicEpoch;
 
     ProducerInterceptorsPtr interceptors_;
+
+    bool retryOnCreationError_;
 };
 
 struct ProducerImplCmp {
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 45bb3aa..bb58a4e 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -649,4 +649,36 @@ TEST(ProducerTest, testReconnectMultiConnectionsPerBroker) 
{
     client.close();
 }
 
+TEST(ProducerTest, testFailedToCreateNewPartitionProducer) {
+    const std::string topic =
+        "public/default/testFailedToCreateNewPartitionProducer" + 
std::to_string(time(nullptr));
+    std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topic + 
"/partitions";
+
+    int res = makePutRequest(topicOperateUrl, "2");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    ClientConfiguration clientConf;
+    clientConf.setPartititionsUpdateInterval(1);
+    Client client(serviceUrl, clientConf);
+    ProducerConfiguration conf;
+    Producer producer;
+    client.createProducer(topic, conf, producer);
+    ASSERT_TRUE(waitUntil(std::chrono::seconds(1), [&producer]() -> bool { 
return producer.isConnected(); }));
+
+    PartitionedProducerImpl& partitionedProducer = 
PulsarFriend::getPartitionedProducerImpl(producer);
+    PulsarFriend::updatePartitions(partitionedProducer, 3);
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+    auto& newProducer = PulsarFriend::getInternalProducerImpl(producer, 2);
+    ASSERT_FALSE(newProducer.isConnected());  // should fail with topic not 
found
+
+    res = makePostRequest(topicOperateUrl, "3");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    ASSERT_TRUE(
+        waitUntil(std::chrono::seconds(5), [&newProducer]() -> bool { return 
newProducer.isConnected(); }));
+
+    producer.close();
+    client.close();
+}
+
 INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

Reply via email to