This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a0ce69  [feat] Support consumer seek by timestamp and reader seek for 
C Api (#118)
1a0ce69 is described below

commit 1a0ce69d2d2a5521f484b207ad575363aac6a10b
Author: Zike Yang <[email protected]>
AuthorDate: Wed Nov 16 19:59:25 2022 +0800

    [feat] Support consumer seek by timestamp and reader seek for C Api (#118)
    
    Fixes #82
    
    
    ### Motivation
    
    #82
    
    ### Modifications
    
    * Support seek by timestamp for the consumer
    * Support seek by messageid and timestamp for the reader
---
 include/pulsar/c/consumer.h |  40 +++++++
 include/pulsar/c/reader.h   |  44 ++++++++
 lib/c/c_Consumer.cc         |  10 ++
 lib/c/c_Reader.cc           |  20 ++++
 tests/c/c_SeekTest.cc       | 265 ++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 379 insertions(+)

diff --git a/include/pulsar/c/consumer.h b/include/pulsar/c/consumer.h
index 99ff8c8..bb8ae37 100644
--- a/include/pulsar/c/consumer.h
+++ b/include/pulsar/c/consumer.h
@@ -241,11 +241,51 @@ PULSAR_PUBLIC pulsar_result 
resume_message_listener(pulsar_consumer_t *consumer)
  */
 PULSAR_PUBLIC void 
pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer);
 
+/**
+ * Reset the subscription associated with this consumer to a specific message 
id.
+ *
+ * @param consumer The consumer
+ * @param messageId The message id can either be a specific message or 
represent the first or last messages in
+ * the topic.
+ * @param callback The callback for this async operation
+ * @param ctx The context for the callback
+ */
 PULSAR_PUBLIC void pulsar_consumer_seek_async(pulsar_consumer_t *consumer, 
pulsar_message_id_t *messageId,
                                               pulsar_result_callback callback, 
void *ctx);
 
+/**
+ * Reset the subscription asynchronously associated with this consumer to a 
specific message id.
+ *
+ * @param consumer The consumer
+ * @param messageId The message id can either be a specific message or 
represent the first or last messages in
+ * the topic.
+ * @return Operation result
+ */
 PULSAR_PUBLIC pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, 
pulsar_message_id_t *messageId);
 
+/**
+ * Reset the subscription associated with this consumer to a specific message 
publish time.
+ *
+ * @param consumer The consumer
+ * @param timestamp The message publish time where to reposition the 
subscription. The timestamp format should
+ * be Unix time in milliseconds.
+ * @param callback The callback for this async operation
+ * @param ctx The context for the callback
+ */
+PULSAR_PUBLIC void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t 
*consumer, uint64_t timestamp,
+                                                           
pulsar_result_callback callback, void *ctx);
+
+/**
+ * Reset the subscription asynchronously associated with this consumer to a 
specific message publish time.
+ *
+ * @param consumer The consumer
+ * @param timestamp The message publish time where to reposition the 
subscription. The timestamp format should
+ * be Unix time in milliseconds.
+ * @return Operation result
+ */
+PULSAR_PUBLIC pulsar_result 
pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer,
+                                                              uint64_t 
timestamp);
+
 PULSAR_PUBLIC int pulsar_consumer_is_connected(pulsar_consumer_t *consumer);
 
 PULSAR_PUBLIC pulsar_result 
pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,
diff --git a/include/pulsar/c/reader.h b/include/pulsar/c/reader.h
index 4c546f8..12321fd 100644
--- a/include/pulsar/c/reader.h
+++ b/include/pulsar/c/reader.h
@@ -59,6 +59,50 @@ PULSAR_PUBLIC pulsar_result 
pulsar_reader_read_next(pulsar_reader_t *reader, pul
 PULSAR_PUBLIC pulsar_result 
pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader,
                                                                  
pulsar_message_t **msg, int timeoutMs);
 
