This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f4f4cdd [C++] Support setting priority for the consumer (#12076)
f4f4cdd is described below
commit f4f4cdd1d5a535f1b93fc4af2d73ce2d58727cb6
Author: metahys <[email protected]>
AuthorDate: Tue Sep 21 01:05:59 2021 +0800
[C++] Support setting priority for the consumer (#12076)
* [C++] Support setting priority for the consumer
* Fixed formatting
* Fixed code style
Co-authored-by: sunhongyi <[email protected]>
---
pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h | 13 +++++++++++++
pulsar-client-cpp/lib/Commands.cc | 4 +++-
pulsar-client-cpp/lib/Commands.h | 3 ++-
pulsar-client-cpp/lib/ConsumerConfiguration.cc | 10 ++++++++++
pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 1 +
pulsar-client-cpp/lib/ConsumerImpl.cc | 3 ++-
pulsar-client-cpp/tests/ConsumerConfigurationTest.cc | 4 ++++
7 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index bf7fdcd..85b1f0a 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -405,6 +405,19 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
ConsumerConfiguration& setProperties(const std::map<std::string,
std::string>& properties);
+ /**
+ * Set the Priority Level for consumer (0 is the default value and means
the highest priority).
+ *
+ * @param priorityLevel the priority of this consumer
+ * @return the ConsumerConfiguration instance
+ */
+ ConsumerConfiguration& setPriorityLevel(int priorityLevel);
+
+ /**
+ * @return the configured priority for the consumer
+ */
+ int getPriorityLevel() const;
+
friend class PulsarWrapper;
private:
diff --git a/pulsar-client-cpp/lib/Commands.cc
b/pulsar-client-cpp/lib/Commands.cc
index 87c149c..52821ede 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -258,7 +258,8 @@ SharedBuffer Commands::newSubscribe(const std::string&
topic, const std::string&
const std::map<std::string, std::string>&
metadata,
const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition
subscriptionInitialPosition,
- bool replicateSubscriptionState,
KeySharedPolicy keySharedPolicy) {
+ bool replicateSubscriptionState,
KeySharedPolicy keySharedPolicy,
+ int priorityLevel) {
BaseCommand cmd;
cmd.set_type(BaseCommand::SUBSCRIBE);
CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -272,6 +273,7 @@ SharedBuffer Commands::newSubscribe(const std::string&
topic, const std::string&
subscribe->set_read_compacted(readCompacted);
subscribe->set_initialposition(subscriptionInitialPosition);
subscribe->set_replicate_subscription_state(replicateSubscriptionState);
+ subscribe->set_priority_level(priorityLevel);
if (isBuiltInSchema(schemaInfo.getSchemaType())) {
subscribe->set_allocated_schema(getSchema(schemaInfo));
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 02ebaad..91c218d 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -89,7 +89,8 @@ class Commands {
bool readCompacted, const
std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo,
proto::CommandSubscribe_InitialPosition
subscriptionInitialPosition,
- bool replicateSubscriptionState,
KeySharedPolicy keySharedPolicy);
+ bool replicateSubscriptionState,
KeySharedPolicy keySharedPolicy,
+ int priorityLevel = 0);
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t
requestId);
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 9d9f2b6..748af71 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -201,6 +201,16 @@ ConsumerConfiguration&
ConsumerConfiguration::setProperties(
return *this;
}
+ConsumerConfiguration& ConsumerConfiguration::setPriorityLevel(int
priorityLevel) {
+ if (priorityLevel < 0) {
+ throw std::invalid_argument("Consumer Config Exception: PriorityLevel
should be nonnegative number.");
+ }
+ impl_->priorityLevel = priorityLevel;
+ return *this;
+}
+
+int ConsumerConfiguration::getPriorityLevel() const { return
impl_->priorityLevel; }
+
ConsumerConfiguration&
ConsumerConfiguration::setKeySharedPolicy(KeySharedPolicy keySharedPolicy) {
impl_->keySharedPolicy = keySharedPolicy.clone();
return *this;
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 502c201..14ef613 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -46,6 +46,7 @@ struct ConsumerConfigurationImpl {
int patternAutoDiscoveryPeriod{60};
bool replicateSubscriptionStateEnabled{false};
std::map<std::string, std::string> properties;
+ int priorityLevel{0};
KeySharedPolicy keySharedPolicy;
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 4b6479d..560ca6d 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -181,7 +181,8 @@ void ConsumerImpl::connectionOpened(const
ClientConnectionPtr& cnx) {
SharedBuffer cmd = Commands::newSubscribe(
topic_, subscription_, consumerId_, requestId, getSubType(),
consumerName_, subscriptionMode_,
startMessageId_, readCompacted_, config_.getProperties(),
config_.getSchema(), getInitialPosition(),
- config_.isReplicateSubscriptionStateEnabled(),
config_.getKeySharedPolicy());
+ config_.isReplicateSubscriptionStateEnabled(),
config_.getKeySharedPolicy(),
+ config_.getPriorityLevel());
cnx->sendRequestWithId(cmd, requestId)
.addListener(
std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(),
cnx, std::placeholders::_1));
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index 99c7cfa..88746dd 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -50,6 +50,7 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) {
ASSERT_EQ(conf.isEncryptionEnabled(), false);
ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), false);
ASSERT_EQ(conf.getProperties().empty(), true);
+ ASSERT_EQ(conf.getPriorityLevel(), 0);
}
TEST(ConsumerConfigurationTest, testCustomConfig) {
@@ -124,6 +125,9 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
conf.setProperty("k1", "v1");
ASSERT_EQ(conf.getProperties()["k1"], "v1");
ASSERT_EQ(conf.hasProperty("k1"), true);
+
+ conf.setPriorityLevel(1);
+ ASSERT_EQ(conf.getPriorityLevel(), 1);
}
TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {