This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fe32736 [C++] Fix the consumer configuration inconsistency with Java
client (#14070)
fe32736 is described below
commit fe327367f3665a442d5ff17630ae3fc9c9d0d0f8
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Feb 2 00:23:30 2022 +0800
[C++] Fix the consumer configuration inconsistency with Java client (#14070)
* [C++] Change AutoOldest to AutoAckOldest in consumer configuration
* Change the default value of MaxPendingChunkedMessages to 10
---
pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h | 10 +++++-----
pulsar-client-cpp/lib/ConsumerConfiguration.cc | 4 ++--
pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 2 +-
pulsar-client-cpp/lib/ConsumerImpl.cc | 2 +-
pulsar-client-cpp/tests/ConsumerConfigurationTest.cc | 8 ++++----
5 files changed, 13 insertions(+), 13 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 2cdbf47..70c90cf 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -450,11 +450,11 @@ class PULSAR_PUBLIC ConsumerConfiguration {
* Buffering large number of outstanding uncompleted chunked messages can
create memory pressure and it
* can be guarded by providing this maxPendingChunkedMessage threshold.
Once, consumer reaches this
* threshold, it drops the outstanding unchunked-messages by silently
acking or asking broker to redeliver
- * later by marking it unacked. See setAutoOldestChunkedMessageOnQueueFull.
+ * later by marking it unacked. See
setAutoAckOldestChunkedMessageOnQueueFull.
*
* If it's zero, the pending chunked messages will not be limited.
*
- * Default: 100
+ * Default: 10
*
* @param maxPendingChunkedMessage the number of max pending chunked
messages
*/
@@ -475,13 +475,13 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*
* @param autoAckOldestChunkedMessageOnQueueFull whether to ack the
discarded chunked message
*/
- ConsumerConfiguration& setAutoOldestChunkedMessageOnQueueFull(
+ ConsumerConfiguration& setAutoAckOldestChunkedMessageOnQueueFull(
bool autoAckOldestChunkedMessageOnQueueFull);
/**
- * The associated getter of setAutoOldestChunkedMessageOnQueueFull
+ * The associated getter of setAutoAckOldestChunkedMessageOnQueueFull
*/
- bool isAutoOldestChunkedMessageOnQueueFull() const;
+ bool isAutoAckOldestChunkedMessageOnQueueFull() const;
friend class PulsarWrapper;
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index d13cb0e..b755063 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -238,13 +238,13 @@ ConsumerConfiguration&
ConsumerConfiguration::setMaxPendingChunkedMessage(size_t
size_t ConsumerConfiguration::getMaxPendingChunkedMessage() const { return
impl_->maxPendingChunkedMessage; }
-ConsumerConfiguration&
ConsumerConfiguration::setAutoOldestChunkedMessageOnQueueFull(
+ConsumerConfiguration&
ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull(
bool autoAckOldestChunkedMessageOnQueueFull) {
impl_->autoAckOldestChunkedMessageOnQueueFull =
autoAckOldestChunkedMessageOnQueueFull;
return *this;
}
-bool ConsumerConfiguration::isAutoOldestChunkedMessageOnQueueFull() const {
+bool ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull() const {
return impl_->autoAckOldestChunkedMessageOnQueueFull;
}
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 9c2a461..1848f2d 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -50,7 +50,7 @@ struct ConsumerConfigurationImpl {
std::map<std::string, std::string> properties;
int priorityLevel{0};
KeySharedPolicy keySharedPolicy;
- size_t maxPendingChunkedMessage{100};
+ size_t maxPendingChunkedMessage{10};
bool autoAckOldestChunkedMessageOnQueueFull{false};
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index fa817a0..b5b5ceb 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -65,7 +65,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const
std::string& topic,
readCompacted_(conf.isReadCompacted()),
startMessageId_(startMessageId),
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
-
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoOldestChunkedMessageOnQueueFull())
{
+
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull())
{
std::stringstream consumerStrStream;
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " <<
consumerId_ << "] ";
consumerStr_ = consumerStrStream.str();
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index 57ed0ec..fc67e86 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -59,8 +59,8 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) {
ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), false);
ASSERT_EQ(conf.getProperties().empty(), true);
ASSERT_EQ(conf.getPriorityLevel(), 0);
- ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 100);
- ASSERT_EQ(conf.isAutoOldestChunkedMessageOnQueueFull(), false);
+ ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 10);
+ ASSERT_EQ(conf.isAutoAckOldestChunkedMessageOnQueueFull(), false);
}
TEST(ConsumerConfigurationTest, testCustomConfig) {
@@ -145,8 +145,8 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
conf.setMaxPendingChunkedMessage(500);
ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 500);
- conf.setAutoOldestChunkedMessageOnQueueFull(true);
- ASSERT_TRUE(conf.isAutoOldestChunkedMessageOnQueueFull());
+ conf.setAutoAckOldestChunkedMessageOnQueueFull(true);
+ ASSERT_TRUE(conf.isAutoAckOldestChunkedMessageOnQueueFull());
}
TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {