This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 804f87b  [fix] producer do not create when topic update partition 
(#295)
804f87b is described below

commit 804f87bc78893e9079f7eddcd27612ef593c12be
Author: ken <[email protected]>
AuthorDate: Sat Jul 1 22:16:37 2023 +0800

    [fix] producer do not create when topic update partition (#295)
    
    Co-authored-by: fanjianye <[email protected]>
---
 lib/PartitionedProducerImpl.cc | 2 +-
 tests/PartitionsUpdateTest.cc  | 9 +++++++++
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 45c604b..f7957d6 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -471,8 +471,8 @@ void PartitionedProducerImpl::handleGetPartitions(Result 
result,
         }
     } else {
         LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
-        runPartitionUpdateTask();
     }
+    runPartitionUpdateTask();
 }
 
 bool PartitionedProducerImpl::isConnected() const {
diff --git a/tests/PartitionsUpdateTest.cc b/tests/PartitionsUpdateTest.cc
index 010e5cb..4d2fa27 100644
--- a/tests/PartitionsUpdateTest.cc
+++ b/tests/PartitionsUpdateTest.cc
@@ -101,6 +101,11 @@ static void waitForPartitionsUpdated() {
     std::this_thread::sleep_for(std::chrono::seconds(3));
 }
 
+static void waitForPartitionUpdateTaskRunMultipleTimes() {
+    // Assume runPartitionUpdateTask run more than one time in 2 seconds if 
enabled
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+}
+
 TEST(PartitionsUpdateTest, testConfigPartitionsUpdateInterval) {
     ClientConfiguration clientConfig;
     ASSERT_EQ(60, clientConfig.getPartitionsUpdateInterval());
@@ -131,6 +136,7 @@ void testPartitionsUpdate(bool 
lazyStartPartitionedProducers, std::string topicN
     ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, 
lazyStartPartitionedProducers));
     ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true));
 
+    waitForPartitionUpdateTaskRunMultipleTimes();
     res = makePostRequest(topicOperateUrl, "3");  // update partitions to 3
     ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
     waitForPartitionsUpdated();
@@ -143,6 +149,7 @@ void testPartitionsUpdate(bool 
lazyStartPartitionedProducers, std::string topicN
     ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, false));
     ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false));
 
+    waitForPartitionUpdateTaskRunMultipleTimes();
     res = makePostRequest(topicOperateUrl, "5");  // update partitions to 5
     ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
     waitForPartitionsUpdated();
@@ -155,6 +162,7 @@ void testPartitionsUpdate(bool 
lazyStartPartitionedProducers, std::string topicN
     ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false));
     ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true));
 
+    waitForPartitionUpdateTaskRunMultipleTimes();
     res = makePostRequest(topicOperateUrl, "7");  // update partitions to 7
     ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
     waitForPartitionsUpdated();
@@ -167,6 +175,7 @@ void testPartitionsUpdate(bool 
lazyStartPartitionedProducers, std::string topicN
     ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false));
     ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false));
 
+    waitForPartitionUpdateTaskRunMultipleTimes();
     res = makePostRequest(topicOperateUrl, "10");  // update partitions to 10
     ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
     waitForPartitionsUpdated();

Reply via email to