BewareMyPower commented on code in PR #237: URL: https://github.com/apache/pulsar-client-cpp/pull/237#discussion_r1172800329
########## lib/c/c_ConsumerConfiguration.cc: ########## @@ -17,10 +17,12 @@ * under the License. */ +#include <pulsar/DeadLetterPolicyBuilder.h> #include <pulsar/c/consumer.h> #include <pulsar/c/consumer_configuration.h> #include "c_structs.h" +#include "climits" Review Comment: ```suggestion #include <climits> ``` It's better to use `<...>` for system headers. https://stackoverflow.com/questions/21593/what-is-the-difference-between-include-filename-and-include-filename ########## tests/c/c_ConsumerTest.cc: ########## @@ -121,3 +121,81 @@ TEST(c_ConsumerTest, testBatchReceive) { pulsar_client_free(client); pulsar_client_configuration_free(conf); } + +TEST(C_ConsumerConfigurationTest, testCDeadLetterTopic) { Review Comment: ```suggestion TEST(C_ConsumerTest, testCDeadLetterTopic) { ``` ########## tests/c/c_ConsumerTest.cc: ########## @@ -121,3 +121,81 @@ TEST(c_ConsumerTest, testBatchReceive) { pulsar_client_free(client); pulsar_client_configuration_free(conf); } + +TEST(C_ConsumerConfigurationTest, testCDeadLetterTopic) { + const char *topic_name = "persistent://public/default/test-c-dlq-topic"; + const char *dlq_topic_name = "persistent://public/default/c-dlq-topic"; + const char *sub_name = "my-sub-name"; + + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create(); + pulsar_consumer_configuration_set_consumer_type(consumer_conf, pulsar_ConsumerShared); + const int max_redeliver_count = 3; + pulsar_consumer_config_dead_letter_policy_t dlq_policy{dlq_topic_name, max_redeliver_count, + "init_sub-name"}; + pulsar_consumer_configuration_set_dlq_policy(consumer_conf, &dlq_policy); + pulsar_consumer_t *consumer; + result = pulsar_client_subscribe(client, topic_name, sub_name, consumer_conf, &consumer); + ASSERT_EQ(pulsar_result_Ok, result); + + // Send messages + const int num = 10; + const char *data = "my-content"; + for (int i = 0; i < num; i++) { + pulsar_message_t *message = pulsar_message_create(); + pulsar_message_set_content(message, data, strlen(data)); + pulsar_result res = pulsar_producer_send(producer, message); + ASSERT_EQ(pulsar_result_Ok, res); + pulsar_message_free(message); + } + + // Redelivery all messages + for (int i = 1; i <= max_redeliver_count * num + num; ++i) { + pulsar_message_t *message = pulsar_message_create(); + pulsar_result res = pulsar_consumer_receive(consumer, &message); + ASSERT_EQ(pulsar_result_Ok, res); + if (i % num == 0) { + pulsar_consumer_redeliver_unacknowledged_messages(consumer); + } + pulsar_message_free(message); + } + + // Consumer dlq topic + pulsar_consumer_t *dlq_consumer; + pulsar_consumer_configuration_t *dlq_consumer_conf = pulsar_consumer_configuration_create(); + result = pulsar_client_subscribe(client, dlq_topic_name, sub_name, dlq_consumer_conf, &dlq_consumer); + ASSERT_EQ(pulsar_result_Ok, result); + for (int i = 0; i < num; ++i) { + pulsar_message_t *message = pulsar_message_create(); Review Comment: ```suggestion pulsar_message_t *message = NULL; ``` Fix the memory leak. `message` will pointer to another instance after `pulsar_consumer_receive`. ########## tests/c/c_ConsumerConfigurationTest.cc: ########## @@ -19,6 +19,8 @@ #include <gtest/gtest.h> #include <pulsar/c/consumer_configuration.h> +#include "climits" Review Comment: ```suggestion #include <climits> ``` ########## tests/c/c_ConsumerTest.cc: ########## @@ -121,3 +121,81 @@ TEST(c_ConsumerTest, testBatchReceive) { pulsar_client_free(client); pulsar_client_configuration_free(conf); } + +TEST(C_ConsumerConfigurationTest, testCDeadLetterTopic) { + const char *topic_name = "persistent://public/default/test-c-dlq-topic"; + const char *dlq_topic_name = "persistent://public/default/c-dlq-topic"; + const char *sub_name = "my-sub-name"; + + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create(); + pulsar_consumer_configuration_set_consumer_type(consumer_conf, pulsar_ConsumerShared); + const int max_redeliver_count = 3; + pulsar_consumer_config_dead_letter_policy_t dlq_policy{dlq_topic_name, max_redeliver_count, + "init_sub-name"}; + pulsar_consumer_configuration_set_dlq_policy(consumer_conf, &dlq_policy); + pulsar_consumer_t *consumer; + result = pulsar_client_subscribe(client, topic_name, sub_name, consumer_conf, &consumer); + ASSERT_EQ(pulsar_result_Ok, result); + + // Send messages + const int num = 10; + const char *data = "my-content"; + for (int i = 0; i < num; i++) { + pulsar_message_t *message = pulsar_message_create(); + pulsar_message_set_content(message, data, strlen(data)); + pulsar_result res = pulsar_producer_send(producer, message); + ASSERT_EQ(pulsar_result_Ok, res); + pulsar_message_free(message); + } + + // Redelivery all messages + for (int i = 1; i <= max_redeliver_count * num + num; ++i) { + pulsar_message_t *message = pulsar_message_create(); Review Comment: ```suggestion pulsar_message_t *message = NULL; ``` ########## tests/c/c_ConsumerTest.cc: ########## @@ -121,3 +121,81 @@ TEST(c_ConsumerTest, testBatchReceive) { pulsar_client_free(client); pulsar_client_configuration_free(conf); } + +TEST(C_ConsumerConfigurationTest, testCDeadLetterTopic) { + const char *topic_name = "persistent://public/default/test-c-dlq-topic"; + const char *dlq_topic_name = "persistent://public/default/c-dlq-topic"; + const char *sub_name = "my-sub-name"; + + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create(); + pulsar_consumer_configuration_set_consumer_type(consumer_conf, pulsar_ConsumerShared); + const int max_redeliver_count = 3; + pulsar_consumer_config_dead_letter_policy_t dlq_policy{dlq_topic_name, max_redeliver_count, + "init_sub-name"}; + pulsar_consumer_configuration_set_dlq_policy(consumer_conf, &dlq_policy); + pulsar_consumer_t *consumer; + result = pulsar_client_subscribe(client, topic_name, sub_name, consumer_conf, &consumer); + ASSERT_EQ(pulsar_result_Ok, result); + + // Send messages + const int num = 10; + const char *data = "my-content"; + for (int i = 0; i < num; i++) { + pulsar_message_t *message = pulsar_message_create(); + pulsar_message_set_content(message, data, strlen(data)); + pulsar_result res = pulsar_producer_send(producer, message); + ASSERT_EQ(pulsar_result_Ok, res); + pulsar_message_free(message); + } + + // Redelivery all messages + for (int i = 1; i <= max_redeliver_count * num + num; ++i) { + pulsar_message_t *message = pulsar_message_create(); + pulsar_result res = pulsar_consumer_receive(consumer, &message); + ASSERT_EQ(pulsar_result_Ok, res); + if (i % num == 0) { + pulsar_consumer_redeliver_unacknowledged_messages(consumer); + } + pulsar_message_free(message); + } + + // Consumer dlq topic + pulsar_consumer_t *dlq_consumer; + pulsar_consumer_configuration_t *dlq_consumer_conf = pulsar_consumer_configuration_create(); + result = pulsar_client_subscribe(client, dlq_topic_name, sub_name, dlq_consumer_conf, &dlq_consumer); + ASSERT_EQ(pulsar_result_Ok, result); + for (int i = 0; i < num; ++i) { + pulsar_message_t *message = pulsar_message_create(); + pulsar_result res = pulsar_consumer_receive(dlq_consumer, &message); + ASSERT_EQ(pulsar_result_Ok, res); + pulsar_message_free(message); + } + pulsar_message_t *message = pulsar_message_create(); + pulsar_result res = pulsar_consumer_receive_with_timeout(dlq_consumer, &message, 200); + ASSERT_EQ(pulsar_result_Timeout, res); + pulsar_message_free(message); + + ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_unsubscribe(consumer)); + ASSERT_EQ(pulsar_result_AlreadyClosed, pulsar_consumer_close(consumer)); + ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_unsubscribe(dlq_consumer)); + ASSERT_EQ(pulsar_result_AlreadyClosed, pulsar_consumer_close(dlq_consumer)); + ASSERT_EQ(pulsar_result_Ok, pulsar_producer_close(producer)); Review Comment: ```suggestion ``` They can be removed. We only need to close the client. BTW, `close` returns `AlreadyClosed` after `unsubscribe` is a bug. We don't need to protect this wrong behavior currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
