This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 834104b [client] add properties to producer for cpp & python client
(#2420)
834104b is described below
commit 834104bba3eab1977114b58f455d735bda2b9fe3
Author: Sijie Guo <[email protected]>
AuthorDate: Wed Aug 22 09:32:46 2018 -0700
[client] add properties to producer for cpp & python client (#2420)
* [client] add properties to producer for cpp & python client
### Motivation
This is a caught-up change to enable properties for producer as java
clients.
### Changes
Enable properties on producer for both cpp & python client
### Results
Properties are added as metadata for CommandProducer. However there is no
way
to verify the producer properties. so I didn't add any specific tests, just
adding properties for both cpp and python clients in the tests, that should
excerise the corresponding code path.
* Add `properties` to pydoc
---
.../include/pulsar/ProducerConfiguration.h | 34 ++++++++++++++++++++++
pulsar-client-cpp/lib/Commands.cc | 11 ++++++-
pulsar-client-cpp/lib/Commands.h | 3 +-
pulsar-client-cpp/lib/ProducerConfiguration.cc | 34 +++++++++++++++++++++-
pulsar-client-cpp/lib/ProducerConfigurationImpl.h | 1 +
pulsar-client-cpp/lib/ProducerImpl.cc | 3 +-
pulsar-client-cpp/python/pulsar/__init__.py | 11 ++++++-
pulsar-client-cpp/python/src/config.cc | 1 +
pulsar-client-cpp/python/test_producer.py | 6 +++-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 2 ++
10 files changed, 100 insertions(+), 6 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 9f7bf1f..45154c5 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -127,6 +127,40 @@ class ProducerConfiguration {
bool isEncryptionEnabled() const;
ProducerConfiguration& addEncryptionKey(std::string key);
+ /**
+ * Check whether the message has a specific property attached.
+ *
+ * @param name the name of the property to check
+ * @return true if the message has the specified property
+ * @return false if the property is not defined
+ */
+ bool hasProperty(const std::string& name) const;
+
+ /**
+ * Get the value of a specific property
+ *
+ * @param name the name of the property
+ * @return the value of the property or null if the property was not
defined
+ */
+ const std::string& getProperty(const std::string& name) const;
+
+ /**
+ * Get all the properties attached to this producer.
+ */
+ std::map<std::string, std::string>& getProperties() const;
+
+ /**
+ * Sets a new property on a message.
+ * @param name the name of the property
+ * @param value the associated value
+ */
+ ProducerConfiguration& setProperty(const std::string& name, const
std::string& value);
+
+ /**
+ * Add all the properties in the provided map
+ */
+ ProducerConfiguration& setProperties(const std::map<std::string,
std::string>& properties);
+
friend class PulsarWrapper;
private:
diff --git a/pulsar-client-cpp/lib/Commands.cc
b/pulsar-client-cpp/lib/Commands.cc
index 8bd0128..4d9f1be 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -22,6 +22,7 @@
#include "pulsar/MessageBuilder.h"
#include "PulsarApi.pb.h"
#include "LogUtils.h"
+#include "PulsarApi.pb.h"
#include "Utils.h"
#include "Url.h"
#include "checksum/ChecksumProvider.h"
@@ -231,13 +232,21 @@ SharedBuffer Commands::newUnsubscribe(uint64_t
consumerId, uint64_t requestId) {
}
SharedBuffer Commands::newProducer(const std::string& topic, uint64_t
producerId,
- const std::string& producerName, uint64_t
requestId) {
+ const std::string& producerName, uint64_t
requestId,
+ const std::map<std::string, std::string>&
metadata) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
producer->set_topic(topic);
producer->set_producer_id(producerId);
producer->set_request_id(requestId);
+ for (std::map<std::string, std::string>::const_iterator it =
metadata.begin(); it != metadata.end();
+ it++) {
+ proto::KeyValue* keyValue = proto::KeyValue().New();
+ keyValue->set_key(it->first);
+ keyValue->set_value(it->second);
+ producer->mutable_metadata()->AddAllocated(keyValue);
+ }
if (!producerName.empty()) {
producer->set_producer_name(producerName);
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index ef1e280..e669953 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -83,7 +83,8 @@ class Commands {
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t
requestId);
static SharedBuffer newProducer(const std::string& topic, uint64_t
producerId,
- const std::string& producerName, uint64_t
requestId);
+ const std::string& producerName, uint64_t
requestId,
+ const std::map<std::string, std::string>&
metadata);
static SharedBuffer newAck(uint64_t consumerId, const
proto::MessageIdData& messageId,
proto::CommandAck_AckType ackType, int
validationError);
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc
b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index ab70db2..9ad2cf9 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -19,6 +19,9 @@
#include <lib/ProducerConfigurationImpl.h>
namespace pulsar {
+
+const static std::string emptyString;
+
ProducerConfiguration::ProducerConfiguration() :
impl_(boost::make_shared<ProducerConfigurationImpl>()) {}
ProducerConfiguration::~ProducerConfiguration() {}
@@ -36,7 +39,6 @@ ProducerConfiguration&
ProducerConfiguration::setProducerName(const std::string&
}
const std::string& ProducerConfiguration::getProducerName() const {
- static const std::string emptyString;
return impl_->producerName.is_present() ? impl_->producerName.value() :
emptyString;
}
@@ -185,4 +187,34 @@ ProducerConfiguration&
ProducerConfiguration::addEncryptionKey(std::string key)
return *this;
}
+bool ProducerConfiguration::hasProperty(const std::string& name) const {
+ const std::map<std::string, std::string>& m = impl_->properties;
+ return m.find(name) != m.end();
+}
+
+const std::string& ProducerConfiguration::getProperty(const std::string& name)
const {
+ if (hasProperty(name)) {
+ const std::map<std::string, std::string>& m = impl_->properties;
+ return m.at(name);
+ } else {
+ return emptyString;
+ }
+}
+
+std::map<std::string, std::string>& ProducerConfiguration::getProperties()
const { return impl_->properties; }
+
+ProducerConfiguration& ProducerConfiguration::setProperty(const std::string&
name, const std::string& value) {
+ impl_->properties.insert(std::make_pair(name, value));
+ return *this;
+}
+
+ProducerConfiguration& ProducerConfiguration::setProperties(
+ const std::map<std::string, std::string>& properties) {
+ for (std::map<std::string, std::string>::const_iterator it =
properties.begin(); it != properties.end();
+ it++) {
+ setProperty(it->first, it->second);
+ }
+ return *this;
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index 3f788a9..6dfaeed 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -44,6 +44,7 @@ struct ProducerConfigurationImpl {
CryptoKeyReaderPtr cryptoKeyReader;
std::set<std::string> encryptionKeys;
ProducerCryptoFailureAction cryptoFailureAction;
+ std::map<std::string, std::string> properties;
ProducerConfigurationImpl()
: sendTimeoutMs(30000),
compressionType(CompressionNone),
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 7f09873..881b189 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -127,7 +127,8 @@ void ProducerImpl::connectionOpened(const
ClientConnectionPtr& cnx) {
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
- SharedBuffer cmd = Commands::newProducer(topic_, producerId_,
producerName_, requestId);
+ SharedBuffer cmd =
+ Commands::newProducer(topic_, producerId_, producerName_, requestId,
conf_.getProperties());
cnx->sendRequestWithId(cmd, requestId)
.addListener(boost::bind(&ProducerImpl::handleCreateProducer,
shared_from_this(), cnx, _1, _2));
}
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py
b/pulsar-client-cpp/python/pulsar/__init__.py
index 222c29f..1185812 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -321,7 +321,8 @@ class Client:
batching_max_messages=1000,
batching_max_allowed_size_in_bytes=128*1024,
batching_max_publish_delay_ms=10,
-
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution
+
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
+ properties=None,
):
"""
Create a new producer on a given topic.
@@ -361,6 +362,9 @@ class Client:
* `message_routing_mode`:
Set the message routing mode for the partitioned producer. Default
is `PartitionsRoutingMode.RoundRobinDistribution`,
other option is `PartitionsRoutingMode.UseSinglePartition`
+ * `properties`:
+ Sets the properties for the producer. The properties associated with
a producer
+ can be used for identify a producer at broker side.
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
@@ -374,6 +378,7 @@ class Client:
_check_type(int, batching_max_messages, 'batching_max_messages')
_check_type(int, batching_max_allowed_size_in_bytes,
'batching_max_allowed_size_in_bytes')
_check_type(int, batching_max_publish_delay_ms,
'batching_max_publish_delay_ms')
+ _check_type_or_none(dict, properties, 'properties')
conf = _pulsar.ProducerConfiguration()
conf.send_timeout_millis(send_timeout_millis)
@@ -390,6 +395,10 @@ class Client:
conf.producer_name(producer_name)
if initial_sequence_id:
conf.initial_sequence_id(initial_sequence_id)
+ if properties:
+ for k, v in properties.items():
+ conf.property(k, v)
+
p = Producer()
p._producer = self._client.create_producer(topic, conf)
return p
diff --git a/pulsar-client-cpp/python/src/config.cc
b/pulsar-client-cpp/python/src/config.cc
index 7b4459a..9626049 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -122,6 +122,7 @@ void export_config() {
.def("batching_max_allowed_size_in_bytes",
&ProducerConfiguration::setBatchingMaxAllowedSizeInBytes, return_self<>())
.def("batching_max_publish_delay_ms",
&ProducerConfiguration::getBatchingMaxPublishDelayMs,
return_value_policy<copy_const_reference>())
.def("batching_max_publish_delay_ms",
&ProducerConfiguration::setBatchingMaxPublishDelayMs, return_self<>())
+ .def("property", &ProducerConfiguration::setProperty,
return_self<>())
;
class_<ConsumerConfiguration>("ConsumerConfiguration")
diff --git a/pulsar-client-cpp/python/test_producer.py
b/pulsar-client-cpp/python/test_producer.py
index a3dd1f8..3bd1537 100755
--- a/pulsar-client-cpp/python/test_producer.py
+++ b/pulsar-client-cpp/python/test_producer.py
@@ -27,7 +27,11 @@ producer = client.create_producer(
'my-topic',
block_if_queue_full=True,
batching_enabled=True,
- batching_max_publish_delay_ms=10
+ batching_max_publish_delay_ms=10,
+ properties={
+ "producer-name": "test-producer-name",
+ "producer-id": "test-producer-id"
+ }
)
for i in range(10):
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index b1b05ef..e87850c 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -126,6 +126,8 @@ TEST(BasicEndToEndTest, testBatchMessages) {
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingEnabled(true);
conf.setBlockIfQueueFull(true);
+ conf.setProperty("producer-name", "test-producer-name");
+ conf.setProperty("producer-id", "test-producer-id");
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, conf,
WaitForCallbackValue<Producer>(producerPromise));