This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 79b88cf  Added producer/consumer properties in Go client (#2447)
79b88cf is described below

commit 79b88cf66eb2a4d337e21ade7f699c7d641b5c9a
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Fri Aug 31 15:39:11 2018 -0700

    Added producer/consumer properties in Go client (#2447)
---
 pulsar-client-cpp/include/pulsar/ProducerConfiguration.h    |  5 +++--
 pulsar-client-cpp/include/pulsar/c/consumer_configuration.h |  3 +++
 pulsar-client-cpp/include/pulsar/c/producer_configuration.h |  3 +++
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc          |  5 +++++
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc          |  5 +++++
 pulsar-client-go/pulsar/c_consumer.go                       | 12 ++++++++++++
 pulsar-client-go/pulsar/c_producer.go                       | 12 ++++++++++++
 pulsar-client-go/pulsar/consumer.go                         |  4 ++++
 pulsar-client-go/pulsar/producer.go                         |  4 ++++
 pulsar-client-go/pulsar/producer_test.go                    |  8 ++++++--
 10 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 45154c5..565a6ab 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -128,7 +128,7 @@ class ProducerConfiguration {
     ProducerConfiguration& addEncryptionKey(std::string key);
 
     /**
-     * Check whether the message has a specific property attached.
+     * Check whether the producer has a specific property attached.
      *
      * @param name the name of the property to check
      * @return true if the message has the specified property
@@ -150,7 +150,8 @@ class ProducerConfiguration {
     std::map<std::string, std::string>& getProperties() const;
 
     /**
-     * Sets a new property on a message.
+     * Sets a new property on the producer
+     * .
      * @param name   the name of the property
      * @param value  the associated value
      */
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h 
b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 7299867..fca47eb 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -152,6 +152,9 @@ int 
pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_
 void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t 
*consumer_configuration,
                                         int compacted);
 
+void 
pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t 
*conf, const char *name,
+                                                const char *value);
+
 // const CryptoKeyReaderPtr getCryptoKeyReader()
 //
 // const;
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h 
b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index ae88198..670bf50 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -134,6 +134,9 @@ void 
pulsar_producer_configuration_set_batching_max_publish_delay_ms(pulsar_prod
 unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
     pulsar_producer_configuration_t *conf);
 
+void 
pulsar_producer_configuration_set_property(pulsar_producer_configuration_t 
*conf, const char *name,
+                                                const char *value);
+
 // const CryptoKeyReaderPtr getCryptoKeyReader() const;
 // ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr 
cryptoKeyReader);
 //
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index c8d5453..75cdc47 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -113,3 +113,8 @@ void 
pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consume
                                         int compacted) {
     consumer_configuration->consumerConfiguration.setReadCompacted(compacted);
 }
+
+void 
pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t 
*conf, const char *name,
+                                                const char *value) {
+    conf->consumerConfiguration.setProperty(name, value);
+}
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc 
b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index 914fc0a..a8eb5be 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -174,3 +174,8 @@ unsigned long 
pulsar_producer_configuration_get_batching_max_publish_delay_ms(
     pulsar_producer_configuration_t *conf) {
     return conf->conf.getBatchingMaxPublishDelayMs();
 }
+
+void 
pulsar_producer_configuration_set_property(pulsar_producer_configuration_t 
*conf, const char *name,
+                                                const char *value) {
+    conf->conf.setProperty(name, value);
+}
diff --git a/pulsar-client-go/pulsar/c_consumer.go 
b/pulsar-client-go/pulsar/c_consumer.go
index 7f613b2..1b41a71 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -120,6 +120,18 @@ func subscribeAsync(client *client, options 
ConsumerOptions, callback func(Consu
                C.pulsar_consumer_set_consumer_name(conf, name)
        }
 
+       if options.Properties != nil {
+               for key, value := range options.Properties {
+                       cKey := C.CString(key)
+                       cValue := C.CString(value)
+
+                       C.pulsar_consumer_configuration_set_property(conf, 
cKey, cValue)
+
+                       C.free(unsafe.Pointer(cKey))
+                       C.free(unsafe.Pointer(cValue))
+               }
+       }
+
        C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted))
 
        subName := C.CString(options.SubscriptionName)
diff --git a/pulsar-client-go/pulsar/c_producer.go 
b/pulsar-client-go/pulsar/c_producer.go
index b4cd2c5..284315d 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -124,6 +124,18 @@ func createProducerAsync(client *client, options 
ProducerOptions, callback func(
                C.pulsar_producer_configuration_set_batching_max_messages(conf, 
C.uint(options.BatchingMaxMessages))
        }
 
+       if options.Properties != nil {
+               for key, value := range options.Properties {
+                       cKey := C.CString(key)
+                       cValue := C.CString(value)
+
+                       C.pulsar_producer_configuration_set_property(conf, 
cKey, cValue)
+
+                       C.free(unsafe.Pointer(cKey))
+                       C.free(unsafe.Pointer(cValue))
+               }
+       }
+
        topicName := C.CString(options.Topic)
        defer C.free(unsafe.Pointer(topicName))
 
diff --git a/pulsar-client-go/pulsar/consumer.go 
b/pulsar-client-go/pulsar/consumer.go
index b9f2616..030ba1b 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -64,6 +64,10 @@ type ConsumerOptions struct {
        // This argument is required when subscribing
        SubscriptionName string
 
+       // Attach a set of application defined properties to the consumer
+       // This properties will be visible in the topic stats
+       Properties map[string]string
+
        // Set the timeout for unacked messages
        // Message not acknowledged within the give time, will be replayed by 
the broker to the same or a different consumer
        // Default is 0, which means message are not being replayed based on 
ack time
diff --git a/pulsar-client-go/pulsar/producer.go 
b/pulsar-client-go/pulsar/producer.go
index 2cfd141..46d6dd6 100644
--- a/pulsar-client-go/pulsar/producer.go
+++ b/pulsar-client-go/pulsar/producer.go
@@ -71,6 +71,10 @@ type ProducerOptions struct {
        // a topic.
        Name string
 
+       // Attach a set of application defined properties to the producer
+       // This properties will be visible in the topic stats
+       Properties map[string]string
+
        // Set the send timeout (default: 30 seconds)
        // If a message is not acknowledged by the server before the 
sendTimeout expires, an error will be reported.
        // Setting the timeout to -1, will set the timeout to infinity, which 
can be useful when using Pulsar's message
diff --git a/pulsar-client-go/pulsar/producer_test.go 
b/pulsar-client-go/pulsar/producer_test.go
index 940be85..cfa0bcb 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-       "testing"
-       "fmt"
        "context"
+       "fmt"
+       "testing"
        "time"
 )
 
@@ -77,6 +77,10 @@ func TestProducer(t *testing.T) {
                MaxPendingMessages:      100,
                BlockIfQueueFull:        true,
                CompressionType:         LZ4,
+               Properties: map[string]string{
+                       "my-name": "test",
+                       "key":     "value",
+               },
        })
 
        assertNil(t, err)

Reply via email to