This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 448a504 [Improve] Add error handling for the message listener (#319)
448a504 is described below
commit 448a5046c9fd70b749e8dac31f395c90498b027e
Author: Zike Yang <[email protected]>
AuthorDate: Thu Apr 27 14:33:28 2023 +0800
[Improve] Add error handling for the message listener (#319)
## Motivation
Currently, there is no error handling for the message listener. If there
are any errors thrown from the user's listener, the program will crash.
## Modification
* Add error handling for the message listener. The client won't crash the
program if there are any errors in the user function. Instead, it will log as
the error.
* Add LogUtils to the internal native code.
* Add `GetTopic` and `GetSubscriptionName` for the internal native consumer.
---
src/Client.h | 5 ++--
src/Consumer.cc | 44 +++++++++++++++++++++++++++--------
src/Consumer.h | 2 ++
src/LogUtils.h | 32 ++++++++++++++++++++++++++
tests/end_to_end.test.js | 60 ++++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 132 insertions(+), 11 deletions(-)
diff --git a/src/Client.h b/src/Client.h
index bc20b5c..3755b6c 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -42,6 +42,9 @@ class Client : public Napi::ObjectWrap<Client> {
static Napi::Object Init(Napi::Env env, Napi::Object exports);
static void SetLogHandler(const Napi::CallbackInfo &info);
+ static void LogMessage(pulsar_logger_level_t level, const char *file, int
line, const char *message,
+ void *ctx);
+
Client(const Napi::CallbackInfo &info);
~Client();
@@ -51,8 +54,6 @@ class Client : public Napi::ObjectWrap<Client> {
std::shared_ptr<pulsar_client_t> cClient;
std::shared_ptr<pulsar_client_configuration_t> cClientConfig;
- static void LogMessage(pulsar_logger_level_t level, const char *file, int
line, const char *message,
- void *ctx);
Napi::Value CreateProducer(const Napi::CallbackInfo &info);
Napi::Value Subscribe(const Napi::CallbackInfo &info);
Napi::Value CreateReader(const Napi::CallbackInfo &info);
diff --git a/src/Consumer.cc b/src/Consumer.cc
index f5c3e04..328da89 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -22,10 +22,12 @@
#include "Message.h"
#include "MessageId.h"
#include "ThreadSafeDeferred.h"
+#include "LogUtils.h"
#include <pulsar/c/result.h>
#include <atomic>
#include <thread>
#include <future>
+#include <sstream>
Napi::FunctionReference Consumer::constructor;
@@ -63,6 +65,13 @@ struct MessageListenerProxyData {
: cMessage(cMessage), consumer(consumer), callback(callback) {}
};
+inline void logMessageListenerError(Consumer *consumer, const char *err) {
+ std::ostringstream ss;
+ ss << "[" << consumer->GetTopic() << "][" << consumer->GetSubscriptionName()
+ << "] Message listener error in processing message: " << err;
+ LOG_ERROR(ss.str().c_str());
+}
+
void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback,
MessageListenerProxyData *data) {
Napi::Object msg = Message::NewInstance({}, data->cMessage);
Consumer *consumer = data->consumer;
@@ -70,17 +79,28 @@ void MessageListenerProxy(Napi::Env env, Napi::Function
jsCallback, MessageListe
// `consumer` might be null in certain cases, segmentation fault might
happend without this null check. We
// need to handle this rare case in future.
if (consumer) {
- Napi::Value ret = jsCallback.Call({msg, consumer->Value()});
+ Napi::Value ret;
+ try {
+ ret = jsCallback.Call({msg, consumer->Value()});
+ } catch (std::exception &exception) {
+ logMessageListenerError(consumer, exception.what());
+ }
+
if (ret.IsPromise()) {
Napi::Promise promise = ret.As<Napi::Promise>();
- Napi::Value thenValue = promise.Get("then");
- if (thenValue.IsFunction()) {
- Napi::Function then = thenValue.As<Napi::Function>();
- Napi::Function callback =
- Napi::Function::New(env, [data](const Napi::CallbackInfo &info) {
data->callback(); });
- then.Call(promise, {callback});
- return;
- }
+ Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>();
+
+ ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const
Napi::CallbackInfo &info) {
+ Napi::Error error = info[0].As<Napi::Error>();
+ logMessageListenerError(consumer, error.what());
+ })});
+
+ promise = ret.As<Napi::Promise>();
+ Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>();
+
+ finallyFunc.Call(
+ promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo
&info) { data->callback(); })});
+ return;
}
}
data->callback();
@@ -227,6 +247,12 @@ Napi::Value Consumer::NewInstance(const Napi::CallbackInfo
&info, std::shared_pt
return deferred->Promise();
}
+std::string Consumer::GetTopic() { return
{pulsar_consumer_get_topic(this->cConsumer.get())}; }
+
+std::string Consumer::GetSubscriptionName() {
+ return {pulsar_consumer_get_subscription_name(this->cConsumer.get())};
+}
+
// We still need a receive worker because the c api is missing the equivalent
async definition
class ConsumerReceiveWorker : public Napi::AsyncWorker {
public:
diff --git a/src/Consumer.h b/src/Consumer.h
index 731ec97..1574fab 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -36,6 +36,8 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
void SetListenerCallback(MessageListenerCallback *listener);
void Cleanup();
void CleanupListener();
+ std::string GetTopic();
+ std::string GetSubscriptionName();
private:
std::shared_ptr<pulsar_consumer_t> cConsumer;
diff --git a/src/LogUtils.h b/src/LogUtils.h
new file mode 100644
index 0000000..6e3180e
--- /dev/null
+++ b/src/LogUtils.h
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PULSAR_CLIENT_NODE_LOGUTILS_H
+#define PULSAR_CLIENT_NODE_LOGUTILS_H
+
+#include "Client.h"
+
+#define LOG(level, message) Client::LogMessage(pulsar_logger_level_t::level,
__FILE__, __LINE__, message, 0)
+
+#define LOG_DEBUG(message) LOG(pulsar_DEBUG, message)
+#define LOG_INFO(message) LOG(pulsar_INFO, message)
+#define LOG_WARN(message) LOG(pulsar_WARN, message)
+#define LOG_ERROR(message) LOG(pulsar_ERROR, message)
+
+#endif // PULSAR_CLIENT_NODE_LOGUTILS_H
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 7a5621d..dc382cf 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -386,6 +386,66 @@ const Pulsar = require('../index.js');
await client.close();
});
+ test('Message Listener error handling', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ });
+ let syncFinsh;
+ const syncPromise = new Promise((resolve) => {
+ syncFinsh = resolve;
+ });
+ let asyncFinsh;
+ const asyncPromise = new Promise((resolve) => {
+ asyncFinsh = resolve;
+ });
+ Pulsar.Client.setLogHandler((level, file, line, message) => {
+ if (level === 3) { // should be error level
+ if (message.includes('consumer1 callback expected error')) {
+ syncFinsh();
+ }
+ if (message.includes('consumer2 callback expected error')) {
+ asyncFinsh();
+ }
+ }
+ });
+
+ const topic = 'test-error-listener';
+ const producer = await client.createProducer({
+ topic,
+ batchingEnabled: false,
+ });
+
+ await producer.send('test-message');
+
+ const consumer1 = await client.subscribe({
+ topic,
+ subscription: 'sync',
+ subscriptionType: 'Shared',
+ subscriptionInitialPosition: 'Earliest',
+ listener: (message, messageConsumer) => {
+ throw new Error('consumer1 callback expected error');
+ },
+ });
+
+ const consumer2 = await client.subscribe({
+ topic,
+ subscription: 'async',
+ subscriptionType: 'Shared',
+ subscriptionInitialPosition: 'Earliest',
+ listener: async (message, messageConsumer) => {
+ throw new Error('consumer2 callback expected error');
+ },
+ });
+
+ await syncPromise;
+ await asyncPromise;
+
+ await consumer1.close();
+ await consumer2.close();
+ await producer.close();
+ await client.close();
+ });
+
test('acknowledgeCumulative', async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',