This is an automated email from the ASF dual-hosted git repository. massakam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
commit 196a69c30e238e1438f4964b6643bfc4b56ed903 Author: yfuruta <[email protected]> AuthorDate: Tue Mar 12 10:48:30 2019 +0900 change 4 to 2 spaces and reformat --- .clang-format | 2 +- src/Client.cc | 194 +++++++++++++++++++------------------- src/Client.h | 20 ++-- src/Consumer.cc | 243 ++++++++++++++++++++++++------------------------ src/Consumer.h | 26 +++--- src/ConsumerConfig.cc | 89 +++++++++--------- src/ConsumerConfig.h | 24 ++--- src/Message.cc | 252 +++++++++++++++++++++++++------------------------- src/Message.h | 60 ++++++------ src/MessageId.cc | 64 ++++++------- src/MessageId.h | 28 +++--- src/Producer.cc | 194 +++++++++++++++++++------------------- src/Producer.h | 22 ++--- src/ProducerConfig.cc | 181 ++++++++++++++++++------------------ src/ProducerConfig.h | 16 ++-- src/addon.cc | 10 +- 16 files changed, 707 insertions(+), 718 deletions(-) diff --git a/.clang-format b/.clang-format index cb40b50..f2d174b 100644 --- a/.clang-format +++ b/.clang-format @@ -17,7 +17,7 @@ BasedOnStyle: Google -IndentWidth: 4 +IndentWidth: 2 ColumnLimit: 110 SortIncludes: false BreakBeforeBraces: Custom diff --git a/src/Client.cc b/src/Client.cc index 97fc573..16fb13d 100644 --- a/src/Client.cc +++ b/src/Client.cc @@ -38,133 +38,133 @@ static const std::string CFG_STATS_INTERVAL = "statsIntervalInSeconds"; Napi::FunctionReference Client::constructor; Napi::Object Client::Init(Napi::Env env, Napi::Object exports) { - Napi::HandleScope scope(env); + Napi::HandleScope scope(env); - Napi::Function func = DefineClass( - env, "Client", - {InstanceMethod("createProducer", &Client::CreateProducer), - InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("close", &Client::Close)}); + Napi::Function func = + DefineClass(env, "Client", + {InstanceMethod("createProducer", &Client::CreateProducer), + InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("close", &Client::Close)}); - constructor = Napi::Persistent(func); - constructor.SuppressDestruct(); + constructor = Napi::Persistent(func); + constructor.SuppressDestruct(); - exports.Set("Client", func); - return exports; + exports.Set("Client", func); + return exports; } Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info) { - Napi::Env env = info.Env(); - Napi::HandleScope scope(env); - Napi::Object clientConfig = info[0].As<Napi::Object>(); - - if (!clientConfig.Has(CFG_SERVICE_URL) || !clientConfig.Get(CFG_SERVICE_URL).IsString()) { - if (clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) { - Napi::Error::New(env, "Service URL is required and must be specified as a string") - .ThrowAsJavaScriptException(); - return; - } + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + Napi::Object clientConfig = info[0].As<Napi::Object>(); + + if (!clientConfig.Has(CFG_SERVICE_URL) || !clientConfig.Get(CFG_SERVICE_URL).IsString()) { + if (clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) { + Napi::Error::New(env, "Service URL is required and must be specified as a string") + .ThrowAsJavaScriptException(); + return; } - Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString(); + } + Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString(); - pulsar_client_configuration_t *cClientConfig = pulsar_client_configuration_create(); + pulsar_client_configuration_t *cClientConfig = pulsar_client_configuration_create(); - if (clientConfig.Has(CFG_OP_TIMEOUT) && clientConfig.Get(CFG_OP_TIMEOUT).IsNumber()) { - int32_t operationTimeoutSeconds = clientConfig.Get(CFG_OP_TIMEOUT).ToNumber().Int32Value(); - if (operationTimeoutSeconds > 0) { - pulsar_client_configuration_set_operation_timeout_seconds(cClientConfig, operationTimeoutSeconds); - } + if (clientConfig.Has(CFG_OP_TIMEOUT) && clientConfig.Get(CFG_OP_TIMEOUT).IsNumber()) { + int32_t operationTimeoutSeconds = clientConfig.Get(CFG_OP_TIMEOUT).ToNumber().Int32Value(); + if (operationTimeoutSeconds > 0) { + pulsar_client_configuration_set_operation_timeout_seconds(cClientConfig, operationTimeoutSeconds); } + } - if (clientConfig.Has(CFG_IO_THREADS) && clientConfig.Get(CFG_IO_THREADS).IsNumber()) { - int32_t ioThreads = clientConfig.Get(CFG_IO_THREADS).ToNumber().Int32Value(); - if (ioThreads > 0) { - pulsar_client_configuration_set_io_threads(cClientConfig, ioThreads); - } + if (clientConfig.Has(CFG_IO_THREADS) && clientConfig.Get(CFG_IO_THREADS).IsNumber()) { + int32_t ioThreads = clientConfig.Get(CFG_IO_THREADS).ToNumber().Int32Value(); + if (ioThreads > 0) { + pulsar_client_configuration_set_io_threads(cClientConfig, ioThreads); } + } - if (clientConfig.Has(CFG_LISTENER_THREADS) && clientConfig.Get(CFG_LISTENER_THREADS).IsNumber()) { - int32_t messageListenerThreads = clientConfig.Get(CFG_LISTENER_THREADS).ToNumber().Int32Value(); - if (messageListenerThreads > 0) { - pulsar_client_configuration_set_message_listener_threads(cClientConfig, messageListenerThreads); - } + if (clientConfig.Has(CFG_LISTENER_THREADS) && clientConfig.Get(CFG_LISTENER_THREADS).IsNumber()) { + int32_t messageListenerThreads = clientConfig.Get(CFG_LISTENER_THREADS).ToNumber().Int32Value(); + if (messageListenerThreads > 0) { + pulsar_client_configuration_set_message_listener_threads(cClientConfig, messageListenerThreads); } + } - if (clientConfig.Has(CFG_CONCURRENT_LOOKUP) && clientConfig.Get(CFG_CONCURRENT_LOOKUP).IsNumber()) { - int32_t concurrentLookupRequest = clientConfig.Get(CFG_CONCURRENT_LOOKUP).ToNumber().Int32Value(); - if (concurrentLookupRequest > 0) { - pulsar_client_configuration_set_concurrent_lookup_request(cClientConfig, concurrentLookupRequest); - } + if (clientConfig.Has(CFG_CONCURRENT_LOOKUP) && clientConfig.Get(CFG_CONCURRENT_LOOKUP).IsNumber()) { + int32_t concurrentLookupRequest = clientConfig.Get(CFG_CONCURRENT_LOOKUP).ToNumber().Int32Value(); + if (concurrentLookupRequest > 0) { + pulsar_client_configuration_set_concurrent_lookup_request(cClientConfig, concurrentLookupRequest); } - - if (clientConfig.Has(CFG_USE_TLS) && clientConfig.Get(CFG_USE_TLS).IsBoolean()) { - Napi::Boolean useTls = clientConfig.Get(CFG_USE_TLS).ToBoolean(); - pulsar_client_configuration_set_use_tls(cClientConfig, useTls.Value()); - } - - if (clientConfig.Has(CFG_TLS_TRUST_CERT) && clientConfig.Get(CFG_TLS_TRUST_CERT).IsString()) { - Napi::String tlsTrustCertsFilePath = clientConfig.Get(CFG_TLS_TRUST_CERT).ToString(); - pulsar_client_configuration_set_tls_trust_certs_file_path(cClientConfig, - tlsTrustCertsFilePath.Utf8Value().c_str()); - } - - if (clientConfig.Has(CFG_TLS_VALIDATE_HOSTNAME) && - clientConfig.Get(CFG_TLS_VALIDATE_HOSTNAME).IsBoolean()) { - Napi::Boolean tlsValidateHostname = clientConfig.Get(CFG_TLS_VALIDATE_HOSTNAME).ToBoolean(); - pulsar_client_configuration_set_validate_hostname(cClientConfig, tlsValidateHostname.Value()); - } - - if (clientConfig.Has(CFG_TLS_ALLOW_INSECURE) && clientConfig.Get(CFG_TLS_ALLOW_INSECURE).IsBoolean()) { - Napi::Boolean tlsAllowInsecureConnection = clientConfig.Get(CFG_TLS_ALLOW_INSECURE).ToBoolean(); - pulsar_client_configuration_set_tls_allow_insecure_connection(cClientConfig, - tlsAllowInsecureConnection.Value()); - } - - if (clientConfig.Has(CFG_STATS_INTERVAL) && clientConfig.Get(CFG_STATS_INTERVAL).IsNumber()) { - uint32_t statsIntervalInSeconds = clientConfig.Get(CFG_STATS_INTERVAL).ToNumber().Uint32Value(); - if (statsIntervalInSeconds > 0) { - pulsar_client_configuration_set_stats_interval_in_seconds(cClientConfig, statsIntervalInSeconds); - } + } + + if (clientConfig.Has(CFG_USE_TLS) && clientConfig.Get(CFG_USE_TLS).IsBoolean()) { + Napi::Boolean useTls = clientConfig.Get(CFG_USE_TLS).ToBoolean(); + pulsar_client_configuration_set_use_tls(cClientConfig, useTls.Value()); + } + + if (clientConfig.Has(CFG_TLS_TRUST_CERT) && clientConfig.Get(CFG_TLS_TRUST_CERT).IsString()) { + Napi::String tlsTrustCertsFilePath = clientConfig.Get(CFG_TLS_TRUST_CERT).ToString(); + pulsar_client_configuration_set_tls_trust_certs_file_path(cClientConfig, + tlsTrustCertsFilePath.Utf8Value().c_str()); + } + + if (clientConfig.Has(CFG_TLS_VALIDATE_HOSTNAME) && + clientConfig.Get(CFG_TLS_VALIDATE_HOSTNAME).IsBoolean()) { + Napi::Boolean tlsValidateHostname = clientConfig.Get(CFG_TLS_VALIDATE_HOSTNAME).ToBoolean(); + pulsar_client_configuration_set_validate_hostname(cClientConfig, tlsValidateHostname.Value()); + } + + if (clientConfig.Has(CFG_TLS_ALLOW_INSECURE) && clientConfig.Get(CFG_TLS_ALLOW_INSECURE).IsBoolean()) { + Napi::Boolean tlsAllowInsecureConnection = clientConfig.Get(CFG_TLS_ALLOW_INSECURE).ToBoolean(); + pulsar_client_configuration_set_tls_allow_insecure_connection(cClientConfig, + tlsAllowInsecureConnection.Value()); + } + + if (clientConfig.Has(CFG_STATS_INTERVAL) && clientConfig.Get(CFG_STATS_INTERVAL).IsNumber()) { + uint32_t statsIntervalInSeconds = clientConfig.Get(CFG_STATS_INTERVAL).ToNumber().Uint32Value(); + if (statsIntervalInSeconds > 0) { + pulsar_client_configuration_set_stats_interval_in_seconds(cClientConfig, statsIntervalInSeconds); } + } - this->cClient = pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig); - pulsar_client_configuration_free(cClientConfig); + this->cClient = pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig); + pulsar_client_configuration_free(cClientConfig); } Client::~Client() { pulsar_client_free(this->cClient); } Napi::Value Client::CreateProducer(const Napi::CallbackInfo &info) { - return Producer::NewInstance(info, this->cClient); + return Producer::NewInstance(info, this->cClient); } Napi::Value Client::Subscribe(const Napi::CallbackInfo &info) { - return Consumer::NewInstance(info, this->cClient); + return Consumer::NewInstance(info, this->cClient); } class ClientCloseWorker : public Napi::AsyncWorker { - public: - ClientCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient) - : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), - deferred(deferred), - cClient(cClient) {} - ~ClientCloseWorker() {} - void Execute() { - pulsar_result result = pulsar_client_close(this->cClient); - if (result != pulsar_result_Ok) SetError(pulsar_result_str(result)); - } - void OnOK() { this->deferred.Resolve(Env().Null()); } - void OnError(const Napi::Error &e) { - this->deferred.Reject( - Napi::Error::New(Env(), std::string("Failed to close client: ") + e.Message()).Value()); - } - - private: - Napi::Promise::Deferred deferred; - pulsar_client_t *cClient; + public: + ClientCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient) + : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), + deferred(deferred), + cClient(cClient) {} + ~ClientCloseWorker() {} + void Execute() { + pulsar_result result = pulsar_client_close(this->cClient); + if (result != pulsar_result_Ok) SetError(pulsar_result_str(result)); + } + void OnOK() { this->deferred.Resolve(Env().Null()); } + void OnError(const Napi::Error &e) { + this->deferred.Reject( + Napi::Error::New(Env(), std::string("Failed to close client: ") + e.Message()).Value()); + } + + private: + Napi::Promise::Deferred deferred; + pulsar_client_t *cClient; }; Napi::Value Client::Close(const Napi::CallbackInfo &info) { - Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); - ClientCloseWorker *wk = new ClientCloseWorker(deferred, this->cClient); - wk->Queue(); - return deferred.Promise(); + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + ClientCloseWorker *wk = new ClientCloseWorker(deferred, this->cClient); + wk->Queue(); + return deferred.Promise(); } diff --git a/src/Client.h b/src/Client.h index bc92125..63c66f2 100644 --- a/src/Client.h +++ b/src/Client.h @@ -24,18 +24,18 @@ #include <pulsar/c/client.h> class Client : public Napi::ObjectWrap<Client> { - public: - static Napi::Object Init(Napi::Env env, Napi::Object exports); - Client(const Napi::CallbackInfo &info); - ~Client(); + public: + static Napi::Object Init(Napi::Env env, Napi::Object exports); + Client(const Napi::CallbackInfo &info); + ~Client(); - private: - static Napi::FunctionReference constructor; - pulsar_client_t *cClient; + private: + static Napi::FunctionReference constructor; + pulsar_client_t *cClient; - Napi::Value CreateProducer(const Napi::CallbackInfo &info); - Napi::Value Subscribe(const Napi::CallbackInfo &info); - Napi::Value Close(const Napi::CallbackInfo &info); + Napi::Value CreateProducer(const Napi::CallbackInfo &info); + Napi::Value Subscribe(const Napi::CallbackInfo &info); + Napi::Value Close(const Napi::CallbackInfo &info); }; #endif diff --git a/src/Consumer.cc b/src/Consumer.cc index 85e0fbf..db6fab4 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -26,18 +26,18 @@ Napi::FunctionReference Consumer::constructor; void Consumer::Init(Napi::Env env, Napi::Object exports) { - Napi::HandleScope scope(env); - - Napi::Function func = DefineClass(env, "Consumer", - { - InstanceMethod("receive", &Consumer::Receive), - InstanceMethod("acknowledge", &Consumer::Acknowledge), - InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId), - InstanceMethod("close", &Consumer::Close), - }); - - constructor = Napi::Persistent(func); - constructor.SuppressDestruct(); + Napi::HandleScope scope(env); + + Napi::Function func = DefineClass(env, "Consumer", + { + InstanceMethod("receive", &Consumer::Receive), + InstanceMethod("acknowledge", &Consumer::Acknowledge), + InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId), + InstanceMethod("close", &Consumer::Close), + }); + + constructor = Napi::Persistent(func); + constructor.SuppressDestruct(); } void Consumer::SetCConsumer(pulsar_consumer_t *cConsumer) { this->cConsumer = cConsumer; } @@ -45,144 +45,139 @@ void Consumer::SetCConsumer(pulsar_consumer_t *cConsumer) { this->cConsumer = cC Consumer::Consumer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Consumer>(info) {} class ConsumerNewInstanceWorker : public Napi::AsyncWorker { - public: - ConsumerNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient, - ConsumerConfig *consumerConfig) - : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), - deferred(deferred), - cClient(cClient), - consumerConfig(consumerConfig) {} - ~ConsumerNewInstanceWorker() {} - void Execute() { - const std::string &topic = this->consumerConfig->GetTopic(); - if (topic.empty()) { - SetError( - std::string("Topic is required and must be specified as a string when creating consumer")); - return; - } - const std::string &subscription = this->consumerConfig->GetSubscription(); - if (subscription.empty()) { - SetError(std::string( - "Subscription is required and must be specified as a string when creating consumer")); - return; - } - int32_t ackTimeoutMs = this->consumerConfig->GetAckTimeoutMs(); - if (ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) { - std::string msg("Ack timeout should be 0 or greater than or equal to " + - std::to_string(MIN_ACK_TIMEOUT_MILLIS)); - SetError(msg); - return; - } - - pulsar_result result = - pulsar_client_subscribe(this->cClient, topic.c_str(), subscription.c_str(), - this->consumerConfig->GetCConsumerConfig(), &(this->cConsumer)); - delete this->consumerConfig; - if (result != pulsar_result_Ok) { - SetError(std::string("Failed to create consumer: ") + pulsar_result_str(result)); - return; - } + public: + ConsumerNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient, + ConsumerConfig *consumerConfig) + : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), + deferred(deferred), + cClient(cClient), + consumerConfig(consumerConfig) {} + ~ConsumerNewInstanceWorker() {} + void Execute() { + const std::string &topic = this->consumerConfig->GetTopic(); + if (topic.empty()) { + SetError(std::string("Topic is required and must be specified as a string when creating consumer")); + return; } - void OnOK() { - Napi::Object obj = Consumer::constructor.New({}); - Consumer *consumer = Consumer::Unwrap(obj); - consumer->SetCConsumer(this->cConsumer); - this->deferred.Resolve(obj); + const std::string &subscription = this->consumerConfig->GetSubscription(); + if (subscription.empty()) { + SetError( + std::string("Subscription is required and must be specified as a string when creating consumer")); + return; } - void OnError(const Napi::Error &e) { - this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); + int32_t ackTimeoutMs = this->consumerConfig->GetAckTimeoutMs(); + if (ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) { + std::string msg("Ack timeout should be 0 or greater than or equal to " + + std::to_string(MIN_ACK_TIMEOUT_MILLIS)); + SetError(msg); + return; } - private: - Napi::Promise::Deferred deferred; - pulsar_client_t *cClient; - ConsumerConfig *consumerConfig; - pulsar_consumer_t *cConsumer; + pulsar_result result = + pulsar_client_subscribe(this->cClient, topic.c_str(), subscription.c_str(), + this->consumerConfig->GetCConsumerConfig(), &(this->cConsumer)); + delete this->consumerConfig; + if (result != pulsar_result_Ok) { + SetError(std::string("Failed to create consumer: ") + pulsar_result_str(result)); + return; + } + } + void OnOK() { + Napi::Object obj = Consumer::constructor.New({}); + Consumer *consumer = Consumer::Unwrap(obj); + consumer->SetCConsumer(this->cConsumer); + this->deferred.Resolve(obj); + } + void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); } + + private: + Napi::Promise::Deferred deferred; + pulsar_client_t *cClient; + ConsumerConfig *consumerConfig; + pulsar_consumer_t *cConsumer; }; Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) { - Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); - Napi::Object config = info[0].As<Napi::Object>(); - ConsumerConfig *consumerConfig = new ConsumerConfig(config); - ConsumerNewInstanceWorker *wk = new ConsumerNewInstanceWorker(deferred, cClient, consumerConfig); - wk->Queue(); - return deferred.Promise(); + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + Napi::Object config = info[0].As<Napi::Object>(); + ConsumerConfig *consumerConfig = new ConsumerConfig(config); + ConsumerNewInstanceWorker *wk = new ConsumerNewInstanceWorker(deferred, cClient, consumerConfig); + wk->Queue(); + return deferred.Promise(); } class ConsumerReceiveWorker : public Napi::AsyncWorker { - public: - ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer) - : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), - deferred(deferred), - cConsumer(cConsumer) {} - ~ConsumerReceiveWorker() {} - void Execute() { - pulsar_result result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage)); - - if (result != pulsar_result_Ok) { - SetError(std::string("Failed to received message ") + pulsar_result_str(result)); - } - } - void OnOK() { - Napi::Object obj = Message::NewInstance({}, this->cMessage); - this->deferred.Resolve(obj); + public: + ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer) + : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), + deferred(deferred), + cConsumer(cConsumer) {} + ~ConsumerReceiveWorker() {} + void Execute() { + pulsar_result result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage)); + + if (result != pulsar_result_Ok) { + SetError(std::string("Failed to received message ") + pulsar_result_str(result)); } - void OnError(const Napi::Error &e) { - this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); - } - - private: - Napi::Promise::Deferred deferred; - pulsar_consumer_t *cConsumer; - pulsar_message_t *cMessage; + } + void OnOK() { + Napi::Object obj = Message::NewInstance({}, this->cMessage); + this->deferred.Resolve(obj); + } + void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); } + + private: + Napi::Promise::Deferred deferred; + pulsar_consumer_t *cConsumer; + pulsar_message_t *cMessage; }; Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) { - Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); - ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer); - wk->Queue(); - return deferred.Promise(); + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer); + wk->Queue(); + return deferred.Promise(); } void Consumer::Acknowledge(const Napi::CallbackInfo &info) { - Napi::Object obj = info[0].As<Napi::Object>(); - Message *msg = Message::Unwrap(obj); - pulsar_consumer_acknowledge_async(this->cConsumer, msg->GetCMessage(), NULL, NULL); + Napi::Object obj = info[0].As<Napi::Object>(); + Message *msg = Message::Unwrap(obj); + pulsar_consumer_acknowledge_async(this->cConsumer, msg->GetCMessage(), NULL, NULL); } void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) { - Napi::Object obj = info[0].As<Napi::Object>(); - MessageId *msgId = MessageId::Unwrap(obj); - pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL); + Napi::Object obj = info[0].As<Napi::Object>(); + MessageId *msgId = MessageId::Unwrap(obj); + pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL); } class ConsumerCloseWorker : public Napi::AsyncWorker { - public: - ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer) - : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), - deferred(deferred), - cConsumer(cConsumer) {} - ~ConsumerCloseWorker() {} - void Execute() { - pulsar_result result = pulsar_consumer_close(this->cConsumer); - if (result != pulsar_result_Ok) SetError(pulsar_result_str(result)); - } - void OnOK() { this->deferred.Resolve(Env().Null()); } - void OnError(const Napi::Error &e) { - this->deferred.Reject( - Napi::Error::New(Env(), std::string("Failed to close consumer: ") + e.Message()).Value()); - } - - private: - Napi::Promise::Deferred deferred; - pulsar_consumer_t *cConsumer; + public: + ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer) + : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), + deferred(deferred), + cConsumer(cConsumer) {} + ~ConsumerCloseWorker() {} + void Execute() { + pulsar_result result = pulsar_consumer_close(this->cConsumer); + if (result != pulsar_result_Ok) SetError(pulsar_result_str(result)); + } + void OnOK() { this->deferred.Resolve(Env().Null()); } + void OnError(const Napi::Error &e) { + this->deferred.Reject( + Napi::Error::New(Env(), std::string("Failed to close consumer: ") + e.Message()).Value()); + } + + private: + Napi::Promise::Deferred deferred; + pulsar_consumer_t *cConsumer; }; Napi::Value Consumer::Close(const Napi::CallbackInfo &info) { - Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); - ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->cConsumer); - wk->Queue(); - return deferred.Promise(); + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->cConsumer); + wk->Queue(); + return deferred.Promise(); } Consumer::~Consumer() { pulsar_consumer_free(this->cConsumer); } diff --git a/src/Consumer.h b/src/Consumer.h index 8e77df3..fe68124 100644 --- a/src/Consumer.h +++ b/src/Consumer.h @@ -24,21 +24,21 @@ #include <pulsar/c/client.h> class Consumer : public Napi::ObjectWrap<Consumer> { - public: - static void Init(Napi::Env env, Napi::Object exports); - static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient); - static Napi::FunctionReference constructor; - Consumer(const Napi::CallbackInfo &info); - ~Consumer(); - void SetCConsumer(pulsar_consumer_t *cConsumer); + public: + static void Init(Napi::Env env, Napi::Object exports); + static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient); + static Napi::FunctionReference constructor; + Consumer(const Napi::CallbackInfo &info); + ~Consumer(); + void SetCConsumer(pulsar_consumer_t *cConsumer); - private: - pulsar_consumer_t *cConsumer; + private: + pulsar_consumer_t *cConsumer; - Napi::Value Receive(const Napi::CallbackInfo &info); - void Acknowledge(const Napi::CallbackInfo &info); - void AcknowledgeId(const Napi::CallbackInfo &info); - Napi::Value Close(const Napi::CallbackInfo &info); + Napi::Value Receive(const Napi::CallbackInfo &info); + void Acknowledge(const Napi::CallbackInfo &info); + void AcknowledgeId(const Napi::CallbackInfo &info); + Napi::Value Close(const Napi::CallbackInfo &info); }; #endif diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc index 78f1605..a98bb69 100644 --- a/src/ConsumerConfig.cc +++ b/src/ConsumerConfig.cc @@ -37,64 +37,63 @@ static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = { ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig) : topic(""), subscription(""), ackTimeoutMs(0) { - this->cConsumerConfig = pulsar_consumer_configuration_create(); + this->cConsumerConfig = pulsar_consumer_configuration_create(); - if (consumerConfig.Has(CFG_TOPIC) && consumerConfig.Get(CFG_TOPIC).IsString()) { - this->topic = consumerConfig.Get(CFG_TOPIC).ToString().Utf8Value(); - } + if (consumerConfig.Has(CFG_TOPIC) && consumerConfig.Get(CFG_TOPIC).IsString()) { + this->topic = consumerConfig.Get(CFG_TOPIC).ToString().Utf8Value(); + } - if (consumerConfig.Has(CFG_SUBSCRIPTION) && consumerConfig.Get(CFG_SUBSCRIPTION).IsString()) { - this->subscription = consumerConfig.Get(CFG_SUBSCRIPTION).ToString().Utf8Value(); - } + if (consumerConfig.Has(CFG_SUBSCRIPTION) && consumerConfig.Get(CFG_SUBSCRIPTION).IsString()) { + this->subscription = consumerConfig.Get(CFG_SUBSCRIPTION).ToString().Utf8Value(); + } - if (consumerConfig.Has(CFG_SUBSCRIPTION_TYPE) && consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).IsString()) { - std::string subscriptionType = consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).ToString().Utf8Value(); - if (SUBSCRIPTION_TYPE.count(subscriptionType)) { - pulsar_consumer_configuration_set_consumer_type(this->cConsumerConfig, - SUBSCRIPTION_TYPE.at(subscriptionType)); - } + if (consumerConfig.Has(CFG_SUBSCRIPTION_TYPE) && consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).IsString()) { + std::string subscriptionType = consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).ToString().Utf8Value(); + if (SUBSCRIPTION_TYPE.count(subscriptionType)) { + pulsar_consumer_configuration_set_consumer_type(this->cConsumerConfig, + SUBSCRIPTION_TYPE.at(subscriptionType)); } + } - if (consumerConfig.Has(CFG_CONSUMER_NAME) && consumerConfig.Get(CFG_CONSUMER_NAME).IsString()) { - std::string consumerName = consumerConfig.Get(CFG_CONSUMER_NAME).ToString().Utf8Value(); - if (!consumerName.empty()) - pulsar_consumer_set_consumer_name(this->cConsumerConfig, consumerName.c_str()); - } + if (consumerConfig.Has(CFG_CONSUMER_NAME) && consumerConfig.Get(CFG_CONSUMER_NAME).IsString()) { + std::string consumerName = consumerConfig.Get(CFG_CONSUMER_NAME).ToString().Utf8Value(); + if (!consumerName.empty()) pulsar_consumer_set_consumer_name(this->cConsumerConfig, consumerName.c_str()); + } - if (consumerConfig.Has(CFG_ACK_TIMEOUT) && consumerConfig.Get(CFG_ACK_TIMEOUT).IsNumber()) { - this->ackTimeoutMs = consumerConfig.Get(CFG_ACK_TIMEOUT).ToNumber().Int64Value(); - if (this->ackTimeoutMs == 0 || this->ackTimeoutMs >= MIN_ACK_TIMEOUT_MILLIS) { - pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig, this->ackTimeoutMs); - } + if (consumerConfig.Has(CFG_ACK_TIMEOUT) && consumerConfig.Get(CFG_ACK_TIMEOUT).IsNumber()) { + this->ackTimeoutMs = consumerConfig.Get(CFG_ACK_TIMEOUT).ToNumber().Int64Value(); + if (this->ackTimeoutMs == 0 || this->ackTimeoutMs >= MIN_ACK_TIMEOUT_MILLIS) { + pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig, this->ackTimeoutMs); } + } - if (consumerConfig.Has(CFG_RECV_QUEUE) && consumerConfig.Get(CFG_RECV_QUEUE).IsNumber()) { - int32_t receiverQueueSize = consumerConfig.Get(CFG_RECV_QUEUE).ToNumber().Int32Value(); - if (receiverQueueSize >= 0) { - pulsar_consumer_configuration_set_receiver_queue_size(this->cConsumerConfig, receiverQueueSize); - } + if (consumerConfig.Has(CFG_RECV_QUEUE) && consumerConfig.Get(CFG_RECV_QUEUE).IsNumber()) { + int32_t receiverQueueSize = consumerConfig.Get(CFG_RECV_QUEUE).ToNumber().Int32Value(); + if (receiverQueueSize >= 0) { + pulsar_consumer_configuration_set_receiver_queue_size(this->cConsumerConfig, receiverQueueSize); } + } - if (consumerConfig.Has(CFG_RECV_QUEUE_ACROSS_PARTITIONS) && - consumerConfig.Get(CFG_RECV_QUEUE_ACROSS_PARTITIONS).IsNumber()) { - int32_t receiverQueueSizeAcrossPartitions = - consumerConfig.Get(CFG_RECV_QUEUE_ACROSS_PARTITIONS).ToNumber().Int32Value(); - if (receiverQueueSizeAcrossPartitions >= 0) { - pulsar_consumer_set_max_total_receiver_queue_size_across_partitions( - this->cConsumerConfig, receiverQueueSizeAcrossPartitions); - } + if (consumerConfig.Has(CFG_RECV_QUEUE_ACROSS_PARTITIONS) && + consumerConfig.Get(CFG_RECV_QUEUE_ACROSS_PARTITIONS).IsNumber()) { + int32_t receiverQueueSizeAcrossPartitions = + consumerConfig.Get(CFG_RECV_QUEUE_ACROSS_PARTITIONS).ToNumber().Int32Value(); + if (receiverQueueSizeAcrossPartitions >= 0) { + pulsar_consumer_set_max_total_receiver_queue_size_across_partitions(this->cConsumerConfig, + receiverQueueSizeAcrossPartitions); } + } - if (consumerConfig.Has(CFG_PROPS) && consumerConfig.Get(CFG_PROPS).IsObject()) { - Napi::Object propObj = consumerConfig.Get(CFG_PROPS).ToObject(); - Napi::Array arr = propObj.GetPropertyNames(); - int size = arr.Length(); - for (int i = 0; i < size; i++) { - std::string key = arr.Get(i).ToString().Utf8Value(); - std::string value = propObj.Get(key).ToString().Utf8Value(); - pulsar_consumer_configuration_set_property(this->cConsumerConfig, key.c_str(), value.c_str()); - } + if (consumerConfig.Has(CFG_PROPS) && consumerConfig.Get(CFG_PROPS).IsObject()) { + Napi::Object propObj = consumerConfig.Get(CFG_PROPS).ToObject(); + Napi::Array arr = propObj.GetPropertyNames(); + int size = arr.Length(); + for (int i = 0; i < size; i++) { + std::string key = arr.Get(i).ToString().Utf8Value(); + std::string value = propObj.Get(key).ToString().Utf8Value(); + pulsar_consumer_configuration_set_property(this->cConsumerConfig, key.c_str(), value.c_str()); } + } } ConsumerConfig::~ConsumerConfig() { pulsar_consumer_configuration_free(this->cConsumerConfig); } diff --git a/src/ConsumerConfig.h b/src/ConsumerConfig.h index 00c890e..7070bf9 100644 --- a/src/ConsumerConfig.h +++ b/src/ConsumerConfig.h @@ -26,19 +26,19 @@ #define MIN_ACK_TIMEOUT_MILLIS 10000 class ConsumerConfig { - public: - ConsumerConfig(const Napi::Object &consumerConfig); - ~ConsumerConfig(); - pulsar_consumer_configuration_t *GetCConsumerConfig(); - std::string GetTopic(); - std::string GetSubscription(); - int64_t GetAckTimeoutMs(); + public: + ConsumerConfig(const Napi::Object &consumerConfig); + ~ConsumerConfig(); + pulsar_consumer_configuration_t *GetCConsumerConfig(); + std::string GetTopic(); + std::string GetSubscription(); + int64_t GetAckTimeoutMs(); - private: - pulsar_consumer_configuration_t *cConsumerConfig; - std::string topic; - std::string subscription; - int64_t ackTimeoutMs; + private: + pulsar_consumer_configuration_t *cConsumerConfig; + std::string topic; + std::string subscription; + int64_t ackTimeoutMs; }; #endif diff --git a/src/Message.cc b/src/Message.cc index 9a0129e..3c527d5 100644 --- a/src/Message.cc +++ b/src/Message.cc @@ -31,29 +31,29 @@ static const std::string CFG_REPL_CLUSTERS = "replicationClusters"; Napi::FunctionReference Message::constructor; Napi::Object Message::Init(Napi::Env env, Napi::Object exports) { - Napi::HandleScope scope(env); - - Napi::Function func = DefineClass( - env, "Message", - {InstanceMethod("getTopicName", &Message::GetTopicName), - InstanceMethod("getProperties", &Message::GetProperties), - InstanceMethod("getData", &Message::GetData), InstanceMethod("getMessageId", &Message::GetMessageId), - InstanceMethod("getPublishTimestamp", &Message::GetPublishTimestamp), - InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp), - InstanceMethod("getPartitionKey", &Message::GetPartitionKey)}); - - constructor = Napi::Persistent(func); - constructor.SuppressDestruct(); - - exports.Set("Message", func); - return exports; + Napi::HandleScope scope(env); + + Napi::Function func = DefineClass( + env, "Message", + {InstanceMethod("getTopicName", &Message::GetTopicName), + InstanceMethod("getProperties", &Message::GetProperties), InstanceMethod("getData", &Message::GetData), + InstanceMethod("getMessageId", &Message::GetMessageId), + InstanceMethod("getPublishTimestamp", &Message::GetPublishTimestamp), + InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp), + InstanceMethod("getPartitionKey", &Message::GetPartitionKey)}); + + constructor = Napi::Persistent(func); + constructor.SuppressDestruct(); + + exports.Set("Message", func); + return exports; } Napi::Object Message::NewInstance(Napi::Value arg, pulsar_message_t *cMessage) { - Napi::Object obj = constructor.New({}); - Message *msg = Unwrap(obj); - msg->cMessage = cMessage; - return obj; + Napi::Object obj = constructor.New({}); + Message *msg = Unwrap(obj); + msg->cMessage = cMessage; + return obj; } Message::Message(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Message>(info), cMessage(nullptr) {} @@ -61,137 +61,137 @@ Message::Message(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Message>(inf pulsar_message_t *Message::GetCMessage() { return this->cMessage; } Napi::Value Message::GetTopicName(const Napi::CallbackInfo &info) { - Napi::Env env = info.Env(); - if (!ValidateCMessage(env)) { - return env.Null(); - } - return Napi::String::New(env, pulsar_message_get_topic_name(this->cMessage)); + Napi::Env env = info.Env(); + if (!ValidateCMessage(env)) { + return env.Null(); + } + return Napi::String::New(env, pulsar_message_get_topic_name(this->cMessage)); } Napi::Value Message::GetProperties(const Napi::CallbackInfo &info) { - Napi::Env env = info.Env(); - if (!ValidateCMessage(env)) { - return env.Null(); - } - Napi::Array arr = Napi::Array::New(env); - pulsar_string_map_t *cProperties = pulsar_message_get_properties(this->cMessage); - int size = pulsar_string_map_size(cProperties); - for (int i = 0; i < size; i++) { - arr.Set(pulsar_string_map_get_key(cProperties, i), pulsar_string_map_get_value(cProperties, i)); - } - return arr; + Napi::Env env = info.Env(); + if (!ValidateCMessage(env)) { + return env.Null(); + } + Napi::Array arr = Napi::Array::New(env); + pulsar_string_map_t *cProperties = pulsar_message_get_properties(this->cMessage); + int size = pulsar_string_map_size(cProperties); + for (int i = 0; i < size; i++) { + arr.Set(pulsar_string_map_get_key(cProperties, i), pulsar_string_map_get_value(cProperties, i)); + } + return arr; } Napi::Value Message::GetData(const Napi::CallbackInfo &info) { - Napi::Env env = info.Env(); - if (!ValidateCMessage(env)) { - return env.Null(); - } - void *data = const_cast<void *>(pulsar_message_get_data(this->cMessage)); - size_t size = (size_t)pulsar_message_get_length(this->cMessage); - return Napi::Buffer<char>::New(env, (char *)data, size); + Napi::Env env = info.Env(); + if (!ValidateCMessage(env)) { + return env.Null(); + } + void *data = const_cast<void *>(pulsar_message_get_data(this->cMessage)); + size_t size = (size_t)pulsar_message_get_length(this->cMessage); + return Napi::Buffer<char>::New(env, (char *)data, size); } Napi::Value Message::GetMessageId(const Napi::CallbackInfo &info) { - Napi::Env env = info.Env(); - if (!ValidateCMessage(env)) { - return env.Null(); - } - return MessageId::NewInstanceFromMessage(info, this->cMessage); + Napi::Env env = info.Env(); + if (!ValidateCMessage(env)) { + return env.Null(); + } + return MessageId::NewInstanceFromMessage(info, this->cMessage); } Napi::Value Message::GetEventTimestamp(const Napi::CallbackInfo &info) { - Napi::Env env = info.Env(); - if (!ValidateCMessage(env)) { - return env.Null(); - } - return Napi::Number::New(env, pulsar_message_get_event_timestamp(this->cMessage)); + Napi::Env env = info.Env(); + if (!ValidateCMessage(env)) { + return env.Null(); + } + return Napi::Number::New(env, pulsar_message_get_event_timestamp(this->cMessage)); } Napi::Value Message::GetPublishTimestamp(const Napi::CallbackInfo &info) { - Napi::Env env = info.Env(); - if (!ValidateCMessage(env)) { - return env.Null(); - } - return Napi::Number::New(env, pulsar_message_get_publish_timestamp(this->cMessage)); + Napi::Env env = info.Env(); + if (!ValidateCMessage(env)) { + return env.Null(); + } + return Napi::Number::New(env, pulsar_message_get_publish_timestamp(this->cMessage)); } Napi::Value Message::GetPartitionKey(const Napi::CallbackInfo &info) { - Napi::Env env = info.Env(); - if (!ValidateCMessage(env)) { - return env.Null(); - } - return Napi::String::New(env, pulsar_message_get_partitionKey(this->cMessage)); + Napi::Env env = info.Env(); + if (!ValidateCMessage(env)) { + return env.Null(); + } + return Napi::String::New(env, pulsar_message_get_partitionKey(this->cMessage)); } bool Message::ValidateCMessage(Napi::Env env) { - if (this->cMessage) { - return true; - } else { - Napi::Error::New(env, "Message has not been built").ThrowAsJavaScriptException(); - return false; - } + if (this->cMessage) { + return true; + } else { + Napi::Error::New(env, "Message has not been built").ThrowAsJavaScriptException(); + return false; + } } pulsar_message_t *Message::BuildMessage(Napi::Object conf) { - pulsar_message_t *cMessage = pulsar_message_create(); - - if (conf.Has(CFG_DATA) && conf.Get(CFG_DATA).IsBuffer()) { - Napi::Buffer<char> buf = conf.Get(CFG_DATA).As<Napi::Buffer<char>>(); - char *data = buf.Data(); - pulsar_message_set_content(cMessage, data, buf.Length()); - } - - if (conf.Has(CFG_PROPS) && conf.Get(CFG_PROPS).IsObject()) { - Napi::Object propObj = conf.Get(CFG_PROPS).ToObject(); - Napi::Array arr = propObj.GetPropertyNames(); - int size = arr.Length(); - for (int i = 0; i < size; i++) { - Napi::String key = arr.Get(i).ToString(); - Napi::String value = propObj.Get(key).ToString(); - pulsar_message_set_property(cMessage, key.Utf8Value().c_str(), value.Utf8Value().c_str()); - } - } - - if (conf.Has(CFG_EVENT_TIME) && conf.Get(CFG_EVENT_TIME).IsNumber()) { - int64_t eventTimestamp = conf.Get(CFG_EVENT_TIME).ToNumber().Int64Value(); - if (eventTimestamp >= 0) { - pulsar_message_set_event_timestamp(cMessage, eventTimestamp); - } - } - - if (conf.Has(CFG_SEQUENCE_ID) && conf.Get(CFG_SEQUENCE_ID).IsNumber()) { - Napi::Number sequenceId = conf.Get(CFG_SEQUENCE_ID).ToNumber(); - pulsar_message_set_sequence_id(cMessage, sequenceId.Int64Value()); - } - - if (conf.Has(CFG_PARTITION_KEY) && conf.Get(CFG_PARTITION_KEY).IsString()) { - Napi::String partitionKey = conf.Get(CFG_PARTITION_KEY).ToString(); - pulsar_message_set_partition_key(cMessage, partitionKey.Utf8Value().c_str()); - } - - if (conf.Has(CFG_REPL_CLUSTERS) && conf.Get(CFG_REPL_CLUSTERS).IsArray()) { - Napi::Array clusters = conf.Get(CFG_REPL_CLUSTERS).As<Napi::Array>(); - // Empty list means to disable replication - int length = clusters.Length(); - if (length == 0) { - pulsar_message_disable_replication(cMessage, 1); - } else { - char **arr = NewStringArray(length); - for (int i = 0; i < length; i++) { - SetString(arr, clusters.Get(i).ToString().Utf8Value().c_str(), i); - } - // TODO: temoporalily commented out unless 2.3.1 which includes interface change of - // pulsar_message_set_replication_clusters (#3729) is released - // pulsar_message_set_replication_clusters(cMessage, (const char **)arr, length); - FreeStringArray(arr, length); - } - } - return cMessage; + pulsar_message_t *cMessage = pulsar_message_create(); + + if (conf.Has(CFG_DATA) && conf.Get(CFG_DATA).IsBuffer()) { + Napi::Buffer<char> buf = conf.Get(CFG_DATA).As<Napi::Buffer<char>>(); + char *data = buf.Data(); + pulsar_message_set_content(cMessage, data, buf.Length()); + } + + if (conf.Has(CFG_PROPS) && conf.Get(CFG_PROPS).IsObject()) { + Napi::Object propObj = conf.Get(CFG_PROPS).ToObject(); + Napi::Array arr = propObj.GetPropertyNames(); + int size = arr.Length(); + for (int i = 0; i < size; i++) { + Napi::String key = arr.Get(i).ToString(); + Napi::String value = propObj.Get(key).ToString(); + pulsar_message_set_property(cMessage, key.Utf8Value().c_str(), value.Utf8Value().c_str()); + } + } + + if (conf.Has(CFG_EVENT_TIME) && conf.Get(CFG_EVENT_TIME).IsNumber()) { + int64_t eventTimestamp = conf.Get(CFG_EVENT_TIME).ToNumber().Int64Value(); + if (eventTimestamp >= 0) { + pulsar_message_set_event_timestamp(cMessage, eventTimestamp); + } + } + + if (conf.Has(CFG_SEQUENCE_ID) && conf.Get(CFG_SEQUENCE_ID).IsNumber()) { + Napi::Number sequenceId = conf.Get(CFG_SEQUENCE_ID).ToNumber(); + pulsar_message_set_sequence_id(cMessage, sequenceId.Int64Value()); + } + + if (conf.Has(CFG_PARTITION_KEY) && conf.Get(CFG_PARTITION_KEY).IsString()) { + Napi::String partitionKey = conf.Get(CFG_PARTITION_KEY).ToString(); + pulsar_message_set_partition_key(cMessage, partitionKey.Utf8Value().c_str()); + } + + if (conf.Has(CFG_REPL_CLUSTERS) && conf.Get(CFG_REPL_CLUSTERS).IsArray()) { + Napi::Array clusters = conf.Get(CFG_REPL_CLUSTERS).As<Napi::Array>(); + // Empty list means to disable replication + int length = clusters.Length(); + if (length == 0) { + pulsar_message_disable_replication(cMessage, 1); + } else { + char **arr = NewStringArray(length); + for (int i = 0; i < length; i++) { + SetString(arr, clusters.Get(i).ToString().Utf8Value().c_str(), i); + } + // TODO: temoporalily commented out unless 2.3.1 which includes interface change of + // pulsar_message_set_replication_clusters (#3729) is released + // pulsar_message_set_replication_clusters(cMessage, (const char **)arr, length); + FreeStringArray(arr, length); + } + } + return cMessage; } Message::~Message() { - if (this->cMessage != nullptr) { - pulsar_message_free(this->cMessage); - } + if (this->cMessage != nullptr) { + pulsar_message_free(this->cMessage); + } } diff --git a/src/Message.h b/src/Message.h index 8992f9c..42aa9aa 100644 --- a/src/Message.h +++ b/src/Message.h @@ -24,41 +24,41 @@ #include <pulsar/c/message.h> class Message : public Napi::ObjectWrap<Message> { - public: - static Napi::Object Init(Napi::Env env, Napi::Object exports); - static Napi::Object NewInstance(Napi::Value arg, pulsar_message_t *cMessage); - static pulsar_message_t *BuildMessage(Napi::Object conf); - Message(const Napi::CallbackInfo &info); - ~Message(); - pulsar_message_t *GetCMessage(); + public: + static Napi::Object Init(Napi::Env env, Napi::Object exports); + static Napi::Object NewInstance(Napi::Value arg, pulsar_message_t *cMessage); + static pulsar_message_t *BuildMessage(Napi::Object conf); + Message(const Napi::CallbackInfo &info); + ~Message(); + pulsar_message_t *GetCMessage(); - private: - static Napi::FunctionReference constructor; + private: + static Napi::FunctionReference constructor; - pulsar_message_t *cMessage; + pulsar_message_t *cMessage; - Napi::Value GetTopicName(const Napi::CallbackInfo &info); - Napi::Value GetProperties(const Napi::CallbackInfo &info); - Napi::Value GetData(const Napi::CallbackInfo &info); - Napi::Value GetMessageId(const Napi::CallbackInfo &info); - Napi::Value GetPublishTimestamp(const Napi::CallbackInfo &info); - Napi::Value GetEventTimestamp(const Napi::CallbackInfo &info); - Napi::Value GetPartitionKey(const Napi::CallbackInfo &info); - bool ValidateCMessage(Napi::Env env); + Napi::Value GetTopicName(const Napi::CallbackInfo &info); + Napi::Value GetProperties(const Napi::CallbackInfo &info); + Napi::Value GetData(const Napi::CallbackInfo &info); + Napi::Value GetMessageId(const Napi::CallbackInfo &info); + Napi::Value GetPublishTimestamp(const Napi::CallbackInfo &info); + Napi::Value GetEventTimestamp(const Napi::CallbackInfo &info); + Napi::Value GetPartitionKey(const Napi::CallbackInfo &info); + bool ValidateCMessage(Napi::Env env); - static char **NewStringArray(int size) { return (char **)calloc(sizeof(char *), size); } - static void SetString(char **array, const char *str, int n) { - char *copied = (char *)calloc(strlen(str) + 1, sizeof(char)); - strcpy(copied, str); - array[n] = copied; - } - static void FreeStringArray(char **array, int size) { - int i; - for (i = 0; i < size; i++) { - free(array[i]); - } - free(array); + static char **NewStringArray(int size) { return (char **)calloc(sizeof(char *), size); } + static void SetString(char **array, const char *str, int n) { + char *copied = (char *)calloc(strlen(str) + 1, sizeof(char)); + strcpy(copied, str); + array[n] = copied; + } + static void FreeStringArray(char **array, int size) { + int i; + for (i = 0; i < size; i++) { + free(array[i]); } + free(array); + } }; #endif diff --git a/src/MessageId.cc b/src/MessageId.cc index 3a95f43..a0b520f 100644 --- a/src/MessageId.cc +++ b/src/MessageId.cc @@ -24,64 +24,64 @@ Napi::FunctionReference MessageId::constructor; Napi::Object MessageId::Init(Napi::Env env, Napi::Object exports) { - Napi::HandleScope scope(env); + Napi::HandleScope scope(env); - Napi::Function func = DefineClass(env, "MessageId", - { - StaticMethod("earliest", &MessageId::Earliest, napi_static), - StaticMethod("latest", &MessageId::Latest, napi_static), - StaticMethod("finalize", &MessageId::Finalize, napi_static), - InstanceMethod("toString", &MessageId::ToString), - }); + Napi::Function func = DefineClass(env, "MessageId", + { + StaticMethod("earliest", &MessageId::Earliest, napi_static), + StaticMethod("latest", &MessageId::Latest, napi_static), + StaticMethod("finalize", &MessageId::Finalize, napi_static), + InstanceMethod("toString", &MessageId::ToString), + }); - constructor = Napi::Persistent(func); - constructor.SuppressDestruct(); + constructor = Napi::Persistent(func); + constructor.SuppressDestruct(); - exports.Set("MessageId", func); - return exports; + exports.Set("MessageId", func); + return exports; } MessageId::MessageId(const Napi::CallbackInfo &info) : Napi::ObjectWrap<MessageId>(info) { - Napi::Env env = info.Env(); - Napi::HandleScope scope(env); + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); } Napi::Object MessageId::NewInstanceFromMessage(const Napi::CallbackInfo &info, pulsar_message_t *cMessage) { - Napi::Object obj = NewInstance(info[0]); - MessageId *msgId = Unwrap(obj); - msgId->cMessageId = pulsar_message_get_message_id(cMessage); - return obj; + Napi::Object obj = NewInstance(info[0]); + MessageId *msgId = Unwrap(obj); + msgId->cMessageId = pulsar_message_get_message_id(cMessage); + return obj; } Napi::Object MessageId::NewInstance(Napi::Value arg) { - Napi::Object obj = constructor.New({arg}); - return obj; + Napi::Object obj = constructor.New({arg}); + return obj; } void MessageId::Finalize(const Napi::CallbackInfo &info) { - Napi::Object obj = info[0].As<Napi::Object>(); - MessageId *msgId = Unwrap(obj); - pulsar_message_id_free(msgId->cMessageId); + Napi::Object obj = info[0].As<Napi::Object>(); + MessageId *msgId = Unwrap(obj); + pulsar_message_id_free(msgId->cMessageId); } Napi::Value MessageId::Earliest(const Napi::CallbackInfo &info) { - Napi::Object obj = NewInstance(info[0]); - MessageId *msgId = Unwrap(obj); - msgId->cMessageId = (pulsar_message_id_t *)pulsar_message_id_earliest(); - return obj; + Napi::Object obj = NewInstance(info[0]); + MessageId *msgId = Unwrap(obj); + msgId->cMessageId = (pulsar_message_id_t *)pulsar_message_id_earliest(); + return obj; } Napi::Value MessageId::Latest(const Napi::CallbackInfo &info) { - Napi::Object obj = NewInstance(info[0]); - MessageId *msgId = Unwrap(obj); - msgId->cMessageId = (pulsar_message_id_t *)pulsar_message_id_latest(); - return obj; + Napi::Object obj = NewInstance(info[0]); + MessageId *msgId = Unwrap(obj); + msgId->cMessageId = (pulsar_message_id_t *)pulsar_message_id_latest(); + return obj; } pulsar_message_id_t *MessageId::GetCMessageId() { return this->cMessageId; } Napi::Value MessageId::ToString(const Napi::CallbackInfo &info) { - return Napi::String::New(info.Env(), pulsar_message_id_str(this->cMessageId)); + return Napi::String::New(info.Env(), pulsar_message_id_str(this->cMessageId)); } MessageId::~MessageId() { pulsar_message_id_free(this->cMessageId); } diff --git a/src/MessageId.h b/src/MessageId.h index 761f04b..09c59e2 100644 --- a/src/MessageId.h +++ b/src/MessageId.h @@ -25,22 +25,22 @@ #include <pulsar/c/message_id.h> class MessageId : public Napi::ObjectWrap<MessageId> { - public: - static Napi::Object Init(Napi::Env env, Napi::Object exports); - static Napi::Object NewInstance(Napi::Value arg); - static Napi::Object NewInstanceFromMessage(const Napi::CallbackInfo &info, pulsar_message_t *cMessage); - static Napi::Value Earliest(const Napi::CallbackInfo &info); - static Napi::Value Latest(const Napi::CallbackInfo &info); - static void Finalize(const Napi::CallbackInfo &info); - MessageId(const Napi::CallbackInfo &info); - ~MessageId(); - pulsar_message_id_t *GetCMessageId(); + public: + static Napi::Object Init(Napi::Env env, Napi::Object exports); + static Napi::Object NewInstance(Napi::Value arg); + static Napi::Object NewInstanceFromMessage(const Napi::CallbackInfo &info, pulsar_message_t *cMessage); + static Napi::Value Earliest(const Napi::CallbackInfo &info); + static Napi::Value Latest(const Napi::CallbackInfo &info); + static void Finalize(const Napi::CallbackInfo &info); + MessageId(const Napi::CallbackInfo &info); + ~MessageId(); + pulsar_message_id_t *GetCMessageId(); - private: - static Napi::FunctionReference constructor; - pulsar_message_id_t *cMessageId; + private: + static Napi::FunctionReference constructor; + pulsar_message_id_t *cMessageId; - Napi::Value ToString(const Napi::CallbackInfo &info); + Napi::Value ToString(const Napi::CallbackInfo &info); }; #endif diff --git a/src/Producer.cc b/src/Producer.cc index 9bdb5d4..a19d828 100644 --- a/src/Producer.cc +++ b/src/Producer.cc @@ -25,131 +25,127 @@ Napi::FunctionReference Producer::constructor; void Producer::Init(Napi::Env env, Napi::Object exports) { - Napi::HandleScope scope(env); + Napi::HandleScope scope(env); - Napi::Function func = - DefineClass(env, "Producer", - {InstanceMethod("send", &Producer::Send), InstanceMethod("close", &Producer::Close)}); + Napi::Function func = DefineClass( + env, "Producer", {InstanceMethod("send", &Producer::Send), InstanceMethod("close", &Producer::Close)}); - constructor = Napi::Persistent(func); - constructor.SuppressDestruct(); + constructor = Napi::Persistent(func); + constructor.SuppressDestruct(); } void Producer::SetCProducer(pulsar_producer_t *cProducer) { this->cProducer = cProducer; } class ProducerNewInstanceWorker : public Napi::AsyncWorker { - public: - ProducerNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient, - ProducerConfig *producerConfig) - : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), - deferred(deferred), - cClient(cClient), - producerConfig(producerConfig) {} - ~ProducerNewInstanceWorker() {} - void Execute() { - const std::string &topic = this->producerConfig->GetTopic(); - if (topic.empty()) { - SetError( - std::string("Topic is required and must be specified as a string when creating producer")); - return; - } - - pulsar_result result = pulsar_client_create_producer( - this->cClient, topic.c_str(), this->producerConfig->GetCProducerConfig(), &(this->cProducer)); - delete this->producerConfig; - if (result != pulsar_result_Ok) { - SetError(std::string("Failed to create producer: ") + pulsar_result_str(result)); - return; - } - } - void OnOK() { - Napi::Object obj = Producer::constructor.New({}); - Producer *producer = Producer::Unwrap(obj); - producer->SetCProducer(this->cProducer); - this->deferred.Resolve(obj); - } - void OnError(const Napi::Error &e) { - this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); + public: + ProducerNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient, + ProducerConfig *producerConfig) + : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), + deferred(deferred), + cClient(cClient), + producerConfig(producerConfig) {} + ~ProducerNewInstanceWorker() {} + void Execute() { + const std::string &topic = this->producerConfig->GetTopic(); + if (topic.empty()) { + SetError(std::string("Topic is required and must be specified as a string when creating producer")); + return; } - private: - Napi::Promise::Deferred deferred; - pulsar_client_t *cClient; - ProducerConfig *producerConfig; - pulsar_producer_t *cProducer; + pulsar_result result = pulsar_client_create_producer( + this->cClient, topic.c_str(), this->producerConfig->GetCProducerConfig(), &(this->cProducer)); + delete this->producerConfig; + if (result != pulsar_result_Ok) { + SetError(std::string("Failed to create producer: ") + pulsar_result_str(result)); + return; + } + } + void OnOK() { + Napi::Object obj = Producer::constructor.New({}); + Producer *producer = Producer::Unwrap(obj); + producer->SetCProducer(this->cProducer); + this->deferred.Resolve(obj); + } + void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); } + + private: + Napi::Promise::Deferred deferred; + pulsar_client_t *cClient; + ProducerConfig *producerConfig; + pulsar_producer_t *cProducer; }; Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) { - Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); - Napi::Object config = info[0].As<Napi::Object>(); - ProducerConfig *producerConfig = new ProducerConfig(config); - ProducerNewInstanceWorker *wk = new ProducerNewInstanceWorker(deferred, cClient, producerConfig); - wk->Queue(); - return deferred.Promise(); + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + Napi::Object config = info[0].As<Napi::Object>(); + ProducerConfig *producerConfig = new ProducerConfig(config); + ProducerNewInstanceWorker *wk = new ProducerNewInstanceWorker(deferred, cClient, producerConfig); + wk->Queue(); + return deferred.Promise(); } Producer::Producer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Producer>(info) {} class ProducerSendWorker : public Napi::AsyncWorker { - public: - ProducerSendWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer, - pulsar_message_t *cMessage) - : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), - deferred(deferred), - cProducer(cProducer), - cMessage(cMessage) {} - ~ProducerSendWorker() { pulsar_message_free(this->cMessage); } - void Execute() { - pulsar_result result = pulsar_producer_send(this->cProducer, this->cMessage); - if (result != pulsar_result_Ok) SetError(pulsar_result_str(result)); - } - void OnOK() { this->deferred.Resolve(Env().Null()); } - void OnError(const Napi::Error &e) { - this->deferred.Reject( - Napi::Error::New(Env(), std::string("Failed to send message: ") + e.Message()).Value()); - } - - private: - Napi::Promise::Deferred deferred; - pulsar_producer_t *cProducer; - pulsar_message_t *cMessage; + public: + ProducerSendWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer, + pulsar_message_t *cMessage) + : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), + deferred(deferred), + cProducer(cProducer), + cMessage(cMessage) {} + ~ProducerSendWorker() { pulsar_message_free(this->cMessage); } + void Execute() { + pulsar_result result = pulsar_producer_send(this->cProducer, this->cMessage); + if (result != pulsar_result_Ok) SetError(pulsar_result_str(result)); + } + void OnOK() { this->deferred.Resolve(Env().Null()); } + void OnError(const Napi::Error &e) { + this->deferred.Reject( + Napi::Error::New(Env(), std::string("Failed to send message: ") + e.Message()).Value()); + } + + private: + Napi::Promise::Deferred deferred; + pulsar_producer_t *cProducer; + pulsar_message_t *cMessage; }; Napi::Value Producer::Send(const Napi::CallbackInfo &info) { - Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); - pulsar_message_t *cMessage = Message::BuildMessage(info[0].As<Napi::Object>()); - ProducerSendWorker *wk = new ProducerSendWorker(deferred, this->cProducer, cMessage); - wk->Queue(); - return deferred.Promise(); + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + pulsar_message_t *cMessage = Message::BuildMessage(info[0].As<Napi::Object>()); + ProducerSendWorker *wk = new ProducerSendWorker(deferred, this->cProducer, cMessage); + wk->Queue(); + return deferred.Promise(); } class ProducerCloseWorker : public Napi::AsyncWorker { - public: - ProducerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer) - : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), - deferred(deferred), - cProducer(cProducer) {} - ~ProducerCloseWorker() {} - void Execute() { - pulsar_result result = pulsar_producer_close(this->cProducer); - if (result != pulsar_result_Ok) SetError(pulsar_result_str(result)); - } - void OnOK() { this->deferred.Resolve(Env().Null()); } - void OnError(const Napi::Error &e) { - this->deferred.Reject( - Napi::Error::New(Env(), std::string("Failed to close producer: ") + e.Message()).Value()); - } - - private: - Napi::Promise::Deferred deferred; - pulsar_producer_t *cProducer; + public: + ProducerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_producer_t *cProducer) + : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), + deferred(deferred), + cProducer(cProducer) {} + ~ProducerCloseWorker() {} + void Execute() { + pulsar_result result = pulsar_producer_close(this->cProducer); + if (result != pulsar_result_Ok) SetError(pulsar_result_str(result)); + } + void OnOK() { this->deferred.Resolve(Env().Null()); } + void OnError(const Napi::Error &e) { + this->deferred.Reject( + Napi::Error::New(Env(), std::string("Failed to close producer: ") + e.Message()).Value()); + } + + private: + Napi::Promise::Deferred deferred; + pulsar_producer_t *cProducer; }; Napi::Value Producer::Close(const Napi::CallbackInfo &info) { - Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); - ProducerCloseWorker *wk = new ProducerCloseWorker(deferred, this->cProducer); - wk->Queue(); - return deferred.Promise(); + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + ProducerCloseWorker *wk = new ProducerCloseWorker(deferred, this->cProducer); + wk->Queue(); + return deferred.Promise(); } Producer::~Producer() { pulsar_producer_free(this->cProducer); } diff --git a/src/Producer.h b/src/Producer.h index c758ce4..5d31cfc 100644 --- a/src/Producer.h +++ b/src/Producer.h @@ -25,18 +25,18 @@ #include <pulsar/c/producer.h> class Producer : public Napi::ObjectWrap<Producer> { - public: - static void Init(Napi::Env env, Napi::Object exports); - static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient); - static Napi::FunctionReference constructor; - Producer(const Napi::CallbackInfo &info); - ~Producer(); - void SetCProducer(pulsar_producer_t *cProducer); + public: + static void Init(Napi::Env env, Napi::Object exports); + static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient); + static Napi::FunctionReference constructor; + Producer(const Napi::CallbackInfo &info); + ~Producer(); + void SetCProducer(pulsar_producer_t *cProducer); - private: - pulsar_producer_t *cProducer; - Napi::Value Send(const Napi::CallbackInfo &info); - Napi::Value Close(const Napi::CallbackInfo &info); + private: + pulsar_producer_t *cProducer; + Napi::Value Send(const Napi::CallbackInfo &info); + Napi::Value Close(const Napi::CallbackInfo &info); }; #endif diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 9b94dd2..76fd25c 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -50,106 +50,105 @@ static std::map<std::string, pulsar_compression_type> COMPRESSION_TYPE = {{"Zlib {"LZ4", pulsar_CompressionLZ4}}; ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { - this->cProducerConfig = pulsar_producer_configuration_create(); - - if (producerConfig.Has(CFG_TOPIC) && producerConfig.Get(CFG_TOPIC).IsString()) { - this->topic = producerConfig.Get(CFG_TOPIC).ToString().Utf8Value(); - } - - if (producerConfig.Has(CFG_PRODUCER_NAME) && producerConfig.Get(CFG_PRODUCER_NAME).IsString()) { - std::string producerName = producerConfig.Get(CFG_PRODUCER_NAME).ToString().Utf8Value(); - if (!producerName.empty()) - pulsar_producer_configuration_set_producer_name(this->cProducerConfig, producerName.c_str()); - } - - if (producerConfig.Has(CFG_SEND_TIMEOUT) && producerConfig.Get(CFG_SEND_TIMEOUT).IsNumber()) { - int32_t sendTimeoutMs = producerConfig.Get(CFG_SEND_TIMEOUT).ToNumber().Int32Value(); - if (sendTimeoutMs > 0) { - pulsar_producer_configuration_set_send_timeout(this->cProducerConfig, sendTimeoutMs); - } + this->cProducerConfig = pulsar_producer_configuration_create(); + + if (producerConfig.Has(CFG_TOPIC) && producerConfig.Get(CFG_TOPIC).IsString()) { + this->topic = producerConfig.Get(CFG_TOPIC).ToString().Utf8Value(); + } + + if (producerConfig.Has(CFG_PRODUCER_NAME) && producerConfig.Get(CFG_PRODUCER_NAME).IsString()) { + std::string producerName = producerConfig.Get(CFG_PRODUCER_NAME).ToString().Utf8Value(); + if (!producerName.empty()) + pulsar_producer_configuration_set_producer_name(this->cProducerConfig, producerName.c_str()); + } + + if (producerConfig.Has(CFG_SEND_TIMEOUT) && producerConfig.Get(CFG_SEND_TIMEOUT).IsNumber()) { + int32_t sendTimeoutMs = producerConfig.Get(CFG_SEND_TIMEOUT).ToNumber().Int32Value(); + if (sendTimeoutMs > 0) { + pulsar_producer_configuration_set_send_timeout(this->cProducerConfig, sendTimeoutMs); } + } - if (producerConfig.Has(CFG_INIT_SEQUENCE_ID) && producerConfig.Get(CFG_INIT_SEQUENCE_ID).IsNumber()) { - int64_t initialSequenceId = producerConfig.Get(CFG_INIT_SEQUENCE_ID).ToNumber().Int64Value(); - pulsar_producer_configuration_set_initial_sequence_id(this->cProducerConfig, initialSequenceId); - } + if (producerConfig.Has(CFG_INIT_SEQUENCE_ID) && producerConfig.Get(CFG_INIT_SEQUENCE_ID).IsNumber()) { + int64_t initialSequenceId = producerConfig.Get(CFG_INIT_SEQUENCE_ID).ToNumber().Int64Value(); + pulsar_producer_configuration_set_initial_sequence_id(this->cProducerConfig, initialSequenceId); + } - if (producerConfig.Has(CFG_MAX_PENDING) && producerConfig.Get(CFG_MAX_PENDING).IsNumber()) { - int32_t maxPendingMessages = producerConfig.Get(CFG_MAX_PENDING).ToNumber().Int32Value(); - if (maxPendingMessages > 0) { - pulsar_producer_configuration_set_max_pending_messages(this->cProducerConfig, maxPendingMessages); - } + if (producerConfig.Has(CFG_MAX_PENDING) && producerConfig.Get(CFG_MAX_PENDING).IsNumber()) { + int32_t maxPendingMessages = producerConfig.Get(CFG_MAX_PENDING).ToNumber().Int32Value(); + if (maxPendingMessages > 0) { + pulsar_producer_configuration_set_max_pending_messages(this->cProducerConfig, maxPendingMessages); } - - if (producerConfig.Has(CFG_MAX_PENDING_ACROSS_PARTITIONS) && - producerConfig.Get(CFG_MAX_PENDING_ACROSS_PARTITIONS).IsNumber()) { - int32_t maxPendingMessagesAcrossPartitions = - producerConfig.Get(CFG_MAX_PENDING_ACROSS_PARTITIONS).ToNumber().Int32Value(); - if (maxPendingMessagesAcrossPartitions > 0) { - pulsar_producer_configuration_set_max_pending_messages(this->cProducerConfig, - maxPendingMessagesAcrossPartitions); - } + } + + if (producerConfig.Has(CFG_MAX_PENDING_ACROSS_PARTITIONS) && + producerConfig.Get(CFG_MAX_PENDING_ACROSS_PARTITIONS).IsNumber()) { + int32_t maxPendingMessagesAcrossPartitions = + producerConfig.Get(CFG_MAX_PENDING_ACROSS_PARTITIONS).ToNumber().Int32Value(); + if (maxPendingMessagesAcrossPartitions > 0) { + pulsar_producer_configuration_set_max_pending_messages(this->cProducerConfig, + maxPendingMessagesAcrossPartitions); } - - if (producerConfig.Has(CFG_BLOCK_IF_QUEUE_FULL) && - producerConfig.Get(CFG_BLOCK_IF_QUEUE_FULL).IsBoolean()) { - bool blockIfQueueFull = producerConfig.Get(CFG_BLOCK_IF_QUEUE_FULL).ToBoolean().Value(); - pulsar_producer_configuration_set_block_if_queue_full(this->cProducerConfig, blockIfQueueFull); - } - - if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) { - std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value(); - if (MESSAGE_ROUTING_MODE.count(messageRoutingMode)) - pulsar_producer_configuration_set_partitions_routing_mode( - this->cProducerConfig, MESSAGE_ROUTING_MODE.at(messageRoutingMode)); + } + + if (producerConfig.Has(CFG_BLOCK_IF_QUEUE_FULL) && + producerConfig.Get(CFG_BLOCK_IF_QUEUE_FULL).IsBoolean()) { + bool blockIfQueueFull = producerConfig.Get(CFG_BLOCK_IF_QUEUE_FULL).ToBoolean().Value(); + pulsar_producer_configuration_set_block_if_queue_full(this->cProducerConfig, blockIfQueueFull); + } + + if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) { + std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value(); + if (MESSAGE_ROUTING_MODE.count(messageRoutingMode)) + pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig, + MESSAGE_ROUTING_MODE.at(messageRoutingMode)); + } + + if (producerConfig.Has(CFG_HASH_SCHEME) && producerConfig.Get(CFG_HASH_SCHEME).IsString()) { + std::string hashingScheme = producerConfig.Get(CFG_HASH_SCHEME).ToString().Utf8Value(); + if (HASHING_SCHEME.count(hashingScheme)) + pulsar_producer_configuration_set_hashing_scheme(this->cProducerConfig, + HASHING_SCHEME.at(hashingScheme)); + } + + if (producerConfig.Has(CFG_COMPRESS_TYPE) && producerConfig.Get(CFG_COMPRESS_TYPE).IsString()) { + std::string compressionType = producerConfig.Get(CFG_COMPRESS_TYPE).ToString().Utf8Value(); + if (COMPRESSION_TYPE.count(compressionType)) + pulsar_producer_configuration_set_compression_type(this->cProducerConfig, + COMPRESSION_TYPE.at(compressionType)); + } + + if (producerConfig.Has(CFG_BATCH_ENABLED) && producerConfig.Get(CFG_BATCH_ENABLED).IsBoolean()) { + bool batchingEnabled = producerConfig.Get(CFG_BATCH_ENABLED).ToBoolean().Value(); + pulsar_producer_configuration_set_batching_enabled(this->cProducerConfig, batchingEnabled); + } + + if (producerConfig.Has(CFG_BATCH_MAX_DELAY) && producerConfig.Get(CFG_BATCH_MAX_DELAY).IsNumber()) { + int64_t batchingMaxPublishDelayMs = producerConfig.Get(CFG_BATCH_MAX_DELAY).ToNumber().Int64Value(); + if (batchingMaxPublishDelayMs > 0) { + pulsar_producer_configuration_set_batching_max_publish_delay_ms(this->cProducerConfig, + (long)batchingMaxPublishDelayMs); } + } - if (producerConfig.Has(CFG_HASH_SCHEME) && producerConfig.Get(CFG_HASH_SCHEME).IsString()) { - std::string hashingScheme = producerConfig.Get(CFG_HASH_SCHEME).ToString().Utf8Value(); - if (HASHING_SCHEME.count(hashingScheme)) - pulsar_producer_configuration_set_hashing_scheme(this->cProducerConfig, - HASHING_SCHEME.at(hashingScheme)); + if (producerConfig.Has(CFG_BATCH_MAX_MSG) && producerConfig.Get(CFG_BATCH_MAX_MSG).IsNumber()) { + uint32_t batchingMaxMessages = producerConfig.Get(CFG_BATCH_MAX_MSG).ToNumber().Uint32Value(); + if (batchingMaxMessages > 0) { + pulsar_producer_configuration_set_batching_max_messages(this->cProducerConfig, batchingMaxMessages); } - - if (producerConfig.Has(CFG_COMPRESS_TYPE) && producerConfig.Get(CFG_COMPRESS_TYPE).IsString()) { - std::string compressionType = producerConfig.Get(CFG_COMPRESS_TYPE).ToString().Utf8Value(); - if (COMPRESSION_TYPE.count(compressionType)) - pulsar_producer_configuration_set_compression_type(this->cProducerConfig, - COMPRESSION_TYPE.at(compressionType)); - } - - if (producerConfig.Has(CFG_BATCH_ENABLED) && producerConfig.Get(CFG_BATCH_ENABLED).IsBoolean()) { - bool batchingEnabled = producerConfig.Get(CFG_BATCH_ENABLED).ToBoolean().Value(); - pulsar_producer_configuration_set_batching_enabled(this->cProducerConfig, batchingEnabled); - } - - if (producerConfig.Has(CFG_BATCH_MAX_DELAY) && producerConfig.Get(CFG_BATCH_MAX_DELAY).IsNumber()) { - int64_t batchingMaxPublishDelayMs = producerConfig.Get(CFG_BATCH_MAX_DELAY).ToNumber().Int64Value(); - if (batchingMaxPublishDelayMs > 0) { - pulsar_producer_configuration_set_batching_max_publish_delay_ms(this->cProducerConfig, - (long)batchingMaxPublishDelayMs); - } - } - - if (producerConfig.Has(CFG_BATCH_MAX_MSG) && producerConfig.Get(CFG_BATCH_MAX_MSG).IsNumber()) { - uint32_t batchingMaxMessages = producerConfig.Get(CFG_BATCH_MAX_MSG).ToNumber().Uint32Value(); - if (batchingMaxMessages > 0) { - pulsar_producer_configuration_set_batching_max_messages(this->cProducerConfig, - batchingMaxMessages); - } - } - - if (producerConfig.Has(CFG_PROPS) && producerConfig.Get(CFG_PROPS).IsObject()) { - Napi::Object propObj = producerConfig.Get(CFG_PROPS).ToObject(); - Napi::Array arr = propObj.GetPropertyNames(); - int size = arr.Length(); - for (int i = 0; i < size; i++) { - Napi::String key = arr.Get(i).ToString(); - Napi::String value = propObj.Get(key).ToString(); - pulsar_producer_configuration_set_property(this->cProducerConfig, key.Utf8Value().c_str(), - value.Utf8Value().c_str()); - } + } + + if (producerConfig.Has(CFG_PROPS) && producerConfig.Get(CFG_PROPS).IsObject()) { + Napi::Object propObj = producerConfig.Get(CFG_PROPS).ToObject(); + Napi::Array arr = propObj.GetPropertyNames(); + int size = arr.Length(); + for (int i = 0; i < size; i++) { + Napi::String key = arr.Get(i).ToString(); + Napi::String value = propObj.Get(key).ToString(); + pulsar_producer_configuration_set_property(this->cProducerConfig, key.Utf8Value().c_str(), + value.Utf8Value().c_str()); } + } } ProducerConfig::~ProducerConfig() { pulsar_producer_configuration_free(this->cProducerConfig); } diff --git a/src/ProducerConfig.h b/src/ProducerConfig.h index e2ecad0..c18c813 100644 --- a/src/ProducerConfig.h +++ b/src/ProducerConfig.h @@ -24,15 +24,15 @@ #include <pulsar/c/producer_configuration.h> class ProducerConfig { - public: - ProducerConfig(const Napi::Object &producerConfig); - ~ProducerConfig(); - pulsar_producer_configuration_t *GetCProducerConfig(); - std::string GetTopic(); + public: + ProducerConfig(const Napi::Object &producerConfig); + ~ProducerConfig(); + pulsar_producer_configuration_t *GetCProducerConfig(); + std::string GetTopic(); - private: - pulsar_producer_configuration_t *cProducerConfig; - std::string topic; + private: + pulsar_producer_configuration_t *cProducerConfig; + std::string topic; }; #endif diff --git a/src/addon.cc b/src/addon.cc index abcbb24..9e75d3f 100644 --- a/src/addon.cc +++ b/src/addon.cc @@ -25,11 +25,11 @@ #include <napi.h> Napi::Object InitAll(Napi::Env env, Napi::Object exports) { - Message::Init(env, exports); - MessageId::Init(env, exports); - Producer::Init(env, exports); - Consumer::Init(env, exports); - return Client::Init(env, exports); + Message::Init(env, exports); + MessageId::Init(env, exports); + Producer::Init(env, exports); + Consumer::Init(env, exports); + return Client::Init(env, exports); } NODE_API_MODULE(NODE_GYP_MODULE_NAME, InitAll)