+/**
+ * Reset the subscription associated with this reader to a specific message id.
+ *
+ * @param reader The reader
+ * @param messageId The message id can either be a specific message or 
represent the first or last messages in
+ * the topic.
+ * @param callback The callback for this async operation
+ * @param ctx The context for the callback
+ */
+PULSAR_PUBLIC void pulsar_reader_seek_async(pulsar_reader_t *reader, 
pulsar_message_id_t *messageId,
+                                            pulsar_result_callback callback, 
void *ctx);
+
+/**
+ * Reset the subscription asynchronously associated with this reader to a 
specific message id.
+ *
+ * @param reader The reader
+ * @param messageId The message id can either be a specific message or 
represent the first or last messages in
+ * the topic.
+ * @return Operation result
+ */
+PULSAR_PUBLIC pulsar_result pulsar_reader_seek(pulsar_reader_t *reader, 
pulsar_message_id_t *messageId);
+
+/**
+ * Reset the subscription associated with this reader to a specific message 
publish time.
+ *
+ * @param reader The reader
+ * @param timestamp The message publish time where to reposition the 
subscription. The timestamp format should
+ * be Unix time in milliseconds.
+ * @param callback The callback for this async operation
+ * @param ctx The context for the callback
+ */
+PULSAR_PUBLIC void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t 
*reader, uint64_t timestamp,
+                                                         
pulsar_result_callback callback, void *ctx);
+
+/**
+ * Reset the subscription asynchronously associated with this reader to a 
specific message publish time.
+ *
+ * @param reader The reader
+ * @param timestamp The message publish time where to reposition the 
subscription. The timestamp format should
+ * be Unix time in milliseconds.
+ * @return Operation result
+ */
+PULSAR_PUBLIC pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t 
*reader, uint64_t timestamp);
+
 PULSAR_PUBLIC pulsar_result pulsar_reader_close(pulsar_reader_t *reader);
 
 PULSAR_PUBLIC void pulsar_reader_close_async(pulsar_reader_t *reader, 
pulsar_result_callback callback,
diff --git a/lib/c/c_Consumer.cc b/lib/c/c_Consumer.cc
index 062c801..df4c9f3 100644
--- a/lib/c/c_Consumer.cc
+++ b/lib/c/c_Consumer.cc
@@ -156,6 +156,16 @@ pulsar_result pulsar_consumer_seek(pulsar_consumer_t 
*consumer, pulsar_message_i
     return (pulsar_result)consumer->consumer.seek(messageId->messageId);
 }
 
+void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer, 
uint64_t timestamp,
+                                             pulsar_result_callback callback, 
void *ctx) {
+    consumer->consumer.seekAsync(timestamp,
+                                 std::bind(handle_result_callback, 
std::placeholders::_1, callback, ctx));
+}
+
+pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer, 
uint64_t timestamp) {
+    return (pulsar_result)consumer->consumer.seek(timestamp);
+}
+
 int pulsar_consumer_is_connected(pulsar_consumer_t *consumer) { return 
consumer->consumer.isConnected(); }
 
 pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,
diff --git a/lib/c/c_Reader.cc b/lib/c/c_Reader.cc
index 3490b54..c4bdc49 100644
--- a/lib/c/c_Reader.cc
+++ b/lib/c/c_Reader.cc
@@ -45,6 +45,26 @@ pulsar_result 
pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls
     return (pulsar_result)res;
 }
 
+void pulsar_reader_seek_async(pulsar_reader_t *reader, pulsar_message_id_t 
*messageId,
+                              pulsar_result_callback callback, void *ctx) {
+    reader->reader.seekAsync(messageId->messageId,
+                             std::bind(handle_result_callback, 
std::placeholders::_1, callback, ctx));
+}
+
+pulsar_result pulsar_reader_seek(pulsar_reader_t *reader, pulsar_message_id_t 
*messageId) {
+    return (pulsar_result)reader->reader.seek(messageId->messageId);
+}
+
+void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t *reader, uint64_t 
timestamp,
+                                           pulsar_result_callback callback, 
void *ctx) {
+    reader->reader.seekAsync(timestamp,
+                             std::bind(handle_result_callback, 
std::placeholders::_1, callback, ctx));
+}
+
+pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t *reader, 
uint64_t timestamp) {
+    return (pulsar_result)reader->reader.seek(timestamp);
+}
+
 pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return 
(pulsar_result)reader->reader.close(); }
 
 void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback 
