This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f4f4cdd  [C++] Support setting priority for the consumer (#12076)
f4f4cdd is described below

commit f4f4cdd1d5a535f1b93fc4af2d73ce2d58727cb6
Author: metahys <[email protected]>
AuthorDate: Tue Sep 21 01:05:59 2021 +0800

    [C++] Support setting priority for the consumer (#12076)
    
    * [C++] Support setting priority for the consumer
    
    * Fixed formatting
    
    * Fixed code style
    
    Co-authored-by: sunhongyi <[email protected]>
---
 pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h | 13 +++++++++++++
 pulsar-client-cpp/lib/Commands.cc                        |  4 +++-
 pulsar-client-cpp/lib/Commands.h                         |  3 ++-
 pulsar-client-cpp/lib/ConsumerConfiguration.cc           | 10 ++++++++++
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h        |  1 +
 pulsar-client-cpp/lib/ConsumerImpl.cc                    |  3 ++-
 pulsar-client-cpp/tests/ConsumerConfigurationTest.cc     |  4 ++++
 7 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index bf7fdcd..85b1f0a 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -405,6 +405,19 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     ConsumerConfiguration& setProperties(const std::map<std::string, 
std::string>& properties);
 
+    /**
+     * Set the Priority Level for consumer (0 is the default value and means 
the highest priority).
+     *
+     * @param priorityLevel the priority of this consumer
+     * @return the ConsumerConfiguration instance
+     */
+    ConsumerConfiguration& setPriorityLevel(int priorityLevel);
+
+    /**
+     * @return the configured priority for the consumer
+     */
+    int getPriorityLevel() const;
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 87c149c..52821ede 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -258,7 +258,8 @@ SharedBuffer Commands::newSubscribe(const std::string& 
topic, const std::string&
                                     const std::map<std::string, std::string>& 
metadata,
                                     const SchemaInfo& schemaInfo,
                                     CommandSubscribe_InitialPosition 
subscriptionInitialPosition,
-                                    bool replicateSubscriptionState, 
KeySharedPolicy keySharedPolicy) {
+                                    bool replicateSubscriptionState, 
KeySharedPolicy keySharedPolicy,
+                                    int priorityLevel) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::SUBSCRIBE);
     CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -272,6 +273,7 @@ SharedBuffer Commands::newSubscribe(const std::string& 
topic, const std::string&
     subscribe->set_read_compacted(readCompacted);
     subscribe->set_initialposition(subscriptionInitialPosition);
     subscribe->set_replicate_subscription_state(replicateSubscriptionState);
+    subscribe->set_priority_level(priorityLevel);
 
     if (isBuiltInSchema(schemaInfo.getSchemaType())) {
         subscribe->set_allocated_schema(getSchema(schemaInfo));
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 02ebaad..91c218d 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -89,7 +89,8 @@ class Commands {
                                      bool readCompacted, const 
std::map<std::string, std::string>& metadata,
                                      const SchemaInfo& schemaInfo,
                                      proto::CommandSubscribe_InitialPosition 
subscriptionInitialPosition,
-                                     bool replicateSubscriptionState, 
KeySharedPolicy keySharedPolicy);
+                                     bool replicateSubscriptionState, 
KeySharedPolicy keySharedPolicy,
+                                     int priorityLevel = 0);
 
     static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t 
requestId);
 
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 9d9f2b6..748af71 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -201,6 +201,16 @@ ConsumerConfiguration& 
ConsumerConfiguration::setProperties(
     return *this;
 }
 
+ConsumerConfiguration& ConsumerConfiguration::setPriorityLevel(int 
priorityLevel) {
+    if (priorityLevel < 0) {
+        throw std::invalid_argument("Consumer Config Exception: PriorityLevel 
should be nonnegative number.");
+    }
+    impl_->priorityLevel = priorityLevel;
+    return *this;
+}
+
+int ConsumerConfiguration::getPriorityLevel() const { return 
impl_->priorityLevel; }
+
 ConsumerConfiguration& 
ConsumerConfiguration::setKeySharedPolicy(KeySharedPolicy keySharedPolicy) {
     impl_->keySharedPolicy = keySharedPolicy.clone();
     return *this;
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 502c201..14ef613 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -46,6 +46,7 @@ struct ConsumerConfigurationImpl {
     int patternAutoDiscoveryPeriod{60};
     bool replicateSubscriptionStateEnabled{false};
     std::map<std::string, std::string> properties;
+    int priorityLevel{0};
     KeySharedPolicy keySharedPolicy;
 };
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 4b6479d..560ca6d 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -181,7 +181,8 @@ void ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
     SharedBuffer cmd = Commands::newSubscribe(
         topic_, subscription_, consumerId_, requestId, getSubType(), 
consumerName_, subscriptionMode_,
         startMessageId_, readCompacted_, config_.getProperties(), 
config_.getSchema(), getInitialPosition(),
-        config_.isReplicateSubscriptionStateEnabled(), 
config_.getKeySharedPolicy());
+        config_.isReplicateSubscriptionStateEnabled(), 
config_.getKeySharedPolicy(),
+        config_.getPriorityLevel());
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(
             std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), 
cnx, std::placeholders::_1));
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc 
b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index 99c7cfa..88746dd 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -50,6 +50,7 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) {
     ASSERT_EQ(conf.isEncryptionEnabled(), false);
     ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), false);
     ASSERT_EQ(conf.getProperties().empty(), true);
+    ASSERT_EQ(conf.getPriorityLevel(), 0);
 }
 
 TEST(ConsumerConfigurationTest, testCustomConfig) {
@@ -124,6 +125,9 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
     conf.setProperty("k1", "v1");
     ASSERT_EQ(conf.getProperties()["k1"], "v1");
     ASSERT_EQ(conf.hasProperty("k1"), true);
+
+    conf.setPriorityLevel(1);
+    ASSERT_EQ(conf.getPriorityLevel(), 1);
 }
 
 TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {

Reply via email to