This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 1a0ce69 [feat] Support consumer seek by timestamp and reader seek for
C Api (#118)
1a0ce69 is described below
commit 1a0ce69d2d2a5521f484b207ad575363aac6a10b
Author: Zike Yang <[email protected]>
AuthorDate: Wed Nov 16 19:59:25 2022 +0800
[feat] Support consumer seek by timestamp and reader seek for C Api (#118)
Fixes #82
### Motivation
#82
### Modifications
* Support seek by timestamp for the consumer
* Support seek by messageid and timestamp for the reader
---
include/pulsar/c/consumer.h | 40 +++++++
include/pulsar/c/reader.h | 44 ++++++++
lib/c/c_Consumer.cc | 10 ++
lib/c/c_Reader.cc | 20 ++++
tests/c/c_SeekTest.cc | 265 ++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 379 insertions(+)
diff --git a/include/pulsar/c/consumer.h b/include/pulsar/c/consumer.h
index 99ff8c8..bb8ae37 100644
--- a/include/pulsar/c/consumer.h
+++ b/include/pulsar/c/consumer.h
@@ -241,11 +241,51 @@ PULSAR_PUBLIC pulsar_result
resume_message_listener(pulsar_consumer_t *consumer)
*/
PULSAR_PUBLIC void
pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer);
+/**
+ * Reset the subscription associated with this consumer to a specific message
id.
+ *
+ * @param consumer The consumer
+ * @param messageId The message id can either be a specific message or
represent the first or last messages in
+ * the topic.
+ * @param callback The callback for this async operation
+ * @param ctx The context for the callback
+ */
PULSAR_PUBLIC void pulsar_consumer_seek_async(pulsar_consumer_t *consumer,
pulsar_message_id_t *messageId,
pulsar_result_callback callback,
void *ctx);
+/**
+ * Reset the subscription asynchronously associated with this consumer to a
specific message id.
+ *
+ * @param consumer The consumer
+ * @param messageId The message id can either be a specific message or
represent the first or last messages in
+ * the topic.
+ * @return Operation result
+ */
PULSAR_PUBLIC pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer,
pulsar_message_id_t *messageId);
+/**
+ * Reset the subscription associated with this consumer to a specific message
publish time.
+ *
+ * @param consumer The consumer
+ * @param timestamp The message publish time where to reposition the
subscription. The timestamp format should
+ * be Unix time in milliseconds.
+ * @param callback The callback for this async operation
+ * @param ctx The context for the callback
+ */
+PULSAR_PUBLIC void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t
*consumer, uint64_t timestamp,
+
pulsar_result_callback callback, void *ctx);
+
+/**
+ * Reset the subscription asynchronously associated with this consumer to a
specific message publish time.
+ *
+ * @param consumer The consumer
+ * @param timestamp The message publish time where to reposition the
subscription. The timestamp format should
+ * be Unix time in milliseconds.
+ * @return Operation result
+ */
+PULSAR_PUBLIC pulsar_result
pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer,
+ uint64_t
timestamp);
+
PULSAR_PUBLIC int pulsar_consumer_is_connected(pulsar_consumer_t *consumer);
PULSAR_PUBLIC pulsar_result
pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,
diff --git a/include/pulsar/c/reader.h b/include/pulsar/c/reader.h
index 4c546f8..12321fd 100644
--- a/include/pulsar/c/reader.h
+++ b/include/pulsar/c/reader.h
@@ -59,6 +59,50 @@ PULSAR_PUBLIC pulsar_result
pulsar_reader_read_next(pulsar_reader_t *reader, pul
PULSAR_PUBLIC pulsar_result
pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader,
pulsar_message_t **msg, int timeoutMs);
+/**
+ * Reset the subscription associated with this reader to a specific message id.
+ *
+ * @param reader The reader
+ * @param messageId The message id can either be a specific message or
represent the first or last messages in
+ * the topic.
+ * @param callback The callback for this async operation
+ * @param ctx The context for the callback
+ */
+PULSAR_PUBLIC void pulsar_reader_seek_async(pulsar_reader_t *reader,
pulsar_message_id_t *messageId,
+ pulsar_result_callback callback,
void *ctx);
+
+/**
+ * Reset the subscription asynchronously associated with this reader to a
specific message id.
+ *
+ * @param reader The reader
+ * @param messageId The message id can either be a specific message or
represent the first or last messages in
+ * the topic.
+ * @return Operation result
+ */
+PULSAR_PUBLIC pulsar_result pulsar_reader_seek(pulsar_reader_t *reader,
pulsar_message_id_t *messageId);
+
+/**
+ * Reset the subscription associated with this reader to a specific message
publish time.
+ *
+ * @param reader The reader
+ * @param timestamp The message publish time where to reposition the
subscription. The timestamp format should
+ * be Unix time in milliseconds.
+ * @param callback The callback for this async operation
+ * @param ctx The context for the callback
+ */
+PULSAR_PUBLIC void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t
*reader, uint64_t timestamp,
+
pulsar_result_callback callback, void *ctx);
+
+/**
+ * Reset the subscription asynchronously associated with this reader to a
specific message publish time.
+ *
+ * @param reader The reader
+ * @param timestamp The message publish time where to reposition the
subscription. The timestamp format should
+ * be Unix time in milliseconds.
+ * @return Operation result
+ */
+PULSAR_PUBLIC pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t
*reader, uint64_t timestamp);
+
PULSAR_PUBLIC pulsar_result pulsar_reader_close(pulsar_reader_t *reader);
PULSAR_PUBLIC void pulsar_reader_close_async(pulsar_reader_t *reader,
pulsar_result_callback callback,
diff --git a/lib/c/c_Consumer.cc b/lib/c/c_Consumer.cc
index 062c801..df4c9f3 100644
--- a/lib/c/c_Consumer.cc
+++ b/lib/c/c_Consumer.cc
@@ -156,6 +156,16 @@ pulsar_result pulsar_consumer_seek(pulsar_consumer_t
*consumer, pulsar_message_i
return (pulsar_result)consumer->consumer.seek(messageId->messageId);
}
+void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer,
uint64_t timestamp,
+ pulsar_result_callback callback,
void *ctx) {
+ consumer->consumer.seekAsync(timestamp,
+ std::bind(handle_result_callback,
std::placeholders::_1, callback, ctx));
+}
+
+pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer,
uint64_t timestamp) {
+ return (pulsar_result)consumer->consumer.seek(timestamp);
+}
+
int pulsar_consumer_is_connected(pulsar_consumer_t *consumer) { return
consumer->consumer.isConnected(); }
pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,
diff --git a/lib/c/c_Reader.cc b/lib/c/c_Reader.cc
index 3490b54..c4bdc49 100644
--- a/lib/c/c_Reader.cc
+++ b/lib/c/c_Reader.cc
@@ -45,6 +45,26 @@ pulsar_result
pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls
return (pulsar_result)res;
}
+void pulsar_reader_seek_async(pulsar_reader_t *reader, pulsar_message_id_t
*messageId,
+ pulsar_result_callback callback, void *ctx) {
+ reader->reader.seekAsync(messageId->messageId,
+ std::bind(handle_result_callback,
std::placeholders::_1, callback, ctx));
+}
+
+pulsar_result pulsar_reader_seek(pulsar_reader_t *reader, pulsar_message_id_t
*messageId) {
+ return (pulsar_result)reader->reader.seek(messageId->messageId);
+}
+
+void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t *reader, uint64_t
timestamp,
+ pulsar_result_callback callback,
void *ctx) {
+ reader->reader.seekAsync(timestamp,
+ std::bind(handle_result_callback,
std::placeholders::_1, callback, ctx));
+}
+
+pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t *reader,
uint64_t timestamp) {
+ return (pulsar_result)reader->reader.seek(timestamp);
+}
+
pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return
(pulsar_result)reader->reader.close(); }
void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback
callback, void *ctx) {
diff --git a/tests/c/c_SeekTest.cc b/tests/c/c_SeekTest.cc
new file mode 100644
index 0000000..cfa8a18
--- /dev/null
+++ b/tests/c/c_SeekTest.cc
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <TimeUtils.h>
+#include <gtest/gtest.h>
+#include <pulsar/c/client.h>
+
+#include <future>
+
+struct seek_ctx {
+ std::promise<pulsar_result> *promise;
+};
+
+static void seek_callback(pulsar_result async_result, void *ctx) {
+ auto *seek_ctx = (struct seek_ctx *)ctx;
+ seek_ctx->promise->set_value(async_result);
+}
+
+void prepare_client(pulsar_client_t **client) {
+ const char *lookup_url = "pulsar://localhost:6650";
+ pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+ *client = pulsar_client_create(lookup_url, conf);
+ pulsar_client_configuration_free(conf);
+}
+
+TEST(c_SeekTest, testConsumerSeekMessageId) {
+ auto topic_name_str = "test-c-seek-msgid-" + std::to_string(time(nullptr));
+ const char *topic_name = topic_name_str.c_str();
+
+ pulsar_client_t *client;
+ prepare_client(&client);
+
+ pulsar_producer_configuration_t *producer_conf =
pulsar_producer_configuration_create();
+ pulsar_producer_t *producer;
+ pulsar_result result = pulsar_client_create_producer(client, topic_name,
producer_conf, &producer);
+ ASSERT_EQ(pulsar_result_Ok, result);
+
+ pulsar_consumer_configuration_t *consumer_conf =
pulsar_consumer_configuration_create();
+ pulsar_consumer_t *consumer;
+ result = pulsar_client_subscribe(client, topic_name, "seek-time",
consumer_conf, &consumer);
+ ASSERT_EQ(pulsar_result_Ok, result);
+
+ pulsar_message_t *seek_message = nullptr;
+
+ for (int i = 0; i < 10; i++) {
+ char content[10];
+ sprintf(content, "msg-%d", i);
+ pulsar_message_t *msg = pulsar_message_create();
+ pulsar_message_set_content(msg, content, strlen(content));
+ pulsar_producer_send(producer, msg);
+ if (i == 5) {
+ seek_message = msg;
+ } else {
+ pulsar_message_free(msg);
+ }
+ }
+
+ pulsar_consumer_seek(consumer,
pulsar_message_get_message_id(seek_message));
+
+ pulsar_message_t *message;
+ ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer,
&message, 1000));
+ ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6");
+ pulsar_message_free(message);
+
+ // Test seek asynchronously
+ std::promise<pulsar_result> seek_promise;
+ std::future<pulsar_result> seek_future = seek_promise.get_future();
+ struct seek_ctx seek_ctx = {&seek_promise};
+ pulsar_consumer_seek_async(consumer,
pulsar_message_get_message_id(seek_message), seek_callback,
+ &seek_ctx);
+ ASSERT_EQ(pulsar_result_Ok, seek_future.get());
+ ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer,
&message, 1000));
+ ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6");
+
+ if (seek_message != NULL) {
+ pulsar_message_free(seek_message);
+ }
+ pulsar_consumer_free(consumer);
+ pulsar_consumer_configuration_free(consumer_conf);
+ pulsar_producer_free(producer);
+ pulsar_producer_configuration_free(producer_conf);
+ pulsar_client_free(client);
+}
+
+TEST(c_SeekTest, testConsumerSeekTime) {
+ auto topic_name_str = "test-c-seek-time-" + std::to_string(time(nullptr));
+ const char *topic_name = topic_name_str.c_str();
+
+ pulsar_client_t *client;
+ prepare_client(&client);
+
+ pulsar_producer_configuration_t *producer_conf =
pulsar_producer_configuration_create();
+ pulsar_producer_t *producer;
+ pulsar_result result = pulsar_client_create_producer(client, topic_name,
producer_conf, &producer);
+ ASSERT_EQ(pulsar_result_Ok, result);
+
+ pulsar_consumer_configuration_t *consumer_conf =
pulsar_consumer_configuration_create();
+ pulsar_consumer_t *consumer;
+ result = pulsar_client_subscribe(client, topic_name, "seek-time",
consumer_conf, &consumer);
+ ASSERT_EQ(pulsar_result_Ok, result);
+
+ for (int i = 0; i < 10; i++) {
+ char content[10];
+ sprintf(content, "msg-%d", i);
+ pulsar_message_t *msg = pulsar_message_create();
+ pulsar_message_set_content(msg, content, strlen(content));
+ pulsar_producer_send(producer, msg);
+ pulsar_message_free(msg);
+ }
+
+ uint64_t currentTime = pulsar::TimeUtils::currentTimeMillis();
+
+ pulsar_consumer_seek_by_timestamp(consumer, currentTime);
+
+ pulsar_message_t *message;
+ ASSERT_EQ(pulsar_result_Timeout,
pulsar_consumer_receive_with_timeout(consumer, &message, 1000));
+
+ pulsar_consumer_seek_by_timestamp(consumer, currentTime - 100000); //
Seek to 100 seconds ago
+
+ ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer,
&message, 1000));
+ ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-0");
+
+ // Test seek asynchronously
+ std::promise<pulsar_result> seek_promise;
+ std::future<pulsar_result> seek_future = seek_promise.get_future();
+ struct seek_ctx seek_ctx = {&seek_promise};
+ pulsar_consumer_seek_by_timestamp_async(consumer, currentTime,
seek_callback, &seek_ctx);
+ ASSERT_EQ(pulsar_result_Ok, seek_future.get());
+ ASSERT_EQ(pulsar_result_Timeout,
pulsar_consumer_receive_with_timeout(consumer, &message, 1000));
+
+ pulsar_consumer_free(consumer);
+ pulsar_consumer_configuration_free(consumer_conf);
+ pulsar_producer_free(producer);
+ pulsar_producer_configuration_free(producer_conf);
+ pulsar_client_free(client);
+}
+
+TEST(c_SeekTest, testReaderSeekMessageId) {
+ auto topic_name_str = "test-c-reader-seek-msgid-" +
std::to_string(time(nullptr));
+ const char *topic_name = topic_name_str.c_str();
+
+ pulsar_client_t *client;
+ prepare_client(&client);
+
+ pulsar_producer_configuration_t *producer_conf =
pulsar_producer_configuration_create();
+ pulsar_producer_t *producer;
+ pulsar_result result = pulsar_client_create_producer(client, topic_name,
producer_conf, &producer);
+ ASSERT_EQ(pulsar_result_Ok, result);
+
+ pulsar_reader_configuration_t *reader_conf =
pulsar_reader_configuration_create();
+ pulsar_reader_t *reader;
+ result =
+ pulsar_client_create_reader(client, topic_name,
pulsar_message_id_earliest(), reader_conf, &reader);
+ ASSERT_EQ(pulsar_result_Ok, result);
+
+ pulsar_message_t *seek_message = nullptr;
+
+ for (int i = 0; i < 10; i++) {
+ char content[10];
+ sprintf(content, "msg-%d", i);
+ pulsar_message_t *msg = pulsar_message_create();
+ pulsar_message_set_content(msg, content, strlen(content));
+ pulsar_producer_send(producer, msg);
+ if (i == 5) {
+ seek_message = msg;
+ } else {
+ pulsar_message_free(msg);
+ }
+ }
+
+ pulsar_reader_seek(reader, pulsar_message_get_message_id(seek_message));
+
+ pulsar_message_t *message;
+ ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader,
&message, 1000));
+ ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6");
+ pulsar_message_free(message);
+
+ // Test seek asynchronously
+ std::promise<pulsar_result> seek_promise;
+ std::future<pulsar_result> seek_future = seek_promise.get_future();
+ struct seek_ctx seek_ctx = {&seek_promise};
+ pulsar_reader_seek_async(reader,
pulsar_message_get_message_id(seek_message), seek_callback, &seek_ctx);
+ ASSERT_EQ(pulsar_result_Ok, seek_future.get());
+ ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader,
&message, 1000));
+ ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6");
+
+ if (seek_message != NULL) {
+ pulsar_message_free(seek_message);
+ }
+ pulsar_reader_free(reader);
+ pulsar_reader_configuration_free(reader_conf);
+ pulsar_producer_free(producer);
+ pulsar_producer_configuration_free(producer_conf);
+ pulsar_client_free(client);
+}
+
+TEST(c_SeekTest, testReaderSeekTime) {
+ auto topic_name_str = "test-c-reader-seek-time-" +
std::to_string(time(nullptr));
+ const char *topic_name = topic_name_str.c_str();
+
+ pulsar_client_t *client;
+ prepare_client(&client);
+
+ pulsar_producer_configuration_t *producer_conf =
pulsar_producer_configuration_create();
+ pulsar_producer_t *producer;
+ pulsar_result result = pulsar_client_create_producer(client, topic_name,
producer_conf, &producer);
+ ASSERT_EQ(pulsar_result_Ok, result);
+
+ pulsar_reader_configuration_t *reader_conf =
pulsar_reader_configuration_create();
+ pulsar_reader_t *reader;
+ result =
+ pulsar_client_create_reader(client, topic_name,
pulsar_message_id_earliest(), reader_conf, &reader);
+ ASSERT_EQ(pulsar_result_Ok, result);
+
+ for (int i = 0; i < 10; i++) {
+ char content[10];
+ sprintf(content, "msg-%d", i);
+ pulsar_message_t *msg = pulsar_message_create();
+ pulsar_message_set_content(msg, content, strlen(content));
+ pulsar_producer_send(producer, msg);
+ pulsar_message_free(msg);
+ }
+
+ uint64_t currentTime = pulsar::TimeUtils::currentTimeMillis();
+
+ pulsar_reader_seek_by_timestamp(reader, currentTime);
+
+ pulsar_message_t *message;
+ ASSERT_EQ(pulsar_result_Timeout,
pulsar_reader_read_next_with_timeout(reader, &message, 1000));
+
+ pulsar_reader_seek_by_timestamp(reader, currentTime - 100000); // Seek to
100 seconds ago
+
+ ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader,
&message, 1000));
+ ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-0");
+
+ // Test seek asynchronously
+ std::promise<pulsar_result> seek_promise;
+ std::future<pulsar_result> seek_future = seek_promise.get_future();
+ struct seek_ctx seek_ctx = {&seek_promise};
+ pulsar_reader_seek_by_timestamp_async(reader, currentTime, seek_callback,
&seek_ctx);
+ ASSERT_EQ(pulsar_result_Ok, seek_future.get());
+ ASSERT_EQ(pulsar_result_Timeout,
pulsar_reader_read_next_with_timeout(reader, &message, 1000));
+
+ pulsar_reader_free(reader);
+ pulsar_reader_configuration_free(reader_conf);
+ pulsar_producer_free(producer);
+ pulsar_producer_configuration_free(producer_conf);
+ pulsar_client_free(client);
+}