This is an automated email from the ASF dual-hosted git repository. ifplusor pushed a commit to branch n-api in repository https://gitbox.apache.org/repos/asf/rocketmq-client-nodejs.git
commit d7d8287a5882b9f06035fb3b84017aea079b5db7 Author: James Yin <[email protected]> AuthorDate: Fri Mar 26 12:30:05 2021 +0800 refactor: powered by N-API and rocketmq-client-cpp@re_dev --- .clang-format | 111 ++++++ .gitignore | 6 +- .gitmodules | 5 +- README.md | 10 +- binding.gyp | 28 +- deps/rocketmq | 2 +- lib/common.js | 28 -- lib/env_init.js | 32 -- lib/producer.js | 6 +- lib/push_consumer.js | 6 +- package.json | 22 +- script/download_lib.js | 131 ------ script/get_linux_distro_route.js | 102 ----- src/consumer_ack.cpp | 86 ++-- src/consumer_ack.h | 51 +-- src/consumer_ack_inner.cpp | 77 ---- src/consumer_ack_inner.h | 42 -- src/producer.cpp | 440 +++++++++++---------- src/producer.h | 48 +-- src/push_consumer.cpp | 546 ++++++++++---------------- src/push_consumer.h | 66 ++-- src/rocketmq.cpp | 32 +- src/workers/producer/send_message.h | 80 ---- src/workers/producer/start_or_shutdown.h | 88 ----- src/workers/push_consumer/start_or_shutdown.h | 88 ----- 25 files changed, 691 insertions(+), 1442 deletions(-) diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..4aad29c --- /dev/null +++ b/.clang-format @@ -0,0 +1,111 @@ +--- +Language: Cpp +# BasedOnStyle: Google +AccessModifierOffset: -1 +AlignAfterOpenBracket: Align +AlignConsecutiveAssignments: false +AlignConsecutiveDeclarations: false +AlignEscapedNewlines: Right +AlignOperands: true +AlignTrailingComments: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortBlocksOnASingleLine: false +AllowShortCaseLabelsOnASingleLine: false +AllowShortFunctionsOnASingleLine: Inline +AllowShortIfStatementsOnASingleLine: true +AllowShortLoopsOnASingleLine: true +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: false +AlwaysBreakTemplateDeclarations: true +BinPackArguments: false +BinPackParameters: false +BraceWrapping: + AfterClass: false + AfterControlStatement: false + AfterEnum: false + AfterFunction: false + AfterNamespace: false + AfterObjCDeclaration: false + AfterStruct: false + AfterUnion: false + AfterExternBlock: false + BeforeCatch: false + BeforeElse: false + IndentBraces: false + SplitEmptyFunction: true + SplitEmptyRecord: true + SplitEmptyNamespace: true +BreakBeforeBinaryOperators: None +BreakBeforeBraces: Attach +BreakBeforeInheritanceComma: false +BreakBeforeTernaryOperators: true +BreakConstructorInitializersBeforeComma: false +BreakConstructorInitializers: BeforeColon +BreakAfterJavaFieldAnnotations: false +BreakStringLiterals: true +ColumnLimit: 80 +CommentPragmas: '^ IWYU pragma:' +CompactNamespaces: false +ConstructorInitializerAllOnOneLineOrOnePerLine: true +ConstructorInitializerIndentWidth: 4 +ContinuationIndentWidth: 4 +Cpp11BracedListStyle: true +DerivePointerAlignment: false +DisableFormat: false +ExperimentalAutoDetectBinPacking: false +FixNamespaceComments: true +ForEachMacros: + - foreach + - Q_FOREACH + - BOOST_FOREACH +IncludeBlocks: Preserve +IncludeCategories: + - Regex: '^<ext/.*\.h>' + Priority: 2 + - Regex: '^<.*\.h>' + Priority: 1 + - Regex: '^<.*' + Priority: 2 + - Regex: '.*' + Priority: 3 +IncludeIsMainRegex: '([-_](test|unittest))?$' +IndentCaseLabels: true +IndentPPDirectives: None +IndentWidth: 2 +IndentWrappedFunctionNames: false +JavaScriptQuotes: Leave +JavaScriptWrapImports: true +KeepEmptyLinesAtTheStartOfBlocks: false +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCBlockIndentWidth: 2 +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: false +PenaltyBreakAssignment: 2 +PenaltyBreakBeforeFirstCallParameter: 1 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakString: 1000 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 200 +PointerAlignment: Left +ReflowComments: true +SortIncludes: true +SortUsingDeclarations: true +SpaceAfterCStyleCast: false +SpaceAfterTemplateKeyword: true +SpaceBeforeAssignmentOperators: true +SpaceBeforeParens: ControlStatements +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 2 +SpacesInAngles: false +SpacesInContainerLiterals: true +SpacesInCStyleCastParentheses: false +SpacesInParentheses: false +SpacesInSquareBrackets: false +Standard: Auto +TabWidth: 8 +UseTab: Never diff --git a/.gitignore b/.gitignore index a50fd0b..6978ed0 100644 --- a/.gitignore +++ b/.gitignore @@ -197,6 +197,8 @@ $RECYCLE.BIN/ # Windows shortcuts *.lnk -build package-lock.json -.vscode +.vscode/ +build/ +Debug/ +Release/ diff --git a/.gitmodules b/.gitmodules index c9fc461..51ac186 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,4 @@ [submodule "deps/rocketmq"] -path = deps/rocketmq -url = https://github.com/apache/rocketmq-client-cpp.git + path = deps/rocketmq + url = https://github.com/apache/rocketmq-client-cpp.git + branch = re_dev diff --git a/README.md b/README.md index cdb8f65..dd15ba7 100644 --- a/README.md +++ b/README.md @@ -6,12 +6,12 @@ [](https://travis-ci.org/apache/rocketmq-client-nodejs) [](https://david-dm.org/apache/rocketmq-client-nodejs) -This official Node.js client is a lightweight wrapper around [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), a finely tuned CPP client. +This official Node.js client is a lightweight wrapper around [re_dev branch of rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp/tree/re_dev), a finely tuned CPP client. > **Notice 1:** This client is still in `dev` version. Use it cautiously in > production. -> **Notice 2:** This SDK is now only support macOS and Ubuntu **14.04**. Ubuntu 16+ is not supported and CentOS is not tested yet. +> **Notice 2:** This SDK is now only tested on macOS. Ubuntu and CentOS is not tested yet. ## Installation @@ -50,7 +50,8 @@ new Producer(groupId[, instanceName][, options]); - `compressLevel`: the compress level (0-9) of this producer, default to `5` where `0` is fastest and `9` is most compressed; - `sendMessageTimeout`: send message timeout millisecond, default to `3000` and suggestion is 2000 - 3000ms; - `maxMessageSize`: max message size with unit (B), default to `1024 * 128` which means 128K; - - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`; + - `logDir`: the folder where C++ core logic log store, default log file path is `$HOME/logs/rocketmq-cpp`; + - `logFileNum`: C++ core logic log file number, default to 3; - `logFileSize`: size of each C++ core logic log file with unit (B); - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`. @@ -177,7 +178,8 @@ new PushConsumer(groupId[, instanceName][, options]); - `nameServer`: the name server of RocketMQ; - `threadCount`: the thread number of underlying C++ logic; - `maxBatchSize`: message max batch size; - - `logFileNum`: C++ core logic log file number, default to 3 and log file path is `$HOME/logs/rocketmq-cpp`; + - `logDir`: the folder where C++ core logic log store, default log file path is `$HOME/logs/rocketmq-cpp`; + - `logFileNum`: C++ core logic log file number, default to 3; - `logFileSize`: size of each C++ core logic log file with unit (B); - `logLevel`: C++ core logic log level in `"fatal"`, `"error"`, `"warn"`, `"info"`, `"debug"`, `"trace"` and `"num"`. diff --git a/binding.gyp b/binding.gyp index 8f4a46e..c7c1d71 100644 --- a/binding.gyp +++ b/binding.gyp @@ -21,21 +21,21 @@ "src/rocketmq.cpp", "src/producer.cpp", "src/push_consumer.cpp", - "src/consumer_ack.cpp", - "src/consumer_ack_inner.cpp" + "src/consumer_ack.cpp" ], "include_dirs": [ "deps/rocketmq/include", - "<!(node -e \"require('nan')\")" + "<!@(node -p \"require('node-addon-api').include\")" + ], + "library_dirs": [ + "<(module_root_dir)/deps/rocketmq/bin" ], "conditions": [ ["OS==\"linux\"", { - "libraries": [ - "<(module_root_dir)/deps/lib/librocketmq.a" - ], - "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ], - "cflags_cc": [ "-Wno-ignored-qualifiers" ], - "cflags": [ "-std=c++11", "-g" ] + "libraries": [ "-lrocketmq" ], + "cflags_cc!": [ "-fno-exceptions", "-fno-rtti", "-pthread", "-Wl,--no-as-needed", "-ldl" ], + "cflags_cc": [ "-Wall", "-std=c++11" ], + "cflags": [ "-g" ] }], ["OS==\"win\"", { "libraries": [ @@ -49,13 +49,13 @@ ] }], ["OS==\"mac\"", { + "libraries": [ "-lrocketmq" ], "xcode_settings": { - "GCC_ENABLE_CPP_EXCEPTIONS": "YES" + "GCC_ENABLE_CPP_EXCEPTIONS": "YES", + 'GCC_ENABLE_CPP_RTTI': 'YES' }, - "cflags!": [ "-fno-exceptions" ], - "cflags_cc!": [ "-fno-exceptions", "-pthread", "-Wl,--no-as-needed", "-ldl" ], - "cflags_cc": [ "-Wno-ignored-qualifiers" ], - "cflags": [ "-std=c++11", "-stdlib=libc++" ] + "cflags_cc!": [ "-fno-exceptions", "-fno-rtti", "-pthread", "-Wl,--no-as-needed", "-ldl" ], + "cflags_cc": [ "-Wall", "-std=c++11", "-stdlib=libc++" ] }] ] } diff --git a/deps/rocketmq b/deps/rocketmq index d5887b6..db0e209 160000 --- a/deps/rocketmq +++ b/deps/rocketmq @@ -1 +1 @@ -Subproject commit d5887b63ddbba16fec562f64dca7f77ce9ca0bb1 +Subproject commit db0e20972a78d70930acce5eaff1456408b72c73 diff --git a/lib/common.js b/lib/common.js deleted file mode 100644 index 2386605..0000000 --- a/lib/common.js +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -"use strict"; - -exports.requireBinding = function(name) { - let mod; - try { - mod = require(`../build/Debug/${name}`); - } catch(e) { - mod = require(`../build/Release/${name}`); - } - - return mod; -}; diff --git a/lib/env_init.js b/lib/env_init.js deleted file mode 100644 index 0c26a1d..0000000 --- a/lib/env_init.js +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -"use strict"; - -const os = require("os"); -const path = require("path"); - -const common = require("./common"); - -const binding = common.requireBinding("rocketmq"); - -switch(os.platform()) { -case "darwin": - process.env.EVENT_NOKQUEUE = "1"; - binding.macosDLOpen(path.join(__dirname, "../deps/lib/librocketmq.dylib")); - break; -default: break; -} diff --git a/lib/producer.js b/lib/producer.js index f4f5c93..02b777e 100644 --- a/lib/producer.js +++ b/lib/producer.js @@ -16,13 +16,9 @@ */ "use strict"; -require("./env_init"); - const assert = require("assert"); -const common = require("./common"); - -const binding = common.requireBinding("rocketmq"); +const binding = require("bindings")("rocketmq"); const START_OR_SHUTDOWN = Symbol("RocketMQProducer#startOrShutdown"); diff --git a/lib/push_consumer.js b/lib/push_consumer.js index 3bb51de..7cf97ab 100644 --- a/lib/push_consumer.js +++ b/lib/push_consumer.js @@ -16,14 +16,10 @@ */ "use strict"; -require("./env_init"); - const assert = require("assert"); const EventEmitter = require("events").EventEmitter; -const common = require("./common"); - -const binding = common.requireBinding("rocketmq"); +const binding = require("bindings")("rocketmq"); const START_OR_SHUTDOWN = Symbol("RocketMQPushConsumer#startOrShutdown"); const START_STATUS = { diff --git a/package.json b/package.json index 074198d..5e38e40 100644 --- a/package.json +++ b/package.json @@ -1,25 +1,23 @@ { "name": "apache-rocketmq", - "version": "1.0.0-rc1", - "cppSDKVersion": "1.2.0", + "version": "2.0.0-rc1", + "cppSDKVersion": "3.0.0", "description": "RocketMQ binding for Node.js", + "license": "Apache-2.0", + "author": "James Yin <[email protected]>", "main": "index.js", "scripts": { + "preinstall": "git submodule update --init --recommend-shallow", + "install": "deps/rocketmq/build.sh && node-gyp rebuild", "test": "npm run lint && echo 'temp example test' && node example/producer.js && node example/push_consumer.js", - "lint": "eslint .", - "install": "node ./script/download_lib.js && node-gyp rebuild" + "lint": "eslint ." }, - "author": "XadillaX <[email protected]>", - "license": "Apache-2.0", "dependencies": { - "co": "^4.6.0", - "destroy": "^1.0.4", - "getos": "^3.1.1", - "mkdirp": "^0.5.1", - "nan": "^2.11.1", - "urllib": "^2.31.3" + "bindings": "^1.5.0", + "node-addon-api": "^3.1.0" }, "devDependencies": { + "co": "^4.6.0", "eslint": "^5.9.0", "eslint-config-rocketmq-style": "^1.0.0" } diff --git a/script/download_lib.js b/script/download_lib.js deleted file mode 100644 index 5cfebfd..0000000 --- a/script/download_lib.js +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -"use strict"; - -const fs = require("fs"); -const os = require("os"); -const path = require("path"); - -const co = require("co"); -const destroy = require("destroy"); -const _mkdirp = require("mkdirp"); -const urllib = require("urllib"); - -const getLinuxDistroRoute = require("./get_linux_distro_route"); -const pkg = require("../package"); - -let REGISTRY_MIRROR = - process.env.NODE_ROCKETMQ_REGISTRY || - "https://opensource-rocketmq-client.oss-cn-hangzhou.aliyuncs.com"; -if(!REGISTRY_MIRROR.endsWith("/")) REGISTRY_MIRROR += "/"; - -const CPP_SDK_VERSION = pkg.cppSDKVersion; -const LIB_DIR = path.join(__dirname, "..", "deps", "lib"); -const URL_ROOT = `${REGISTRY_MIRROR}cpp-client`; - -function mkdirp(dir) { - return new Promise((resolve, reject) => { - _mkdirp(dir, null, err => { - if(err) reject(err); - else resolve(); - }); - }); -} - -function *getUrlArray() { - const platform = os.platform(); - const ret = []; - let distro; - - switch(platform) { - case "win32": - ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.dll`); - ret.push(`${URL_ROOT}/windows/${CPP_SDK_VERSION}/rocketmq-client-cpp.lib`); - break; - - case "darwin": - ret.push(`${URL_ROOT}/mac/${CPP_SDK_VERSION}/librocketmq.dylib`); - break; - - case "linux": - distro = yield getLinuxDistroRoute(); - ret.push(`${URL_ROOT}/linux/${CPP_SDK_VERSION}/${distro}/librocketmq.a`); - break; - - default: throw new Error(`Unsupported platform ${platform}`); - } - - return ret; -} - -co(function *() { - let urls; - try { - urls = yield getUrlArray(); - } catch(e) { - console.error(`[rocketmq sdk] [error] ${e.message}`); - process.exit(4); - } - - yield mkdirp(LIB_DIR); - let writeTimes = 0; - for(const url of urls) { - console.log(`[rocketmq sdk] [info] downloading [${url}]...`); - - const resp = yield urllib.request(url, { - timeout: 60000 * 5, - followRedirect: true, - streaming: true - }); - - if(resp.status !== 200) { - destroy(resp.res); - console.error(`[rocketmq sdk] [error] error status ${resp.status} while downloading [${url}].`); - process.exit(4); - } - - const readStream = resp.res; - const filename = path.join(LIB_DIR, path.basename(url)); - const writeStream = fs.createWriteStream(filename, { - encoding: "binary" - }); - - // eslint-disable-next-line - function handleDownladCallback(err) { - if(err) { - console.error(`[rocketmq sdk] [error] error occurred while downloading [${url}] to [${filename}].`); - console.error(err.stack); - process.exit(4); - } - - writeTimes++; - destroy(resp.res); - - console.log(`[rocketmq sdk] [info] downloaded library [${url}].`); - if(writeTimes === urls.length) { - console.log("[rocketmq sdk] [info] all libraries have been written to disk."); - process.exit(0); - } - } - - readStream.on("error", handleDownladCallback); - writeStream.on("error", handleDownladCallback); - writeStream.on("finish", handleDownladCallback); - - readStream.pipe(writeStream); - } -}); diff --git a/script/get_linux_distro_route.js b/script/get_linux_distro_route.js deleted file mode 100644 index f03c827..0000000 --- a/script/get_linux_distro_route.js +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -"use strict"; - -const assert = require("assert"); - -const getos = require("getos"); - -const RHEL_MAP_ARRAY = [ - "Centos", - "Red Hat Linux", - "RHEL", - "Scientific Linux", - "ScientificSL", - "ScientificCERNSLC", - "ScientificFermiLTS", - "ScientificSLF" -]; -const NORMAL_MAP_ARRAY = [ - "Alpine Linux", - "Amazon Linux", - "Arch Linux", - "Chakra", - "Debian", - "elementary OS", - "IYCC", - "Linux Mint", - "Manjaro Linux", - "Ubuntu Linux" -]; - -function realGetLinuxDistroRoute(dist, release) { - let major; - if(release) { - major = Number(release.split(".")[0]); - } - - // RHEL Distros - if(RHEL_MAP_ARRAY.includes(dist)) { - assert(major >= 5 && major <= 7, `Only support ${dist} 5-7.`); - return `RHEL${major}.x`; - } - - // Fedora - if("Fedora" === dist) { - if(major <= 18) { - return "RHEL6.x"; - } else if(major === 19) { - return "RHEL7.x"; - } - } - - if("Ubuntu Linux" === dist) { - assert([ 14, 16, 18 ].includes(major)); - return `UBUNTU/${major}.04`; - } - - if("Debian" === dist) { - assert(major >= 8 && major <= 10); - return `UBUNTU/${(major + 2) / 2}.04`; - } - - // Ubuntu Distros - if(!NORMAL_MAP_ARRAY.includes(dist)) { - console.error(`[rocketmq sdk] [warn] ${dist} may not supported, fallback to use Ubuntu library.`); - } - - return "UBUNTU/14.04"; -} - -function getLinuxDistroRoute() { - return new Promise((resolve, reject) => { - getos(function(err, ret) { - if(err) return reject(err); - - let route; - try { - route = realGetLinuxDistroRoute(ret.dist, ret.release); - } catch(e) { - return reject(e); - } - - resolve(route); - }); - }); -} - -module.exports = getLinuxDistroRoute; diff --git a/src/consumer_ack.cpp b/src/consumer_ack.cpp index e2f5c09..8de4aaf 100644 --- a/src/consumer_ack.cpp +++ b/src/consumer_ack.cpp @@ -15,76 +15,52 @@ * limitations under the License. */ #include "consumer_ack.h" +#include <exception> +#include "napi.h" namespace __node_rocketmq__ { -Nan::Persistent<Function> ConsumerAck::constructor; +Napi::Object ConsumerAck::Init(Napi::Env env, Napi::Object exports) { + Napi::Function func = DefineClass( + env, "ConsumerAck", {InstanceMethod<&ConsumerAck::Done>("done")}); -ConsumerAck::ConsumerAck() : - inner(NULL) -{ -} + Napi::FunctionReference* constructor = new Napi::FunctionReference(); + *constructor = Napi::Persistent(func); + env.SetInstanceData<Napi::FunctionReference>(constructor); -ConsumerAck::~ConsumerAck() -{ - inner = NULL; + exports.Set("ConsumerAck", func); + return exports; } -NAN_MODULE_INIT(ConsumerAck::Init) -{ - Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); - tpl->SetClassName(Nan::New("ConsumerAck").ToLocalChecked()); - tpl->InstanceTemplate()->SetInternalFieldCount(1); - - Nan::SetPrototypeMethod(tpl, "done", Done); - - constructor.Reset(tpl->GetFunction()); - Nan::Set(target, Nan::New("ConsumerAck").ToLocalChecked(), tpl->GetFunction()); +Napi::Object ConsumerAck::NewInstance(Napi::Env env) { + Napi::Object obj = env.GetInstanceData<Napi::FunctionReference>()->New({}); + return obj; } -NAN_METHOD(ConsumerAck::New) -{ - Isolate* isolate = info.GetIsolate(); - Local<Context> context = Context::New(isolate); - - if(!info.IsConstructCall()) - { - Local<Function> _constructor = Nan::New<Function>(constructor); - info.GetReturnValue().Set(_constructor->NewInstance(context, 0, 0).ToLocalChecked()); - return; - } +ConsumerAck::ConsumerAck(const Napi::CallbackInfo& info) + : Napi::ObjectWrap<ConsumerAck>(info) {} - ConsumerAck* producer = new ConsumerAck(); - producer->Wrap(info.This()); - info.GetReturnValue().Set(info.This()); +void ConsumerAck::SetPromise(std::promise<bool>&& promise) { + promise_ = std::move(promise); } -NAN_METHOD(ConsumerAck::Done) -{ - ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(info.Holder()); - bool succ = true; - - if(info.Length() >= 1) - { - succ = (info[0]->IsUndefined() || info[0]->IsNull() || Nan::To<bool>(info[0]).FromJust()); - } +void ConsumerAck::Done(bool ack) { + promise_.set_value(ack); +} - // call inner ack's `Ack` function to emit the true `Acker`'s `Ack` function - // and finish waiting of consume thread - CConsumeStatus status = succ ? - CConsumeStatus::E_CONSUME_SUCCESS : - CConsumeStatus::E_RECONSUME_LATER; - ack->Ack(status); +void ConsumerAck::Done(std::exception_ptr exception) { + promise_.set_exception(exception); } -void ConsumerAck::Ack(CConsumeStatus status) -{ - if(inner) - { - // call inner ack in the main event loop - inner->Ack(status); - inner = NULL; +Napi::Value ConsumerAck::Done(const Napi::CallbackInfo& info) { + if (info.Length() >= 1) { + Napi::Value ack = info[0]; + if (ack.IsBoolean() && !ack.ToBoolean()) { + Done(false); } + } + Done(true); + return info.Env().Undefined(); } -} +} // namespace __node_rocketmq__ diff --git a/src/consumer_ack.h b/src/consumer_ack.h index 682d85e..ff0ebd1 100644 --- a/src/consumer_ack.h +++ b/src/consumer_ack.h @@ -17,50 +17,31 @@ #ifndef __ROCKETMQ_CONSUMER_ACK_H__ #define __ROCKETMQ_CONSUMER_ACK_H__ -#include "consumer_ack_inner.h" -#include <nan.h> +#include <future> -namespace __node_rocketmq__ { - -using v8::Context; -using v8::Function; -using v8::FunctionTemplate; -using v8::Isolate; -using v8::Local; -using v8::Object; -using v8::String; -using v8::Value; +#include <napi.h> -class ConsumerAck : public Nan::ObjectWrap { -public: - static NAN_MODULE_INIT(Init); - -private: - explicit ConsumerAck(); - ~ConsumerAck(); +namespace __node_rocketmq__ { - static NAN_METHOD(New); - static NAN_METHOD(Done); +class ConsumerAck : public Napi::ObjectWrap<ConsumerAck> { + public: + static Napi::Object Init(Napi::Env env, Napi::Object exports); + static Napi::Object NewInstance(Napi::Env env); - void Ack(CConsumeStatus status); + ConsumerAck(const Napi::CallbackInfo& info); - static Nan::Persistent<v8::Function> constructor; + void SetPromise(std::promise<bool>&& promise); -public: - void SetInner(ConsumerAckInner* _inner) - { - inner = _inner; - } + void Done(bool ack); + void Done(std::exception_ptr exception); - static Nan::Persistent<v8::Function>& GetConstructor() - { - return constructor; - } + private: + Napi::Value Done(const Napi::CallbackInfo& info); -private: - ConsumerAckInner* inner; + private: + std::promise<bool> promise_; }; -} +} // namespace __node_rocketmq__ #endif diff --git a/src/consumer_ack_inner.cpp b/src/consumer_ack_inner.cpp deleted file mode 100644 index 47b1a07..0000000 --- a/src/consumer_ack_inner.cpp +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "consumer_ack_inner.h" - -namespace __node_rocketmq__ { - -ConsumerAckInner::ConsumerAckInner() : - acked(false) -{ - uv_cond_init(&cond); - uv_mutex_init(&mutex); -} - -ConsumerAckInner::~ConsumerAckInner() -{ - uv_mutex_destroy(&mutex); - uv_cond_destroy(&cond); -} - -void ConsumerAckInner::Ack(CConsumeStatus _status) -{ - uv_mutex_lock(&mutex); - bool _acked = acked; - - if(_acked) - { - uv_mutex_unlock(&mutex); - return; - } - - status = _status; - acked = true; - - // tell `this->WaitResult()` to continue - uv_cond_signal(&cond); - uv_mutex_unlock(&mutex); -} - -CConsumeStatus ConsumerAckInner::WaitResult() -{ - uv_mutex_lock(&mutex); - - // if `cond signal` send before `WaitResult()`, - // `uv_cond_wait` will be blocked and never continue - // - // so we have to return result directly without `uv_cond_wait` - if(acked) - { - CConsumeStatus _status = status; - uv_mutex_unlock(&mutex); - return _status; - } - - // wait for `this->Ack()` and that will emit `uv_cond_signal` to let it stop wait - uv_cond_wait(&cond, &mutex); - - CConsumeStatus _status = status; - uv_mutex_unlock(&mutex); - - return _status; -} - -} diff --git a/src/consumer_ack_inner.h b/src/consumer_ack_inner.h deleted file mode 100644 index 9caa105..0000000 --- a/src/consumer_ack_inner.h +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __ROCKETMQ_CONSUMER_ACK_INNER_H__ -#define __ROCKETMQ_CONSUMER_ACK_INNER_H__ - -#include <uv.h> -#include <CPushConsumer.h> - -namespace __node_rocketmq__ { - -class ConsumerAckInner { -public: - ConsumerAckInner(); - ~ConsumerAckInner(); - - void Ack(CConsumeStatus _status); - CConsumeStatus WaitResult(); - -private: - bool acked; - CConsumeStatus status; - uv_mutex_t mutex; - uv_cond_t cond; -}; - -} - -#endif diff --git a/src/producer.cpp b/src/producer.cpp index 3ae4047..45dfe1f 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -15,253 +15,277 @@ * limitations under the License. */ #include "producer.h" -#include "workers/producer/send_message.h" -#include "workers/producer/start_or_shutdown.h" -#include <MQClientException.h> +#include <cstddef> +#include <exception> #include <string> -using namespace std; -namespace __node_rocketmq__ { - -#define NAN_GET_CPRODUCER() \ - RocketMQProducer* _v8_producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder()); \ - CProducer* producer_ptr = _v8_producer->GetProducer(); - -Nan::Persistent<Function> RocketMQProducer::constructor; +#include <napi.h> -RocketMQProducer::RocketMQProducer(const char* group_id, const char* instance_name) -{ - producer_ptr = CreateProducer(group_id); - if(instance_name) - { - SetProducerInstanceName(producer_ptr, instance_name); - } -} +#include <ClientRPCHook.h> +#include <LoggerConfig.h> +#include <MQException.h> +#include <MQMessage.h> +#include <SendCallback.h> -RocketMQProducer::~RocketMQProducer() -{ - try - { - ShutdownProducer(producer_ptr); - } - catch (...) - { - // - } +namespace __node_rocketmq__ { - DestroyProducer(producer_ptr); +Napi::Object RocketMQProducer::Init(Napi::Env env, Napi::Object exports) { + Napi::Function func = + DefineClass(env, + "RocketMQProducer", + { + InstanceMethod<&RocketMQProducer::Start>("start"), + InstanceMethod<&RocketMQProducer::Shutdown>("shutdown"), + InstanceMethod<&RocketMQProducer::Send>("send"), + InstanceMethod<&RocketMQProducer::SetSessionCredentials>( + "setSessionCredentials"), + }); + + Napi::FunctionReference* constructor = new Napi::FunctionReference(); + *constructor = Napi::Persistent(func); + env.SetInstanceData<Napi::FunctionReference>(constructor); + + exports.Set("Producer", func); + return exports; } -void RocketMQProducer::SetOptions(Local<Object> options) -{ - // set name server - Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked(); - if(_name_server_v->IsString()) - { - Nan::Utf8String namesrv(_name_server_v); - SetProducerNameServerAddress(producer_ptr, *namesrv); - } - - // set group name - Local<Value> _group_name_v = Nan::Get(options, Nan::New<String>("groupName").ToLocalChecked()).ToLocalChecked(); - if(_group_name_v->IsString()) - { - Nan::Utf8String group_name(_group_name_v); - SetProducerGroupName(producer_ptr, *group_name); - } +RocketMQProducer::RocketMQProducer(const Napi::CallbackInfo& info) + : Napi::ObjectWrap<RocketMQProducer>(info), producer_("") { + const Napi::Value group_name = info[0]; + if (group_name.IsString()) { + producer_.set_group_name(group_name.ToString()); + } - // set log num & single log size - int file_num = 3; - int64 file_size = 104857600; - Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked(); - Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked(); - if(_log_file_num_v->IsNumber()) - { - file_num = _log_file_num_v->Int32Value(); - } - if(_log_file_size_v->IsNumber()) - { - file_size = _log_file_size_v->Int32Value(); - } - SetProducerLogFileNumAndSize(producer_ptr, file_num, file_size); - - // set log level - Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked(); - if(_log_level_v->IsNumber()) - { - int level = _log_level_v->Int32Value(); - SetProducerLogLevel(producer_ptr, (CLogLevel)level); - } + const Napi::Value instance_name = info[1]; + if (instance_name.IsString()) { + producer_.set_instance_name(instance_name.ToString()); + } - // set compress level - Local<Value> _compress_level_v = Nan::Get(options, Nan::New<String>("compressLevel").ToLocalChecked()).ToLocalChecked(); - if(_compress_level_v->IsNumber()) { - int level = _compress_level_v->Int32Value(); - SetProducerCompressLevel(producer_ptr, level); - } + const Napi::Value options = info[2]; + if (options.IsObject()) { + // try to set options + SetOptions(options.ToObject()); + } +} - // set send message timeout - Local<Value> _send_message_timeout_v = Nan::Get(options, Nan::New<String>("sendMessageTimeout").ToLocalChecked()).ToLocalChecked(); - if(_send_message_timeout_v->IsNumber()) - { - int timeout = _send_message_timeout_v->Int32Value(); - SetProducerSendMsgTimeout(producer_ptr, timeout); - } +RocketMQProducer::~RocketMQProducer() { + producer_.shutdown(); +} - // set max message size - Local<Value> _max_message_size_v = Nan::Get(options, Nan::New<String>("maxMessageSize").ToLocalChecked()).ToLocalChecked(); - if(_max_message_size_v->IsNumber()) - { - int size = _max_message_size_v->Int32Value(); - SetProducerMaxMessageSize(producer_ptr, size); +void RocketMQProducer::SetOptions(const Napi::Object& options) { + // set name server + Napi::Value name_server = options.Get("nameServer"); + if (name_server.IsString()) { + producer_.set_namesrv_addr(name_server.ToString()); + } + + // set group name + Napi::Value group_name = options.Get("groupName"); + if (group_name.IsString()) { + producer_.set_group_name(group_name.ToString()); + } + + // set max message size + Napi::Value max_message_size = options.Get("maxMessageSize"); + if (max_message_size.IsNumber()) { + producer_.set_max_message_size(max_message_size.ToNumber()); + } + + // set compress level + Napi::Value compress_level = options.Get("compressLevel"); + if (compress_level.IsNumber()) { + producer_.set_compress_level(compress_level.ToNumber()); + } + + // set send message timeout + Napi::Value send_message_timeout = options.Get("sendMessageTimeout"); + if (send_message_timeout.IsNumber()) { + producer_.set_send_msg_timeout(send_message_timeout.ToNumber()); + } + + // set log level + Napi::Value log_level = options.Get("logLevel"); + if (log_level.IsNumber()) { + int32_t level = log_level.ToNumber(); + if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) { + rocketmq::GetDefaultLoggerConfig().set_level( + static_cast<rocketmq::LogLevel>(level)); } + } + + // set log directory + Napi::Value log_dir = options.Get("logDir"); + if (log_dir.IsString()) { + rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString()); + } + + // set log file size + Napi::Value log_file_size = options.Get("logFileSize"); + if (log_file_size.IsNumber()) { + rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_size.ToNumber()); + } + + // set log file num + Napi::Value log_file_num = options.Get("logFileNum"); + if (log_file_num.IsNumber()) { + rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber()); + } } -NAN_MODULE_INIT(RocketMQProducer::Init) -{ - Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); - tpl->SetClassName(Nan::New("RocketMQProducer").ToLocalChecked()); - tpl->InstanceTemplate()->SetInternalFieldCount(1); +Napi::Value RocketMQProducer::SetSessionCredentials( + const Napi::CallbackInfo& info) { + Napi::String access_key = info[0].As<Napi::String>(); + Napi::String secret_key = info[1].As<Napi::String>(); + Napi::String ons_channel = info[2].As<Napi::String>(); - Nan::SetPrototypeMethod(tpl, "start", Start); - Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown); - Nan::SetPrototypeMethod(tpl, "send", Send); - Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials); + auto rpc_hook = std::make_shared<rocketmq::ClientRPCHook>( + rocketmq::SessionCredentials(access_key, secret_key, ons_channel)); + producer_.setRPCHook(rpc_hook); - constructor.Reset(tpl->GetFunction()); - Nan::Set(target, Nan::New("Producer").ToLocalChecked(), tpl->GetFunction()); + return info.Env().Undefined(); } -NAN_METHOD(RocketMQProducer::New) -{ - Isolate* isolate = info.GetIsolate(); - Local<Context> context = Context::New(isolate); - - if(!info.IsConstructCall()) - { - const int argc = 3; - Local<Value> argv[argc] = { info[0], info[1], info[2] }; - Local<Function> _constructor = Nan::New<v8::Function>(constructor); - info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked()); - return; - } +class ProducerStartWorker : public Napi::AsyncWorker { + public: + ProducerStartWorker(const Napi::Function& callback, + const rocketmq::DefaultMQProducer& producer) + : Napi::AsyncWorker(callback), producer_(producer) {} - Nan::Utf8String group_id(info[0]); - Nan::Utf8String instance_name(info[1]); - Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked(); - RocketMQProducer* producer = new RocketMQProducer(*group_id, info[1]->IsNull() ? NULL : *instance_name); + void Execute() override { producer_.start(); } - producer->Wrap(info.This()); + private: + rocketmq::DefaultMQProducer producer_; +}; - // try to set options - try - { - producer->SetOptions(options); - } - catch (runtime_error e) - { - Nan::ThrowError(e.what()); - return; - } - - info.GetReturnValue().Set(info.This()); +Napi::Value RocketMQProducer::Start(const Napi::CallbackInfo& info) { + Napi::Function callback = info[0].As<Napi::Function>(); + auto* worker = new ProducerStartWorker(callback, producer_); + worker->Queue(); + return info.Env().Undefined(); } -NAN_METHOD(RocketMQProducer::Start) -{ - NAN_GET_CPRODUCER(); - - Nan::Callback* callback = (info[0]->IsFunction()) ? - new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) : - NULL; - - Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::START_PRODUCER)); -} +class ProducerShutdownWorker : public Napi::AsyncWorker { + public: + ProducerShutdownWorker(const Napi::Function& callback, + const rocketmq::DefaultMQProducer& producer) + : Napi::AsyncWorker(callback), producer_(producer) {} -NAN_METHOD(RocketMQProducer::Shutdown) -{ - NAN_GET_CPRODUCER(); + void Execute() override { producer_.shutdown(); } - Nan::Callback* callback = (info[0]->IsFunction()) ? - new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) : - NULL; + private: + rocketmq::DefaultMQProducer producer_; +}; - Nan::AsyncQueueWorker(new ProducerStartOrShutdownWorker(callback, producer_ptr, ProducerWorkerType::SHUTDOWN_PRODUCER)); +Napi::Value RocketMQProducer::Shutdown(const Napi::CallbackInfo& info) { + Napi::Function callback = info[0].As<Napi::Function>(); + auto* worker = new ProducerShutdownWorker(callback, producer_); + worker->Queue(); + return info.Env().Undefined(); } -NAN_METHOD(RocketMQProducer::SetSessionCredentials) -{ - NAN_GET_CPRODUCER(); - - Nan::Utf8String access_key(info[0]); - Nan::Utf8String secret_key(info[1]); - Nan::Utf8String ons_channel(info[2]); - - int ret; - try - { - ret = SetProducerSessionCredentials(producer_ptr, *access_key, *secret_key, *ons_channel); +class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback { + private: + struct ResultOrException { + std::unique_ptr<rocketmq::SendResult> result; + std::exception_ptr exception; + }; + + public: + ProducerSendCallback(Napi::Env&& env, Napi::Function&& callback) + : callback_( + Callback::New(env, callback, "RocketMQ Send Callback", 0, 1)) {} + + ~ProducerSendCallback() { callback_.Release(); } + + void onSuccess(rocketmq::SendResult& send_result) override { + auto* data = + new ResultOrException{std::unique_ptr<rocketmq::SendResult>( + new rocketmq::SendResult(send_result)), + nullptr}; + napi_status status = callback_.BlockingCall(data); + if (status != napi_ok) { + // TODO: Handle error + std::exit(-1); } - catch(runtime_error e) - { - Nan::ThrowError(e.what()); + } + + void onException(rocketmq::MQException& exception) noexcept override { + auto* data = + new ResultOrException{nullptr, std::make_exception_ptr(exception)}; + napi_status status = callback_.BlockingCall(data); + if (status != napi_ok) { + // TODO: Handle error + std::exit(-1); } - catch(std::exception& e) - { - Nan::ThrowError(e.what()); + } + + static void CallJs(Napi::Env env, + Napi::Function callback, + std::nullptr_t*, + ResultOrException* data) { + std::unique_ptr<ResultOrException> data_guard(data); + if (env != nullptr) { + if (callback != nullptr) { + if (data->exception) { + try { + std::rethrow_exception(data->exception); + } catch (const std::exception& e) { + callback.Call(Napi::Object::New(callback.Env()), + {Napi::Error::New(env, e.what()).Value()}); + } + } else { + callback.Call(Napi::Object::New(callback.Env()), + {env.Undefined(), + Napi::Number::New(env, data->result->send_status()), + Napi::String::New(env, data->result->msg_id()), + Napi::Number::New(env, data->result->queue_offset())}); + } + } } - catch(rocketmq::MQException& e) - { - Nan::ThrowError(e.what()); + } + + private: + using Callback = Napi::TypedThreadSafeFunction<std::nullptr_t, + ResultOrException, + &ProducerSendCallback::CallJs>; + + Callback callback_; +}; + +Napi::Value RocketMQProducer::Send(const Napi::CallbackInfo& info) { + rocketmq::MQMessage message = [&]() { + Napi::String topic = info[0].As<Napi::String>(); + Napi::Value body = info[1]; + if (body.IsString()) { + return rocketmq::MQMessage(topic, body.ToString()); + } else { + Napi::Buffer<char> buffer = body.As<Napi::Buffer<char>>(); + return rocketmq::MQMessage(topic, + std::string(buffer.Data(), buffer.Length())); } - info.GetReturnValue().Set(ret); -} - -NAN_METHOD(RocketMQProducer::Send) -{ - Nan::Utf8String topic(info[0]); - Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked(); - - CMessage* msg = CreateMessage(*topic); + }(); - Local<Value> _tags_to_be_checked = Nan::Get(options, Nan::New<String>("tags").ToLocalChecked()).ToLocalChecked(); - Local<Value> _keys_to_be_checked = Nan::Get(options, Nan::New<String>("keys").ToLocalChecked()).ToLocalChecked(); + const Napi::Value options_v = info[2]; + if (options_v.IsObject()) { + const Napi::Object options = options_v.ToObject(); - if(_tags_to_be_checked->IsString()) - { - Nan::Utf8String tags(_tags_to_be_checked); - SetMessageTags(msg, *tags); + Napi::Value tags = options.Get("tags"); + if (tags.IsString()) { + message.set_tags(tags.ToString()); } - if(_keys_to_be_checked->IsString()) - { - Nan::Utf8String keys(_keys_to_be_checked); - SetMessageKeys(msg, *keys); + Napi::Value keys = options.Get("keys"); + if (keys.IsString()) { + message.set_keys(keys.ToString()); } + } - // set message body: - // 1. if it's a string, call `SetMessageBody`; - // 2. if it's a buffer, call `SetByteMessageBody`. - if(info[1]->IsString()) - { - Nan::Utf8String body(info[1]); - SetMessageBody(msg, *body); - } - else - { - Local<Object> node_buff_object = Nan::To<Object>(info[1]).ToLocalChecked(); - unsigned int length = node::Buffer::Length(node_buff_object); - const char* buff = node::Buffer::Data(node_buff_object); - SetByteMessageBody(msg, buff, length); - } + auto* send_callback = + new ProducerSendCallback(info.Env(), info[3].As<Napi::Function>()); + producer_.send(message, send_callback); - Nan::Callback* callback = (info[3]->IsFunction()) ? - new Nan::Callback(Nan::To<Function>(info[3]).ToLocalChecked()) : - NULL; - - RocketMQProducer* producer = ObjectWrap::Unwrap<RocketMQProducer>(info.Holder()); - Nan::AsyncQueueWorker(new ProducerSendMessageWorker(callback, producer, msg)); + return info.Env().Undefined(); } -} +} // namespace __node_rocketmq__ diff --git a/src/producer.h b/src/producer.h index d20cd29..780efa4 100644 --- a/src/producer.h +++ b/src/producer.h @@ -17,46 +17,34 @@ #ifndef __ROCKETMQ_PRODUCER_H__ #define __ROCKETMQ_PRODUCER_H__ -#include <CProducer.h> -#include <nan.h> +#include <napi.h> -namespace __node_rocketmq__ { +#include <DefaultMQProducer.h> -using v8::Context; -using v8::Function; -using v8::FunctionTemplate; -using v8::Isolate; -using v8::Local; -using v8::Object; -using v8::String; -using v8::Value; +namespace __node_rocketmq__ { -class RocketMQProducer : public Nan::ObjectWrap { -public: - static NAN_MODULE_INIT(Init); +class RocketMQProducer : public Napi::ObjectWrap<RocketMQProducer> { + public: + static Napi::Object Init(Napi::Env env, Napi::Object exports); -public: - CProducer* GetProducer() { return producer_ptr; } + RocketMQProducer(const Napi::CallbackInfo& info); + ~RocketMQProducer(); -private: - explicit RocketMQProducer(const char* group_id, const char* instance_name); - ~RocketMQProducer(); + private: + Napi::Value SetSessionCredentials(const Napi::CallbackInfo& info); - static NAN_METHOD(New); - static NAN_METHOD(Start); - static NAN_METHOD(Shutdown); - static NAN_METHOD(Send); - static NAN_METHOD(SetSessionCredentials); + Napi::Value Start(const Napi::CallbackInfo& info); + Napi::Value Shutdown(const Napi::CallbackInfo& info); - static Nan::Persistent<Function> constructor; + Napi::Value Send(const Napi::CallbackInfo& info); -private: - void SetOptions(Local<Object> options); + private: + void SetOptions(const Napi::Object& options); -private: - CProducer* producer_ptr; + private: + rocketmq::DefaultMQProducer producer_; }; -} +} // namespace __node_rocketmq__ #endif diff --git a/src/push_consumer.cpp b/src/push_consumer.cpp index 074a64d..63a786f 100644 --- a/src/push_consumer.cpp +++ b/src/push_consumer.cpp @@ -14,381 +14,259 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include <map> #include "push_consumer.h" -#include "consumer_ack.h" -#include "workers/push_consumer/start_or_shutdown.h" - -using namespace std; - -namespace __node_rocketmq__ { -struct MessageHandlerParam -{ - RocketMQPushConsumer* consumer; - ConsumerAckInner* ack; - CMessageExt* msg; -}; -char message_handler_param_keys[5][8] = { "topic", "tags", "keys", "body", "msgId" }; - -uv_mutex_t _get_msg_ext_column_lock; - -map<CPushConsumer*, RocketMQPushConsumer*> _push_consumer_map; +#include <exception> +#include <future> -#define NAN_GET_CPUSHCONSUMER() \ - RocketMQPushConsumer* _v8_consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder()); \ - CPushConsumer* consumer_ptr = _v8_consumer->GetConsumer(); +#include <napi.h> -Nan::Persistent<Function> RocketMQPushConsumer::constructor; +#include <ClientRPCHook.h> +#include <LoggerConfig.h> +#include <MQMessageListener.h> -RocketMQPushConsumer::RocketMQPushConsumer(const char* group_id, const char* instance_name) : - consumer_ptr(NULL) -{ - consumer_ptr = CreatePushConsumer(group_id); +#include "consumer_ack.h" - if(instance_name) - { - SetPushConsumerInstanceName(consumer_ptr, instance_name); - } +using namespace std; - _push_consumer_map[consumer_ptr] = this; +namespace __node_rocketmq__ { - RegisterMessageCallback(consumer_ptr, RocketMQPushConsumer::OnMessage); +Napi::Object RocketMQPushConsumer::Init(Napi::Env env, Napi::Object exports) { + Napi::Function func = DefineClass( + env, + "RocketMQPushConsumer", + { + InstanceMethod<&RocketMQPushConsumer::Start>("start"), + InstanceMethod<&RocketMQPushConsumer::Shutdown>("shutdown"), + InstanceMethod<&RocketMQPushConsumer::Subscribe>("subscribe"), + InstanceMethod<&RocketMQPushConsumer::SetListener>("setListener"), + InstanceMethod<&RocketMQPushConsumer::SetSessionCredentials>( + "setSessionCredentials"), + }); + + Napi::FunctionReference* constructor = new Napi::FunctionReference(); + *constructor = Napi::Persistent(func); + env.SetInstanceData<Napi::FunctionReference>(constructor); + + exports.Set("PushConsumer", func); + return exports; } -RocketMQPushConsumer::~RocketMQPushConsumer() -{ - try - { - ShutdownPushConsumer(consumer_ptr); - auto it = _push_consumer_map.find(consumer_ptr); - if(it != _push_consumer_map.end()) - { - _push_consumer_map.erase(consumer_ptr); - } - } - catch(...) - { - // - } +RocketMQPushConsumer::RocketMQPushConsumer(const Napi::CallbackInfo& info) + : Napi::ObjectWrap<RocketMQPushConsumer>(info), consumer_("") { + const Napi::Value group_name = info[0]; + if (group_name.IsString()) { + consumer_.set_group_name(group_name.ToString()); + } - DestroyPushConsumer(consumer_ptr); - consumer_ptr = NULL; -} - -void RocketMQPushConsumer::SetOptions(Local<Object> options) -{ - // set name server - Local<Value> _name_server_v = Nan::Get(options, Nan::New<String>("nameServer").ToLocalChecked()).ToLocalChecked(); - if(_name_server_v->IsString()) - { - Nan::Utf8String namesrv(_name_server_v); - SetPushConsumerNameServerAddress(consumer_ptr, *namesrv); - } + const Napi::Value instance_name = info[1]; + if (instance_name.IsString()) { + consumer_.set_instance_name(instance_name.ToString()); + } - // set thread count - Local<Value> _thread_count_v = Nan::Get(options, Nan::New<String>("threadCount").ToLocalChecked()).ToLocalChecked(); - if(_thread_count_v->IsNumber()) - { - int thread_count = Nan::To<int32_t>(_thread_count_v).FromJust(); - if(thread_count > 0) - { - SetPushConsumerThreadCount(consumer_ptr, thread_count); - } - } - - // set message batch max size - Local<Value> _max_batch_size_v = Nan::Get(options, Nan::New<String>("maxBatchSize").ToLocalChecked()).ToLocalChecked(); - if(_max_batch_size_v->IsNumber()) - { - int max_batch_size = Nan::To<int32_t>(_max_batch_size_v).FromJust(); - if(max_batch_size > 0) - { - SetPushConsumerMessageBatchMaxSize(consumer_ptr, max_batch_size); - } - } - - // set log num & single log size - int file_num = 3; - int64 file_size = 104857600; - Local<Value> _log_file_num_v = Nan::Get(options, Nan::New<String>("logFileNum").ToLocalChecked()).ToLocalChecked(); - Local<Value> _log_file_size_v = Nan::Get(options, Nan::New<String>("logFileSize").ToLocalChecked()).ToLocalChecked(); - if(_log_file_num_v->IsNumber()) - { - file_num = _log_file_num_v->Int32Value(); - } - if(_log_file_size_v->IsNumber()) - { - file_size = _log_file_size_v->Int32Value(); - } - SetPushConsumerLogFileNumAndSize(consumer_ptr, file_num, file_size); - - // set log level - Local<Value> _log_level_v = Nan::Get(options, Nan::New<String>("logLevel").ToLocalChecked()).ToLocalChecked(); - if(_log_level_v->IsNumber()) - { - int level = _log_level_v->Int32Value(); - SetPushConsumerLogLevel(consumer_ptr, (CLogLevel) level); - } + const Napi::Value options = info[2]; + if (options.IsObject()) { + // try to set options + SetOptions(options.ToObject()); + } } -NAN_MODULE_INIT(RocketMQPushConsumer::Init) -{ - uv_mutex_init(&_get_msg_ext_column_lock); - Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); - tpl->SetClassName(Nan::New("RocketMQPushConsumer").ToLocalChecked()); - tpl->InstanceTemplate()->SetInternalFieldCount(1); - - Nan::SetPrototypeMethod(tpl, "start", Start); - Nan::SetPrototypeMethod(tpl, "shutdown", Shutdown); - Nan::SetPrototypeMethod(tpl, "subscribe", Subscribe); - Nan::SetPrototypeMethod(tpl, "setListener", SetListener); - Nan::SetPrototypeMethod(tpl, "setSessionCredentials", SetSessionCredentials); - - constructor.Reset(tpl->GetFunction()); - Nan::Set(target, Nan::New("PushConsumer").ToLocalChecked(), tpl->GetFunction()); +RocketMQPushConsumer::~RocketMQPushConsumer() { + consumer_.shutdown(); } -NAN_METHOD(RocketMQPushConsumer::New) -{ - Isolate* isolate = info.GetIsolate(); - Local<Context> context = Context::New(isolate); - - if(!info.IsConstructCall()) - { - const int argc = 3; - Local<Value> argv[argc] = { info[0], info[1], info[2] }; - Local<Function> _constructor = Nan::New<v8::Function>(constructor); - info.GetReturnValue().Set(_constructor->NewInstance(context, argc, argv).ToLocalChecked()); - return; - } - - Nan::Utf8String v8_group_id(info[0]); - Nan::Utf8String v8_instance_name(info[1]); - string group_id = *v8_group_id; - string instance_name = *v8_instance_name; - Local<Object> options = Nan::To<Object>(info[2]).ToLocalChecked(); - RocketMQPushConsumer* consumer = new RocketMQPushConsumer(group_id.c_str(), info[1]->IsNull() ? NULL : instance_name.c_str()); - - consumer->Wrap(info.This()); - - // try to set options - try - { - consumer->SetOptions(options); - } - catch(const runtime_error e) - { - Nan::ThrowError(e.what()); - return; - } - catch(const std::exception& e) - { - Nan::ThrowError(e.what()); - return; +void RocketMQPushConsumer::SetOptions(const Napi::Object& options) { + // set name server + Napi::Value name_server = options.Get("nameServer"); + if (name_server.IsString()) { + consumer_.set_namesrv_addr(name_server.ToString()); + } + + // set group name + Napi::Value group_name = options.Get("groupName"); + if (group_name.IsString()) { + consumer_.set_group_name(group_name.ToString()); + } + + // set thread count + Napi::Value thread_count = options.Get("threadCount"); + if (thread_count.IsNumber()) { + consumer_.set_consume_thread_nums(thread_count.ToNumber()); + } + + // set message batch max size + Napi::Value max_batch_size = options.Get("maxBatchSize"); + if (max_batch_size.IsNumber()) { + consumer_.set_consume_message_batch_max_size(max_batch_size.ToNumber()); + } + + // set log level + Napi::Value log_level = options.Get("logLevel"); + if (log_level.IsNumber()) { + int32_t level = log_level.ToNumber(); + if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) { + rocketmq::GetDefaultLoggerConfig().set_level( + static_cast<rocketmq::LogLevel>(level)); } - - info.GetReturnValue().Set(info.This()); -} - -NAN_METHOD(RocketMQPushConsumer::Start) -{ - NAN_GET_CPUSHCONSUMER(); - - Nan::Callback* callback = (info[0]->IsFunction()) ? - new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) : - NULL; - - Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::START_PUSH_CONSUMER)); + } + + // set log directory + Napi::Value log_dir = options.Get("logDir"); + if (log_dir.IsString()) { + rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString()); + } + + // set log file size + Napi::Value log_file_size = options.Get("logFileSize"); + if (log_file_size.IsNumber()) { + rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_size.ToNumber()); + } + + // set log file num + Napi::Value log_file_num = options.Get("logFileNum"); + if (log_file_num.IsNumber()) { + rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber()); + } } -NAN_METHOD(RocketMQPushConsumer::Shutdown) -{ - NAN_GET_CPUSHCONSUMER(); +Napi::Value RocketMQPushConsumer::SetSessionCredentials( + const Napi::CallbackInfo& info) { + Napi::String access_key = info[0].As<Napi::String>(); + Napi::String secret_key = info[1].As<Napi::String>(); + Napi::String ons_channel = info[2].As<Napi::String>(); - Nan::Callback* callback = (info[0]->IsFunction()) ? - new Nan::Callback(Nan::To<Function>(info[0]).ToLocalChecked()) : - NULL; + auto rpc_hook = std::make_shared<rocketmq::ClientRPCHook>( + rocketmq::SessionCredentials(access_key, secret_key, ons_channel)); + consumer_.setRPCHook(rpc_hook); - Nan::AsyncQueueWorker(new PushConsumerStartOrShutdownWorker(callback, consumer_ptr, PushConsumerWorkerType::SHUTDOWN_PUSH_CONSUMER)); + return info.Env().Undefined(); } -NAN_METHOD(RocketMQPushConsumer::Subscribe) -{ - NAN_GET_CPUSHCONSUMER(); +class ConsumerStartWorker : public Napi::AsyncWorker { + public: + ConsumerStartWorker(const Napi::Function& callback, + const rocketmq::DefaultMQPushConsumer& consumer) + : Napi::AsyncWorker(callback), consumer_(consumer) {} - Nan::Utf8String v8_topic(info[0]); - Nan::Utf8String v8_expression(info[1]); - string topic = *v8_topic; - string expression = *v8_expression; + void Execute() override { consumer_.start(); } - int ret; - try - { - ret = ::Subscribe(consumer_ptr, topic.c_str(), expression.c_str()); - } - catch(const runtime_error e) - { - Nan::ThrowError(e.what()); - return; - } - catch(const std::exception& e) - { - Nan::ThrowError(e.what()); - return; - } - - info.GetReturnValue().Set(ret); -} - -NAN_METHOD(RocketMQPushConsumer::SetListener) -{ - RocketMQPushConsumer* consumer = ObjectWrap::Unwrap<RocketMQPushConsumer>(info.Holder()); - if(!consumer->listener_func.IsEmpty()) - { - consumer->listener_func.Reset(); - } + private: + rocketmq::DefaultMQPushConsumer consumer_; +}; - consumer->listener_func.Reset(Nan::To<Function>(info[0]).ToLocalChecked()); +Napi::Value RocketMQPushConsumer::Start(const Napi::CallbackInfo& info) { + Napi::Function callback = info[0].As<Napi::Function>(); + auto* worker = new ConsumerStartWorker(callback, consumer_); + worker->Queue(); + return info.Env().Undefined(); } -NAN_METHOD(RocketMQPushConsumer::SetSessionCredentials) -{ - NAN_GET_CPUSHCONSUMER(); +class ConsumerShutdownWorker : public Napi::AsyncWorker { + public: + ConsumerShutdownWorker(const Napi::Function& callback, + const rocketmq::DefaultMQPushConsumer& consumer) + : Napi::AsyncWorker(callback), consumer_(consumer) {} - Nan::Utf8String access_key(info[0]); - Nan::Utf8String secret_key(info[1]); - Nan::Utf8String ons_channel(info[2]); + void Execute() override { consumer_.shutdown(); } - int ret; - try - { - ret = SetPushConsumerSessionCredentials(consumer_ptr, *access_key, *secret_key, *ons_channel); - } - catch(const runtime_error e) - { - Nan::ThrowError(e.what()); - return; - } - catch(const std::exception& e) - { - Nan::ThrowError(e.what()); - return; - } + private: + rocketmq::DefaultMQPushConsumer consumer_; +}; - info.GetReturnValue().Set(ret); +Napi::Value RocketMQPushConsumer::Shutdown(const Napi::CallbackInfo& info) { + Napi::Function callback = info[0].As<Napi::Function>(); + auto* worker = new ConsumerShutdownWorker(callback, consumer_); + worker->Queue(); + return info.Env().Undefined(); } -string RocketMQPushConsumer::GetMessageColumn(char* name, CMessageExt* msg) -{ - const char* orig = NULL; - - uv_mutex_lock(&_get_msg_ext_column_lock); - switch(name[0]) - { - // topic / tags - case 't': - orig = name[1] == 'o' ? GetMessageTopic(msg) : GetMessageTags(msg); - break; - - // keys - case 'k': - orig = GetMessageKeys(msg); - break; - - // body - case 'b': - orig = GetMessageBody(msg); - break; - - // msgId - case 'm': - orig = GetMessageId(msg); - break; - - default: - orig = NULL; - break; - } +Napi::Value RocketMQPushConsumer::Subscribe(const Napi::CallbackInfo& info) { + Napi::String topic = info[0].As<Napi::String>(); + Napi::String expression = info[1].As<Napi::String>(); - uv_mutex_unlock(&_get_msg_ext_column_lock); + consumer_.subscribe(topic, expression); - if(!orig) return ""; - return orig; + return info.Env().Undefined(); } -void close_async_done(uv_handle_t* handle) -{ - free(handle); -} - -void RocketMQPushConsumer::HandleMessageInEventLoop(uv_async_t* async) -{ - Nan::HandleScope scope; - - Isolate* isolate = Isolate::GetCurrent(); - Local<Context> context = isolate->GetCurrentContext(); - - MessageHandlerParam* param = (MessageHandlerParam*)(async->data); - RocketMQPushConsumer* consumer = param->consumer; - ConsumerAckInner* ack_inner = param->ack; - CMessageExt* msg = param->msg; - - // create the JavaScript ack object and then set inner ack object - Local<Function> cons = Nan::New<Function>(ConsumerAck::GetConstructor()); - Local<Object> ack_obj = cons->NewInstance(context, 0, 0).ToLocalChecked(); - ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(ack_obj); - ack->SetInner(ack_inner); - - // TODO: const char *GetMessageProperty(CMessageExt *msgExt, const char *key); - Local<Object> result = Nan::New<Object>(); - for(int i = 0; i < 5; i++) - { - Nan::Set( - result, - Nan::New(message_handler_param_keys[i]).ToLocalChecked(), - Nan::New(RocketMQPushConsumer::GetMessageColumn(message_handler_param_keys[i], msg)).ToLocalChecked()); +class ConsumerMessageListener : public rocketmq::MessageListenerConcurrently { + private: + struct MessageAndPromise { + rocketmq::MQMessageExt message; + std::promise<bool> promise; + }; + + public: + ConsumerMessageListener(Napi::Env& env, Napi::Function&& callback) + : listener_( + Listener::New(env, callback, "RocketMQ Message Listener", 0, 1)) {} + + ~ConsumerMessageListener() { listener_.Release(); } + + rocketmq::ConsumeStatus consumeMessage( + std::vector<rocketmq::MQMessageExt>& msgs) override { + for (auto& msg : msgs) { + MessageAndPromise data{msg, std::promise<bool>()}; + auto future = data.promise.get_future(); + listener_.BlockingCall(&data); + try { + if (!future.get()) { + return rocketmq::ConsumeStatus::RECONSUME_LATER; + } + } catch (const std::exception& e) { + return rocketmq::ConsumeStatus::RECONSUME_LATER; + } } - - Local<Value> argv[2] = { - result, - ack_obj - }; - Nan::Callback* callback = consumer->GetListenerFunction(); - callback->Call(2, argv); - - uv_close((uv_handle_t*)async, close_async_done); -} - -int RocketMQPushConsumer::OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext) -{ - RocketMQPushConsumer* consumer = _push_consumer_map[consumer_ptr]; - if (!consumer) - { - // TODO: error handle - return CConsumeStatus::E_RECONSUME_LATER; + return rocketmq::ConsumeStatus::CONSUME_SUCCESS; + }; + + static void CallJs(Napi::Env env, + Napi::Function listener, + std::nullptr_t*, + MessageAndPromise* data) { + if (env != nullptr) { + if (listener != nullptr) { + Napi::Object message = Napi::Object::New(env); + message.Set("topic", data->message.topic()); + message.Set("tags", data->message.tags()); + message.Set("keys", data->message.keys()); + message.Set("body", data->message.body()); + message.Set("msgId", data->message.msg_id()); + + Napi::Object ack = ConsumerAck::NewInstance(env); + ConsumerAck* consumer_ack = Napi::ObjectWrap<ConsumerAck>::Unwrap(ack); + consumer_ack->SetPromise(std::move(data->promise)); + + try { + listener.Call(Napi::Object::New(listener.Env()), {message, ack}); + } catch (const Napi::Error& e) { + try { + consumer_ack->Done(std::current_exception()); + } catch (const std::future_error&) { + // ignore + } + } + return; + } } + data->promise.set_value(false); + } - ConsumerAckInner ack_inner; - - // create async parameter - MessageHandlerParam param; - param.consumer = consumer; - param.ack = &ack_inner; - param.msg = msg_ext; + private: + using Listener = + Napi::TypedThreadSafeFunction<std::nullptr_t, + MessageAndPromise, + &ConsumerMessageListener::CallJs>; - // create a new async handler and bind with `RocketMQPushConsumer::HandleMessageInEventLoop` - uv_async_t* async = (uv_async_t*)malloc(sizeof(uv_async_t)); - uv_async_init(uv_default_loop(), async, RocketMQPushConsumer::HandleMessageInEventLoop); - async->data = (void*)(¶m); - - // send async handler - uv_async_send(async); - - // wait for result - CConsumeStatus status = ack_inner.WaitResult(); + Listener listener_; +}; - return status; +Napi::Value RocketMQPushConsumer::SetListener(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + listener_.reset( + new ConsumerMessageListener(env, info[0].As<Napi::Function>())); + consumer_.registerMessageListener(listener_.get()); + return env.Undefined(); } -} +} // namespace __node_rocketmq__ diff --git a/src/push_consumer.h b/src/push_consumer.h index 6250d4e..47b3d14 100644 --- a/src/push_consumer.h +++ b/src/push_consumer.h @@ -17,62 +17,40 @@ #ifndef __ROCKETMQ_PUSH_CONSUMER_H__ #define __ROCKETMQ_PUSH_CONSUMER_H__ -#include <CPushConsumer.h> -#include <uv.h> -#include <nan.h> #include <string> -namespace __node_rocketmq__ { +#include <napi.h> + +#include <DefaultMQPushConsumer.h> -using v8::Context; -using v8::Function; -using v8::FunctionTemplate; -using v8::Isolate; -using v8::Local; -using v8::Object; -using v8::String; -using v8::Value; +namespace __node_rocketmq__ { -class RocketMQPushConsumer : public Nan::ObjectWrap { -public: - static NAN_MODULE_INIT(Init); - static int OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext); - static std::string GetMessageColumn(char* name, CMessageExt* msg); +class ConsumerMessageListener; -private: - explicit RocketMQPushConsumer(const char* group_id, const char* instance_name); - ~RocketMQPushConsumer(); +class RocketMQPushConsumer : public Napi::ObjectWrap<RocketMQPushConsumer> { + public: + static Napi::Object Init(Napi::Env env, Napi::Object exports); - static NAN_METHOD(New); - static NAN_METHOD(Start); - static NAN_METHOD(Shutdown); - static NAN_METHOD(Subscribe); - static NAN_METHOD(SetListener); - static NAN_METHOD(SetSessionCredentials); + RocketMQPushConsumer(const Napi::CallbackInfo& info); + ~RocketMQPushConsumer(); - static Nan::Persistent<v8::Function> constructor; + private: + Napi::Value SetSessionCredentials(const Napi::CallbackInfo& info); - void SetOptions(Local<Object>); - static void HandleMessageInEventLoop(uv_async_t* async); + Napi::Value Start(const Napi::CallbackInfo& info); + Napi::Value Shutdown(const Napi::CallbackInfo& info); -protected: - CPushConsumer* GetConsumer() - { - return consumer_ptr; - } + Napi::Value Subscribe(const Napi::CallbackInfo& info); + Napi::Value SetListener(const Napi::CallbackInfo& info); - Nan::Callback* GetListenerFunction() - { - Nan::Callback* cb; - cb = &listener_func; - return cb; - } + private: + void SetOptions(const Napi::Object& options); -private: - CPushConsumer* consumer_ptr; - Nan::Callback listener_func; + private: + rocketmq::DefaultMQPushConsumer consumer_; + std::unique_ptr<ConsumerMessageListener> listener_; }; -} +} // namespace __node_rocketmq__ #endif diff --git a/src/rocketmq.cpp b/src/rocketmq.cpp index 3b426fb..63fd41f 100644 --- a/src/rocketmq.cpp +++ b/src/rocketmq.cpp @@ -14,35 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include <nan.h> +#include <napi.h> +#include "consumer_ack.h" #include "producer.h" #include "push_consumer.h" -#include "consumer_ack.h" namespace __node_rocketmq__ { -#if defined(__APPLE__) -uv_lib_t lib; - -NAN_METHOD(DLOpen) -{ - Nan::Utf8String filename(info[0]); - uv_dlopen(*filename, &lib); +Napi::Object Init(Napi::Env env, Napi::Object exports) { + RocketMQProducer::Init(env, exports); + RocketMQPushConsumer::Init(env, exports); + ConsumerAck::Init(env, exports); + return exports; } -#endif -NAN_MODULE_INIT(Init) -{ - RocketMQProducer::Init(target); - RocketMQPushConsumer::Init(target); - ConsumerAck::Init(target); +NODE_API_MODULE(rocketmq, Init) -#if defined(__APPLE__) - Nan::Set(target, Nan::New("macosDLOpen").ToLocalChecked(), Nan::New<v8::FunctionTemplate>(DLOpen)->GetFunction()); -#endif -} - -NODE_MODULE(rocketmq, Init) - -} +} // namespace __node_rocketmq__ diff --git a/src/workers/producer/send_message.h b/src/workers/producer/send_message.h deleted file mode 100644 index 6974e5a..0000000 --- a/src/workers/producer/send_message.h +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __ROCKETMQ_SEND_MESSAGE_H__ -#define __ROCKETMQ_SEND_MESSAGE_H__ - -#include <nan.h> -#include <CProducer.h> -#include <MQClientException.h> - -namespace __node_rocketmq__ { - -using namespace std; - -class ProducerSendMessageWorker : public Nan::AsyncWorker { -public: - ProducerSendMessageWorker(Nan::Callback* callback, RocketMQProducer* producer, CMessage* msg) : - AsyncWorker(callback), - msg(msg), - producer(producer) - { - } - - ~ProducerSendMessageWorker() - { - DestroyMessage(msg); - } - - void Execute() - { - try - { - SendMessageSync(producer->GetProducer(), msg, &send_ret); - } - catch(const runtime_error e) - { - SetErrorMessage(e.what()); - } - catch(const std::exception& e) - { - SetErrorMessage(e.what()); - } - } - - void HandleOKCallback() - { - Nan::HandleScope scope; - - Local<Value> argv[] = { - Nan::Undefined(), - Nan::New<v8::Number>((unsigned int)send_ret.sendStatus), - Nan::New<v8::String>(send_ret.msgId).ToLocalChecked(), - Nan::New<v8::Number>((long long)send_ret.offset) - }; - callback->Call(4, argv); - } - -private: - CMessage* msg; - RocketMQProducer* producer; - - CSendResult send_ret; -}; - -} - -#endif diff --git a/src/workers/producer/start_or_shutdown.h b/src/workers/producer/start_or_shutdown.h deleted file mode 100644 index dde99a6..0000000 --- a/src/workers/producer/start_or_shutdown.h +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__ -#define __ROCKETMQ_PRODUCER_START_OR_SHUTDOWN_H__ - -#include <nan.h> -#include <CProducer.h> -#include <MQClientException.h> - -namespace __node_rocketmq__ { - -using namespace std; - -enum ProducerWorkerType { - START_PRODUCER = 0, - SHUTDOWN_PRODUCER -}; - -class ProducerStartOrShutdownWorker : public Nan::AsyncWorker { -public: - ProducerStartOrShutdownWorker(Nan::Callback* callback, CProducer* producer_ptr, ProducerWorkerType type) : - AsyncWorker(callback), - producer_ptr(producer_ptr), - ret(0), - type(type) - { - } - - ~ProducerStartOrShutdownWorker() - { - } - - void Execute() - { - try - { - switch(type) { - case START_PRODUCER: - ret = StartProducer(producer_ptr); break; - case SHUTDOWN_PRODUCER: - ret = ShutdownProducer(producer_ptr); break; - default: break; - } - } - catch(const runtime_error e) - { - SetErrorMessage(e.what()); - } - catch(const exception& e) - { - SetErrorMessage(e.what()); - } - } - - void HandleOKCallback() - { - Nan::HandleScope scope; - - Local<Value> argv[] = { - Nan::Undefined(), - Nan::New<v8::Number>((int)ret), - }; - callback->Call(2, argv); - } - -private: - CProducer* producer_ptr; - int ret; - ProducerWorkerType type; -}; - -} - -#endif diff --git a/src/workers/push_consumer/start_or_shutdown.h b/src/workers/push_consumer/start_or_shutdown.h deleted file mode 100644 index 1169059..0000000 --- a/src/workers/push_consumer/start_or_shutdown.h +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__ -#define __ROCKETMQ_PUSH_CONSUMER_START_OR_SHUTDOWN_H__ - -#include <nan.h> -#include <CPushConsumer.h> -#include <MQClientException.h> - -namespace __node_rocketmq__ { - -using namespace std; - -enum PushConsumerWorkerType { - START_PUSH_CONSUMER = 0, - SHUTDOWN_PUSH_CONSUMER -}; - -class PushConsumerStartOrShutdownWorker : public Nan::AsyncWorker { -public: - PushConsumerStartOrShutdownWorker(Nan::Callback* callback, CPushConsumer* consumer_ptr, PushConsumerWorkerType type) : - AsyncWorker(callback), - consumer_ptr(consumer_ptr), - ret(0), - type(type) - { - } - - ~PushConsumerStartOrShutdownWorker() - { - } - - void Execute() - { - try - { - switch(type) { - case START_PUSH_CONSUMER: - ret = StartPushConsumer(consumer_ptr); break; - case SHUTDOWN_PUSH_CONSUMER: - ret = ShutdownPushConsumer(consumer_ptr); break; - default: break; - } - } - catch(const runtime_error e) - { - SetErrorMessage(e.what()); - } - catch(const exception& e) - { - SetErrorMessage(e.what()); - } - } - - void HandleOKCallback() - { - Nan::HandleScope scope; - - Local<Value> argv[] = { - Nan::Undefined(), - Nan::New<v8::Number>((int)ret), - }; - callback->Call(2, argv); - } - -private: - CPushConsumer* consumer_ptr; - int ret; - PushConsumerWorkerType type; -}; - -} - -#endif
