This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new 262918a fix: ReaderListenerProxy will make a segfault (#376)
262918a is described below
commit 262918a6cbce8773b9c57adb06b8c723a61930d5
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Apr 9 17:40:25 2024 +0800
fix: ReaderListenerProxy will make a segfault (#376)
(cherry picked from commit e3bf582ea450e68379ff685598257ed6e69fb1aa)
---
src/Reader.cc | 24 +++++++++++++-----------
tests/reader.test.js | 41 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 54 insertions(+), 11 deletions(-)
diff --git a/src/Reader.cc b/src/Reader.cc
index bd6c7f0..4fe7c63 100644
--- a/src/Reader.cc
+++ b/src/Reader.cc
@@ -60,17 +60,19 @@ struct ReaderListenerProxyData {
void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback,
ReaderListenerProxyData *data) {
Napi::Object msg = Message::NewInstance({}, data->cMessage);
Reader *reader = data->reader;
-
- Napi::Value ret = jsCallback.Call({msg, reader->Value()});
- if (ret.IsPromise()) {
- Napi::Promise promise = ret.As<Napi::Promise>();
- Napi::Value thenValue = promise.Get("then");
- if (thenValue.IsFunction()) {
- Napi::Function then = thenValue.As<Napi::Function>();
- Napi::Function callback =
- Napi::Function::New(env, [data](const Napi::CallbackInfo &info) {
data->callback(); });
- then.Call(promise, {callback});
- return;
+ // `reader` might be null in certain cases, segmentation fault might happend
without this null check.
+ if (reader) {
+ Napi::Value ret = jsCallback.Call({msg, reader->Value()});
+ if (ret.IsPromise()) {
+ Napi::Promise promise = ret.As<Napi::Promise>();
+ Napi::Value thenValue = promise.Get("then");
+ if (thenValue.IsFunction()) {
+ Napi::Function then = thenValue.As<Napi::Function>();
+ Napi::Function callback =
+ Napi::Function::New(env, [data](const Napi::CallbackInfo &info) {
data->callback(); });
+ then.Call(promise, {callback});
+ return;
+ }
}
}
data->callback();
diff --git a/tests/reader.test.js b/tests/reader.test.js
index 673ae60..56d1b48 100644
--- a/tests/reader.test.js
+++ b/tests/reader.test.js
@@ -130,5 +130,46 @@ const baseUrl = 'http://localhost:8080';
await reader.close();
await client.close();
});
+
+ test('Reader should not throw segmentation fault when create and close',
async () => {
+ const NUM_ITS = 1000;
+ const its = Array.from({ length: NUM_ITS }, (_, i) => i);
+
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ });
+
+ const producer = await client.createProducer({
+ topic: 'persistent://public/default/my-topic',
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ });
+
+ // Send messages
+ for (let i = 0; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ producer.send({
+ data: Buffer.from(msg),
+ });
+ console.log(`Sent message: ${msg}`);
+ }
+ await producer.flush();
+
+ await Promise.all(
+ its.map(async () => {
+ const reader = await client.createReader({
+ topic: 'persistent://public/default/my-topic',
+ startMessageId: Pulsar.MessageId.earliest(),
+ listener: (message) => {
+ console.log(message.getData().toString());
+ },
+ });
+ await reader.close();
+ }),
+ );
+ await producer.close();
+ await client.close();
+ expect(true).toBe(true);
+ });
});
})();