This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new e80e2e7  [feat] Add support for seek by message id (#241)
e80e2e7 is described below

commit e80e2e7dff8b72e56586a0faf35fb99b6803171f
Author: Zike Yang <[email protected]>
AuthorDate: Thu Nov 10 21:09:23 2022 +0800

    [feat] Add support for seek by message id (#241)
---
 index.d.ts               |  1 +
 src/Consumer.cc          | 25 +++++++++++++++++++++++++
 src/Consumer.h           |  1 +
 tests/end_to_end.test.js | 40 ++++++++++++++++++++++++++++++++++++++++
 4 files changed, 67 insertions(+)

diff --git a/index.d.ts b/index.d.ts
index 3cafb4a..ff2573d 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -98,6 +98,7 @@ export class Consumer {
   negativeAcknowledgeId(messageId: MessageId): void;
   acknowledgeCumulative(message: Message): Promise<null>;
   acknowledgeCumulativeId(messageId: MessageId): Promise<null>;
+  seek(messageId: MessageId): Promise<null>;
   isConnected(): boolean;
   close(): Promise<null>;
   unsubscribe(): Promise<null>;
diff --git a/src/Consumer.cc b/src/Consumer.cc
index 24ca11e..53847ad 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -41,6 +41,7 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
                       InstanceMethod("negativeAcknowledgeId", 
&Consumer::NegativeAcknowledgeId),
                       InstanceMethod("acknowledgeCumulative", 
&Consumer::AcknowledgeCumulative),
                       InstanceMethod("acknowledgeCumulativeId", 
&Consumer::AcknowledgeCumulativeId),
+                      InstanceMethod("seek", &Consumer::Seek),
                       InstanceMethod("isConnected", &Consumer::IsConnected),
                       InstanceMethod("close", &Consumer::Close),
                       InstanceMethod("unsubscribe", &Consumer::Unsubscribe),
@@ -368,6 +369,30 @@ Napi::Value Consumer::AcknowledgeCumulativeId(const 
Napi::CallbackInfo &info) {
   return deferred->Promise();
 }
 
+Napi::Value Consumer::Seek(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msgId = MessageId::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext(deferred);
+
+  pulsar_consumer_seek_async(
+      this->cConsumer.get(), msgId->GetCMessageId().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+        auto deferred = deferredContext->deferred;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to seek message by id: ") + 
pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+      },
+      ctx);
+
+  return deferred->Promise();
+}
+
 Napi::Value Consumer::IsConnected(const Napi::CallbackInfo &info) {
   Napi::Env env = info.Env();
   return Napi::Boolean::New(env, 
pulsar_consumer_is_connected(this->cConsumer.get()));
diff --git a/src/Consumer.h b/src/Consumer.h
index dcae8e5..cdde96a 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -48,6 +48,7 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
   void NegativeAcknowledgeId(const Napi::CallbackInfo &info);
   Napi::Value AcknowledgeCumulative(const Napi::CallbackInfo &info);
   Napi::Value AcknowledgeCumulativeId(const Napi::CallbackInfo &info);
+  Napi::Value Seek(const Napi::CallbackInfo &info);
   Napi::Value IsConnected(const Napi::CallbackInfo &info);
   Napi::Value Close(const Napi::CallbackInfo &info);
   Napi::Value Unsubscribe(const Napi::CallbackInfo &info);
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 39b0272..c42200a 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -847,5 +847,45 @@ const Pulsar = require('../index.js');
 
       await client.close();
     });
+
+    test('Consumer seek by message Id', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'persistent://public/default/seek-by-msgid';
+      const producer = await client.createProducer({
+        topic,
+        sendTimeoutMs: 30000,
+        batchingEnabled: false,
+      });
+      expect(producer).not.toBeNull();
+
+      const msgIds = [];
+      for (let i = 0; i < 10; i += 1) {
+        const msg = `my-message-${i}`;
+        console.log(msg);
+        const msgId = await producer.send({
+          data: Buffer.from(msg),
+        });
+        msgIds.push(msgId);
+      }
+
+      const consumer = await client.subscribe({
+        topic,
+        subscription: 'sub',
+      });
+      expect(consumer).not.toBeNull();
+
+      await consumer.seek(msgIds[5]);
+      const msg = consumer.receive(1000);
+      console.log((await msg).getMessageId().toString());
+      expect((await msg).getData().toString()).toBe('my-message-6');
+
+      await producer.close();
+      await consumer.close();
+      await client.close();
+    });
   });
 })();

Reply via email to