merlimat closed pull request #2420: [client] add properties to producer for cpp 
& python client
URL: https://github.com/apache/incubator-pulsar/pull/2420
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 9f7bf1fe17..45154c5c6a 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -127,6 +127,40 @@ class ProducerConfiguration {
     bool isEncryptionEnabled() const;
     ProducerConfiguration& addEncryptionKey(std::string key);
 
+    /**
+     * Check whether the message has a specific property attached.
+     *
+     * @param name the name of the property to check
+     * @return true if the message has the specified property
+     * @return false if the property is not defined
+     */
+    bool hasProperty(const std::string& name) const;
+
+    /**
+     * Get the value of a specific property
+     *
+     * @param name the name of the property
+     * @return the value of the property or null if the property was not 
defined
+     */
+    const std::string& getProperty(const std::string& name) const;
+
+    /**
+     * Get all the properties attached to this producer.
+     */
+    std::map<std::string, std::string>& getProperties() const;
+
+    /**
+     * Sets a new property on a message.
+     * @param name   the name of the property
+     * @param value  the associated value
+     */
+    ProducerConfiguration& setProperty(const std::string& name, const 
std::string& value);
+
+    /**
+     * Add all the properties in the provided map
+     */
+    ProducerConfiguration& setProperties(const std::map<std::string, 
std::string>& properties);
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 8a1933bde7..6461d014f0 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -21,12 +21,14 @@
 #include "Version.h"
 #include "pulsar/MessageBuilder.h"
 #include "LogUtils.h"
+#include "PulsarApi.pb.h"
 #include "Utils.h"
 #include "Url.h"
 #include "checksum/ChecksumProvider.h"
 #include <algorithm>
 #include <boost/thread/mutex.hpp>
 
+using namespace pulsar;
 namespace pulsar {
 
 using namespace pulsar::proto;
@@ -221,13 +223,21 @@ SharedBuffer Commands::newUnsubscribe(uint64_t 
consumerId, uint64_t requestId) {
 }
 
 SharedBuffer Commands::newProducer(const std::string& topic, uint64_t 
producerId,
-                                   const std::string& producerName, uint64_t 
requestId) {
+                                   const std::string& producerName, uint64_t 
requestId,
+                                   const std::map<std::string, std::string>& 
metadata) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::PRODUCER);
     CommandProducer* producer = cmd.mutable_producer();
     producer->set_topic(topic);
     producer->set_producer_id(producerId);
     producer->set_request_id(requestId);
+    for (std::map<std::string, std::string>::const_iterator it = 
metadata.begin(); it != metadata.end();
+         it++) {
+        proto::KeyValue* keyValue = proto::KeyValue().New();
+        keyValue->set_key(it->first);
+        keyValue->set_value(it->second);
+        producer->mutable_metadata()->AddAllocated(keyValue);
+    }
 
     if (!producerName.empty()) {
         producer->set_producer_name(producerName);
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index d9b8589fe2..fbae9fa2f3 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -83,7 +83,8 @@ class Commands {
     static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t 
requestId);
 
     static SharedBuffer newProducer(const std::string& topic, uint64_t 
producerId,
-                                    const std::string& producerName, uint64_t 
requestId);
+                                    const std::string& producerName, uint64_t 
requestId,
+                                    const std::map<std::string, std::string>& 
metadata);
 
     static SharedBuffer newAck(uint64_t consumerId, const 
proto::MessageIdData& messageId,
                                proto::CommandAck_AckType ackType, int 
validationError);
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc 
b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index ab70db225c..9ad2cf90a3 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -19,6 +19,9 @@
 #include <lib/ProducerConfigurationImpl.h>
 
 namespace pulsar {
+
+const static std::string emptyString;
+
 ProducerConfiguration::ProducerConfiguration() : 
impl_(boost::make_shared<ProducerConfigurationImpl>()) {}
 
 ProducerConfiguration::~ProducerConfiguration() {}
@@ -36,7 +39,6 @@ ProducerConfiguration& 
ProducerConfiguration::setProducerName(const std::string&
 }
 
 const std::string& ProducerConfiguration::getProducerName() const {
-    static const std::string emptyString;
     return impl_->producerName.is_present() ? impl_->producerName.value() : 
emptyString;
 }
 
@@ -185,4 +187,34 @@ ProducerConfiguration& 
ProducerConfiguration::addEncryptionKey(std::string key)
     return *this;
 }
 
+bool ProducerConfiguration::hasProperty(const std::string& name) const {
+    const std::map<std::string, std::string>& m = impl_->properties;
+    return m.find(name) != m.end();
+}
+
+const std::string& ProducerConfiguration::getProperty(const std::string& name) 
const {
+    if (hasProperty(name)) {
+        const std::map<std::string, std::string>& m = impl_->properties;
+        return m.at(name);
+    } else {
+        return emptyString;
+    }
+}
+
+std::map<std::string, std::string>& ProducerConfiguration::getProperties() 
const { return impl_->properties; }
+
+ProducerConfiguration& ProducerConfiguration::setProperty(const std::string& 
name, const std::string& value) {
+    impl_->properties.insert(std::make_pair(name, value));
+    return *this;
+}
+
+ProducerConfiguration& ProducerConfiguration::setProperties(
+    const std::map<std::string, std::string>& properties) {
+    for (std::map<std::string, std::string>::const_iterator it = 
properties.begin(); it != properties.end();
+         it++) {
+        setProperty(it->first, it->second);
+    }
+    return *this;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index 3f788a95c4..6dfaeedf3a 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -44,6 +44,7 @@ struct ProducerConfigurationImpl {
     CryptoKeyReaderPtr cryptoKeyReader;
     std::set<std::string> encryptionKeys;
     ProducerCryptoFailureAction cryptoFailureAction;
+    std::map<std::string, std::string> properties;
     ProducerConfigurationImpl()
         : sendTimeoutMs(30000),
           compressionType(CompressionNone),
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 7f0987351a..881b1899c8 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -127,7 +127,8 @@ void ProducerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
 
-    SharedBuffer cmd = Commands::newProducer(topic_, producerId_, 
producerName_, requestId);
+    SharedBuffer cmd =
+        Commands::newProducer(topic_, producerId_, producerName_, requestId, 
conf_.getProperties());
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(boost::bind(&ProducerImpl::handleCreateProducer, 
shared_from_this(), cnx, _1, _2));
 }
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index f3b560b747..3f57016dfd 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -321,7 +321,8 @@ def create_producer(self, topic,
                         batching_max_messages=1000,
                         batching_max_allowed_size_in_bytes=128*1024,
                         batching_max_publish_delay_ms=10,
-                        
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution
+                        
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
+                        properties=None,
                         ):
         """
         Create a new producer on a given topic.
@@ -361,6 +362,9 @@ def create_producer(self, topic,
         * `message_routing_mode`:
           Set the message routing mode for the partitioned producer. Default 
is `PartitionsRoutingMode.RoundRobinDistribution`,
           other option is `PartitionsRoutingMode.UseSinglePartition`
+        * `properties`:
+          Sets the properties for the producer. The properties associated with 
a producer
+          can be used for identify a producer at broker side.
         """
         _check_type(str, topic, 'topic')
         _check_type_or_none(str, producer_name, 'producer_name')
@@ -374,6 +378,7 @@ def create_producer(self, topic,
         _check_type(int, batching_max_messages, 'batching_max_messages')
         _check_type(int, batching_max_allowed_size_in_bytes, 
'batching_max_allowed_size_in_bytes')
         _check_type(int, batching_max_publish_delay_ms, 
'batching_max_publish_delay_ms')
+        _check_type_or_none(dict, properties, 'properties')
 
         conf = _pulsar.ProducerConfiguration()
         conf.send_timeout_millis(send_timeout_millis)
@@ -390,6 +395,10 @@ def create_producer(self, topic,
             conf.producer_name(producer_name)
         if initial_sequence_id:
             conf.initial_sequence_id(initial_sequence_id)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
         p = Producer()
         p._producer = self._client.create_producer(topic, conf)
         return p
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index 9deee9af78..19a887ebba 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -122,6 +122,7 @@ void export_config() {
             .def("batching_max_allowed_size_in_bytes", 
&ProducerConfiguration::setBatchingMaxAllowedSizeInBytes, return_self<>())
             .def("batching_max_publish_delay_ms", 
&ProducerConfiguration::getBatchingMaxPublishDelayMs, 
return_value_policy<copy_const_reference>())
             .def("batching_max_publish_delay_ms", 
&ProducerConfiguration::setBatchingMaxPublishDelayMs, return_self<>())
+            .def("property", &ProducerConfiguration::setProperty, 
return_self<>())
             ;
 
     class_<ConsumerConfiguration>("ConsumerConfiguration")
diff --git a/pulsar-client-cpp/python/test_producer.py 
b/pulsar-client-cpp/python/test_producer.py
index a3dd1f8285..3bd1537ed4 100755
--- a/pulsar-client-cpp/python/test_producer.py
+++ b/pulsar-client-cpp/python/test_producer.py
@@ -27,7 +27,11 @@
                     'my-topic',
                     block_if_queue_full=True,
                     batching_enabled=True,
-                    batching_max_publish_delay_ms=10
+                    batching_max_publish_delay_ms=10,
+                    properties={
+                        "producer-name": "test-producer-name",
+                        "producer-id": "test-producer-id"
+                    }
                 )
 
 for i in range(10):
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index d4c1df80c0..06197f7739 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -126,6 +126,8 @@ TEST(BasicEndToEndTest, testBatchMessages) {
     conf.setBatchingMaxMessages(batchSize);
     conf.setBatchingEnabled(true);
     conf.setBlockIfQueueFull(true);
+    conf.setProperty("producer-name", "test-producer-name");
+    conf.setProperty("producer-id", "test-producer-id");
 
     Promise<Result, Producer> producerPromise;
     client.createProducerAsync(topicName, conf, 
WaitForCallbackValue<Producer>(producerPromise));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to