This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 804f87b [fix] producer do not create when topic update partition
(#295)
804f87b is described below
commit 804f87bc78893e9079f7eddcd27612ef593c12be
Author: ken <[email protected]>
AuthorDate: Sat Jul 1 22:16:37 2023 +0800
[fix] producer do not create when topic update partition (#295)
Co-authored-by: fanjianye <[email protected]>
---
lib/PartitionedProducerImpl.cc | 2 +-
tests/PartitionsUpdateTest.cc | 9 +++++++++
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 45c604b..f7957d6 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -471,8 +471,8 @@ void PartitionedProducerImpl::handleGetPartitions(Result
result,
}
} else {
LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
- runPartitionUpdateTask();
}
+ runPartitionUpdateTask();
}
bool PartitionedProducerImpl::isConnected() const {
diff --git a/tests/PartitionsUpdateTest.cc b/tests/PartitionsUpdateTest.cc
index 010e5cb..4d2fa27 100644
--- a/tests/PartitionsUpdateTest.cc
+++ b/tests/PartitionsUpdateTest.cc
@@ -101,6 +101,11 @@ static void waitForPartitionsUpdated() {
std::this_thread::sleep_for(std::chrono::seconds(3));
}
+static void waitForPartitionUpdateTaskRunMultipleTimes() {
+ // Assume runPartitionUpdateTask run more than one time in 2 seconds if
enabled
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+}
+
TEST(PartitionsUpdateTest, testConfigPartitionsUpdateInterval) {
ClientConfiguration clientConfig;
ASSERT_EQ(60, clientConfig.getPartitionsUpdateInterval());
@@ -131,6 +136,7 @@ void testPartitionsUpdate(bool
lazyStartPartitionedProducers, std::string topicN
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true,
lazyStartPartitionedProducers));
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true));
+ waitForPartitionUpdateTaskRunMultipleTimes();
res = makePostRequest(topicOperateUrl, "3"); // update partitions to 3
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
waitForPartitionsUpdated();
@@ -143,6 +149,7 @@ void testPartitionsUpdate(bool
lazyStartPartitionedProducers, std::string topicN
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, false));
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false));
+ waitForPartitionUpdateTaskRunMultipleTimes();
res = makePostRequest(topicOperateUrl, "5"); // update partitions to 5
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
waitForPartitionsUpdated();
@@ -155,6 +162,7 @@ void testPartitionsUpdate(bool
lazyStartPartitionedProducers, std::string topicN
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false));
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true));
+ waitForPartitionUpdateTaskRunMultipleTimes();
res = makePostRequest(topicOperateUrl, "7"); // update partitions to 7
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
waitForPartitionsUpdated();
@@ -167,6 +175,7 @@ void testPartitionsUpdate(bool
lazyStartPartitionedProducers, std::string topicN
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false));
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false));
+ waitForPartitionUpdateTaskRunMultipleTimes();
res = makePostRequest(topicOperateUrl, "10"); // update partitions to 10
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
waitForPartitionsUpdated();