merlimat closed pull request #2448: Add multi-topic and regex consumer in Go client URL: https://github.com/apache/incubator-pulsar/pull/2448
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h index 2da7c6de92..4b603bbff3 100644 --- a/pulsar-client-cpp/include/pulsar/c/client.h +++ b/pulsar-client-cpp/include/pulsar/c/client.h @@ -86,6 +86,16 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c const pulsar_consumer_configuration_t *conf, pulsar_subscribe_callback callback, void *ctx); +void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx); + +void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx); + /** * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified * topic. diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc index 905e41067f..ecefe20c03 100644 --- a/pulsar-client-cpp/lib/c/c_Client.cc +++ b/pulsar-client-cpp/lib/c/c_Client.cc @@ -98,6 +98,27 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx)); } +void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx) { + std::vector<std::string> topicsList; + for (int i = 0; i < topicsCount; i++) { + topicsList.push_back(topics[i]); + } + + client->client->subscribeAsync(topicsList, subscriptionName, conf->consumerConfiguration, + boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx)); +} + +void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx) { + client->client->subscribeWithRegexAsync(topicPattern, subscriptionName, conf->consumerConfiguration, + boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx)); +} + pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic, const pulsar_message_id_t *startMessageId, pulsar_reader_configuration_t *conf, pulsar_reader_t **c_reader) { diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go index aebabc877d..093dd9dc6e 100644 --- a/pulsar-client-go/pulsar/c_consumer.go +++ b/pulsar-client-go/pulsar/c_consumer.go @@ -25,10 +25,10 @@ package pulsar import "C" import ( + "context" "runtime" "time" "unsafe" - "context" ) type consumer struct { @@ -64,7 +64,7 @@ type subscribeContext struct { } func subscribeAsync(client *client, options ConsumerOptions, callback func(Consumer, error)) { - if options.Topic == "" { + if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" { go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required")) return } @@ -120,12 +120,38 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu C.pulsar_consumer_set_consumer_name(conf, name) } - topic := C.CString(options.Topic) subName := C.CString(options.SubscriptionName) - defer C.free(unsafe.Pointer(topic)) defer C.free(unsafe.Pointer(subName)) - C._pulsar_client_subscribe_async(client.ptr, topic, subName, - conf, savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback})) + + callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback}) + + if options.Topic != "" { + topic := C.CString(options.Topic) + defer C.free(unsafe.Pointer(topic)) + C._pulsar_client_subscribe_async(client.ptr, topic, subName, conf, callbackPtr) + } else if options.Topics != nil { + cArray := C.malloc(C.size_t(len(options.Topics)) * C.size_t(unsafe.Sizeof(uintptr(0)))) + + // convert the C array to a Go Array so we can index it + a := (*[1<<30 - 1]*C.char)(cArray) + + for idx, topic := range options.Topics { + a[idx] = C.CString(topic) + } + + C._pulsar_client_subscribe_multi_topics_async(client.ptr, (**C.char)(cArray), C.int(len(options.Topics)), + subName, conf, callbackPtr) + + for idx, _ := range options.Topics { + C.free(unsafe.Pointer(a[idx])) + } + + C.free(cArray) + } else if options.TopicsPattern != "" { + topicsPattern := C.CString(options.TopicsPattern) + defer C.free(unsafe.Pointer(topicsPattern)) + C._pulsar_client_subscribe_pattern_async(client.ptr, topicsPattern, subName, conf, callbackPtr) + } } type consumerCallback struct { diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h b/pulsar-client-go/pulsar/c_go_pulsar.h index 045a4a2a84..881427610e 100644 --- a/pulsar-client-go/pulsar/c_go_pulsar.h +++ b/pulsar-client-go/pulsar/c_go_pulsar.h @@ -73,6 +73,19 @@ static inline void _pulsar_client_subscribe_async(pulsar_client_t *client, const pulsar_client_subscribe_async(client, topic, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx); } +static inline void _pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char ** topics, + int topicsCount, const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, void *ctx) { + pulsar_client_subscribe_multi_topics_async(client, topics, topicsCount, subscriptionName, conf, + pulsarSubscribeCallbackProxy, ctx); +} + +static inline void _pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, void *ctx) { + pulsar_client_subscribe_pattern_async(client, topicPattern, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx); +} + void pulsarMessageListenerProxy(pulsar_consumer_t *consumer, pulsar_message_t *message, void *ctx); static inline void _pulsar_consumer_configuration_set_message_listener( diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go index 4ce0857652..ed56d9e817 100644 --- a/pulsar-client-go/pulsar/consumer.go +++ b/pulsar-client-go/pulsar/consumer.go @@ -49,9 +49,17 @@ const ( // ConsumerBuilder is used to configure and create instances of Consumer type ConsumerOptions struct { // Specify the topic this consumer will subscribe on. - // This argument is required when subscribing + // Either a topic, a list of topics or a topics pattern are required when subscribing Topic string + // Specify a list of topics this consumer will subscribe on. + // Either a topic, a list of topics or a topics pattern are required when subscribing + Topics []string + + // Specify a regular expression to subscribe to multiple topics under the same namespace. + // Either a topic, a list of topics or a topics pattern are required when subscribing + TopicsPattern string + // Specify the subscription name for this consumer // This argument is required when subscribing SubscriptionName string diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go index 2930f19d9c..75a454be88 100644 --- a/pulsar-client-go/pulsar/consumer_test.go +++ b/pulsar-client-go/pulsar/consumer_test.go @@ -20,9 +20,9 @@ package pulsar import ( - "testing" - "fmt" "context" + "fmt" + "testing" "time" ) @@ -131,3 +131,123 @@ func TestConsumerWithInvalidConf(t *testing.T) { assertEqual(t, err.(*Error).Result(), InvalidConfiguration) } + + +func TestConsumerMultiTopics(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6650", + }) + + assertNil(t, err) + defer client.Close() + + producer1, err := client.CreateProducer(ProducerOptions{ + Topic: "multi-topic-1", + }) + + assertNil(t, err) + + producer2, err := client.CreateProducer(ProducerOptions{ + Topic: "multi-topic-2", + }) + + assertNil(t, err) + defer producer1.Close() + defer producer2.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topics: []string{"multi-topic-1", "multi-topic-2"}, + SubscriptionName: "my-sub", + }) + + assertNil(t, err) + defer consumer.Close() + + assertEqual(t, consumer.Subscription(), "my-sub") + + ctx := context.Background() + + for i := 0; i < 10; i++ { + if err := producer1.Send(ctx, ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + t.Fatal(err) + } + + if err := producer2.Send(ctx, ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 20; i++ { + msg, err := consumer.Receive(ctx) + assertNil(t, err) + assertNotNil(t, msg) + + consumer.Ack(msg) + } + + consumer.Unsubscribe() +} + + +func TestConsumerRegex(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6650", + }) + + assertNil(t, err) + defer client.Close() + + producer1, err := client.CreateProducer(ProducerOptions{ + Topic: "topic-1", + }) + + assertNil(t, err) + + producer2, err := client.CreateProducer(ProducerOptions{ + Topic: "topic-2", + }) + + assertNil(t, err) + defer producer1.Close() + defer producer2.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + TopicsPattern: "topic-\\d+", + SubscriptionName: "my-sub", + }) + + assertNil(t, err) + defer consumer.Close() + + assertEqual(t, consumer.Subscription(), "my-sub") + + ctx := context.Background() + + for i := 0; i < 10; i++ { + if err := producer1.Send(ctx, ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + t.Fatal(err) + } + + if err := producer2.Send(ctx, ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 20; i++ { + msg, err := consumer.Receive(ctx) + assertNil(t, err) + assertNotNil(t, msg) + + consumer.Ack(msg) + } + + consumer.Unsubscribe() +} \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services