merlimat closed pull request #2448: Add multi-topic and regex consumer in Go 
client
URL: https://github.com/apache/incubator-pulsar/pull/2448
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/c/client.h 
b/pulsar-client-cpp/include/pulsar/c/client.h
index 2da7c6de92..4b603bbff3 100644
--- a/pulsar-client-cpp/include/pulsar/c/client.h
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -86,6 +86,16 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, 
const char *topic, c
                                    const pulsar_consumer_configuration_t *conf,
                                    pulsar_subscribe_callback callback, void 
*ctx);
 
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const 
char **topics, int topicsCount,
+                                                const char *subscriptionName,
+                                                const 
pulsar_consumer_configuration_t *conf,
+                                                pulsar_subscribe_callback 
callback, void *ctx);
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char 
*topicPattern,
+                                           const char *subscriptionName,
+                                           const 
pulsar_consumer_configuration_t *conf,
+                                           pulsar_subscribe_callback callback, 
void *ctx);
+
 /**
  * Create a topic reader with given {@code ReaderConfiguration} for reading 
messages from the specified
  * topic.
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc 
b/pulsar-client-cpp/lib/c/c_Client.cc
index 905e41067f..ecefe20c03 100644
--- a/pulsar-client-cpp/lib/c/c_Client.cc
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -98,6 +98,27 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, 
const char *topic, c
                                    boost::bind(&handle_subscribe_callback, _1, 
_2, callback, ctx));
 }
 
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const 
char **topics, int topicsCount,
+                                                const char *subscriptionName,
+                                                const 
pulsar_consumer_configuration_t *conf,
+                                                pulsar_subscribe_callback 
callback, void *ctx) {
+    std::vector<std::string> topicsList;
+    for (int i = 0; i < topicsCount; i++) {
+        topicsList.push_back(topics[i]);
+    }
+
+    client->client->subscribeAsync(topicsList, subscriptionName, 
conf->consumerConfiguration,
+                                   boost::bind(&handle_subscribe_callback, _1, 
_2, callback, ctx));
+}
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char 
*topicPattern,
+                                           const char *subscriptionName,
+                                           const 
pulsar_consumer_configuration_t *conf,
+                                           pulsar_subscribe_callback callback, 
void *ctx) {
+    client->client->subscribeWithRegexAsync(topicPattern, subscriptionName, 
conf->consumerConfiguration,
+                                            
boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
+}
+
 pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char 
*topic,
                                           const pulsar_message_id_t 
*startMessageId,
                                           pulsar_reader_configuration_t *conf, 
pulsar_reader_t **c_reader) {
diff --git a/pulsar-client-go/pulsar/c_consumer.go 
b/pulsar-client-go/pulsar/c_consumer.go
index aebabc877d..093dd9dc6e 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -25,10 +25,10 @@ package pulsar
 import "C"
 
 import (
+       "context"
        "runtime"
        "time"
        "unsafe"
-       "context"
 )
 
 type consumer struct {
@@ -64,7 +64,7 @@ type subscribeContext struct {
 }
 
 func subscribeAsync(client *client, options ConsumerOptions, callback 
func(Consumer, error)) {
-       if options.Topic == "" {
+       if options.Topic == "" && options.Topics == nil && 
options.TopicsPattern == "" {
                go callback(nil, newError(C.pulsar_result_InvalidConfiguration, 
"topic is required"))
                return
        }
@@ -120,12 +120,38 @@ func subscribeAsync(client *client, options 
ConsumerOptions, callback func(Consu
                C.pulsar_consumer_set_consumer_name(conf, name)
        }
 
-       topic := C.CString(options.Topic)
        subName := C.CString(options.SubscriptionName)
-       defer C.free(unsafe.Pointer(topic))
        defer C.free(unsafe.Pointer(subName))
-       C._pulsar_client_subscribe_async(client.ptr, topic, subName,
-               conf, savePointer(&subscribeContext{conf: conf, consumer: 
consumer, callback: callback}))
+
+       callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: 
consumer, callback: callback})
+
+       if options.Topic != "" {
+               topic := C.CString(options.Topic)
+               defer C.free(unsafe.Pointer(topic))
+               C._pulsar_client_subscribe_async(client.ptr, topic, subName, 
conf, callbackPtr)
+       } else if options.Topics != nil {
+               cArray := C.malloc(C.size_t(len(options.Topics)) * 
C.size_t(unsafe.Sizeof(uintptr(0))))
+
+               // convert the C array to a Go Array so we can index it
+               a := (*[1<<30 - 1]*C.char)(cArray)
+
+               for idx, topic := range options.Topics {
+                       a[idx] = C.CString(topic)
+               }
+
+               C._pulsar_client_subscribe_multi_topics_async(client.ptr, 
(**C.char)(cArray), C.int(len(options.Topics)),
+                       subName, conf, callbackPtr)
+
+               for idx, _ := range options.Topics {
+                       C.free(unsafe.Pointer(a[idx]))
+               }
+
+               C.free(cArray)
+       } else if options.TopicsPattern != "" {
+               topicsPattern := C.CString(options.TopicsPattern)
+               defer C.free(unsafe.Pointer(topicsPattern))
+               C._pulsar_client_subscribe_pattern_async(client.ptr, 
topicsPattern, subName, conf, callbackPtr)
+       }
 }
 
 type consumerCallback struct {
diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h 
b/pulsar-client-go/pulsar/c_go_pulsar.h
index 045a4a2a84..881427610e 100644
--- a/pulsar-client-go/pulsar/c_go_pulsar.h
+++ b/pulsar-client-go/pulsar/c_go_pulsar.h
@@ -73,6 +73,19 @@ static inline void 
_pulsar_client_subscribe_async(pulsar_client_t *client, const
     pulsar_client_subscribe_async(client, topic, subscriptionName, conf, 
pulsarSubscribeCallbackProxy, ctx);
 }
 
+static inline void _pulsar_client_subscribe_multi_topics_async(pulsar_client_t 
*client, const char ** topics,
+                                                  int topicsCount,  const char 
*subscriptionName,
+                                                  const 
pulsar_consumer_configuration_t *conf, void *ctx) {
+    pulsar_client_subscribe_multi_topics_async(client, topics, topicsCount, 
subscriptionName, conf,
+                                               pulsarSubscribeCallbackProxy, 
ctx);
+}
+
+static inline void _pulsar_client_subscribe_pattern_async(pulsar_client_t 
*client, const char *topicPattern,
+                                                  const char *subscriptionName,
+                                                  const 
pulsar_consumer_configuration_t *conf, void *ctx) {
+    pulsar_client_subscribe_pattern_async(client, topicPattern, 
subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx);
+}
+
 void pulsarMessageListenerProxy(pulsar_consumer_t *consumer, pulsar_message_t 
*message, void *ctx);
 
 static inline void _pulsar_consumer_configuration_set_message_listener(
diff --git a/pulsar-client-go/pulsar/consumer.go 
b/pulsar-client-go/pulsar/consumer.go
index 4ce0857652..ed56d9e817 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -49,9 +49,17 @@ const (
 // ConsumerBuilder is used to configure and create instances of Consumer
 type ConsumerOptions struct {
        // Specify the topic this consumer will subscribe on.
-       // This argument is required when subscribing
+       // Either a topic, a list of topics or a topics pattern are required 
when subscribing
        Topic string
 
+       // Specify a list of topics this consumer will subscribe on.
+       // Either a topic, a list of topics or a topics pattern are required 
when subscribing
+       Topics []string
+
+       // Specify a regular expression to subscribe to multiple topics under 
the same namespace.
+       // Either a topic, a list of topics or a topics pattern are required 
when subscribing
+       TopicsPattern string
+
        // Specify the subscription name for this consumer
        // This argument is required when subscribing
        SubscriptionName string
diff --git a/pulsar-client-go/pulsar/consumer_test.go 
b/pulsar-client-go/pulsar/consumer_test.go
index 2930f19d9c..75a454be88 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-       "testing"
-       "fmt"
        "context"
+       "fmt"
+       "testing"
        "time"
 )
 
@@ -131,3 +131,123 @@ func TestConsumerWithInvalidConf(t *testing.T) {
 
        assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
 }
+
+
+func TestConsumerMultiTopics(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://localhost:6650",
+       })
+
+       assertNil(t, err)
+       defer client.Close()
+
+       producer1, err := client.CreateProducer(ProducerOptions{
+               Topic: "multi-topic-1",
+       })
+
+       assertNil(t, err)
+
+       producer2, err := client.CreateProducer(ProducerOptions{
+               Topic: "multi-topic-2",
+       })
+
+       assertNil(t, err)
+       defer producer1.Close()
+       defer producer2.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topics:           []string{"multi-topic-1", "multi-topic-2"},
+               SubscriptionName: "my-sub",
+       })
+
+       assertNil(t, err)
+       defer consumer.Close()
+
+       assertEqual(t, consumer.Subscription(), "my-sub")
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer1.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+
+               if err := producer2.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       for i := 0; i < 20; i++ {
+               msg, err := consumer.Receive(ctx)
+               assertNil(t, err)
+               assertNotNil(t, msg)
+
+               consumer.Ack(msg)
+       }
+
+       consumer.Unsubscribe()
+}
+
+
+func TestConsumerRegex(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://localhost:6650",
+       })
+
+       assertNil(t, err)
+       defer client.Close()
+
+       producer1, err := client.CreateProducer(ProducerOptions{
+               Topic: "topic-1",
+       })
+
+       assertNil(t, err)
+
+       producer2, err := client.CreateProducer(ProducerOptions{
+               Topic: "topic-2",
+       })
+
+       assertNil(t, err)
+       defer producer1.Close()
+       defer producer2.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               TopicsPattern: "topic-\\d+",
+               SubscriptionName: "my-sub",
+       })
+
+       assertNil(t, err)
+       defer consumer.Close()
+
+       assertEqual(t, consumer.Subscription(), "my-sub")
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer1.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+
+               if err := producer2.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       for i := 0; i < 20; i++ {
+               msg, err := consumer.Receive(ctx)
+               assertNil(t, err)
+               assertNotNil(t, msg)
+
+               consumer.Ack(msg)
+       }
+
+       consumer.Unsubscribe()
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to