This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-ons-cpp.git
commit e3411416a516d652b591450abc0441e21216e7af Author: ShannonDing <[email protected]> AuthorDate: Tue Jul 23 21:19:24 2019 +0800 Add ONS API --- src/main/c/native/rocketmq.h | 63 +++++++ src/main/cpp/demos/CMakeLists.Release | 39 ++++ src/main/cpp/demos/CMakeLists.txt | 17 ++ src/main/cpp/demos/ConsumerDemo.cpp | 63 +++++++ src/main/cpp/demos/MultiThreadProducerDemo.cpp | 75 ++++++++ src/main/cpp/demos/OrderConsumerDemo.cpp | 59 ++++++ src/main/cpp/demos/OrderProducerDemo.cpp | 54 ++++++ src/main/cpp/demos/ProducerAsyncDemo.cpp | 93 ++++++++++ src/main/cpp/demos/ProducerDemo.cpp | 56 ++++++ src/main/cpp/demos/ProducerOnewayDemo.cpp | 49 +++++ src/main/cpp/demos/TransactionProducerDemo.cpp | 75 ++++++++ src/main/cpp/include/Action.h | 14 ++ src/main/cpp/include/ConsumeContext.h | 13 ++ src/main/cpp/include/ConsumeOrderContext.h | 13 ++ src/main/cpp/include/LocalTransactionChecker.h | 16 ++ src/main/cpp/include/LocalTransactionExecuter.h | 16 ++ src/main/cpp/include/Message.h | 100 ++++++++++ src/main/cpp/include/MessageListener.h | 19 ++ src/main/cpp/include/MessageOrderListener.h | 20 ++ src/main/cpp/include/MessageQueueONS.h | 39 ++++ src/main/cpp/include/MessageQueueSelectorONS.h | 17 ++ src/main/cpp/include/ONSCallback.h | 16 ++ src/main/cpp/include/ONSChannel.h | 15 ++ src/main/cpp/include/ONSClient.h | 20 ++ src/main/cpp/include/ONSClientException.h | 25 +++ src/main/cpp/include/ONSFactory.h | 103 ++++++++++ src/main/cpp/include/OrderAction.h | 11 ++ src/main/cpp/include/OrderConsumer.h | 19 ++ src/main/cpp/include/OrderProducer.h | 23 +++ src/main/cpp/include/Producer.h | 35 ++++ src/main/cpp/include/PullConsumer.h | 38 ++++ src/main/cpp/include/PullResultONS.h | 40 ++++ src/main/cpp/include/PushConsumer.h | 20 ++ src/main/cpp/include/SendResultONS.h | 19 ++ src/main/cpp/include/TransactionProducer.h | 29 +++ src/main/cpp/include/TransactionStatus.h | 13 ++ src/test/cpp/MessageTest.cpp | 237 ++++++++++++++++++++++++ 37 files changed, 1573 insertions(+) diff --git a/src/main/c/native/rocketmq.h b/src/main/c/native/rocketmq.h new file mode 100644 index 0000000..ad48a98 --- /dev/null +++ b/src/main/c/native/rocketmq.h @@ -0,0 +1,63 @@ +#ifndef __ROCKETMQ_H__ +#define __ROCKETMQ_H__ +#ifdef __cplusplus +extern "C" +{ +#endif + +typedef struct message_struct { + char *topic; + char *tags; + char *body; + unsigned int body_size; + char *key; + char *user_prop; + char *system_prop; +} message; + +typedef struct send_result_struct { + char *message_id; + int error_no; + char *error_msg; +} send_result; + +typedef struct factory_property_struct { + char *group_id; + char *access_key; + char *access_secret; + char *name_srv_addr; + char *name_srv_domain; + char *message_model; + char *send_msg_timeout_millis; + char *consume_thread_nums; + char *ons_channel; + char *max_msg_cache_size; + char *ons_trace_switch; + char *consumer_instance_name; + char *language_identifier; + char *instance_id; + int use_domain; +} factory_property; + +typedef struct callback_func_struct { + char *send_callback_ons; + + void (*on_success)(void *thread, char *message_id, char *send_callback_ons); + + void (*on_exception)(void *thread, char *m_msg, int m_error, char *send_callback_ons); +} callback_func; + +typedef struct subscription_struct { + char *topic; + char *sub_expression; + + int (*on_message)(void *thread, void *opaque, char *topic, char *user_props, char *sys_props, char *body, int len); + + void *opaque; +} subscription; + +#ifdef __cplusplus +} +#endif + +#endif //__ROCKETMQ_H__ \ No newline at end of file diff --git a/src/main/cpp/demos/CMakeLists.Release b/src/main/cpp/demos/CMakeLists.Release new file mode 100644 index 0000000..7f47ee5 --- /dev/null +++ b/src/main/cpp/demos/CMakeLists.Release @@ -0,0 +1,39 @@ +cmake_minimum_required(VERSION 3.0) +project(onsclient4cpp_demo VERSION 1.0 + LANGUAGES C CXX) +set(CMAKE_CXX_STANDARD 11) +include_directories(../include) + +find_library(ROCKETMQ_CLIENT_CORE + NAMES rocketmq_client_core + HINTS ../lib) + +if (${ROCKETMQ_CLIENT_CORE-NOTFOUNT}) + message("find_library for rocketmq_client_core failed") +endif () + +find_library(ONS_CLIENT + NAMES onsclient4cpp + HINTS ../lib) + +if (${ONS_CLIENT-NOTFOUNT}) + message("find_library for rocketmq_client_core failed") +endif () + +macro(add_demo name source_file) + add_executable(${name} ${source_file}) + target_link_libraries(${name} pthread ${ROCKETMQ_CLIENT_CORE} ${ONS_CLIENT}) + set_target_properties(${name} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/bin) +endmacro() + +add_demo(producer_demo ProducerDemo.cpp) +add_demo(order_producer_demo OrderProducerDemo.cpp) +add_demo(multi_thread_producer_demo MultiThreadProducerDemo.cpp) +add_demo(producer_async_demo ProducerAsyncDemo.cpp) +add_demo(producer_oneway_demo ProducerOnewayDemo.cpp) +add_demo(consumer_demo ConsumerDemo.cpp) +add_demo(order_consumer_demo OrderConsumerDemo.cpp) +add_demo(transaction_producer_demo TransactionProducerDemo.cpp) + + + diff --git a/src/main/cpp/demos/CMakeLists.txt b/src/main/cpp/demos/CMakeLists.txt new file mode 100644 index 0000000..6ffd443 --- /dev/null +++ b/src/main/cpp/demos/CMakeLists.txt @@ -0,0 +1,17 @@ +macro(add_demo name source_file) + add_executable(${name} ${source_file}) + target_link_libraries(${name} pthread ${CMAKE_PROJECT_NAME}) + set_target_properties(${name} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/bin/demos) +endmacro() + +add_demo(producer_demo ProducerDemo.cpp) +add_demo(order_producer_demo OrderProducerDemo.cpp) +add_demo(multi_thread_producer_demo MultiThreadProducerDemo.cpp) +add_demo(producer_async_demo ProducerAsyncDemo.cpp) +add_demo(producer_oneway_demo ProducerOnewayDemo.cpp) +add_demo(consumer_demo ConsumerDemo.cpp) +add_demo(order_consumer_demo OrderConsumerDemo.cpp) +add_demo(transaction_producer_demo TransactionProducerDemo.cpp) + + + diff --git a/src/main/cpp/demos/ConsumerDemo.cpp b/src/main/cpp/demos/ConsumerDemo.cpp new file mode 100644 index 0000000..0df672a --- /dev/null +++ b/src/main/cpp/demos/ConsumerDemo.cpp @@ -0,0 +1,63 @@ +#include "ONSFactory.h" + +#include <iostream> +#include <thread> +#include <mutex> + +using namespace ons; + +std::mutex console_mtx; + +class ExampleMessageListener : public MessageListener { +public: + Action consume(Message& message, ConsumeContext& context) { + //此处为具体的消息处理过程,确认消息被处理成功请返回 CommitMessage, + //如果有消费异常,或者期望重新消费,可以返回 ReconsumeLater,消息将会在一段时间后重新投递 + std::lock_guard<std::mutex> lk(console_mtx); + std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " + << message.getMsgID() << std::endl; + return CommitMessage; + } +}; + +int main(int argc, char* argv[]) { + std::cout << "=======Before consuming messages=======" << std::endl; + ONSFactoryProperty factoryInfo; + //Request from ONS console, this should be GroupID here. + //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。 + factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX"); + //Request from ONS console + //请填写阿里云ONS控制台上申请的topic + factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic"); + //Your Access Key from your account. + //请填写你的账户的AK + factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key"); + //Your Secret Key from your account. + //请填写你的账户的SK + factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key"); + //This is the endpoint from ONS console + //请填写阿里云ONS控制台上对应实例的接入点 + factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, + "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80"); + + PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo); + + std::string topic(factoryInfo.getPublishTopics()); + std::string tag("Your Tag"); + + //register your own listener here to handle the messages received. + //请注册自定义侦听函数用来处理接收到的消息,并返回响应的处理结果。 + ExampleMessageListener *messageListener = new ExampleMessageListener(); + consumer->subscribe(topic.c_str(), tag.c_str(), messageListener); + + //Start this consumer + //准备工作完成,必须调用启动函数,才可以正常工作。 + consumer->start(); + + //Keep main thread running until process finished. + //请保持线程常驻,不要执行shutdown操作 + std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000)); + consumer->shutdown(); + std::cout << "=======After consuming messages======" << std::endl; + return 0; +} \ No newline at end of file diff --git a/src/main/cpp/demos/MultiThreadProducerDemo.cpp b/src/main/cpp/demos/MultiThreadProducerDemo.cpp new file mode 100644 index 0000000..867a736 --- /dev/null +++ b/src/main/cpp/demos/MultiThreadProducerDemo.cpp @@ -0,0 +1,75 @@ +#include <iostream> +#include <chrono> +#include <thread> +#include <mutex> +#include <vector> + +#include "ONSFactory.h" + +using namespace std; +using namespace ons; + +int main() { + std::cout << "=======Before sending messages=======" << std::endl; + ONSFactoryProperty factoryInfo; + //Request from ONS console, this should be GroupID here. + //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。 + factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX"); + //Request from ONS console + //请填写阿里云ONS控制台上申请的topic + factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic"); + //Your Access Key from your account. + //请填写你的账户的AK + factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key"); + //Your Secret Key from your account. + //请填写你的账户的SK + factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key"); + //This is the endpoint from ONS console + //请填写阿里云ONS控制台上对应实例的接入点 + factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, + "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80"); + Producer *producer = nullptr; + producer = ONSFactory::getInstance()->createProducer(factoryInfo); + producer->start(); + Message msg( + factoryInfo.getPublishTopics(), + "Your Tag", + "Your Key", + "This message body." + ); + + std::mutex console_mutex; + + auto lambda = [&]() { + auto start = std::chrono::system_clock::now(); + int count = 32; + for (int i = 0; i < count; ++i) { + SendResultONS sendResult = producer->send(msg); + std::cout << "Message ID: " << sendResult.getMessageId() << std::endl; + } + auto interval = std::chrono::system_clock::now() - start; + { + std::lock_guard<std::mutex> lk(console_mutex); + std::cout << "Send " << count << " messages OK, costs " + << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; + } + }; + + std::vector<std::thread> threads; + int thread_num = 3; + + threads.reserve(thread_num); + for (int i = 0; i < thread_num; ++i) { + threads.emplace_back(lambda); + } + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + + producer->shutdown(); + std::cout << "=======After sending messages=======" << std::endl; + return 0; +} \ No newline at end of file diff --git a/src/main/cpp/demos/OrderConsumerDemo.cpp b/src/main/cpp/demos/OrderConsumerDemo.cpp new file mode 100644 index 0000000..28f5ee5 --- /dev/null +++ b/src/main/cpp/demos/OrderConsumerDemo.cpp @@ -0,0 +1,59 @@ +#include "ONSFactory.h" + +#include <iostream> +#include <thread> +#include <mutex> + +using namespace ons; + +std::mutex console_mtx; + +class ExampleMessageListener : public MessageOrderListener { +public: + OrderAction consume(Message &message, ConsumeOrderContext &context) { + //此处为具体的消息处理过程,确认消息被处理成功请返回 Success, + //如果有消费异常,或者期望重新消费,可以返回 Suspend,消息将会在一段时间后重新投递 + std::lock_guard<std::mutex> lk(console_mtx); + std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " + << message.getMsgID() << std::endl; + return Success; + } +}; + +int main(int argc, char *argv[]) { + std::cout << "=======Before consuming messages=======" << std::endl; + ONSFactoryProperty factoryInfo; + //Request from ONS console, this should be GroupID here. + //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。 + factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX"); + //Request from ONS console + //请填写阿里云ONS控制台上申请的topic + factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic"); + //Your Access Key from your account. + //请填写你的账户的AK + factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key"); + //Your Secret Key from your account. + //请填写你的账户的SK + factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key"); + //This is the endpoint from ONS console + //请填写阿里云ONS控制台上对应实例的接入点 + factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, + "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80"); + OrderConsumer *consumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo); + + std::string topic(factoryInfo.getPublishTopics()); + std::string tag("Your Tag"); + ExampleMessageListener *messageListener = new ExampleMessageListener(); + consumer->subscribe(topic.c_str(), tag.c_str(), messageListener); + + //Start this consumer + //准备工作完成,必须调用启动函数,才可以正常工作。 + consumer->start(); + + //Keep main thread running until process finished. + //请保持线程常驻,不要执行shutdown操作 + std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000)); + consumer->shutdown(); + std::cout << "=======After consuming messages======" << std::endl; + return 0; +} \ No newline at end of file diff --git a/src/main/cpp/demos/OrderProducerDemo.cpp b/src/main/cpp/demos/OrderProducerDemo.cpp new file mode 100644 index 0000000..6bf6ed4 --- /dev/null +++ b/src/main/cpp/demos/OrderProducerDemo.cpp @@ -0,0 +1,54 @@ +#include <iostream> +#include <chrono> +#include "ONSFactory.h" + +using namespace std; +using namespace ons; + +int main() { + std::cout << "=======Before sending messages=======" << std::endl; + ONSFactoryProperty factoryInfo; + //Request from ONS console, this should be GroupID here. + //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。 + factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX"); + //Request from ONS console + //请填写阿里云ONS控制台上申请的topic + factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic"); + //Your Access Key from your account. + //请填写你的账户的AK + factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key"); + //Your Secret Key from your account. + //请填写你的账户的SK + factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key"); + //This is the endpoint from ONS console + //请填写阿里云ONS控制台上对应实例的接入点 + factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, + "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80"); + OrderProducer *producer = nullptr; + producer = ONSFactory::getInstance()->createOrderProducer(factoryInfo); + producer->start(); + Message msg( + factoryInfo.getPublishTopics(), + "Your Tag", + "Your Key", + "This message body." + ); + + auto start = std::chrono::system_clock::now(); + int count = 32; + for (int i = 0; i < count; ++i) { + try { + SendResultONS sendResult = producer->send(msg,"Your Sharding Key"); + std::cout << "Topic: " << msg.getTopic() << ", Message ID: " << sendResult.getMessageId() << std::endl; + } catch (ONSClientException e) { + std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.GetMsg() << std::endl; + } + } + auto interval = std::chrono::system_clock::now() - start; + std::cout << "Send " << count << " messages OK, costs " + << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; + + producer->shutdown(); + std::cout << "=======After sending messages=======" << std::endl; + return 0; +} \ No newline at end of file diff --git a/src/main/cpp/demos/ProducerAsyncDemo.cpp b/src/main/cpp/demos/ProducerAsyncDemo.cpp new file mode 100644 index 0000000..451ab8a --- /dev/null +++ b/src/main/cpp/demos/ProducerAsyncDemo.cpp @@ -0,0 +1,93 @@ +#include <iostream> +#include <chrono> +#include <mutex> +#include <condition_variable> +#include "ONSFactory.h" +#include"ONSCallback.h" + +using namespace std; +using namespace ons; + +std::mutex m1; +std::mutex m2; +std::condition_variable cv; + +class MyCallback : public SendCallbackONS { +public: + + void onSuccess(SendResultONS &sendResult) override { + std::lock_guard<std::mutex> lg(m2); + success_num++; + std::cout << "send success, message_id: " << sendResult.getMessageId() << ", total: " << success_num + << std::endl; + if (success_num + failed_num == total) { + cv.notify_all(); + } + } + + void onException(ONSClientException &e) override { + std::lock_guard<std::mutex> lg(m2); + failed_num++; + std::cout << "send failure, total: " << failed_num << std::endl; + std::cout << e.what() << std::endl; + if (success_num + failed_num == total) { + cv.notify_all(); + } + } + + static int success_num; + static int failed_num; + static int total; +}; + +int MyCallback::success_num = 0; +int MyCallback::failed_num = 0; +int MyCallback::total = 32; + + +int main() { + std::cout << "=======before send message=======" << std::endl; + ONSFactoryProperty factoryInfo; + //Request from ONS console, this should be GroupID here. + //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。 + factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX"); + //Request from ONS console + //请填写阿里云ONS控制台上申请的topic + factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic"); + //Your Access Key from your account. + //请填写你的账户的AK + factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key"); + //Your Secret Key from your account. + //请填写你的账户的SK + factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key"); + //This is the endpoint from ONS console + //请填写阿里云ONS控制台上对应实例的接入点 + factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, + "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80"); + Producer *producer = nullptr; + producer = ONSFactory::getInstance()->createProducer(factoryInfo); + producer->start(); + Message msg( + factoryInfo.getPublishTopics(), + "Your Tag", + "Your Key", + "This message body." + ); + + auto start = std::chrono::system_clock::now(); + MyCallback m_callback; + for (int i = 0; i < MyCallback::total; ++i) { + producer->sendAsync(msg, &m_callback); + } + + { + std::unique_lock<std::mutex> lk(m1); + cv.wait(lk); + } + + producer->shutdown(); + std::cout << "=======after sending messages=======" << std::endl; + + + return 0; +} \ No newline at end of file diff --git a/src/main/cpp/demos/ProducerDemo.cpp b/src/main/cpp/demos/ProducerDemo.cpp new file mode 100644 index 0000000..0b19174 --- /dev/null +++ b/src/main/cpp/demos/ProducerDemo.cpp @@ -0,0 +1,56 @@ +#include <iostream> +#include <chrono> +#include "ONSFactory.h" + +using namespace std; +using namespace ons; + +int main() { + std::cout << "=======Before sending messages=======" << std::endl; + ONSFactoryProperty factoryInfo; + //Request from ONS console, this should be GroupID here. + //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。 + factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX"); + //Request from ONS console + //请填写阿里云ONS控制台上申请的topic + factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic"); + //Your Access Key from your account. + //请填写你的账户的AK + factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key"); + //Your Secret Key from your account. + //请填写你的账户的SK + factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key"); + //This is the endpoint from ONS console + //请填写阿里云ONS控制台上对应实例的接入点 + factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, + "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80"); + Producer *producer = nullptr; + producer = ONSFactory::getInstance()->createProducer(factoryInfo); + producer->start(); + Message msg( + factoryInfo.getPublishTopics(), + "Your Tag", + "Your Key", + "This message body." + ); + + auto start = std::chrono::system_clock::now(); + int count = 32; + for (int i = 0; i < count; ++i) { + try { + SendResultONS sendResult = producer->send(msg); + std::cout << "Message ID: " << sendResult.getMessageId() << std::endl; + } catch (ONSClientException e) { + std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.GetMsg() << std::endl; + } + } + auto interval = std::chrono::system_clock::now() - start; + std::cout << "Send " << count << " messages OK, costs " + << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; + + //Keep main thread running until process finished. + //请保持线程常驻,不要执行shutdown操作 + producer->shutdown(); + std::cout << "=======After sending messages=======" << std::endl; + return 0; +} \ No newline at end of file diff --git a/src/main/cpp/demos/ProducerOnewayDemo.cpp b/src/main/cpp/demos/ProducerOnewayDemo.cpp new file mode 100644 index 0000000..031bff3 --- /dev/null +++ b/src/main/cpp/demos/ProducerOnewayDemo.cpp @@ -0,0 +1,49 @@ +#include <iostream> +#include <chrono> +#include "ONSFactory.h" + +using namespace std; +using namespace ons; + +int main() { + std::cout << "=======Before sending messages=======" << std::endl; + ONSFactoryProperty factoryInfo; + //Request from ONS console, this should be GroupID here. + //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。 + factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX"); + //Request from ONS console + //请填写阿里云ONS控制台上申请的topic + factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic"); + //Your Access Key from your account. + //请填写你的账户的AK + factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key"); + //Your Secret Key from your account. + //请填写你的账户的SK + factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key"); + //This is the endpoint from ONS console + //请填写阿里云ONS控制台上对应实例的接入点 + factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, + "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80"); + Producer *producer = nullptr; + producer = ONSFactory::getInstance()->createProducer(factoryInfo); + producer->start(); + Message msg( + factoryInfo.getPublishTopics(), + "Your Tag", + "Your Key", + "This message body." + ); + + auto start = std::chrono::system_clock::now(); + int count = 32; + for (int i = 0; i < count; ++i) { + producer->sendOneway(msg); + } + auto interval = std::chrono::system_clock::now() - start; + std::cout << "Send " << count << " messages OK, costs " + << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; + + producer->shutdown(); + std::cout << "=======After sending messages=======" << std::endl; + return 0; +} \ No newline at end of file diff --git a/src/main/cpp/demos/TransactionProducerDemo.cpp b/src/main/cpp/demos/TransactionProducerDemo.cpp new file mode 100644 index 0000000..6044589 --- /dev/null +++ b/src/main/cpp/demos/TransactionProducerDemo.cpp @@ -0,0 +1,75 @@ +#include <iostream> +#include <chrono> +#include "ONSFactory.h" +#include"LocalTransactionChecker.h" +#include "LocalTransactionExecuter.h" + + +using namespace std; +using namespace ons; + +class LocalTransactionCheckerImpl : public LocalTransactionChecker { + virtual TransactionStatus check(Message &msg) { + cout << "checker::commit transaction" << endl; + return CommitTransaction; + } +}; + +class LocalTransactionExecuterImpl : public LocalTransactionExecuter { + virtual TransactionStatus execute(Message &msg) { + cout << "executer::commit transaction of msgid: " << msg.getMsgID() << endl; + return CommitTransaction; + } +}; + + +int main() { + std::cout << "=======Before sending message=======" << std::endl; + ONSFactoryProperty factoryInfo; + //Request from ONS console, this should be GroupID here. + //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。 + factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX"); + //Request from ONS console + //请填写阿里云ONS控制台上申请的topic + factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic"); + //Your Access Key from your account. + //请填写你的账户的AK + factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key"); + //Your Secret Key from your account. + //请填写你的账户的SK + factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key"); + //This is the endpoint from ONS console + //请填写阿里云ONS控制台上对应实例的接入点 + factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, + "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80"); + TransactionProducer *producer = nullptr; + LocalTransactionCheckerImpl *checker = new LocalTransactionCheckerImpl(); + producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo, checker); + producer->start(); + Message msg( + factoryInfo.getPublishTopics(), + "Your Tag", + "Your Key", + "This message body." + ); + auto start = std::chrono::system_clock::now(); + int count = 32; + LocalTransactionExecuterImpl executer; + for (int i = 0; i < count; ++i) { + try { + SendResultONS sendResult = producer->send(msg, &executer); + std::cout << "Message ID: " << sendResult.getMessageId() << std::endl; + } + catch (ONSClientException &e) { + cout << e.GetMsg() << endl; + } + cout << endl; + } + auto interval = std::chrono::system_clock::now() - start; + std::cout << "Send " << count << " messages OK, costs " + << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; + +// producer->shutdown(); + std::cout << "=======After sending message=======" << std::endl; + return 0; +} \ No newline at end of file diff --git a/src/main/cpp/include/Action.h b/src/main/cpp/include/Action.h new file mode 100644 index 0000000..289f440 --- /dev/null +++ b/src/main/cpp/include/Action.h @@ -0,0 +1,14 @@ +#ifndef __ACTION_H__ +#define __ACTION_H__ + +#include "ONSClient.h" + +// consuming result +enum Action { + // consume success, application could continue to consume next message + CommitMessage, + // consume fail, server will deliver this message later, application could + // continue to consume next message + ReconsumeLater +}; +#endif diff --git a/src/main/cpp/include/ConsumeContext.h b/src/main/cpp/include/ConsumeContext.h new file mode 100644 index 0000000..abb59de --- /dev/null +++ b/src/main/cpp/include/ConsumeContext.h @@ -0,0 +1,13 @@ +#ifndef __CONSUMECONTEXT_H__ +#define __CONSUMECONTEXT_H__ +#include "ONSClient.h" + +namespace ons { + +class ONSCLIENT_API ConsumeContext { + public: + ConsumeContext() {} + virtual ~ConsumeContext() {} +}; +} +#endif diff --git a/src/main/cpp/include/ConsumeOrderContext.h b/src/main/cpp/include/ConsumeOrderContext.h new file mode 100644 index 0000000..a196d5d --- /dev/null +++ b/src/main/cpp/include/ConsumeOrderContext.h @@ -0,0 +1,13 @@ +#ifndef __CONSUMEORDERLYCONTEXT_H__ +#define __CONSUMEORDERLYCONTEXT_H__ +#include "ONSClient.h" + +namespace ons { + +class ONSCLIENT_API ConsumeOrderContext { + public: + ConsumeOrderContext() {} + virtual ~ConsumeOrderContext() {} +}; +} +#endif diff --git a/src/main/cpp/include/LocalTransactionChecker.h b/src/main/cpp/include/LocalTransactionChecker.h new file mode 100644 index 0000000..8601671 --- /dev/null +++ b/src/main/cpp/include/LocalTransactionChecker.h @@ -0,0 +1,16 @@ +#ifndef __LOCALTRANSACTIONCHECKER_H__ +#define __LOCALTRANSACTIONCHECKER_H__ + +#include "Message.h" +#include "TransactionStatus.h" + +namespace ons { +class LocalTransactionChecker { + public: + LocalTransactionChecker() {} + virtual TransactionStatus check(Message& msg) = 0; + virtual ~LocalTransactionChecker() {} +}; +} + +#endif diff --git a/src/main/cpp/include/LocalTransactionExecuter.h b/src/main/cpp/include/LocalTransactionExecuter.h new file mode 100644 index 0000000..1d7ea4f --- /dev/null +++ b/src/main/cpp/include/LocalTransactionExecuter.h @@ -0,0 +1,16 @@ +#ifndef __LOCALTRANSACTIONEXECUTER_H__ +#define __LOCALTRANSACTIONEXECUTER_H__ + +#include "Message.h" +#include "TransactionStatus.h" + +namespace ons { +class LocalTransactionExecuter { + public: + LocalTransactionExecuter() {} + virtual TransactionStatus execute(Message& msg) = 0; + virtual ~LocalTransactionExecuter() {} +}; +} + +#endif diff --git a/src/main/cpp/include/Message.h b/src/main/cpp/include/Message.h new file mode 100755 index 0000000..8038e29 --- /dev/null +++ b/src/main/cpp/include/Message.h @@ -0,0 +1,100 @@ +#ifndef __MESSAGE_H__ +#define __MESSAGE_H__ + +#include <map> +#include <sstream> +#include <vector> +#include "ONSClient.h" + +namespace ons { + +class SystemPropKey { + public: + SystemPropKey() {} + ~SystemPropKey() {} + static const char* TAG; + static const char* KEY; + static const char* MSGID; + static const char* RECONSUMETIMES; + static const char* STARTDELIVERTIME; +}; + +class ONSCLIENT_API Message { + public: + Message(); + Message(const std::string& topic, const std::string& tags, const std::string& byte_body); + Message(const char* topic, const char* tags, const char* byte_body); + Message(const char* topic, size_t topic_size, const char* tags, size_t tags_size, const char* body, size_t body_size); + Message(const char* topic, const char* tags, const char* keys, const char* body); + + virtual ~Message(); + + Message(const Message& other); + Message& operator=(const Message& other); + + // userProperties was used to save user specific parameters which doesn't + // belong to SystemPropKey + void putUserProperties(const char* key, const char* value); + const char* getUserProperties(const char* key) const; + void setUserProperties(std::map<std::string, std::string>& userProperty); + std::map<std::string, std::string> getUserProperties() const; + + // systemProperties only save parameters defined in SystemPropKey, please do + // not add other parameters into systemProperties, else it was not saved. + void putSystemProperties(const char* key, const char* value); + const char* getSystemProperties(const char* key) const; + void setSystemProperties(std::map<std::string, std::string>& systemProperty); + std::map<std::string, std::string> getSystemProperties() const; + + const char* getTopic() const; + void setTopic(const char* topic); + + const char* getTag() const; + void setTag(const char* tags); + + const char* getKey() const; + void setKey(const char* keys); + + const char* getMsgID() const; + void setMsgID(const char* msgId); + + const long long getStartDeliverTime() const; + void setStartDeliverTime(long long level); + + const char* getBody() const; + const char* getByteBody(int *len) const; + const std::string getMsgBody() const; + const size_t getBodySize() const; + void setMsgBody(const std::string msgbody); + void setBody(unsigned char* byte_msgbody, int len); + + const int getReconsumeTimes() const; + void setReconsumeTimes(int reconsumeTimes); + + long long getStoreTimestamp() const; + void setStoreTimestamp(long long storeTimestamp); + + const std::string toString() const; + + const std::string toSystemString() const; + + const std::string toUserString() const; + + long long getQueueOffset() const; + void setQueueOffset(long long queueOffset); + protected: + void Init(const std::string& topic, const std::string& tags, + const std::string& keys, const std::string& body); + + private: + std::string topic; + std::string body; + size_t body_size; + long long m_storeTimestamp; + long long m_queueOffset; + std::map<std::string, std::string> systemProperties; + std::map<std::string, std::string> userProperties; +}; + +} //<!end namespace; +#endif diff --git a/src/main/cpp/include/MessageListener.h b/src/main/cpp/include/MessageListener.h new file mode 100644 index 0000000..5733d2f --- /dev/null +++ b/src/main/cpp/include/MessageListener.h @@ -0,0 +1,19 @@ +#ifndef __MESSAGELISTENER_H__ +#define __MESSAGELISTENER_H__ + +#include "Action.h" +#include "ConsumeContext.h" +#include "Message.h" + +namespace ons { + +class ONSCLIENT_API MessageListener { + public: + MessageListener() {} + virtual ~MessageListener() {} + + // interface of consuming message, should be realized by application + virtual Action consume(Message& message, ConsumeContext& context) = 0; +}; +} +#endif diff --git a/src/main/cpp/include/MessageOrderListener.h b/src/main/cpp/include/MessageOrderListener.h new file mode 100644 index 0000000..848dd20 --- /dev/null +++ b/src/main/cpp/include/MessageOrderListener.h @@ -0,0 +1,20 @@ +#ifndef __MESSAGEORDERLYLISTENER_H__ +#define __MESSAGEORDERLYLISTENER_H__ + +#include "ConsumeOrderContext.h" +#include "Message.h" +#include "OrderAction.h" + +namespace ons { + +class MessageOrderListener { + public: + MessageOrderListener() {} + virtual ~MessageOrderListener() {} + + // interface of consuming message, should be realized by application + virtual OrderAction consume(Message& message, + ConsumeOrderContext& context) = 0; +}; +} +#endif diff --git a/src/main/cpp/include/MessageQueueONS.h b/src/main/cpp/include/MessageQueueONS.h new file mode 100755 index 0000000..bd8a8dc --- /dev/null +++ b/src/main/cpp/include/MessageQueueONS.h @@ -0,0 +1,39 @@ +#ifndef __MESSAGEQUEUEONS_H__ +#define __MESSAGEQUEUEONS_H__ + +#include <iomanip> +#include <sstream> +#include <string> + +using namespace std; + +namespace ons { +class MessageQueueONS { + public: + MessageQueueONS(); + MessageQueueONS(const string& topic, const string& brokerName, int queueId); + + MessageQueueONS(const MessageQueueONS& other); + MessageQueueONS& operator=(const MessageQueueONS& other); + + string getTopic() const; + void setTopic(const string& topic); + + string getBrokerName() const; + void setBrokerName(const string& brokerName); + + int getQueueId() const; + void setQueueId(int queueId); + + bool operator==(const MessageQueueONS& mq) const; + bool operator<(const MessageQueueONS& mq) const; + int compareTo(const MessageQueueONS& mq) const; + + private: + string m_topic; + string m_brokerName; + int m_queueId; +}; + +} //<!end namespace; +#endif diff --git a/src/main/cpp/include/MessageQueueSelectorONS.h b/src/main/cpp/include/MessageQueueSelectorONS.h new file mode 100644 index 0000000..a82a285 --- /dev/null +++ b/src/main/cpp/include/MessageQueueSelectorONS.h @@ -0,0 +1,17 @@ +#ifndef _MESSAGEQUEUESELECTOR_H_ +#define _MESSAGEQUEUESELECTOR_H_ + +#include "Message.h" +#include "MessageQueueONS.h" + +namespace ons { + +class MessageQueueSelectorONS { + public: + virtual ~MessageQueueSelectorONS() {} + virtual MessageQueueONS select(const vector<MessageQueueONS>& mqs, + const Message& msg, void* arg) = 0; +}; + +} //<!end namespace; +#endif //<! _MQSELECTOR_H_ diff --git a/src/main/cpp/include/ONSCallback.h b/src/main/cpp/include/ONSCallback.h new file mode 100644 index 0000000..d529426 --- /dev/null +++ b/src/main/cpp/include/ONSCallback.h @@ -0,0 +1,16 @@ +#ifndef __ONSCALLBACK_H__ +#define __ONSCALLBACK_H__ + +#include "ONSClientException.h" +#include "SendResultONS.h" + +namespace ons { + class SendCallbackONS { + public: + virtual ~SendCallbackONS() {} + virtual void onSuccess(SendResultONS& sendResult) {}; + virtual void onException(ONSClientException& e) {}; + }; + +} // end of namespace SendResultONS +#endif // end of _SENDCALLBACK_H_ diff --git a/src/main/cpp/include/ONSChannel.h b/src/main/cpp/include/ONSChannel.h new file mode 100644 index 0000000..6f538af --- /dev/null +++ b/src/main/cpp/include/ONSChannel.h @@ -0,0 +1,15 @@ +#ifndef __ONSCHANNEL_H__ +#define __ONSCHANNEL_H__ + +namespace ons { + + enum ONSChannel { + CLOUD, + ALIYUN, + ALL, + LOCAL, + INNER + }; +} + +#endif diff --git a/src/main/cpp/include/ONSClient.h b/src/main/cpp/include/ONSClient.h new file mode 100644 index 0000000..4ebc595 --- /dev/null +++ b/src/main/cpp/include/ONSClient.h @@ -0,0 +1,20 @@ +#ifndef __ONSCLIENT_H__ +#define __ONSCLIENT_H__ + +#ifdef WIN32 +#ifdef ONSCLIENT_EXPORTS + +#ifndef SWIG +#define ONSCLIENT_API __declspec(dllexport) +#else +#define ONSCLIENT_API +#endif + +#else +#define ONSCLIENT_API __declspec(dllimport) +#endif +#else +#define ONSCLIENT_API +#endif + +#endif diff --git a/src/main/cpp/include/ONSClientException.h b/src/main/cpp/include/ONSClientException.h new file mode 100644 index 0000000..76922f3 --- /dev/null +++ b/src/main/cpp/include/ONSClientException.h @@ -0,0 +1,25 @@ +#ifndef __ONSCLIENTEXCEPTION_H__ +#define __ONSCLIENTEXCEPTION_H__ + +#include <exception> +#include <string> +#include "ONSClient.h" + +namespace ons { + +class ONSCLIENT_API ONSClientException : public std::exception { + public: + ONSClientException() throw(); + virtual ~ONSClientException() throw(); + ONSClientException(std::string msg, int error) throw(); + const char* GetMsg() const throw(); + const char* what() const throw(); + int GetError() const throw(); + + private: + std::string m_msg; + int m_error; +}; +} + +#endif diff --git a/src/main/cpp/include/ONSFactory.h b/src/main/cpp/include/ONSFactory.h new file mode 100755 index 0000000..ecb531a --- /dev/null +++ b/src/main/cpp/include/ONSFactory.h @@ -0,0 +1,103 @@ +#ifndef __ONSFACTORY_H_ +#define __ONSFACTORY_H_ + +#include "LocalTransactionChecker.h" +#include "ONSChannel.h" +#include "ONSClientException.h" +#include "OrderConsumer.h" +#include "OrderProducer.h" +#include "Producer.h" +#include "PullConsumer.h" +#include "PushConsumer.h" +#include "TransactionProducer.h" + +namespace ons { +class ONSCLIENT_API ONSFactoryProperty { + public: + ONSFactoryProperty(); + virtual ~ONSFactoryProperty(); + bool checkValidityOfFactoryProperties(const std::string& key, + const std::string& value) throw(ons::ONSClientException); + const char* getLogPath() const; + void setSendMsgTimeout(const int value); + void setSendMsgRetryTimes(const int value); + void setMaxMsgCacheSize(const int size); + void setOnsTraceSwitch(bool onswitch); + void setOnsChannel(ONSChannel onsChannel) throw(ons::ONSClientException); + void setFactoryProperty(const char* key, const char* value) throw(ons::ONSClientException); + void setFactoryProperties(std::map<std::string, std::string> factoryProperties); + std::map<std::string, std::string> getFactoryProperties() const; + const char* getProducerId() const; + const char* getConsumerId() const; + const char* getGroupId() const; + const char* getPublishTopics() const; + const char* getMessageModel() const; + const int getSendMsgTimeout() const; + const int getSendMsgRetryTimes() const; + const int getConsumeThreadNums() const; + const int getMaxMsgCacheSize() const; + const ONSChannel getOnsChannel() const; + const char* getChannel() const; + const char* getMessageContent() const; + const char* getNameSrvAddr() const; + const char* getNameSrvDomain() const; + const char* getAccessKey() const; + const char* getSecretKey() const; + const char* getConsumerInstanceName() const; + bool getOnsTraceSwitch() const; + const char* getInstanceId() const; + + public: + static const char* LogPath; + static const char* ProducerId; + static const char* ConsumerId; + static const char* GroupId; + static const char* PublishTopics; + static const char* MsgContent; + static const char* ONSAddr; + static const char* AccessKey; + static const char* SecretKey; + static const char* MessageModel; + static const char* BROADCASTING; + static const char* CLUSTERING; + static const char* SendMsgTimeoutMillis; + static const char* NAMESRV_ADDR; + static const char* ConsumeThreadNums; + static const char* OnsChannel; + static const char* MaxMsgCacheSize; + static const char* OnsTraceSwitch; + static const char* SendMsgRetryTimes; + static const char* ConsumerInstanceName; + static const char* InstanceId; + + private: + std::map<std::string, std::string> m_onsFactoryProperties; +}; + +class ONSCLIENT_API ONSFactoryAPI { + public: + ONSFactoryAPI(); + virtual ~ONSFactoryAPI(); + + virtual ons::Producer* createProducer(ons::ONSFactoryProperty factoryProperty) throw(ons::ONSClientException); + virtual ons::OrderProducer* createOrderProducer(ons::ONSFactoryProperty factoryProperty) throw( + ons::ONSClientException); + virtual ons::OrderConsumer* createOrderConsumer(ons::ONSFactoryProperty factoryProperty) throw( + ons::ONSClientException); + virtual ons::TransactionProducer* createTransactionProducer( + ons::ONSFactoryProperty factoryProperty, ons::LocalTransactionChecker* checker) throw(ons::ONSClientException); + virtual ons::PullConsumer* createPullConsumer(ons::ONSFactoryProperty factoryProperty) throw(ons::ONSClientException); + virtual ons::PushConsumer* createPushConsumer(ons::ONSFactoryProperty factoryProperty) throw(ons::ONSClientException); +}; + +class ONSCLIENT_API ONSFactory { + public: + virtual ~ONSFactory(); + static ons::ONSFactoryAPI* getInstance(); + + private: + ONSFactory(); + static ons::ONSFactoryAPI* onsFactoryInstance; +}; +} // namespace ons +#endif diff --git a/src/main/cpp/include/OrderAction.h b/src/main/cpp/include/OrderAction.h new file mode 100644 index 0000000..297d285 --- /dev/null +++ b/src/main/cpp/include/OrderAction.h @@ -0,0 +1,11 @@ +#ifndef __ORDERACTION_H__ +#define __ORDERACTION_H__ + +// order consuming result +enum OrderAction { + // consume success, application could continue to consume next message + Success, + // consume fail, suspends the current queue + Suspend, +}; +#endif diff --git a/src/main/cpp/include/OrderConsumer.h b/src/main/cpp/include/OrderConsumer.h new file mode 100644 index 0000000..cce53b6 --- /dev/null +++ b/src/main/cpp/include/OrderConsumer.h @@ -0,0 +1,19 @@ +#ifndef __ORDERCONSUMER_H__ +#define __ORDERCONSUMER_H__ + +#include "MessageOrderListener.h" + +namespace ons { + +class ONSCLIENT_API OrderConsumer { + public: + OrderConsumer() {} + virtual ~OrderConsumer() {} + + virtual void start() = 0; + virtual void shutdown() = 0; + virtual void subscribe(const char* topic, const char* subExpression, + MessageOrderListener* listener) = 0; +}; +} +#endif diff --git a/src/main/cpp/include/OrderProducer.h b/src/main/cpp/include/OrderProducer.h new file mode 100644 index 0000000..8924996 --- /dev/null +++ b/src/main/cpp/include/OrderProducer.h @@ -0,0 +1,23 @@ +#ifndef __ORDERLYPRODUCER_H__ +#define __ORDERLYPRODUCER_H__ + +#include "Message.h" +#include "SendResultONS.h" + +namespace ons { + +class ONSCLIENT_API OrderProducer { + public: + OrderProducer() {} + virtual ~OrderProducer() {} + + // before send msg, start must be called to allocate resources. + virtual void start() = 0; + // before exit ons, shutdown must be called to release all resources allocated + // by ons internally. + virtual void shutdown() = 0; + + virtual SendResultONS send(Message& msg, std::string shardingKey) = 0; +}; +} +#endif diff --git a/src/main/cpp/include/Producer.h b/src/main/cpp/include/Producer.h new file mode 100644 index 0000000..2daf759 --- /dev/null +++ b/src/main/cpp/include/Producer.h @@ -0,0 +1,35 @@ +#ifndef __PRODUCER_H__ +#define __PRODUCER_H__ + +#include "Message.h" +#include "SendResultONS.h" +#include "ONSClientException.h" +#include "MessageQueueONS.h" +#include"ONSCallback.h" + +namespace ons { + +class ONSCLIENT_API Producer { + public: + Producer() {} + virtual ~Producer() {} + + // before send msg, start must be called to allocate resources. + virtual void start() = 0; + // before exit ons, shutdown must be called to release all resources allocated + // by ons internally. + virtual void shutdown() = 0; + // retry max 3 times if send failed. if no exception throwed, it sends + // success; + virtual ons::SendResultONS send(Message& msg) throw(ons::ONSClientException) = 0; + virtual ons::SendResultONS send(Message& msg, + const MessageQueueONS& mq) throw(ons::ONSClientException) = 0; + + // async send + virtual void sendAsync(Message& msg, ons::SendCallbackONS* callback) throw(ons::ONSClientException) = 0; + + // oneway send + virtual void sendOneway(Message& msg) throw(ons::ONSClientException) = 0; +}; +} +#endif diff --git a/src/main/cpp/include/PullConsumer.h b/src/main/cpp/include/PullConsumer.h new file mode 100755 index 0000000..5f94e5a --- /dev/null +++ b/src/main/cpp/include/PullConsumer.h @@ -0,0 +1,38 @@ +#ifndef __PULLCONSUMER_H__ +#define __PULLCONSUMER_H__ + +#include <string> +#include <vector> +#include "MessageQueueONS.h" +#include "PullResultONS.h" + +namespace ons { + +class ONSFactoryProperty; + +class ONSCLIENT_API PullConsumer { + public: + PullConsumer() {} + virtual ~PullConsumer() {} + + virtual void start() = 0; + virtual void shutdown() = 0; + virtual void fetchSubscribeMessageQueues( + const std::string& topic, std::vector<MessageQueueONS>& mqs) = 0; + virtual PullResultONS pull(const MessageQueueONS& mq, + const std::string& subExpression, long long offset, + int maxNums) = 0; + virtual long long searchOffset(const MessageQueueONS& mq, long long timestamp) = 0; + virtual long long maxOffset(const MessageQueueONS& mq) = 0; + virtual long long minOffset(const MessageQueueONS& mq) = 0; + virtual void updateConsumeOffset(const MessageQueueONS& mq, + long long offset) = 0; + virtual void removeConsumeOffset(const MessageQueueONS& mq) = 0; + virtual long long fetchConsumeOffset(const MessageQueueONS& mq, + bool fromStore) = 0; + virtual void persistConsumerOffset4PullConsumer(const MessageQueueONS& mq) + throw(ons::ONSClientException) = 0; +}; + +} +#endif diff --git a/src/main/cpp/include/PullResultONS.h b/src/main/cpp/include/PullResultONS.h new file mode 100644 index 0000000..2d939ea --- /dev/null +++ b/src/main/cpp/include/PullResultONS.h @@ -0,0 +1,40 @@ +#ifndef __PULLRESULTONS_H__ +#define __PULLRESULTONS_H__ + +#include <vector> +#include "Message.h" +#include "ONSClient.h" + +namespace ons { + +enum ONSPullStatus { + ONS_FOUND, + ONS_NO_NEW_MSG, + ONS_NO_MATCHED_MSG, + ONS_OFFSET_ILLEGAL, + ONS_BROKER_TIMEOUT // indicate pull request timeout or received NULL response +}; + +class ONSCLIENT_API PullResultONS { + public: + PullResultONS(ONSPullStatus status) + : pullStatus(status), nextBeginOffset(0), minOffset(0), maxOffset(0) {} + + PullResultONS(ONSPullStatus pullStatus, long long nextBeginOffset, + long long minOffset, long long maxOffset) + : pullStatus(pullStatus), + nextBeginOffset(nextBeginOffset), + minOffset(minOffset), + maxOffset(maxOffset) {} + + virtual ~PullResultONS() {} + + public: + ONSPullStatus pullStatus; + long long nextBeginOffset; + long long minOffset; + long long maxOffset; + std::vector<Message> msgFoundList; +}; +} +#endif diff --git a/src/main/cpp/include/PushConsumer.h b/src/main/cpp/include/PushConsumer.h new file mode 100644 index 0000000..925858c --- /dev/null +++ b/src/main/cpp/include/PushConsumer.h @@ -0,0 +1,20 @@ +#ifndef __PUSHCONSUMER_H__ +#define __PUSHCONSUMER_H__ + +#include "MessageListener.h" + +namespace ons { + +class ONSCLIENT_API PushConsumer { + public: + PushConsumer() {} + virtual ~PushConsumer() {} + + virtual void start() = 0; + virtual void shutdown() = 0; + virtual void subscribe(const char* topic, const char* subExpression, + MessageListener* listener) = 0; + // virtual void setNamesrvAddr(const std::string& nameSrvAddr) = 0; +}; +} +#endif diff --git a/src/main/cpp/include/SendResultONS.h b/src/main/cpp/include/SendResultONS.h new file mode 100644 index 0000000..d21ec12 --- /dev/null +++ b/src/main/cpp/include/SendResultONS.h @@ -0,0 +1,19 @@ +#ifndef __SENDRESULTONS_H__ +#define __SENDRESULTONS_H__ +#include <string> +#include "ONSClient.h" + +namespace ons { + +class ONSCLIENT_API SendResultONS { + public: + SendResultONS(); + virtual ~SendResultONS(); + void setMessageId(const std::string& msgId); + const char* getMessageId() const; + + private: + std::string messageId; +}; +} +#endif diff --git a/src/main/cpp/include/TransactionProducer.h b/src/main/cpp/include/TransactionProducer.h new file mode 100644 index 0000000..bceadb0 --- /dev/null +++ b/src/main/cpp/include/TransactionProducer.h @@ -0,0 +1,29 @@ +#ifndef __TRANSACTIONPRODUCER_H__ +#define __TRANSACTIONPRODUCER_H__ + +#include "LocalTransactionExecuter.h" +#include "ONSClient.h" +#include "SendResultONS.h" + +namespace ons { + class ONSCLIENT_API TransactionProducer { + public: + TransactionProducer() {} + + virtual ~TransactionProducer() {} + + // before send msg, start must be called to allocate resources. + virtual void start() = 0; + + // before exit ons, shutdown must be called to release all resources allocated + // by ons internally. + virtual void shutdown() = 0; + + // retry max 3 times if send failed. if no exception throwed, it sends + // success; + virtual SendResultONS send(Message &msg, + LocalTransactionExecuter *executer) = 0; + }; +} + +#endif diff --git a/src/main/cpp/include/TransactionStatus.h b/src/main/cpp/include/TransactionStatus.h new file mode 100644 index 0000000..c47cd99 --- /dev/null +++ b/src/main/cpp/include/TransactionStatus.h @@ -0,0 +1,13 @@ +#ifndef __TRANSACTIONSTATUS_H__ +#define __TRANSACTIONSTATUS_H__ + +namespace ons { + +enum TransactionStatus { + CommitTransaction = 0, + RollbackTransaction = 1, + Unknow = 2, +}; +} + +#endif diff --git a/src/test/cpp/MessageTest.cpp b/src/test/cpp/MessageTest.cpp new file mode 100644 index 0000000..8b54067 --- /dev/null +++ b/src/test/cpp/MessageTest.cpp @@ -0,0 +1,237 @@ +#include <iostream> +#include <chrono> +#include <gtest/gtest.h> +#include <memory> + +#include "Message.h" +#include "ONSFactory.h" + +class MessageTest : public testing::Test { +protected: + void SetUp() override { + ubody_ = new unsigned char[10]; + memset(ubody_, 0, 10); + strcpy(reinterpret_cast<char *>(ubody_), "RocketMQ"); + sbody = std::string("RocketMQ"); + } + + void TearDown() override { + delete ubody_; + } + + const char *topic_ = "Topic"; + const char *tag_ = "Tag"; + const char *key_ = "Key"; + const char *body_ = "RocketMQ"; + const char *user_key_ = "UserKey"; + const char *user_value_ = "UserValue"; + const char *sys_key_ = "SysKey"; + const char *sys_value_ = "SysValue"; + unsigned char *ubody_; + int bodylen = strlen("RocketMQ"); + std::string sbody; +}; + +TEST_F(MessageTest, testMessage_TopicBeingNULL) { + EXPECT_THROW( + ons::Message msg(NULL, tag_, key_, body_);, + ons::ONSClientException); +} + +TEST_F(MessageTest, testMessageWithoutKey_TopicBeingNULL) { + EXPECT_THROW( + ons::Message msg(NULL, tag_, body_);, + ons::ONSClientException); +} + +TEST_F(MessageTest, testMessage_TopicLenBeingZero) { + EXPECT_THROW( + ons::Message msg(topic_, 0, tag_, strlen(tag_), body_, strlen(body_));, + ons::ONSClientException); +} + +TEST_F(MessageTest, testMessage_BodyBeingNULL) { + EXPECT_THROW( + ons::Message msg(topic_, tag_, key_, NULL);, + ons::ONSClientException); +} + +TEST_F(MessageTest, testMessageWithoutKey_BodyBeingNULL) { + EXPECT_THROW( + ons::Message msg(topic_, tag_, NULL);, + ons::ONSClientException); +} + +TEST_F(MessageTest, testMessage_BodyLenBeingZero) { + EXPECT_THROW( + ons::Message msg(topic_, strlen(topic_), tag_, strlen(tag_), body_, 0);, + ons::ONSClientException); +} + +TEST_F(MessageTest, testMessageSysPerperty_KeyBeingNULL) { + ons::Message msg(topic_, tag_, key_, body_); + EXPECT_THROW( + msg.putSystemProperties(NULL, sys_value_);, + ons::ONSClientException); +} + +TEST_F(MessageTest, testMessageSysPerperty_ValueBeingNULL) { + ons::Message msg(topic_, tag_, key_, body_); + EXPECT_THROW( + msg.putSystemProperties(sys_key_, NULL);, + ons::ONSClientException); +} + +TEST_F(MessageTest, testMessageUserPerperty_KeyBeingNULL) { + ons::Message msg(topic_, tag_, key_, body_); + EXPECT_THROW( + msg.putSystemProperties(NULL, user_value_);, + ons::ONSClientException); +} + +TEST_F(MessageTest, testMessageUserPerperty_ValueBeingNULL) { + ons::Message msg(topic_, tag_, key_, body_); + EXPECT_THROW( + msg.putSystemProperties(user_key_, NULL);, + ons::ONSClientException); +} + +TEST_F(MessageTest, testMessageGetTopic_ValueBeingNormal) { + ons::Message msg(topic_, tag_, key_, body_); + ASSERT_STREQ(topic_, msg.getTopic()); +} + +TEST_F(MessageTest, testMessageetTopic_ValueBeingNormal) { + ons::Message msg("", tag_, key_, body_); + msg.setTopic(topic_); + ASSERT_STREQ(topic_, msg.getTopic()); +} + +TEST_F(MessageTest, testMessageGetTopic_ValueBeingNull) { + ons::Message msg("", tag_, key_, body_); + ASSERT_STREQ("", msg.getTopic()); +} + +TEST_F(MessageTest, testMessageSetTopic_ValueBeingNull) { + ons::Message msg("", tag_, key_, body_); + msg.setTopic(NULL); + ASSERT_STREQ("", msg.getTopic()); +} + +TEST_F(MessageTest, testMessageGetTag_ValueBeingNormal) { + ons::Message msg(topic_, tag_, key_, body_); + ASSERT_STREQ(tag_, msg.getTag()); +} + +TEST_F(MessageTest, testMessageSetTag_ValueBeingNormal) { + ons::Message msg(topic_, "", key_, body_); + msg.setTag(tag_); + ASSERT_STREQ(tag_, msg.getTag()); +} + +TEST_F(MessageTest, testMessageGetTag_ValueBeingNull) { + ons::Message msg(topic_, "", key_, body_); + ASSERT_STREQ("", msg.getTag()); +} + +TEST_F(MessageTest, testMessageSetTag_ValueBeingNull) { + ons::Message msg(topic_, "", key_, body_); + msg.setTag(NULL); + ASSERT_STREQ("", msg.getTag()); +} + +TEST_F(MessageTest, testMessageGetBody_ValueBeingNormal) { + ons::Message msg(topic_, tag_, key_, body_); + ASSERT_STREQ(body_, msg.getBody()); +} + +TEST_F(MessageTest, testMessageSetBody_ValueBeingNormal) { + ons::Message msg(topic_, tag_, key_, ""); + msg.setBody(ubody_, bodylen); + ASSERT_STREQ(body_, msg.getBody()); +} + +TEST_F(MessageTest, testMessageGetByteBody_ValueBeingNormal) { + int len = 0; + ons::Message msg(topic_, tag_, key_, body_); + const char *strBody = msg.getByteBody(&len); + ASSERT_STREQ(body_, strBody); + ASSERT_EQ(strlen(body_), len); +} + +TEST_F(MessageTest, testMessageSetBodyWithLen_ValueBeingNormal) { + int len = 0; + ons::Message msg(topic_, tag_, key_, ""); + msg.setBody(ubody_, bodylen); + const char *strBody = msg.getByteBody(&len); + ASSERT_STREQ(body_, strBody); + ASSERT_EQ(bodylen, len); +} + +TEST_F(MessageTest, testMessageGetBody_ValueBeingNull) { + ons::Message msg(topic_, tag_, key_, ""); + ASSERT_STREQ("", msg.getBody()); +} + +TEST_F(MessageTest, testMessageSetBody_ValueBeingNull) { + ons::Message msg(topic_, tag_, key_, ""); + msg.setBody(NULL, strlen(body_)); + ASSERT_STREQ("", msg.getBody()); +} + +TEST_F(MessageTest, testMessageGetMsgBody_ValueBeingString) { + ons::Message msg(topic_, tag_, key_, body_); + ASSERT_STREQ(sbody.c_str(), msg.getMsgBody().c_str()); +} + +TEST_F(MessageTest, testMessageGetMsgBodyLen_ValueBeingString) { + ons::Message msg(topic_, tag_, key_, body_); + ASSERT_EQ(sbody.length(), msg.getBodySize()); +} + +TEST_F(MessageTest, testMessageSetMsgBody_ValueBeingString) { + ons::Message msg(topic_, tag_, key_, ""); + msg.setMsgBody(sbody); + ASSERT_STREQ(sbody.c_str(), msg.getMsgBody().c_str()); +} + +TEST_F(MessageTest, testMessageGetUserPerperty_KeyBeingNull) { + ons::Message msg(topic_, tag_, key_, ""); + ASSERT_TRUE(NULL == msg.getUserProperties(NULL)); +} + +TEST_F(MessageTest, testMessageGetUserPerperty_KeyNotExsit) { + ons::Message msg(topic_, tag_, key_, ""); + ASSERT_STREQ("", msg.getUserProperties("NONEEXISTKEY")); +} + +TEST_F(MessageTest, testMessageGetSysPerperty_KeyBeingNull) { + ons::Message msg(topic_, tag_, key_, ""); + ASSERT_TRUE(NULL == msg.getSystemProperties(NULL)); +} + +TEST_F(MessageTest, testMessageGetSysPerperty_KeyNotExsit) { + ons::Message msg(topic_, tag_, key_, ""); + ASSERT_STREQ("", msg.getUserProperties("NONEEXISTKEY")); +} + +TEST_F(MessageTest, testMessageSetDeliverTime_ValueBeingNormal) { + ons::Message msg(topic_, tag_, key_, body_); + long long now = 123456789L; + msg.setStartDeliverTime(now); + ASSERT_EQ(now, msg.getStartDeliverTime()); +} + +TEST_F(MessageTest, testMessageSetDeliverTime_ValueBeingZero) { + ons::Message msg(topic_, tag_, key_, body_); + long long now = 0; + msg.setStartDeliverTime(now); + ASSERT_EQ(now, msg.getStartDeliverTime()); +} + +TEST_F(MessageTest, testMessageSetDeliverTime_ValueBeingNagetive) { + ons::Message msg(topic_, tag_, key_, body_); + long long now = -10000; + msg.setStartDeliverTime(now); + ASSERT_EQ(now, msg.getStartDeliverTime()); +}
