This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 1e834bf [feat] Support consumer seek by timestamp and reader seek
(#247)
1e834bf is described below
commit 1e834bf20c0d58a87a3c9af8e775f5c703bf0970
Author: Zike Yang <[email protected]>
AuthorDate: Thu Nov 24 17:23:20 2022 +0800
[feat] Support consumer seek by timestamp and reader seek (#247)
### Motivation
Complete the seek support for nodejs client.
### Modifications
* Support seek by timestamp for the consumer
* Support seek by messageid and timestamp for the reader
* Change C++ client version to 3.1.0-candidate-1
---
.github/workflows/ci-build-release-napi.yml | 2 +-
.github/workflows/ci-pr-validation.yml | 2 +-
build-support/cpp-base-url.txt | 1 +
build-support/install-cpp-client.sh | 2 +-
index.d.ts | 3 +
pkg/mac/build-cpp-lib.sh | 4 +-
pkg/mac/common.sh | 2 +
pkg/windows/download-cpp-client.bat | 8 +-
pulsar-client-cpp-version.txt | 2 +-
src/Consumer.cc | 24 +++++
src/Consumer.h | 1 +
src/Reader.cc | 52 ++++++++++-
src/Reader.h | 2 +
tests/end_to_end.test.js | 139 ++++++++++++++++++++++++++++
tmp-pulsar-client-cpp-version.txt | 1 -
15 files changed, 233 insertions(+), 12 deletions(-)
diff --git a/.github/workflows/ci-build-release-napi.yml
b/.github/workflows/ci-build-release-napi.yml
index 9e702dd..9d6a2f6 100644
--- a/.github/workflows/ci-build-release-napi.yml
+++ b/.github/workflows/ci-build-release-napi.yml
@@ -149,7 +149,7 @@ jobs:
uses: actions/cache@v3
with:
path: pkg/windows/
- key: ${{ runner.os }}-${{ matrix.arch }}-${{
hashFiles('tmp-pulsar-client-cpp-version.txt') }}
+ key: ${{ runner.os }}-${{ matrix.arch }}-${{
hashFiles('build-support/cpp-base-url.txt') }}-${{
hashFiles('pulsar-client-cpp-version.txt') }}
- name: Add env vars
shell: bash
diff --git a/.github/workflows/ci-pr-validation.yml
b/.github/workflows/ci-pr-validation.yml
index 2b9c920..bd22c20 100644
--- a/.github/workflows/ci-pr-validation.yml
+++ b/.github/workflows/ci-pr-validation.yml
@@ -174,7 +174,7 @@ jobs:
uses: actions/cache@v3
with:
path: pkg/windows/
- key: ${{ runner.os }}-${{ matrix.arch }}-${{
hashFiles('tmp-pulsar-client-cpp-version.txt') }}
+ key: ${{ runner.os }}-${{ matrix.arch }}-${{
hashFiles('build-support/cpp-base-url.txt') }}-${{
hashFiles('pulsar-client-cpp-version.txt') }}
- name: Add env vars
shell: bash
diff --git a/build-support/cpp-base-url.txt b/build-support/cpp-base-url.txt
new file mode 100755
index 0000000..6040757
--- /dev/null
+++ b/build-support/cpp-base-url.txt
@@ -0,0 +1 @@
+https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp/pulsar-client-cpp-3.1.0-candidate-1
diff --git a/build-support/install-cpp-client.sh
b/build-support/install-cpp-client.sh
index 51134a5..4581fbf 100755
--- a/build-support/install-cpp-client.sh
+++ b/build-support/install-cpp-client.sh
@@ -33,7 +33,7 @@ export $(cat /etc/*-release | grep "^ID=")
cd /tmp
# Fetch the client binaries
-BASE_URL=https://dist.apache.org/repos/dist/release/pulsar/pulsar-client-cpp-${CPP_CLIENT_VERSION}
+BASE_URL=$(< "$ROOT_DIR"/build-support/cpp-base-url.txt xargs)
UNAME_ARCH=$(uname -m)
if [ $UNAME_ARCH == 'aarch64' ]; then
diff --git a/index.d.ts b/index.d.ts
index ff2573d..312f903 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -99,6 +99,7 @@ export class Consumer {
acknowledgeCumulative(message: Message): Promise<null>;
acknowledgeCumulativeId(messageId: MessageId): Promise<null>;
seek(messageId: MessageId): Promise<null>;
+ seekTimestamp(timestamp: number): Promise<null>;
isConnected(): boolean;
close(): Promise<null>;
unsubscribe(): Promise<null>;
@@ -118,6 +119,8 @@ export class Reader {
readNext(timeout?: number): Promise<Message>;
hasNext(): boolean;
isConnected(): boolean;
+ seek(messageId: MessageId): Promise<null>;
+ seekTimestamp(timestamp: number): Promise<null>;
close(): Promise<null>;
}
diff --git a/pkg/mac/build-cpp-lib.sh b/pkg/mac/build-cpp-lib.sh
index 67402d0..ab82927 100755
--- a/pkg/mac/build-cpp-lib.sh
+++ b/pkg/mac/build-cpp-lib.sh
@@ -26,7 +26,7 @@ mkdir -p $PULSAR_PREFIX
cd $PULSAR_DIR
## Fetch from official release
-curl -O -L
https://dist.apache.org/repos/dist/release/pulsar/pulsar-client-cpp-${PULSAR_CPP_VERSION}/apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz
+curl -O -L "$BASE_URL"/apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz
tar xfz apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz
pushd apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}
chmod +x ./build-support/merge_archives.sh
@@ -43,7 +43,7 @@ pushd apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}
-DBUILD_DYNAMIC_LIB=OFF \
-DPROTOC_PATH=$PREFIX/bin/protoc
make -j16 install
- mkdir $ROOT_DIR/pkg/lib/
+ mkdir -p $ROOT_DIR/pkg/lib/
cp -r lib/libpulsarwithdeps.a $ROOT_DIR/pkg/lib/
popd
diff --git a/pkg/mac/common.sh b/pkg/mac/common.sh
index 94319c8..9772018 100755
--- a/pkg/mac/common.sh
+++ b/pkg/mac/common.sh
@@ -25,6 +25,8 @@ export MACOSX_DEPLOYMENT_TARGET=11.0
MAC_BUILD_DIR=`cd $(dirname $0); pwd`
ROOT_DIR=$(git rev-parse --show-toplevel)
export PULSAR_CPP_VERSION=`cat $ROOT_DIR/pulsar-client-cpp-version.txt`
+BASE_URL=$(cat "$ROOT_DIR"/build-support/cpp-base-url.txt)
+export BASE_URL
cd $MAC_BUILD_DIR
mkdir -p build
diff --git a/pkg/windows/download-cpp-client.bat
b/pkg/windows/download-cpp-client.bat
index 825511b..d329706 100644
--- a/pkg/windows/download-cpp-client.bat
+++ b/pkg/windows/download-cpp-client.bat
@@ -1,8 +1,8 @@
cd %~dp0
set arch=%1
-:: TODO: Fetch from official release, change to apache release version and
path.
-set /P CPP_VERSION=<..\..\tmp-pulsar-client-cpp-version.txt
-curl -O -L
https://github.com/BewareMyPower/pulsar-client-cpp/releases/download/%CPP_VERSION%/pulsar-client-cpp-%arch%-windows-static.zip
-7z x pulsar-client-cpp-%arch%-windows-static.zip -opulsar-cpp
+set /P BASE_URL=<..\..\build-support\cpp-base-url.txt
+curl -O -L %BASE_URL%/%arch%-windows-static.tar.gz
+tar -xvzf %arch%-windows-static.tar.gz
+mv %arch%-windows-static pulsar-cpp
dir
diff --git a/pulsar-client-cpp-version.txt b/pulsar-client-cpp-version.txt
index 4a36342..fd2a018 100644
--- a/pulsar-client-cpp-version.txt
+++ b/pulsar-client-cpp-version.txt
@@ -1 +1 @@
-3.0.0
+3.1.0
diff --git a/src/Consumer.cc b/src/Consumer.cc
index 53847ad..e4e7898 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -42,6 +42,7 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
InstanceMethod("acknowledgeCumulative",
&Consumer::AcknowledgeCumulative),
InstanceMethod("acknowledgeCumulativeId",
&Consumer::AcknowledgeCumulativeId),
InstanceMethod("seek", &Consumer::Seek),
+ InstanceMethod("seekTimestamp",
&Consumer::SeekTimestamp),
InstanceMethod("isConnected", &Consumer::IsConnected),
InstanceMethod("close", &Consumer::Close),
InstanceMethod("unsubscribe", &Consumer::Unsubscribe),
@@ -393,6 +394,29 @@ Napi::Value Consumer::Seek(const Napi::CallbackInfo &info)
{
return deferred->Promise();
}
+Napi::Value Consumer::SeekTimestamp(const Napi::CallbackInfo &info) {
+ Napi::Number timestamp = info[0].As<Napi::Object>().ToNumber();
+ auto deferred = ThreadSafeDeferred::New(Env());
+ auto ctx = new ExtDeferredContext(deferred);
+
+ pulsar_consumer_seek_by_timestamp_async(
+ this->cConsumer.get(), timestamp.Int64Value(),
+ [](pulsar_result result, void *ctx) {
+ auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+ auto deferred = deferredContext->deferred;
+ delete deferredContext;
+
+ if (result != pulsar_result_Ok) {
+ deferred->Reject(std::string("Failed to seek message by timestamp:
") + pulsar_result_str(result));
+ } else {
+ deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+ }
+ },
+ ctx);
+
+ return deferred->Promise();
+}
+
Napi::Value Consumer::IsConnected(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
return Napi::Boolean::New(env,
pulsar_consumer_is_connected(this->cConsumer.get()));
diff --git a/src/Consumer.h b/src/Consumer.h
index cdde96a..731ec97 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -49,6 +49,7 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
Napi::Value AcknowledgeCumulative(const Napi::CallbackInfo &info);
Napi::Value AcknowledgeCumulativeId(const Napi::CallbackInfo &info);
Napi::Value Seek(const Napi::CallbackInfo &info);
+ Napi::Value SeekTimestamp(const Napi::CallbackInfo &info);
Napi::Value IsConnected(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
Napi::Value Unsubscribe(const Napi::CallbackInfo &info);
diff --git a/src/Reader.cc b/src/Reader.cc
index 319ccc3..74bd4b2 100644
--- a/src/Reader.cc
+++ b/src/Reader.cc
@@ -20,6 +20,7 @@
#include "Message.h"
#include "Reader.h"
#include "ReaderConfig.h"
+#include "MessageId.h"
#include "ThreadSafeDeferred.h"
#include <pulsar/c/result.h>
#include <pulsar/c/reader.h>
@@ -36,6 +37,8 @@ void Reader::Init(Napi::Env env, Napi::Object exports) {
InstanceMethod("readNext",
&Reader::ReadNext),
InstanceMethod("hasNext",
&Reader::HasNext),
InstanceMethod("isConnected",
&Reader::IsConnected),
+ InstanceMethod("seek", &Reader::Seek),
+ InstanceMethod("seekTimestamp",
&Reader::SeekTimestamp),
InstanceMethod("close",
&Reader::Close),
});
@@ -165,7 +168,7 @@ class ReaderReadNextWorker : public Napi::AsyncWorker {
result = pulsar_reader_read_next(this->cReader.get(), &rawMessage);
}
if (result != pulsar_result_Ok) {
- SetError(std::string("Failed to received message ") +
pulsar_result_str(result));
+ SetError(std::string("Failed to receive message: ") +
pulsar_result_str(result));
}
this->cMessage = std::shared_ptr<pulsar_message_t>(rawMessage,
pulsar_message_free);
}
@@ -213,6 +216,53 @@ Napi::Value Reader::IsConnected(const Napi::CallbackInfo
&info) {
return Napi::Boolean::New(env,
pulsar_reader_is_connected(this->cReader.get()));
}
+Napi::Value Reader::Seek(const Napi::CallbackInfo &info) {
+ auto obj = info[0].As<Napi::Object>();
+ auto *msgId = MessageId::Unwrap(obj);
+ auto deferred = ThreadSafeDeferred::New(Env());
+ auto ctx = new ExtDeferredContext(deferred);
+
+ pulsar_reader_seek_async(
+ this->cReader.get(), msgId->GetCMessageId().get(),
+ [](pulsar_result result, void *ctx) {
+ auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+ auto deferred = deferredContext->deferred;
+ delete deferredContext;
+
+ if (result != pulsar_result_Ok) {
+ deferred->Reject(std::string("Failed to seek message by id: ") +
pulsar_result_str(result));
+ } else {
+ deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+ }
+ },
+ ctx);
+
+ return deferred->Promise();
+}
+
+Napi::Value Reader::SeekTimestamp(const Napi::CallbackInfo &info) {
+ Napi::Number timestamp = info[0].As<Napi::Object>().ToNumber();
+ auto deferred = ThreadSafeDeferred::New(Env());
+ auto ctx = new ExtDeferredContext(deferred);
+
+ pulsar_reader_seek_by_timestamp_async(
+ this->cReader.get(), timestamp.Int64Value(),
+ [](pulsar_result result, void *ctx) {
+ auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+ auto deferred = deferredContext->deferred;
+ delete deferredContext;
+
+ if (result != pulsar_result_Ok) {
+ deferred->Reject(std::string("Failed to seek message by timestamp:
") + pulsar_result_str(result));
+ } else {
+ deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+ }
+ },
+ ctx);
+
+ return deferred->Promise();
+}
+
Napi::Value Reader::Close(const Napi::CallbackInfo &info) {
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
diff --git a/src/Reader.h b/src/Reader.h
index 311be8d..7c37f7c 100644
--- a/src/Reader.h
+++ b/src/Reader.h
@@ -42,6 +42,8 @@ class Reader : public Napi::ObjectWrap<Reader> {
Napi::Value ReadNext(const Napi::CallbackInfo &info);
Napi::Value HasNext(const Napi::CallbackInfo &info);
Napi::Value IsConnected(const Napi::CallbackInfo &info);
+ Napi::Value Seek(const Napi::CallbackInfo &info);
+ Napi::Value SeekTimestamp(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
void CleanupListener();
};
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index c42200a..54bba38 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -887,5 +887,144 @@ const Pulsar = require('../index.js');
await consumer.close();
await client.close();
});
+
+ test('Consumer seek by timestamp', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'persistent://public/default/seek-by-timestamp';
+ const producer = await client.createProducer({
+ topic,
+ sendTimeoutMs: 30000,
+ batchingEnabled: false,
+ });
+ expect(producer).not.toBeNull();
+
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ console.log(msg);
+ await producer.send({
+ data: Buffer.from(msg),
+ });
+ }
+
+ const consumer = await client.subscribe({
+ topic,
+ subscription: 'sub',
+ });
+ expect(consumer).not.toBeNull();
+
+ const currentTime = Date.now();
+ console.log(currentTime);
+
+ await consumer.seekTimestamp(currentTime);
+
+ console.log('End seek');
+
+ await expect(consumer.receive(1000)).rejects.toThrow('Failed to receive
message: TimeOut');
+
+ await consumer.seekTimestamp(currentTime - 100000);
+
+ const msg = consumer.receive(1000);
+ console.log((await msg).getMessageId().toString());
+ expect((await msg).getData().toString()).toBe('my-message-0');
+
+ await producer.close();
+ await consumer.close();
+ await client.close();
+ });
+
+ test('Reader seek by message Id', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'persistent://public/default/reader-seek-by-msgid';
+ const producer = await client.createProducer({
+ topic,
+ sendTimeoutMs: 30000,
+ batchingEnabled: false,
+ });
+ expect(producer).not.toBeNull();
+
+ const msgIds = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ console.log(msg);
+ const msgId = await producer.send({
+ data: Buffer.from(msg),
+ });
+ msgIds.push(msgId);
+ }
+
+ const reader = await client.createReader({
+ topic,
+ startMessageId: Pulsar.MessageId.latest(),
+ });
+ expect(reader).not.toBeNull();
+
+ await reader.seek(msgIds[5]);
+ expect(reader.hasNext()).toBe(true);
+ const msg = reader.readNext(1000);
+ console.log((await msg).getMessageId().toString());
+ expect((await msg).getData().toString()).toBe('my-message-6');
+
+ await producer.close();
+ await reader.close();
+ await client.close();
+ });
+
+ test('Reader seek by timestamp', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'persistent://public/default/reader-seek-timestamp';
+ const producer = await client.createProducer({
+ topic,
+ sendTimeoutMs: 30000,
+ batchingEnabled: false,
+ });
+ expect(producer).not.toBeNull();
+
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ console.log(msg);
+ await producer.send({
+ data: Buffer.from(msg),
+ });
+ }
+
+ const reader = await client.createReader({
+ topic,
+ startMessageId: Pulsar.MessageId.latest(),
+ });
+ expect(reader).not.toBeNull();
+
+ const currentTime = Date.now();
+ console.log(currentTime);
+
+ await reader.seekTimestamp(currentTime);
+
+ console.log('End seek');
+
+ expect(reader.hasNext()).toBe(false);
+
+ await reader.seekTimestamp(currentTime - 100000);
+ console.log('Seek to previous time');
+
+ expect(reader.hasNext()).toBe(true);
+ const msg = reader.readNext(1000);
+ console.log((await msg).getMessageId().toString());
+ expect((await msg).getData().toString()).toBe('my-message-0');
+
+ await producer.close();
+ await reader.close();
+ await client.close();
+ });
});
})();
diff --git a/tmp-pulsar-client-cpp-version.txt
b/tmp-pulsar-client-cpp-version.txt
deleted file mode 100644
index fb8ae7b..0000000
--- a/tmp-pulsar-client-cpp-version.txt
+++ /dev/null
@@ -1 +0,0 @@
-v3.1.0-rc-20221026-windows