This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new e80e2e7 [feat] Add support for seek by message id (#241)
e80e2e7 is described below
commit e80e2e7dff8b72e56586a0faf35fb99b6803171f
Author: Zike Yang <[email protected]>
AuthorDate: Thu Nov 10 21:09:23 2022 +0800
[feat] Add support for seek by message id (#241)
---
index.d.ts | 1 +
src/Consumer.cc | 25 +++++++++++++++++++++++++
src/Consumer.h | 1 +
tests/end_to_end.test.js | 40 ++++++++++++++++++++++++++++++++++++++++
4 files changed, 67 insertions(+)
diff --git a/index.d.ts b/index.d.ts
index 3cafb4a..ff2573d 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -98,6 +98,7 @@ export class Consumer {
negativeAcknowledgeId(messageId: MessageId): void;
acknowledgeCumulative(message: Message): Promise<null>;
acknowledgeCumulativeId(messageId: MessageId): Promise<null>;
+ seek(messageId: MessageId): Promise<null>;
isConnected(): boolean;
close(): Promise<null>;
unsubscribe(): Promise<null>;
diff --git a/src/Consumer.cc b/src/Consumer.cc
index 24ca11e..53847ad 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -41,6 +41,7 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
InstanceMethod("negativeAcknowledgeId",
&Consumer::NegativeAcknowledgeId),
InstanceMethod("acknowledgeCumulative",
&Consumer::AcknowledgeCumulative),
InstanceMethod("acknowledgeCumulativeId",
&Consumer::AcknowledgeCumulativeId),
+ InstanceMethod("seek", &Consumer::Seek),
InstanceMethod("isConnected", &Consumer::IsConnected),
InstanceMethod("close", &Consumer::Close),
InstanceMethod("unsubscribe", &Consumer::Unsubscribe),
@@ -368,6 +369,30 @@ Napi::Value Consumer::AcknowledgeCumulativeId(const
Napi::CallbackInfo &info) {
return deferred->Promise();
}
+Napi::Value Consumer::Seek(const Napi::CallbackInfo &info) {
+ auto obj = info[0].As<Napi::Object>();
+ auto *msgId = MessageId::Unwrap(obj);
+ auto deferred = ThreadSafeDeferred::New(Env());
+ auto ctx = new ExtDeferredContext(deferred);
+
+ pulsar_consumer_seek_async(
+ this->cConsumer.get(), msgId->GetCMessageId().get(),
+ [](pulsar_result result, void *ctx) {
+ auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+ auto deferred = deferredContext->deferred;
+ delete deferredContext;
+
+ if (result != pulsar_result_Ok) {
+ deferred->Reject(std::string("Failed to seek message by id: ") +
pulsar_result_str(result));
+ } else {
+ deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+ }
+ },
+ ctx);
+
+ return deferred->Promise();
+}
+
Napi::Value Consumer::IsConnected(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
return Napi::Boolean::New(env,
pulsar_consumer_is_connected(this->cConsumer.get()));
diff --git a/src/Consumer.h b/src/Consumer.h
index dcae8e5..cdde96a 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -48,6 +48,7 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
void NegativeAcknowledgeId(const Napi::CallbackInfo &info);
Napi::Value AcknowledgeCumulative(const Napi::CallbackInfo &info);
Napi::Value AcknowledgeCumulativeId(const Napi::CallbackInfo &info);
+ Napi::Value Seek(const Napi::CallbackInfo &info);
Napi::Value IsConnected(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
Napi::Value Unsubscribe(const Napi::CallbackInfo &info);
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 39b0272..c42200a 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -847,5 +847,45 @@ const Pulsar = require('../index.js');
await client.close();
});
+
+ test('Consumer seek by message Id', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'persistent://public/default/seek-by-msgid';
+ const producer = await client.createProducer({
+ topic,
+ sendTimeoutMs: 30000,
+ batchingEnabled: false,
+ });
+ expect(producer).not.toBeNull();
+
+ const msgIds = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ console.log(msg);
+ const msgId = await producer.send({
+ data: Buffer.from(msg),
+ });
+ msgIds.push(msgId);
+ }
+
+ const consumer = await client.subscribe({
+ topic,
+ subscription: 'sub',
+ });
+ expect(consumer).not.toBeNull();
+
+ await consumer.seek(msgIds[5]);
+ const msg = consumer.receive(1000);
+ console.log((await msg).getMessageId().toString());
+ expect((await msg).getData().toString()).toBe('my-message-6');
+
+ await producer.close();
+ await consumer.close();
+ await client.close();
+ });
});
})();