This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e834bf  [feat] Support consumer seek by timestamp and reader seek 
(#247)
1e834bf is described below

commit 1e834bf20c0d58a87a3c9af8e775f5c703bf0970
Author: Zike Yang <[email protected]>
AuthorDate: Thu Nov 24 17:23:20 2022 +0800

    [feat] Support consumer seek by timestamp and reader seek (#247)
    
    ### Motivation
    
    Complete the seek support for nodejs client.
    
    ### Modifications
    
    * Support seek by timestamp for the consumer
    * Support seek by messageid and timestamp for the reader
    * Change C++ client version to 3.1.0-candidate-1
---
 .github/workflows/ci-build-release-napi.yml |   2 +-
 .github/workflows/ci-pr-validation.yml      |   2 +-
 build-support/cpp-base-url.txt              |   1 +
 build-support/install-cpp-client.sh         |   2 +-
 index.d.ts                                  |   3 +
 pkg/mac/build-cpp-lib.sh                    |   4 +-
 pkg/mac/common.sh                           |   2 +
 pkg/windows/download-cpp-client.bat         |   8 +-
 pulsar-client-cpp-version.txt               |   2 +-
 src/Consumer.cc                             |  24 +++++
 src/Consumer.h                              |   1 +
 src/Reader.cc                               |  52 ++++++++++-
 src/Reader.h                                |   2 +
 tests/end_to_end.test.js                    | 139 ++++++++++++++++++++++++++++
 tmp-pulsar-client-cpp-version.txt           |   1 -
 15 files changed, 233 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/ci-build-release-napi.yml 
b/.github/workflows/ci-build-release-napi.yml
index 9e702dd..9d6a2f6 100644
--- a/.github/workflows/ci-build-release-napi.yml
+++ b/.github/workflows/ci-build-release-napi.yml
@@ -149,7 +149,7 @@ jobs:
         uses: actions/cache@v3
         with:
           path: pkg/windows/
-          key: ${{ runner.os }}-${{ matrix.arch }}-${{ 
hashFiles('tmp-pulsar-client-cpp-version.txt') }}
+          key: ${{ runner.os }}-${{ matrix.arch }}-${{ 
hashFiles('build-support/cpp-base-url.txt') }}-${{ 
hashFiles('pulsar-client-cpp-version.txt') }}
 
       - name: Add env vars
         shell: bash
diff --git a/.github/workflows/ci-pr-validation.yml 
b/.github/workflows/ci-pr-validation.yml
index 2b9c920..bd22c20 100644
--- a/.github/workflows/ci-pr-validation.yml
+++ b/.github/workflows/ci-pr-validation.yml
@@ -174,7 +174,7 @@ jobs:
         uses: actions/cache@v3
         with:
           path: pkg/windows/
-          key: ${{ runner.os }}-${{ matrix.arch }}-${{ 
hashFiles('tmp-pulsar-client-cpp-version.txt') }}
+          key: ${{ runner.os }}-${{ matrix.arch }}-${{ 
hashFiles('build-support/cpp-base-url.txt') }}-${{ 
hashFiles('pulsar-client-cpp-version.txt') }}
 
       - name: Add env vars
         shell: bash
diff --git a/build-support/cpp-base-url.txt b/build-support/cpp-base-url.txt
new file mode 100755
index 0000000..6040757
--- /dev/null
+++ b/build-support/cpp-base-url.txt
@@ -0,0 +1 @@
+https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp/pulsar-client-cpp-3.1.0-candidate-1
diff --git a/build-support/install-cpp-client.sh 
b/build-support/install-cpp-client.sh
index 51134a5..4581fbf 100755
--- a/build-support/install-cpp-client.sh
+++ b/build-support/install-cpp-client.sh
@@ -33,7 +33,7 @@ export $(cat /etc/*-release | grep "^ID=")
 cd /tmp
 
 # Fetch the client binaries
-BASE_URL=https://dist.apache.org/repos/dist/release/pulsar/pulsar-client-cpp-${CPP_CLIENT_VERSION}
+BASE_URL=$(< "$ROOT_DIR"/build-support/cpp-base-url.txt xargs)
 
 UNAME_ARCH=$(uname -m)
 if [ $UNAME_ARCH == 'aarch64' ]; then
diff --git a/index.d.ts b/index.d.ts
index ff2573d..312f903 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -99,6 +99,7 @@ export class Consumer {
   acknowledgeCumulative(message: Message): Promise<null>;
   acknowledgeCumulativeId(messageId: MessageId): Promise<null>;
   seek(messageId: MessageId): Promise<null>;
+  seekTimestamp(timestamp: number): Promise<null>;
   isConnected(): boolean;
   close(): Promise<null>;
   unsubscribe(): Promise<null>;
@@ -118,6 +119,8 @@ export class Reader {
   readNext(timeout?: number): Promise<Message>;
   hasNext(): boolean;
   isConnected(): boolean;
+  seek(messageId: MessageId): Promise<null>;
+  seekTimestamp(timestamp: number): Promise<null>;
   close(): Promise<null>;
 }
 
diff --git a/pkg/mac/build-cpp-lib.sh b/pkg/mac/build-cpp-lib.sh
index 67402d0..ab82927 100755
--- a/pkg/mac/build-cpp-lib.sh
+++ b/pkg/mac/build-cpp-lib.sh
@@ -26,7 +26,7 @@ mkdir -p $PULSAR_PREFIX
 cd $PULSAR_DIR
 
 ## Fetch from official release
-curl -O -L 
https://dist.apache.org/repos/dist/release/pulsar/pulsar-client-cpp-${PULSAR_CPP_VERSION}/apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz
+curl -O -L "$BASE_URL"/apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz
 tar xfz apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz
 pushd apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}
   chmod +x ./build-support/merge_archives.sh
@@ -43,7 +43,7 @@ pushd apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}
       -DBUILD_DYNAMIC_LIB=OFF \
       -DPROTOC_PATH=$PREFIX/bin/protoc
   make -j16 install
-  mkdir $ROOT_DIR/pkg/lib/
+  mkdir -p $ROOT_DIR/pkg/lib/
   cp -r lib/libpulsarwithdeps.a $ROOT_DIR/pkg/lib/
 popd
 
diff --git a/pkg/mac/common.sh b/pkg/mac/common.sh
index 94319c8..9772018 100755
--- a/pkg/mac/common.sh
+++ b/pkg/mac/common.sh
@@ -25,6 +25,8 @@ export MACOSX_DEPLOYMENT_TARGET=11.0
 MAC_BUILD_DIR=`cd $(dirname $0); pwd`
 ROOT_DIR=$(git rev-parse --show-toplevel)
 export PULSAR_CPP_VERSION=`cat $ROOT_DIR/pulsar-client-cpp-version.txt`
+BASE_URL=$(cat "$ROOT_DIR"/build-support/cpp-base-url.txt)
+export BASE_URL
 
 cd $MAC_BUILD_DIR
 mkdir -p build
diff --git a/pkg/windows/download-cpp-client.bat 
b/pkg/windows/download-cpp-client.bat
index 825511b..d329706 100644
--- a/pkg/windows/download-cpp-client.bat
+++ b/pkg/windows/download-cpp-client.bat
@@ -1,8 +1,8 @@
 cd %~dp0
 set arch=%1
-:: TODO: Fetch from official release, change to apache release version and 
path.
-set /P CPP_VERSION=<..\..\tmp-pulsar-client-cpp-version.txt
-curl -O -L 
https://github.com/BewareMyPower/pulsar-client-cpp/releases/download/%CPP_VERSION%/pulsar-client-cpp-%arch%-windows-static.zip
-7z x pulsar-client-cpp-%arch%-windows-static.zip -opulsar-cpp
+set /P BASE_URL=<..\..\build-support\cpp-base-url.txt
+curl -O -L %BASE_URL%/%arch%-windows-static.tar.gz
+tar -xvzf %arch%-windows-static.tar.gz
+mv %arch%-windows-static pulsar-cpp
 dir
 
diff --git a/pulsar-client-cpp-version.txt b/pulsar-client-cpp-version.txt
index 4a36342..fd2a018 100644
--- a/pulsar-client-cpp-version.txt
+++ b/pulsar-client-cpp-version.txt
@@ -1 +1 @@
-3.0.0
+3.1.0
diff --git a/src/Consumer.cc b/src/Consumer.cc
index 53847ad..e4e7898 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -42,6 +42,7 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
                       InstanceMethod("acknowledgeCumulative", 
&Consumer::AcknowledgeCumulative),
                       InstanceMethod("acknowledgeCumulativeId", 
&Consumer::AcknowledgeCumulativeId),
                       InstanceMethod("seek", &Consumer::Seek),
+                      InstanceMethod("seekTimestamp", 
&Consumer::SeekTimestamp),
                       InstanceMethod("isConnected", &Consumer::IsConnected),
                       InstanceMethod("close", &Consumer::Close),
                       InstanceMethod("unsubscribe", &Consumer::Unsubscribe),
@@ -393,6 +394,29 @@ Napi::Value Consumer::Seek(const Napi::CallbackInfo &info) 
{
   return deferred->Promise();
 }
 
+Napi::Value Consumer::SeekTimestamp(const Napi::CallbackInfo &info) {
+  Napi::Number timestamp = info[0].As<Napi::Object>().ToNumber();
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext(deferred);
+
+  pulsar_consumer_seek_by_timestamp_async(
+      this->cConsumer.get(), timestamp.Int64Value(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+        auto deferred = deferredContext->deferred;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to seek message by timestamp: 
") + pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+      },
+      ctx);
+
+  return deferred->Promise();
+}
+
 Napi::Value Consumer::IsConnected(const Napi::CallbackInfo &info) {
   Napi::Env env = info.Env();
   return Napi::Boolean::New(env, 
pulsar_consumer_is_connected(this->cConsumer.get()));
diff --git a/src/Consumer.h b/src/Consumer.h
index cdde96a..731ec97 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -49,6 +49,7 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
   Napi::Value AcknowledgeCumulative(const Napi::CallbackInfo &info);
   Napi::Value AcknowledgeCumulativeId(const Napi::CallbackInfo &info);
   Napi::Value Seek(const Napi::CallbackInfo &info);
+  Napi::Value SeekTimestamp(const Napi::CallbackInfo &info);
   Napi::Value IsConnected(const Napi::CallbackInfo &info);
   Napi::Value Close(const Napi::CallbackInfo &info);
   Napi::Value Unsubscribe(const Napi::CallbackInfo &info);
diff --git a/src/Reader.cc b/src/Reader.cc
index 319ccc3..74bd4b2 100644
--- a/src/Reader.cc
+++ b/src/Reader.cc
@@ -20,6 +20,7 @@
 #include "Message.h"
 #include "Reader.h"
 #include "ReaderConfig.h"
+#include "MessageId.h"
 #include "ThreadSafeDeferred.h"
 #include <pulsar/c/result.h>
 #include <pulsar/c/reader.h>
@@ -36,6 +37,8 @@ void Reader::Init(Napi::Env env, Napi::Object exports) {
                                         InstanceMethod("readNext", 
&Reader::ReadNext),
                                         InstanceMethod("hasNext", 
&Reader::HasNext),
                                         InstanceMethod("isConnected", 
&Reader::IsConnected),
+                                        InstanceMethod("seek", &Reader::Seek),
+                                        InstanceMethod("seekTimestamp", 
&Reader::SeekTimestamp),
                                         InstanceMethod("close", 
&Reader::Close),
                                     });
 
@@ -165,7 +168,7 @@ class ReaderReadNextWorker : public Napi::AsyncWorker {
       result = pulsar_reader_read_next(this->cReader.get(), &rawMessage);
     }
     if (result != pulsar_result_Ok) {
-      SetError(std::string("Failed to received message ") + 
pulsar_result_str(result));
+      SetError(std::string("Failed to receive message: ") + 
pulsar_result_str(result));
     }
     this->cMessage = std::shared_ptr<pulsar_message_t>(rawMessage, 
pulsar_message_free);
   }
@@ -213,6 +216,53 @@ Napi::Value Reader::IsConnected(const Napi::CallbackInfo 
&info) {
   return Napi::Boolean::New(env, 
pulsar_reader_is_connected(this->cReader.get()));
 }
 
+Napi::Value Reader::Seek(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msgId = MessageId::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext(deferred);
+
+  pulsar_reader_seek_async(
+      this->cReader.get(), msgId->GetCMessageId().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+        auto deferred = deferredContext->deferred;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to seek message by id: ") + 
pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+      },
+      ctx);
+
+  return deferred->Promise();
+}
+
+Napi::Value Reader::SeekTimestamp(const Napi::CallbackInfo &info) {
+  Napi::Number timestamp = info[0].As<Napi::Object>().ToNumber();
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext(deferred);
+
+  pulsar_reader_seek_by_timestamp_async(
+      this->cReader.get(), timestamp.Int64Value(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+        auto deferred = deferredContext->deferred;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to seek message by timestamp: 
") + pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+      },
+      ctx);
+
+  return deferred->Promise();
+}
+
 Napi::Value Reader::Close(const Napi::CallbackInfo &info) {
   auto deferred = ThreadSafeDeferred::New(Env());
   auto ctx = new ExtDeferredContext(deferred);
diff --git a/src/Reader.h b/src/Reader.h
index 311be8d..7c37f7c 100644
--- a/src/Reader.h
+++ b/src/Reader.h
@@ -42,6 +42,8 @@ class Reader : public Napi::ObjectWrap<Reader> {
   Napi::Value ReadNext(const Napi::CallbackInfo &info);
   Napi::Value HasNext(const Napi::CallbackInfo &info);
   Napi::Value IsConnected(const Napi::CallbackInfo &info);
+  Napi::Value Seek(const Napi::CallbackInfo &info);
+  Napi::Value SeekTimestamp(const Napi::CallbackInfo &info);
   Napi::Value Close(const Napi::CallbackInfo &info);
   void CleanupListener();
 };
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index c42200a..54bba38 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -887,5 +887,144 @@ const Pulsar = require('../index.js');
       await consumer.close();
       await client.close();
     });
+
+    test('Consumer seek by timestamp', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'persistent://public/default/seek-by-timestamp';
+      const producer = await client.createProducer({
+        topic,
+        sendTimeoutMs: 30000,
+        batchingEnabled: false,
+      });
+      expect(producer).not.toBeNull();
+
+      for (let i = 0; i < 10; i += 1) {
+        const msg = `my-message-${i}`;
+        console.log(msg);
+        await producer.send({
+          data: Buffer.from(msg),
+        });
+      }
+
+      const consumer = await client.subscribe({
+        topic,
+        subscription: 'sub',
+      });
+      expect(consumer).not.toBeNull();
+
+      const currentTime = Date.now();
+      console.log(currentTime);
+
+      await consumer.seekTimestamp(currentTime);
+
+      console.log('End seek');
+
+      await expect(consumer.receive(1000)).rejects.toThrow('Failed to receive 
message: TimeOut');
+
+      await consumer.seekTimestamp(currentTime - 100000);
+
+      const msg = consumer.receive(1000);
+      console.log((await msg).getMessageId().toString());
+      expect((await msg).getData().toString()).toBe('my-message-0');
+
+      await producer.close();
+      await consumer.close();
+      await client.close();
+    });
+
+    test('Reader seek by message Id', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'persistent://public/default/reader-seek-by-msgid';
+      const producer = await client.createProducer({
+        topic,
+        sendTimeoutMs: 30000,
+        batchingEnabled: false,
+      });
+      expect(producer).not.toBeNull();
+
+      const msgIds = [];
+      for (let i = 0; i < 10; i += 1) {
+        const msg = `my-message-${i}`;
+        console.log(msg);
+        const msgId = await producer.send({
+          data: Buffer.from(msg),
+        });
+        msgIds.push(msgId);
+      }
+
+      const reader = await client.createReader({
+        topic,
+        startMessageId: Pulsar.MessageId.latest(),
+      });
+      expect(reader).not.toBeNull();
+
+      await reader.seek(msgIds[5]);
+      expect(reader.hasNext()).toBe(true);
+      const msg = reader.readNext(1000);
+      console.log((await msg).getMessageId().toString());
+      expect((await msg).getData().toString()).toBe('my-message-6');
+
+      await producer.close();
+      await reader.close();
+      await client.close();
+    });
+
+    test('Reader seek by timestamp', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'persistent://public/default/reader-seek-timestamp';
+      const producer = await client.createProducer({
+        topic,
+        sendTimeoutMs: 30000,
+        batchingEnabled: false,
+      });
+      expect(producer).not.toBeNull();
+
+      for (let i = 0; i < 10; i += 1) {
+        const msg = `my-message-${i}`;
+        console.log(msg);
+        await producer.send({
+          data: Buffer.from(msg),
+        });
+      }
+
+      const reader = await client.createReader({
+        topic,
+        startMessageId: Pulsar.MessageId.latest(),
+      });
+      expect(reader).not.toBeNull();
+
+      const currentTime = Date.now();
+      console.log(currentTime);
+
+      await reader.seekTimestamp(currentTime);
+
+      console.log('End seek');
+
+      expect(reader.hasNext()).toBe(false);
+
+      await reader.seekTimestamp(currentTime - 100000);
+      console.log('Seek to previous time');
+
+      expect(reader.hasNext()).toBe(true);
+      const msg = reader.readNext(1000);
+      console.log((await msg).getMessageId().toString());
+      expect((await msg).getData().toString()).toBe('my-message-0');
+
+      await producer.close();
+      await reader.close();
+      await client.close();
+    });
   });
 })();
diff --git a/tmp-pulsar-client-cpp-version.txt 
b/tmp-pulsar-client-cpp-version.txt
deleted file mode 100644
index fb8ae7b..0000000
--- a/tmp-pulsar-client-cpp-version.txt
+++ /dev/null
@@ -1 +0,0 @@
-v3.1.0-rc-20221026-windows

Reply via email to