This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new d7ec1c5  [client] add properties to producer for cpp & python client 
(#2420)
d7ec1c5 is described below

commit d7ec1c5051ee3f7066cd1ba1572606760a3c371e
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Wed Aug 22 09:32:46 2018 -0700

    [client] add properties to producer for cpp & python client (#2420)
    
    * [client] add properties to producer for cpp & python client
    
     ### Motivation
    
    This is a caught-up change to enable properties for producer as java 
clients.
    
     ### Changes
    
    Enable properties on producer for both cpp & python client
    
     ### Results
    
    Properties are added as metadata for CommandProducer. However there is no 
way
    to verify the producer properties. so I didn't add any specific tests, just
    adding properties for both cpp and python clients in the tests, that should
    excerise the corresponding code path.
    
    * Add `properties` to pydoc
---
 .../include/pulsar/ProducerConfiguration.h         | 34 ++++++++++++++++++++++
 pulsar-client-cpp/lib/Commands.cc                  | 11 ++++++-
 pulsar-client-cpp/lib/Commands.h                   |  3 +-
 pulsar-client-cpp/lib/ProducerConfiguration.cc     | 34 +++++++++++++++++++++-
 pulsar-client-cpp/lib/ProducerConfigurationImpl.h  |  1 +
 pulsar-client-cpp/lib/ProducerImpl.cc              |  3 +-
 pulsar-client-cpp/python/pulsar/__init__.py        | 11 ++++++-
 pulsar-client-cpp/python/src/config.cc             |  1 +
 pulsar-client-cpp/python/test_producer.py          |  6 +++-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |  2 ++
 10 files changed, 100 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 9f7bf1f..45154c5 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -127,6 +127,40 @@ class ProducerConfiguration {
     bool isEncryptionEnabled() const;
     ProducerConfiguration& addEncryptionKey(std::string key);
 
+    /**
+     * Check whether the message has a specific property attached.
+     *
+     * @param name the name of the property to check
+     * @return true if the message has the specified property
+     * @return false if the property is not defined
+     */
+    bool hasProperty(const std::string& name) const;
+
+    /**
+     * Get the value of a specific property
+     *
+     * @param name the name of the property
+     * @return the value of the property or null if the property was not 
defined
+     */
+    const std::string& getProperty(const std::string& name) const;
+
+    /**
+     * Get all the properties attached to this producer.
+     */
+    std::map<std::string, std::string>& getProperties() const;
+
+    /**
+     * Sets a new property on a message.
+     * @param name   the name of the property
+     * @param value  the associated value
+     */
+    ProducerConfiguration& setProperty(const std::string& name, const 
std::string& value);
+
+    /**
+     * Add all the properties in the provided map
+     */
+    ProducerConfiguration& setProperties(const std::map<std::string, 
std::string>& properties);
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 8bd0128..4d9f1be 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -22,6 +22,7 @@
 #include "pulsar/MessageBuilder.h"
 #include "PulsarApi.pb.h"
 #include "LogUtils.h"
+#include "PulsarApi.pb.h"
 #include "Utils.h"
 #include "Url.h"
 #include "checksum/ChecksumProvider.h"
@@ -231,13 +232,21 @@ SharedBuffer Commands::newUnsubscribe(uint64_t 
consumerId, uint64_t requestId) {
 }
 
 SharedBuffer Commands::newProducer(const std::string& topic, uint64_t 
producerId,
-                                   const std::string& producerName, uint64_t 
requestId) {
+                                   const std::string& producerName, uint64_t 
requestId,
+                                   const std::map<std::string, std::string>& 
metadata) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::PRODUCER);
     CommandProducer* producer = cmd.mutable_producer();
     producer->set_topic(topic);
     producer->set_producer_id(producerId);
     producer->set_request_id(requestId);
+    for (std::map<std::string, std::string>::const_iterator it = 
metadata.begin(); it != metadata.end();
+         it++) {
+        proto::KeyValue* keyValue = proto::KeyValue().New();
+        keyValue->set_key(it->first);
+        keyValue->set_value(it->second);
+        producer->mutable_metadata()->AddAllocated(keyValue);
+    }
 
     if (!producerName.empty()) {
         producer->set_producer_name(producerName);
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index ef1e280..e669953 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -83,7 +83,8 @@ class Commands {
     static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t 
requestId);
 
     static SharedBuffer newProducer(const std::string& topic, uint64_t 
producerId,
-                                    const std::string& producerName, uint64_t 
requestId);
+                                    const std::string& producerName, uint64_t 
requestId,
+                                    const std::map<std::string, std::string>& 
metadata);
 
     static SharedBuffer newAck(uint64_t consumerId, const 
proto::MessageIdData& messageId,
                                proto::CommandAck_AckType ackType, int 
validationError);
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc 
b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index ab70db2..9ad2cf9 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -19,6 +19,9 @@
 #include <lib/ProducerConfigurationImpl.h>
 
 namespace pulsar {
+
+const static std::string emptyString;
+
 ProducerConfiguration::ProducerConfiguration() : 
impl_(boost::make_shared<ProducerConfigurationImpl>()) {}
 
 ProducerConfiguration::~ProducerConfiguration() {}
@@ -36,7 +39,6 @@ ProducerConfiguration& 
ProducerConfiguration::setProducerName(const std::string&
 }
 
 const std::string& ProducerConfiguration::getProducerName() const {
-    static const std::string emptyString;
     return impl_->producerName.is_present() ? impl_->producerName.value() : 
emptyString;
 }
 
@@ -185,4 +187,34 @@ ProducerConfiguration& 
ProducerConfiguration::addEncryptionKey(std::string key)
     return *this;
 }
 
+bool ProducerConfiguration::hasProperty(const std::string& name) const {
+    const std::map<std::string, std::string>& m = impl_->properties;
+    return m.find(name) != m.end();
+}
+
+const std::string& ProducerConfiguration::getProperty(const std::string& name) 
const {
+    if (hasProperty(name)) {
+        const std::map<std::string, std::string>& m = impl_->properties;
+        return m.at(name);
+    } else {
+        return emptyString;
+    }
+}
+
+std::map<std::string, std::string>& ProducerConfiguration::getProperties() 
const { return impl_->properties; }
+
+ProducerConfiguration& ProducerConfiguration::setProperty(const std::string& 
name, const std::string& value) {
+    impl_->properties.insert(std::make_pair(name, value));
+    return *this;
+}
+
+ProducerConfiguration& ProducerConfiguration::setProperties(
+    const std::map<std::string, std::string>& properties) {
+    for (std::map<std::string, std::string>::const_iterator it = 
properties.begin(); it != properties.end();
+         it++) {
+        setProperty(it->first, it->second);
+    }
+    return *this;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index 3f788a9..6dfaeed 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -44,6 +44,7 @@ struct ProducerConfigurationImpl {
     CryptoKeyReaderPtr cryptoKeyReader;
     std::set<std::string> encryptionKeys;
     ProducerCryptoFailureAction cryptoFailureAction;
+    std::map<std::string, std::string> properties;
     ProducerConfigurationImpl()
         : sendTimeoutMs(30000),
           compressionType(CompressionNone),
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 7f09873..881b189 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -127,7 +127,8 @@ void ProducerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
 
-    SharedBuffer cmd = Commands::newProducer(topic_, producerId_, 
producerName_, requestId);
+    SharedBuffer cmd =
+        Commands::newProducer(topic_, producerId_, producerName_, requestId, 
conf_.getProperties());
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(boost::bind(&ProducerImpl::handleCreateProducer, 
shared_from_this(), cnx, _1, _2));
 }
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 222c29f..1185812 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -321,7 +321,8 @@ class Client:
                         batching_max_messages=1000,
                         batching_max_allowed_size_in_bytes=128*1024,
                         batching_max_publish_delay_ms=10,
-                        
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution
+                        
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
+                        properties=None,
                         ):
         """
         Create a new producer on a given topic.
@@ -361,6 +362,9 @@ class Client:
         * `message_routing_mode`:
           Set the message routing mode for the partitioned producer. Default 
is `PartitionsRoutingMode.RoundRobinDistribution`,
           other option is `PartitionsRoutingMode.UseSinglePartition`
+        * `properties`:
+          Sets the properties for the producer. The properties associated with 
a producer
+          can be used for identify a producer at broker side.
         """
         _check_type(str, topic, 'topic')
         _check_type_or_none(str, producer_name, 'producer_name')
@@ -374,6 +378,7 @@ class Client:
         _check_type(int, batching_max_messages, 'batching_max_messages')
         _check_type(int, batching_max_allowed_size_in_bytes, 
'batching_max_allowed_size_in_bytes')
         _check_type(int, batching_max_publish_delay_ms, 
'batching_max_publish_delay_ms')
+        _check_type_or_none(dict, properties, 'properties')
 
         conf = _pulsar.ProducerConfiguration()
         conf.send_timeout_millis(send_timeout_millis)
@@ -390,6 +395,10 @@ class Client:
             conf.producer_name(producer_name)
         if initial_sequence_id:
             conf.initial_sequence_id(initial_sequence_id)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
         p = Producer()
         p._producer = self._client.create_producer(topic, conf)
         return p
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index 7b4459a..9626049 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -122,6 +122,7 @@ void export_config() {
             .def("batching_max_allowed_size_in_bytes", 
&ProducerConfiguration::setBatchingMaxAllowedSizeInBytes, return_self<>())
             .def("batching_max_publish_delay_ms", 
&ProducerConfiguration::getBatchingMaxPublishDelayMs, 
return_value_policy<copy_const_reference>())
             .def("batching_max_publish_delay_ms", 
&ProducerConfiguration::setBatchingMaxPublishDelayMs, return_self<>())
+            .def("property", &ProducerConfiguration::setProperty, 
return_self<>())
             ;
 
     class_<ConsumerConfiguration>("ConsumerConfiguration")
diff --git a/pulsar-client-cpp/python/test_producer.py 
b/pulsar-client-cpp/python/test_producer.py
index a3dd1f8..3bd1537 100755
--- a/pulsar-client-cpp/python/test_producer.py
+++ b/pulsar-client-cpp/python/test_producer.py
@@ -27,7 +27,11 @@ producer = client.create_producer(
                     'my-topic',
                     block_if_queue_full=True,
                     batching_enabled=True,
-                    batching_max_publish_delay_ms=10
+                    batching_max_publish_delay_ms=10,
+                    properties={
+                        "producer-name": "test-producer-name",
+                        "producer-id": "test-producer-id"
+                    }
                 )
 
 for i in range(10):
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index b1b05ef..e87850c 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -126,6 +126,8 @@ TEST(BasicEndToEndTest, testBatchMessages) {
     conf.setBatchingMaxMessages(batchSize);
     conf.setBatchingEnabled(true);
     conf.setBlockIfQueueFull(true);
+    conf.setProperty("producer-name", "test-producer-name");
+    conf.setProperty("producer-id", "test-producer-id");
 
     Promise<Result, Producer> producerPromise;
     client.createProducerAsync(topicName, conf, 
WaitForCallbackValue<Producer>(producerPromise));

Reply via email to