callback, void *ctx) {
diff --git a/tests/c/c_SeekTest.cc b/tests/c/c_SeekTest.cc
new file mode 100644
index 0000000..cfa8a18
--- /dev/null
+++ b/tests/c/c_SeekTest.cc
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <TimeUtils.h>
+#include <gtest/gtest.h>
+#include <pulsar/c/client.h>
+
+#include <future>
+
+struct seek_ctx {
+    std::promise<pulsar_result> *promise;
+};
+
+static void seek_callback(pulsar_result async_result, void *ctx) {
+    auto *seek_ctx = (struct seek_ctx *)ctx;
+    seek_ctx->promise->set_value(async_result);
+}
+
+void prepare_client(pulsar_client_t **client) {
+    const char *lookup_url = "pulsar://localhost:6650";
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    *client = pulsar_client_create(lookup_url, conf);
+    pulsar_client_configuration_free(conf);
+}
+
+TEST(c_SeekTest, testConsumerSeekMessageId) {
+    auto topic_name_str = "test-c-seek-msgid-" + std::to_string(time(nullptr));
+    const char *topic_name = topic_name_str.c_str();
+
+    pulsar_client_t *client;
+    prepare_client(&client);
+
+    pulsar_producer_configuration_t *producer_conf = 
pulsar_producer_configuration_create();
+    pulsar_producer_t *producer;
+    pulsar_result result = pulsar_client_create_producer(client, topic_name, 
producer_conf, &producer);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    pulsar_consumer_configuration_t *consumer_conf = 
pulsar_consumer_configuration_create();
+    pulsar_consumer_t *consumer;
+    result = pulsar_client_subscribe(client, topic_name, "seek-time", 
consumer_conf, &consumer);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    pulsar_message_t *seek_message = nullptr;
+
+    for (int i = 0; i < 10; i++) {
+        char content[10];
+        sprintf(content, "msg-%d", i);
+        pulsar_message_t *msg = pulsar_message_create();
+        pulsar_message_set_content(msg, content, strlen(content));
+        pulsar_producer_send(producer, msg);
+        if (i == 5) {
+            seek_message = msg;
+        } else {
+            pulsar_message_free(msg);
+        }
+    }
+
+    pulsar_consumer_seek(consumer, 
pulsar_message_get_message_id(seek_message));
+
+    pulsar_message_t *message;
+    ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, 
&message, 1000));
+    ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6");
+    pulsar_message_free(message);
+
+    // Test seek asynchronously
+    std::promise<pulsar_result> seek_promise;
+    std::future<pulsar_result> seek_future = seek_promise.get_future();
+    struct seek_ctx seek_ctx = {&seek_promise};
+    pulsar_consumer_seek_async(consumer, 
pulsar_message_get_message_id(seek_message), seek_callback,
+                               &seek_ctx);
+    ASSERT_EQ(pulsar_result_Ok, seek_future.get());
+    ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, 
&message, 1000));
+    ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6");
+
+    if (seek_message != NULL) {
+        pulsar_message_free(seek_message);
+    }
+    pulsar_consumer_free(consumer);
+    pulsar_consumer_configuration_free(consumer_conf);
+    pulsar_producer_free(producer);
+    pulsar_producer_configuration_free(producer_conf);
+    pulsar_client_free(client);
+}
+
+TEST(c_SeekTest, testConsumerSeekTime) {
+    auto topic_name_str = "test-c-seek-time-" + std::to_string(time(nullptr));
+    const char *topic_name = topic_name_str.c_str();
+
+    pulsar_client_t *client;
+    prepare_client(&client);
+
+    pulsar_producer_configuration_t *producer_conf = 
pulsar_producer_configuration_create();
+    pulsar_producer_t *producer;
+    pulsar_result result = pulsar_client_create_producer(client, topic_name, 
producer_conf, &producer);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    pulsar_consumer_configuration_t *consumer_conf = 
pulsar_consumer_configuration_create();
+    pulsar_consumer_t *consumer;
+    result = pulsar_client_subscribe(client, topic_name, "seek-time", 
consumer_conf, &consumer);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    for (int i = 0; i < 10; i++) {
+        char content[10];
+        sprintf(content, "msg-%d", i);
+        pulsar_message_t *msg = pulsar_message_create();
+        pulsar_message_set_content(msg, content, strlen(content));
+        pulsar_producer_send(producer, msg);
+        pulsar_message_free(msg);
+    }
+
+    uint64_t currentTime = pulsar::TimeUtils::currentTimeMillis();
+
+    pulsar_consumer_seek_by_timestamp(consumer, currentTime);
+
+    pulsar_message_t *message;
+    ASSERT_EQ(pulsar_result_Timeout, 
pulsar_consumer_receive_with_timeout(consumer, &message, 1000));
+
+    pulsar_consumer_seek_by_timestamp(consumer, currentTime - 100000);  // 
Seek to 100 seconds ago
+
+    ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, 
&message, 1000));
+    ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-0");
+
+    // Test seek asynchronously
+    std::promise<pulsar_result> seek_promise;
+    std::future<pulsar_result> seek_future = seek_promise.get_future();
+    struct seek_ctx seek_ctx = {&seek_promise};
+    pulsar_consumer_seek_by_timestamp_async(consumer, currentTime, 
seek_callback, &seek_ctx);
+    ASSERT_EQ(pulsar_result_Ok, seek_future.get());
+    ASSERT_EQ(pulsar_result_Timeout, 
pulsar_consumer_receive_with_timeout(consumer, &message, 1000));
+
+    pulsar_consumer_free(consumer);
+    pulsar_consumer_configuration_free(consumer_conf);
+    pulsar_producer_free(producer);
+    pulsar_producer_configuration_free(producer_conf);
+    pulsar_client_free(client);
+}
+
+TEST(c_SeekTest, testReaderSeekMessageId) {
+    auto topic_name_str = "test-c-reader-seek-msgid-" + 
std::to_string(time(nullptr));
+    const char *topic_name = topic_name_str.c_str();
+
+    pulsar_client_t *client;
+    prepare_client(&client);
+
+    pulsar_producer_configuration_t *producer_conf = 
pulsar_producer_configuration_create();
+    pulsar_producer_t *producer;
+    pulsar_result result = pulsar_client_create_producer(client, topic_name, 
producer_conf, &producer);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    pulsar_reader_configuration_t *reader_conf = 
pulsar_reader_configuration_create();
+    pulsar_reader_t *reader;
+    result =
+        pulsar_client_create_reader(client, topic_name, 
pulsar_message_id_earliest(), reader_conf, &reader);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    pulsar_message_t *seek_message = nullptr;
+
+    for (int i = 0; i < 10; i++) {
+        char content[10];
+        sprintf(content, "msg-%d", i);
+        pulsar_message_t *msg = pulsar_message_create();
+        pulsar_message_set_content(msg, content, strlen(content));
+        pulsar_producer_send(producer, msg);
+        if (i == 5) {
+            seek_message = msg;
+        } else {
+            pulsar_message_free(msg);
+        }
+    }
+
+    pulsar_reader_seek(reader, pulsar_message_get_message_id(seek_message));
+
+    pulsar_message_t *message;
+    ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader, 
&message, 1000));
+    ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6");
+    pulsar_message_free(message);
+
+    // Test seek asynchronously
+    std::promise<pulsar_result> seek_promise;
+    std::future<pulsar_result> seek_future = seek_promise.get_future();
+    struct seek_ctx seek_ctx = {&seek_promise};
+    pulsar_reader_seek_async(reader, 
pulsar_message_get_message_id(seek_message), seek_callback, &seek_ctx);
+    ASSERT_EQ(pulsar_result_Ok, seek_future.get());
+    ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader, 
&message, 1000));
+    ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6");
+
+    if (seek_message != NULL) {
+        pulsar_message_free(seek_message);
+    }
+    pulsar_reader_free(reader);
+    pulsar_reader_configuration_free(reader_conf);
+    pulsar_producer_free(producer);
+    pulsar_producer_configuration_free(producer_conf);
+    pulsar_client_free(client);
+}
+
+TEST(c_SeekTest, testReaderSeekTime) {
+    auto topic_name_str = "test-c-reader-seek-time-" + 
std::to_string(time(nullptr));
+    const char *topic_name = topic_name_str.c_str();
+
+    pulsar_client_t *client;
+    prepare_client(&client);
+
+    pulsar_producer_configuration_t *producer_conf = 
pulsar_producer_configuration_create();
+    pulsar_producer_t *producer;
+    pulsar_result result = pulsar_client_create_producer(client, topic_name, 
producer_conf, &producer);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    pulsar_reader_configuration_t *reader_conf = 
pulsar_reader_configuration_create();
+    pulsar_reader_t *reader;
+    result =
+        pulsar_client_create_reader(client, topic_name, 
pulsar_message_id_earliest(), reader_conf, &reader);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    for (int i = 0; i < 10; i++) {
+        char content[10];
+        sprintf(content, "msg-%d", i);
+        pulsar_message_t *msg = pulsar_message_create();
+        pulsar_message_set_content(msg, content, strlen(content));
+        pulsar_producer_send(producer, msg);
+        pulsar_message_free(msg);
+    }
+
+    uint64_t currentTime = pulsar::TimeUtils::currentTimeMillis();
+
+    pulsar_reader_seek_by_timestamp(reader, currentTime);
+
+    pulsar_message_t *message;
+    ASSERT_EQ(pulsar_result_Timeout, 
pulsar_reader_read_next_with_timeout(reader, &message, 1000));
+
+    pulsar_reader_seek_by_timestamp(reader, currentTime - 100000);  // Seek to 
100 seconds ago
+
+    ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader, 
&message, 1000));
+    ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-0");
+
+    // Test seek asynchronously
+    std::promise<pulsar_result> seek_promise;
+    std::future<pulsar_result> seek_future = seek_promise.get_future();
+    struct seek_ctx seek_ctx = {&seek_promise};
+    pulsar_reader_seek_by_timestamp_async(reader, currentTime, seek_callback, 
&seek_ctx);
+    ASSERT_EQ(pulsar_result_Ok, seek_future.get());
+    ASSERT_EQ(pulsar_result_Timeout, 
pulsar_reader_read_next_with_timeout(reader, &message, 1000));
+
+    pulsar_reader_free(reader);
+    pulsar_reader_configuration_free(reader_conf);
+    pulsar_producer_free(producer);
+    pulsar_producer_configuration_free(producer_conf);
+    pulsar_client_free(client);
+}

Reply via email to