This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a4b7f12 Retry on new partition producer creation failure (#378)
a4b7f12 is described below
commit a4b7f120aa0a7f74d8e1d837629a48caab1403bd
Author: erobot <[email protected]>
AuthorDate: Thu Jan 4 10:28:41 2024 +0800
Retry on new partition producer creation failure (#378)
Fixes #319
### Motivation
Already created producer should not fail after new partition producers
creation failure.
### Modifications
`ProducerImpl`: Add an option retryOnCreationError to control whether to
retry on creation error
`PartitionedProducerImpl`: Use retryOnCreationError=true to create new
partition producers
---
lib/PartitionedProducerImpl.cc | 12 +++++++-----
lib/PartitionedProducerImpl.h | 2 +-
lib/ProducerImpl.cc | 7 ++++---
lib/ProducerImpl.h | 5 ++++-
tests/ProducerTest.cc | 32 ++++++++++++++++++++++++++++++++
5 files changed, 48 insertions(+), 10 deletions(-)
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index a799d5c..54e96c8 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -92,10 +92,12 @@ unsigned int
PartitionedProducerImpl::getNumPartitionsWithLock() const {
return getNumPartitions();
}
-ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int
partition, bool lazy) {
+ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int
partition, bool lazy,
+ bool
retryOnCreationError) {
using namespace std::placeholders;
auto client = client_.lock();
- auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_,
interceptors_, partition);
+ auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_,
interceptors_, partition,
+ retryOnCreationError);
if (!client) {
return producer;
}
@@ -127,13 +129,13 @@ void PartitionedProducerImpl::start() {
for (unsigned int i = 0; i < getNumPartitions(); i++) {
bool lazy = (short)i != partition;
- producers_.push_back(newInternalProducer(i, lazy));
+ producers_.push_back(newInternalProducer(i, lazy, false));
}
producers_[partition]->start();
} else {
for (unsigned int i = 0; i < getNumPartitions(); i++) {
- producers_.push_back(newInternalProducer(i, false));
+ producers_.push_back(newInternalProducer(i, false, false));
}
for (ProducerList::const_iterator prod = producers_.begin(); prod !=
producers_.end(); prod++) {
@@ -461,7 +463,7 @@ void PartitionedProducerImpl::handleGetPartitions(Result
result,
for (unsigned int i = currentNumPartitions; i < newNumPartitions;
i++) {
ProducerImplPtr producer;
try {
- producer = newInternalProducer(i, lazy);
+ producer = newInternalProducer(i, lazy, true);
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create producer for partition " << i
<< ": " << e.what());
producers.clear();
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 25ba9c3..2d07a81 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -135,7 +135,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
unsigned int getNumPartitions() const;
unsigned int getNumPartitionsWithLock() const;
- ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy);
+ ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy,
bool retryOnCreationError);
MessageRoutingPolicyPtr getMessageRouter();
void runPartitionUpdateTask();
void getPartitionMetadata();
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 61b95bf..0a12925 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -48,7 +48,7 @@ DECLARE_LOG_OBJECT()
ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
const ProducerConfiguration& conf, const
ProducerInterceptorsPtr& interceptors,
- int32_t partition)
+ int32_t partition, bool retryOnCreationError)
: HandlerBase(client, (partition < 0) ? topicName.toString() :
topicName.getTopicPartitionName(partition),
Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()),
milliseconds(client->getClientConfig().getMaxBackoffIntervalMs()),
@@ -67,7 +67,8 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const
TopicName& topicName,
dataKeyRefreshTask_(*executor_, 4 * 60 * 60 * 1000),
memoryLimitController_(client->getMemoryLimitController()),
chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent()
&& !conf_.getBatchingEnabled()),
- interceptors_(interceptors) {
+ interceptors_(interceptors),
+ retryOnCreationError_(retryOnCreationError) {
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on
topic " << topic()
<< " id: " << producerId_);
if (!producerName_.empty()) {
@@ -273,7 +274,7 @@ Result ProducerImpl::handleCreateProducer(const
ClientConnectionPtr& cnx, Result
lock.unlock();
producerCreatedPromise_.setFailed(result);
handleResult = result;
- } else if (producerCreatedPromise_.isComplete()) {
+ } else if (producerCreatedPromise_.isComplete() ||
retryOnCreationError_) {
if (result == ResultProducerBlockedQuotaExceededException) {
LOG_WARN(getName() << "Backlog is exceeded on topic. Sending
exception to producer");
failPendingMessages(ResultProducerBlockedQuotaExceededException, false);
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 2fb0b88..b467458 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -66,7 +66,8 @@ class ProducerImpl : public HandlerBase, public
ProducerImplBase {
public:
ProducerImpl(ClientImplPtr client, const TopicName& topic,
const ProducerConfiguration& producerConfiguration,
- const ProducerInterceptorsPtr& interceptors, int32_t
partition = -1);
+ const ProducerInterceptorsPtr& interceptors, int32_t
partition = -1,
+ bool retryOnCreationError = false);
~ProducerImpl();
// overrided methods from ProducerImplBase
@@ -202,6 +203,8 @@ class ProducerImpl : public HandlerBase, public
ProducerImplBase {
boost::optional<uint64_t> topicEpoch;
ProducerInterceptorsPtr interceptors_;
+
+ bool retryOnCreationError_;
};
struct ProducerImplCmp {
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 45bb3aa..bb58a4e 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -649,4 +649,36 @@ TEST(ProducerTest, testReconnectMultiConnectionsPerBroker)
{
client.close();
}
+TEST(ProducerTest, testFailedToCreateNewPartitionProducer) {
+ const std::string topic =
+ "public/default/testFailedToCreateNewPartitionProducer" +
std::to_string(time(nullptr));
+ std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topic +
"/partitions";
+
+ int res = makePutRequest(topicOperateUrl, "2");
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ ClientConfiguration clientConf;
+ clientConf.setPartititionsUpdateInterval(1);
+ Client client(serviceUrl, clientConf);
+ ProducerConfiguration conf;
+ Producer producer;
+ client.createProducer(topic, conf, producer);
+ ASSERT_TRUE(waitUntil(std::chrono::seconds(1), [&producer]() -> bool {
return producer.isConnected(); }));
+
+ PartitionedProducerImpl& partitionedProducer =
PulsarFriend::getPartitionedProducerImpl(producer);
+ PulsarFriend::updatePartitions(partitionedProducer, 3);
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ auto& newProducer = PulsarFriend::getInternalProducerImpl(producer, 2);
+ ASSERT_FALSE(newProducer.isConnected()); // should fail with topic not
found
+
+ res = makePostRequest(topicOperateUrl, "3");
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ ASSERT_TRUE(
+ waitUntil(std::chrono::seconds(5), [&newProducer]() -> bool { return
newProducer.isConnected(); }));
+
+ producer.close();
+ client.close();
+}
+
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));