[ROCKETMQ-129] Initialized the rocketmq c++ client closes apache/incubator-rocketmq-externals#11
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/6a45c767 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/6a45c767 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/6a45c767 Branch: refs/heads/master Commit: 6a45c7677a9be286c85e8faf314a660814c705bb Parents: 2d2a949 Author: hooligan520 <[email protected]> Authored: Fri Apr 21 18:08:56 2017 +0800 Committer: dongeforever <[email protected]> Committed: Fri Apr 21 18:08:56 2017 +0800 ---------------------------------------------------------------------- rocketmq-client4cpp/.gitignore | 30 + rocketmq-client4cpp/LICENSE | 201 ++ rocketmq-client4cpp/Makefile | 5 + rocketmq-client4cpp/NOTICE | 5 + rocketmq-client4cpp/README.md | 21 + rocketmq-client4cpp/build.sh | 38 + rocketmq-client4cpp/clean.sh | 7 + rocketmq-client4cpp/docs/roadmap.md | 0 rocketmq-client4cpp/example/Makefile | 5 + .../example/demo/AsyncProducer.cpp | 249 +++ rocketmq-client4cpp/example/demo/Common.cpp | 20 + rocketmq-client4cpp/example/demo/Common.h | 162 ++ rocketmq-client4cpp/example/demo/Makefile | 40 + rocketmq-client4cpp/example/demo/Producer.cpp | 199 ++ .../example/demo/PullConsumer.cpp | 194 ++ .../example/demo/PushConsumer.cpp | 256 +++ .../include/AllocateMessageQueueStrategy.h | 54 + rocketmq-client4cpp/include/ClientConfig.h | 75 + .../include/ConsumeMessageHook.h | 45 + rocketmq-client4cpp/include/ConsumeType.h | 56 + rocketmq-client4cpp/include/DefaultMQProducer.h | 129 ++ .../include/DefaultMQPullConsumer.h | 154 ++ .../include/DefaultMQPushConsumer.h | 181 ++ rocketmq-client4cpp/include/MQAdmin.h | 66 + rocketmq-client4cpp/include/MQClientException.h | 105 + rocketmq-client4cpp/include/MQConsumer.h | 48 + rocketmq-client4cpp/include/MQProducer.h | 71 + rocketmq-client4cpp/include/MQPullConsumer.h | 54 + rocketmq-client4cpp/include/MQPushConsumer.h | 49 + rocketmq-client4cpp/include/Message.h | 136 ++ rocketmq-client4cpp/include/MessageExt.h | 108 + rocketmq-client4cpp/include/MessageListener.h | 94 + rocketmq-client4cpp/include/MessageQueue.h | 70 + .../include/MessageQueueListener.h | 38 + rocketmq-client4cpp/include/OffsetStore.h | 58 + rocketmq-client4cpp/include/PullCallback.h | 39 + rocketmq-client4cpp/include/PullResult.h | 91 + rocketmq-client4cpp/include/QueryResult.h | 56 + rocketmq-client4cpp/include/RocketMQClient.h | 100 + rocketmq-client4cpp/include/SendCallback.h | 39 + rocketmq-client4cpp/include/SendMessageHook.h | 50 + rocketmq-client4cpp/include/SendResult.h | 89 + rocketmq-client4cpp/include/TopicFilterType.h | 32 + rocketmq-client4cpp/rocketmq.mk | 6 + rocketmq-client4cpp/src/ClientConfig.cpp | 168 ++ .../src/ClientRemotingProcessor.cpp | 154 ++ .../src/ClientRemotingProcessor.h | 45 + rocketmq-client4cpp/src/CommunicationMode.h | 34 + rocketmq-client4cpp/src/FindBrokerResult.h | 28 + rocketmq-client4cpp/src/MQAdminImpl.cpp | 295 +++ rocketmq-client4cpp/src/MQAdminImpl.h | 63 + rocketmq-client4cpp/src/MQClientAPIImpl.cpp | 1323 ++++++++++++ rocketmq-client4cpp/src/MQClientAPIImpl.h | 280 +++ rocketmq-client4cpp/src/MQClientFactory.cpp | 1258 +++++++++++ rocketmq-client4cpp/src/MQClientFactory.h | 214 ++ rocketmq-client4cpp/src/MQClientManager.cpp | 75 + rocketmq-client4cpp/src/MQClientManager.h | 49 + rocketmq-client4cpp/src/Makefile | 26 + rocketmq-client4cpp/src/Makefile.std | 127 ++ rocketmq-client4cpp/src/RocketMQClient.cpp | 186 ++ rocketmq-client4cpp/src/common/ConsumeStats.h | 95 + rocketmq-client4cpp/src/common/FilterAPI.h | 72 + rocketmq-client4cpp/src/common/MQVersion.cpp | 88 + rocketmq-client4cpp/src/common/MQVersion.h | 184 ++ .../src/common/MessageSysFlag.cpp | 47 + rocketmq-client4cpp/src/common/MessageSysFlag.h | 46 + rocketmq-client4cpp/src/common/MixAll.cpp | 88 + rocketmq-client4cpp/src/common/MixAll.h | 62 + rocketmq-client4cpp/src/common/NamesrvConfig.h | 72 + rocketmq-client4cpp/src/common/NamesrvUtil.h | 29 + rocketmq-client4cpp/src/common/PermName.cpp | 63 + rocketmq-client4cpp/src/common/PermName.h | 39 + rocketmq-client4cpp/src/common/PullSysFlag.cpp | 68 + rocketmq-client4cpp/src/common/PullSysFlag.h | 38 + rocketmq-client4cpp/src/common/SendResult.cpp | 132 ++ rocketmq-client4cpp/src/common/ServiceState.h | 31 + .../src/common/ServiceThread.cpp | 73 + rocketmq-client4cpp/src/common/ServiceThread.h | 50 + .../src/common/SubscriptionGroupConfig.h | 50 + rocketmq-client4cpp/src/common/TopAddressing.h | 54 + rocketmq-client4cpp/src/common/TopicConfig.cpp | 167 ++ rocketmq-client4cpp/src/common/TopicConfig.h | 71 + .../src/common/TopicStatsTable.h | 51 + rocketmq-client4cpp/src/common/UtilAll.h | 608 ++++++ rocketmq-client4cpp/src/common/Validators.cpp | 132 ++ rocketmq-client4cpp/src/common/Validators.h | 49 + .../src/common/VirtualEnvUtil.cpp | 66 + rocketmq-client4cpp/src/common/VirtualEnvUtil.h | 41 + .../AllocateMessageQueueStrategyInner.h | 205 ++ .../ConsumeMessageConcurrentlyService.cpp | 476 ++++ .../ConsumeMessageConcurrentlyService.h | 120 + .../consumer/ConsumeMessageOrderlyService.cpp | 574 +++++ .../src/consumer/ConsumeMessageOrderlyService.h | 122 ++ .../src/consumer/ConsumeMessageService.h | 41 + .../src/consumer/ConsumeType.cpp | 70 + .../src/consumer/ConsumerInvokeCallback.cpp | 96 + .../src/consumer/ConsumerInvokeCallback.h | 40 + .../src/consumer/ConsumerStatManage.h | 132 ++ .../src/consumer/DefaultMQPullConsumer.cpp | 309 +++ .../src/consumer/DefaultMQPullConsumerImpl.cpp | 630 ++++++ .../src/consumer/DefaultMQPullConsumerImpl.h | 174 ++ .../src/consumer/DefaultMQPushConsumer.cpp | 399 ++++ .../src/consumer/DefaultMQPushConsumerImpl.cpp | 1018 +++++++++ .../src/consumer/DefaultMQPushConsumerImpl.h | 169 ++ .../src/consumer/LocalFileOffsetStore.cpp | 257 +++ .../src/consumer/LocalFileOffsetStore.h | 61 + .../src/consumer/MQConsumerInner.h | 46 + .../src/consumer/MessageQueueLock.h | 68 + .../src/consumer/ProcessQueue.cpp | 445 ++++ rocketmq-client4cpp/src/consumer/ProcessQueue.h | 102 + .../src/consumer/PullAPIWrapper.cpp | 222 ++ .../src/consumer/PullAPIWrapper.h | 67 + .../src/consumer/PullMessageService.cpp | 171 ++ .../src/consumer/PullMessageService.h | 56 + .../src/consumer/PullRequest.cpp | 108 + rocketmq-client4cpp/src/consumer/PullRequest.h | 59 + .../src/consumer/PullResultExt.h | 53 + .../src/consumer/RebalanceImpl.cpp | 613 ++++++ .../src/consumer/RebalanceImpl.h | 102 + .../src/consumer/RebalancePullImpl.cpp | 79 + .../src/consumer/RebalancePullImpl.h | 56 + .../src/consumer/RebalancePushImpl.cpp | 217 ++ .../src/consumer/RebalancePushImpl.h | 55 + .../src/consumer/RebalanceService.cpp | 55 + .../src/consumer/RebalanceService.h | 44 + .../src/consumer/RemoteBrokerOffsetStore.cpp | 266 +++ .../src/consumer/RemoteBrokerOffsetStore.h | 61 + .../src/consumer/SubscriptionData.cpp | 201 ++ .../src/consumer/SubscriptionData.h | 76 + rocketmq-client4cpp/src/jsoncpp/AUTHORS | 0 rocketmq-client4cpp/src/jsoncpp/LICENSE | 1 + rocketmq-client4cpp/src/jsoncpp/README.txt | 117 + .../src/jsoncpp/json/allocator.h | 96 + .../src/jsoncpp/json/assertions.h | 54 + rocketmq-client4cpp/src/jsoncpp/json/autolink.h | 25 + rocketmq-client4cpp/src/jsoncpp/json/config.h | 182 ++ rocketmq-client4cpp/src/jsoncpp/json/features.h | 59 + rocketmq-client4cpp/src/jsoncpp/json/forwards.h | 39 + rocketmq-client4cpp/src/jsoncpp/json/json.h | 15 + rocketmq-client4cpp/src/jsoncpp/json/reader.h | 406 ++++ rocketmq-client4cpp/src/jsoncpp/json/value.h | 868 ++++++++ rocketmq-client4cpp/src/jsoncpp/json/version.h | 20 + rocketmq-client4cpp/src/jsoncpp/json/writer.h | 333 +++ rocketmq-client4cpp/src/jsoncpp/json_reader.cpp | 2042 ++++++++++++++++++ rocketmq-client4cpp/src/jsoncpp/json_tool.h | 113 + rocketmq-client4cpp/src/jsoncpp/json_value.cpp | 1612 ++++++++++++++ .../src/jsoncpp/json_valueiterator.inl | 171 ++ rocketmq-client4cpp/src/jsoncpp/json_writer.cpp | 1220 +++++++++++ rocketmq-client4cpp/src/jsoncpp/version | 1 + rocketmq-client4cpp/src/kpr/AtomicValue.cpp | 146 ++ rocketmq-client4cpp/src/kpr/AtomicValue.h | 200 ++ rocketmq-client4cpp/src/kpr/Condition.cpp | 158 ++ rocketmq-client4cpp/src/kpr/Condition.h | 54 + rocketmq-client4cpp/src/kpr/Epoller.cpp | 96 + rocketmq-client4cpp/src/kpr/Epoller.h | 62 + rocketmq-client4cpp/src/kpr/Exception.h | 100 + rocketmq-client4cpp/src/kpr/FileUtil.cpp | 523 +++++ rocketmq-client4cpp/src/kpr/FileUtil.h | 90 + rocketmq-client4cpp/src/kpr/KPRTypes.h | 65 + rocketmq-client4cpp/src/kpr/KPRUtil.cpp | 76 + rocketmq-client4cpp/src/kpr/KPRUtil.h | 38 + rocketmq-client4cpp/src/kpr/Monitor.cpp | 125 ++ rocketmq-client4cpp/src/kpr/Monitor.h | 48 + rocketmq-client4cpp/src/kpr/Mutex.cpp | 296 +++ rocketmq-client4cpp/src/kpr/Mutex.h | 107 + rocketmq-client4cpp/src/kpr/RefHandle.h | 328 +++ rocketmq-client4cpp/src/kpr/ScopedLock.h | 91 + rocketmq-client4cpp/src/kpr/Semaphore.cpp | 73 + rocketmq-client4cpp/src/kpr/Semaphore.h | 42 + rocketmq-client4cpp/src/kpr/Thread.cpp | 191 ++ rocketmq-client4cpp/src/kpr/Thread.h | 68 + rocketmq-client4cpp/src/kpr/ThreadLocal.cpp | 56 + rocketmq-client4cpp/src/kpr/ThreadLocal.h | 37 + rocketmq-client4cpp/src/kpr/ThreadPool.cpp | 418 ++++ rocketmq-client4cpp/src/kpr/ThreadPool.h | 124 ++ rocketmq-client4cpp/src/kpr/ThreadPoolWork.h | 34 + .../src/kpr/TimerTaskManager.cpp | 91 + rocketmq-client4cpp/src/kpr/TimerTaskManager.h | 95 + rocketmq-client4cpp/src/kpr/TimerThread.cpp | 186 ++ rocketmq-client4cpp/src/kpr/TimerThread.h | 79 + rocketmq-client4cpp/src/message/Message.cpp | 379 ++++ .../src/message/MessageDecoder.cpp | 366 ++++ .../src/message/MessageDecoder.h | 64 + rocketmq-client4cpp/src/message/MessageExt.cpp | 244 +++ rocketmq-client4cpp/src/message/MessageId.h | 59 + .../src/message/MessageQueue.cpp | 153 ++ .../src/producer/DefaultMQProducer.cpp | 277 +++ .../src/producer/DefaultMQProducerImpl.cpp | 932 ++++++++ .../src/producer/DefaultMQProducerImpl.h | 205 ++ .../src/producer/LocalTransactionExecuter.h | 31 + .../src/producer/MQProducerInner.h | 44 + .../src/producer/MessageQueueSelector.h | 96 + .../src/producer/ProducerInvokeCallback.cpp | 101 + .../src/producer/ProducerInvokeCallback.h | 46 + .../src/producer/TopicPublishInfo.h | 141 ++ .../src/producer/TransactionCheckListener.h | 31 + .../src/producer/TransactionMQProducer.h | 118 + .../src/protocol/CommandCustomHeader.cpp | 672 ++++++ .../src/protocol/CommandCustomHeader.h | 604 ++++++ .../src/protocol/ConsumerRunningInfo.cpp | 168 ++ .../src/protocol/ConsumerRunningInfo.h | 97 + .../GetConsumerListByGroupResponseBody.h | 97 + .../src/protocol/HeartbeatData.cpp | 52 + .../src/protocol/HeartbeatData.h | 157 ++ rocketmq-client4cpp/src/protocol/KVTable.h | 58 + .../src/protocol/LockBatchBody.cpp | 112 + .../src/protocol/LockBatchBody.h | 73 + rocketmq-client4cpp/src/protocol/MQProtos.cpp | 248 +++ rocketmq-client4cpp/src/protocol/MQProtos.h | 150 ++ .../src/protocol/OffsetSerializeWrapper.h | 135 ++ .../src/protocol/RemotingCommand.cpp | 421 ++++ .../src/protocol/RemotingCommand.h | 153 ++ .../src/protocol/RemotingSerializable.h | 33 + rocketmq-client4cpp/src/protocol/TopicList.h | 60 + .../src/protocol/TopicRouteData.h | 279 +++ .../src/transport/InvokeCallback.h | 32 + .../src/transport/RemoteClientConfig.h | 67 + .../src/transport/ResponseFuture.cpp | 183 ++ .../src/transport/ResponseFuture.h | 77 + .../src/transport/SocketUtil.cpp | 250 +++ rocketmq-client4cpp/src/transport/SocketUtil.h | 75 + .../src/transport/TcpRemotingClient.cpp | 841 ++++++++ .../src/transport/TcpRemotingClient.h | 152 ++ .../src/transport/TcpRequestProcessor.h | 32 + .../src/transport/TcpTransport.cpp | 387 ++++ .../src/transport/TcpTransport.h | 78 + 226 files changed, 39032 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/.gitignore ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/.gitignore b/rocketmq-client4cpp/.gitignore new file mode 100644 index 0000000..c17af80 --- /dev/null +++ b/rocketmq-client4cpp/.gitignore @@ -0,0 +1,30 @@ +# Compiled Object files +*.slo +*.lo +*.o +*.d +*.obj + +# Precompiled Headers +*.gch +*.pch + +# Compiled Dynamic libraries +*.so +*.dylib +*.dll + +# Fortran module files +*.mod +*.smod + +# Compiled Static libraries +*.lai +*.la +*.a +*.lib + +# Executables +*.exe +*.out +*.app http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/LICENSE ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/LICENSE b/rocketmq-client4cpp/LICENSE new file mode 100644 index 0000000..5c304d1 --- /dev/null +++ b/rocketmq-client4cpp/LICENSE @@ -0,0 +1,201 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/Makefile ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/Makefile b/rocketmq-client4cpp/Makefile new file mode 100644 index 0000000..7b31385 --- /dev/null +++ b/rocketmq-client4cpp/Makefile @@ -0,0 +1,5 @@ +all: + make -C src all + +clean: + make -C src clean http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/NOTICE ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/NOTICE b/rocketmq-client4cpp/NOTICE new file mode 100644 index 0000000..5384857 --- /dev/null +++ b/rocketmq-client4cpp/NOTICE @@ -0,0 +1,5 @@ +Apache RocketMQ (incubating) +Copyright 2016-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/README.md ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/README.md b/rocketmq-client4cpp/README.md new file mode 100755 index 0000000..b4fa664 --- /dev/null +++ b/rocketmq-client4cpp/README.md @@ -0,0 +1,21 @@ +Fork from [RocketMQ-Client4CPP](https://github.com/NDPMediaCorp/RocketMQ-Client4CPP) + +[RocketMQ](https://github.com/alibaba/RocketMQ) C++ Client +=================== + +### Contributors +* @[kangliqiang](https://github.com/kangliqiang) +* @[lizhanhui](https://github.com/lizhanhui) +* @[suwenkuang](https://github.com/hooligan520) + +### Current status +* On the basis of the original repair a lot of coredump, and memory leak (valgrind) problem, add namespace, fill some of the features +* only support linux system +* Currently supports sending messages, support pull mode consumption messages, support push mode consumption messages + +### Roadmap +* Support for transaction messages +* Continue to support more commands (such as support broker query client running information) + + + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/build.sh ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/build.sh b/rocketmq-client4cpp/build.sh new file mode 100755 index 0000000..1519780 --- /dev/null +++ b/rocketmq-client4cpp/build.sh @@ -0,0 +1,38 @@ +#!/bin/sh +VERSION=1.0.3 +BUILD_PATH=`pwd` +INSTALL_PATH=$BUILD_PATH/release +RELEASE_PATH=/data/libs/rocketmq + +echo -e "\e[33;1m# copy include files...\e[0m" +mkdir -p $INSTALL_PATH +rm -rf $INSTALL_PATH/* +cp -rf $BUILD_PATH/rocketmq.mk $INSTALL_PATH/ +cp -rf $BUILD_PATH/include $INSTALL_PATH/ +cp -rf $BUILD_PATH/example $INSTALL_PATH/ + +echo -e "\e[33;1m# build target with BIT=32...\e[0m" +cd $BUILD_PATH/ +BIT=32 make clean >/dev/null +BIT=32 make all >/dev/null +mkdir -p $INSTALL_PATH/lib32 +cp -rf $BUILD_PATH/src/librocketmq.a $INSTALL_PATH/lib32/librocketmq.a + +echo -e "\e[33;1m# build target with BIT=64...\e[0m" +cd $BUILD_PATH/ +BIT=64 make clean >/dev/null +BIT=64 make all >/dev/null +mkdir -p $INSTALL_PATH/lib64 +cp -rf $BUILD_PATH/src/librocketmq.a $INSTALL_PATH/lib64/librocketmq.a + +echo -e "\e[33;1m# release libs...\e[0m" +cd $BUILD_PATH/ +tar czf rocketmq-client4cpp-${VERSION}.tgz release/ + +#rm -rf $RELEASE_PATH +#cp -rf $INSTALL_PATH $RELEASE_PATH + +echo -e "\e[33;1m# build example...\e[0m" +cd $INSTALL_PATH/example +make all >/dev/null + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/clean.sh ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/clean.sh b/rocketmq-client4cpp/clean.sh new file mode 100755 index 0000000..3cf8145 --- /dev/null +++ b/rocketmq-client4cpp/clean.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +BUILD_PATH=`pwd` +BIT=32 make -C ./src/ cleanall +BIT=64 make -C ./src/ cleanall +rm -rf release + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/docs/roadmap.md ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/docs/roadmap.md b/rocketmq-client4cpp/docs/roadmap.md new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/example/Makefile ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/example/Makefile b/rocketmq-client4cpp/example/Makefile new file mode 100755 index 0000000..25a9450 --- /dev/null +++ b/rocketmq-client4cpp/example/Makefile @@ -0,0 +1,5 @@ +all: + make -C demo all + +clean: + make -C demo clean http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/example/demo/AsyncProducer.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/example/demo/AsyncProducer.cpp b/rocketmq-client4cpp/example/demo/AsyncProducer.cpp new file mode 100644 index 0000000..045a32a --- /dev/null +++ b/rocketmq-client4cpp/example/demo/AsyncProducer.cpp @@ -0,0 +1,249 @@ +/** +* Copyright (C) 2013 suwenkuang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "Common.h" +#include "SendCallback.h" +#include "DefaultMQProducer.h" +using namespace rmq; + +long long g_lastUpdateTime = 0; +volatile long long g_cnt_total = 0; +volatile long long g_cnt_last = 0; +volatile long long g_cnt_succ = 0; +volatile long long g_cnt_fail = 0; + + +void Usage(const char* program) +{ + printf("Usage:%s ip:port [-g group] [-t topic] [-n count] [-s size] [-w logpath]\n", program); + printf("\t -g group\n"); + printf("\t -t topic\n"); + printf("\t -n message count\n"); + printf("\t -s message size \n"); + printf("\t -w log path\n"); +} + + +class SampleSendCallback : public SendCallback { +public: + SampleSendCallback() + { + } + + virtual ~SampleSendCallback() + { + } + + int count() + { + + long long now = MyUtil::getNowMs(); + long long old = g_lastUpdateTime; + long long total = g_cnt_succ + g_cnt_fail; + if ((now - old) >= 1000) + { + if (__sync_bool_compare_and_swap(&g_lastUpdateTime, old, now)) + { + long long time = now - old; + int tps = (int)((total - g_cnt_last) * 1.0 / time * 1000.0); + g_cnt_last = total; + + MYDEBUG("[producer]succ: %lld, fail: %lld, TPS: %d\n", + g_cnt_succ, g_cnt_fail, tps); + } + } + } + + void onSuccess(SendResult& sendResult) + { + int cnt = __sync_fetch_and_add(&g_cnt_total, 1); + __sync_fetch_and_add(&g_cnt_succ, 1); + MYLOG("[%d]|succ|%s\n", cnt, sendResult.toString().c_str()); + } + + void onException(MQException& e) + { + int cnt = __sync_fetch_and_add(&g_cnt_total, 1); + __sync_fetch_and_add(&g_cnt_fail, 1); + + MYLOG("[%d]|fail|%s\n", cnt, e.what()); + } +}; + +int main(int argc, char *argv[]) { + if (argc < 2) + { + Usage(argv[0]); + return 0; + } + + std::string namesrv = argv[1]; + std::string group = "pg_test_group"; + std::string topic = "topic_test"; + int size = 32; + int count = 1000; + + for (int i=2; i< argc; i++) + { + if (strcmp(argv[i],"-g")==0) + { + if (i+1 < argc) + { + group = argv[i+1]; + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-t")==0) + { + if (i+1 < argc) + { + topic = argv[i+1]; + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-n")==0) + { + if (i+1 < argc) + { + count = atoi(argv[i+1]); + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-s")==0) + { + if (i+1 < argc) + { + size = atoi(argv[i+1]); + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-w")==0) + { + if (i+1 < argc) + { + MyUtil::initLog(argv[i+1]); + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else + { + Usage(argv[0]); + return 0; + } + } + + // init client api log, here is not necessary, need to debug the api need to be initialized, you can consider comment it + // Here only the default print warning, error log, the log will be rolling by day, if you need to modify the log level, please set the environment variable, export ROCKETMQ_LOGLEVEL = loglevel + // The log level is as follows: + // 0 - close the log + // 1 - write error log + // 2 - write error, warning log + // 3 - write error, warning, info log + // 4 - write errors, warnings, info, debug logs + RocketMQUtil::initLog("/tmp/rocketmq_producer.log"); + + RMQ_DEBUG("producer.new: %s", "pg_CppClient"); + DefaultMQProducer producer("pg_CppClient"); + + RMQ_DEBUG("producer.setNamesrvAddr: %s", namesrv.c_str()); + producer.setNamesrvAddr(namesrv); + + RMQ_DEBUG("producer.start"); + producer.start(); + + std::string tags[] = { "TagA", "TagB", "TagC", "TagD", "TagE" }; + int nNow = time(NULL); + char key[64]; + char value[1024]; + + std::string str; + for (int i = 0; i < size; i += 8) + { + str.append("hello baby"); + } + + TimeCount tcTotal; + tcTotal.begin(); + + for (int i = 0; i < count; i++) + { + try + { + snprintf(key, sizeof(key), "KEY_%d_%d", nNow, i); + snprintf(value, sizeof(value), "%011d_%s", i, str.c_str()); + Message msg(topic,// topic + tags[i % 5],// tag + key,// key + value,// body + strlen(value)+1 + ); + + // Send messages asynchronously + SampleSendCallback* pSendCallback = new SampleSendCallback(); + producer.send(msg, pSendCallback); + } + catch (MQClientException& e) + { + std::cout << e << std::endl; + __sync_fetch_and_add(&g_cnt_fail, 1); + MyUtil::msleep(3000); + } + } + + while (1) + { + if ((g_cnt_succ + g_cnt_fail) >= count) + { + break; + } + } + + tcTotal.end(); + + printf("statsics: succ=%d, fail=%d, total_cost=%ds, tps=%d, avg=%dms\n", + g_cnt_succ, g_cnt_fail, tcTotal.countSec(), + (int)((double)count/((double)tcTotal.countUsec()/1000/1000)), tcTotal.countMsec()/count); + + producer.shutdown(); + + return 0; +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/example/demo/Common.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/example/demo/Common.cpp b/rocketmq-client4cpp/example/demo/Common.cpp new file mode 100755 index 0000000..b4db55c --- /dev/null +++ b/rocketmq-client4cpp/example/demo/Common.cpp @@ -0,0 +1,20 @@ +/** +* Copyright (C) 2013 suwenkuang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "Common.h" + + +std::string MyUtil::_logPath = ""; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/example/demo/Common.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/example/demo/Common.h b/rocketmq-client4cpp/example/demo/Common.h new file mode 100644 index 0000000..4863588 --- /dev/null +++ b/rocketmq-client4cpp/example/demo/Common.h @@ -0,0 +1,162 @@ +/** +* Copyright (C) 2013 suwenkuang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include <stdlib.h> +#include <stdio.h> +#include <stdint.h> +#include <string.h> +#include <assert.h> +#include <time.h> +#include <stdarg.h> +#include <fcntl.h> +#include <errno.h> +#include <signal.h> +#include <pthread.h> + +#include <sys/time.h> +#include <sys/timeb.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/file.h> +#include <sys/syscall.h> +#include <linux/unistd.h> + +#include <cstdio> +#include <iostream> +#include <string> +#include <sstream> +#include <vector> +#include <map> +#include <set> + + +#define MYDEBUG(fmt, args...) printf(fmt, ##args) +#define MYLOG(fmt, args...) MyUtil::writelog("[%s]"fmt, RocketMQUtil::now2str().c_str(), ##args) + +class MyUtil +{ +public: + static void msleep(long millis) + { + struct timespec tv; + tv.tv_sec = millis / 1000; + tv.tv_nsec = (millis % 1000) * 1000000; + nanosleep(&tv, 0); + } + + static long long str2ll( const char *str ) + { + return atoll(str); + } + + static unsigned long long getNowMs() + { + struct timeval tv; + gettimeofday(&tv, 0); + return tv.tv_sec * 1000ULL+tv.tv_usec/1000; + } + + static int initLog(const std::string& logPath) + { + _logPath = logPath; + } + + static void writelog(const char* fmt, ...) + { + if (_logPath.empty()) + { + return; + } + + static int logFd = -1; + if (logFd < 0) + { + logFd = open(_logPath.c_str(), O_CREAT | O_RDWR | O_APPEND, 0666); + } + + if (logFd > 0) + { + char buf[1024*128]; + buf[0] = buf[sizeof(buf) - 1] = '\0'; + + va_list ap; + va_start(ap, fmt); + int size = vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + write(logFd, buf, size); + } + + return; + } +public: + static std::string _logPath; +}; + +/* + * int test() + * { + * TimeCount tc; + * tc.begin(); + * func1(); + * tc.end(); + * cout << "cost:" << tc.countSec() << endl; + * } + */ +class TimeCount +{ +public: + TimeCount() + { + m_tBegin.tv_sec = 0; + m_tBegin.tv_usec = 0; + + m_tEnd.tv_sec = 0; + m_tEnd.tv_usec = 0; + } + + ~TimeCount(){} +public: + void begin() + { + gettimeofday(&m_tBegin,0); + } + + void end() + { + gettimeofday(&m_tEnd, 0); + } + + int countMsec() + { + return (int)((m_tEnd.tv_sec - m_tBegin.tv_sec)*1000 + (m_tEnd.tv_usec -m_tBegin.tv_usec)/1000.0); + } + + int countUsec() + { + return (m_tEnd.tv_sec - m_tBegin.tv_sec)*1000000+(m_tEnd.tv_usec -m_tBegin.tv_usec); + } + + int countSec() + { + return (m_tEnd.tv_sec - m_tBegin.tv_sec); + } + +public: + timeval m_tBegin; + timeval m_tEnd; +}; + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/example/demo/Makefile ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/example/demo/Makefile b/rocketmq-client4cpp/example/demo/Makefile new file mode 100755 index 0000000..be2d60f --- /dev/null +++ b/rocketmq-client4cpp/example/demo/Makefile @@ -0,0 +1,40 @@ +#!/bin/sh +TOPDIR=../.. + +CFLAGS=-g -ggdb -Wno-deprecated -fno-strict-aliasing -fno-omit-frame-pointer +INCLUDE=-I${TOPDIR}/include +LIB=-L${TOPDIR}/lib64 -lrocketmq -lz -lrt -lpthread +TARGET=Producer AsyncProducer PullConsumer PushConsumer +DEPEND_OBJ=Common.o + + +LOCAL_SRC += $(sort $(wildcard *.cpp *.cc *.c)) +LOCAL_OBJ += $(patsubst %.cpp,%.o, $(patsubst %.cc,%.o, $(patsubst %.c,%.o, $(LOCAL_SRC)))) + +all:${LOCAL_OBJ} ${TARGET} + @echo "build succ!" + +clean: + rm -vf ${TARGET} *.o + @echo "clean succ!" + +Producer:Producer.o ${DEPEND_OBJ} + $(CXX) $(CFLAGS) -o $@ $^ $(INCLUDE) $(LIB) + +AsyncProducer:AsyncProducer.o ${DEPEND_OBJ} + $(CXX) $(CFLAGS) -o $@ $^ $(INCLUDE) $(LIB) + +PullConsumer:PullConsumer.o ${DEPEND_OBJ} + $(CXX) $(CFLAGS) -o $@ $^ $(INCLUDE) $(LIB) + +PushConsumer:PushConsumer.o ${DEPEND_OBJ} + $(CXX) $(CFLAGS) -o $@ $^ $(INCLUDE) $(LIB) + +%.o: %.cpp + $(CXX) $(CFLAGS) $(INCLUDE) -o $@ -c $< + +%.o: %.cc + $(CXX) $(CFLAGS) $(INCLUDE) -o $@ -c $< + +%.o: %.c + $(CC) $(CFLAGS) $(INCLUDE) -o $@ -c $< http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/example/demo/Producer.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/example/demo/Producer.cpp b/rocketmq-client4cpp/example/demo/Producer.cpp new file mode 100644 index 0000000..9905e0b --- /dev/null +++ b/rocketmq-client4cpp/example/demo/Producer.cpp @@ -0,0 +1,199 @@ +/** +* Copyright (C) 2013 suwenkuang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "Common.h" +#include "DefaultMQProducer.h" +using namespace rmq; + +void Usage(const char* program) +{ + printf("Usage:%s ip:port [-g group] [-t topic] [-n count] [-s size] [-w logpath]\n", program); + printf("\t -g group\n"); + printf("\t -t topic\n"); + printf("\t -n message count\n"); + printf("\t -s message size \n"); + printf("\t -w log path\n"); +} + +int main(int argc, char* argv[]) +{ + if (argc < 2) + { + Usage(argv[0]); + return 0; + } + + std::string namesrv = argv[1]; + std::string group = "pg_test_group"; + std::string topic = "topic_test"; + int size = 32; + int count = 1000; + + for (int i=2; i< argc; i++) + { + if (strcmp(argv[i],"-g")==0) + { + if (i+1 < argc) + { + group = argv[i+1]; + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-t")==0) + { + if (i+1 < argc) + { + topic = argv[i+1]; + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-n")==0) + { + if (i+1 < argc) + { + count = atoi(argv[i+1]); + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-s")==0) + { + if (i+1 < argc) + { + size = atoi(argv[i+1]); + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-w")==0) + { + if (i+1 < argc) + { + MyUtil::initLog(argv[i+1]); + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else + { + Usage(argv[0]); + return 0; + } + } + + // init client api log, here is not necessary, need to debug the api need to be initialized, you can consider comment it + // Here only the default print warning, error log, the log will be rolling by day, if you need to modify the log level, please set the environment variable, export ROCKETMQ_LOGLEVEL = loglevel + // The log level is as follows: + // 0 - close the log + // 1 - write error log + // 2 - write error, warning log + // 3 - write error, warning, info log + // 4 - write errors, warnings, info, debug logs + RocketMQUtil::initLog("/tmp/rocketmq_producer.log"); + + RMQ_DEBUG("producer.new: %s", group.c_str()); + DefaultMQProducer producer(group); + + RMQ_DEBUG("producer.setNamesrvAddr: %s", namesrv.c_str()); + producer.setNamesrvAddr(namesrv); + + RMQ_DEBUG("producer.start"); + producer.start(); + + std::string tags[] = { "TagA", "TagB", "TagC", "TagD", "TagE" }; + + int _cost = 0, _tps = 0, _avg = 0, _min = 0, _max = 0; + int _failCnt = 0; + TimeCount tc; + TimeCount tcTotal; + tcTotal.begin(); + + int nNow = time(NULL); + char key[64]; + char value[1024]; + + std::string str; + for (int i = 0; i < size; i += 8) + { + str.append("hello baby"); + } + + for (int i = 0; i < count; i++) + { + try + { + + tc.begin(); + + snprintf(key, sizeof(key), "KEY_%d_%d", nNow, i); + snprintf(value, sizeof(value), "%011d_%s", i, str.c_str()); + Message msg(topic,// topic + tags[i % 5],// tag + key,// key + value,// body + strlen(value)+1 + ); + + // Send messages synchronously + SendResult sendResult = producer.send(msg); + + tc.end(); + + int cost = tc.countMsec(); + _min = (_min == 0) ? cost : (std::min(cost, _min)); + _max = (_max == 0) ? cost : (std::max(cost, _max)); + + MYLOG("[%d]|succ|cost:%dms, result:%s\n", i, cost, sendResult.toString().c_str()); + } + catch (MQClientException& e) + { + _failCnt++; + MYLOG("[%d]|fail|%s\n", i, e.what()); + } + } + tcTotal.end(); + + MYDEBUG("statsics: num=%d, fail=%d, total_cost=%ds, tps=%d, avg=%dms, min=%dms, max=%dms\n", + count, _failCnt, tcTotal.countSec(), (int)((double)count/(tcTotal.countMsec()/1000)), + tcTotal.countMsec()/count, _min, _max); + + // 忢ç产è + producer.shutdown(); + + return 0; +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/example/demo/PullConsumer.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/example/demo/PullConsumer.cpp b/rocketmq-client4cpp/example/demo/PullConsumer.cpp new file mode 100755 index 0000000..3fae6dc --- /dev/null +++ b/rocketmq-client4cpp/example/demo/PullConsumer.cpp @@ -0,0 +1,194 @@ +/** +* Copyright (C) 2013 suwenkuang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "Common.h" +#include "DefaultMQPullConsumer.h" +using namespace rmq; + +volatile long long g_totalCnt = 0; + +void PrintResult(PullResult& result) +{ + std::list<MessageExt*>::iterator it = result.msgFoundList.begin(); + for (;it!=result.msgFoundList.end();it++) + { + MessageExt* me = *it; + std::string str; + str.assign(me->getBody(),me->getBodyLen()); + + int cnt = __sync_fetch_and_add(&g_totalCnt, 1); + MYLOG("[%d]|%s|%s\n", cnt, me->toString().c_str(), str.c_str()); + } +} + + +void Usage(const char* program) +{ + printf("Usage:%s ip:port [-g group] [-t topic] [-w logpath]\n", program); + printf("\t -g consumer group\n"); + printf("\t -t topic\n"); + printf("\t -w log path\n"); +} + + +int main(int argc, char* argv[]) +{ + if (argc<2) + { + Usage(argv[0]); + return 0; + } + + std::string namesrv = argv[1]; + std::string group = "cg_test_pull_group"; + std::string topic = "topic_test"; + + for (int i=2; i< argc; i++) + { + if (strcmp(argv[i],"-g")==0) + { + if (i+1 < argc) + { + group = argv[i+1]; + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-t")==0) + { + if (i+1 < argc) + { + topic = argv[i+1]; + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-w")==0) + { + if (i+1 < argc) + { + MyUtil::initLog(argv[i+1]); + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else + { + Usage(argv[0]); + return 0; + } + } + + // init client api log, here is not necessary, need to debug the api need to be initialized, you can consider comment it + // Here only the default print warning, error log, the log will be rolling by day, if you need to modify the log level, please set the environment variable, export ROCKETMQ_LOGLEVEL = loglevel + // The log level is as follows: + // 0 - close the log + // 1 - write error log + // 2 - write error, warning log + // 3 - write error, warning, info log + // 4 - write errors, warnings, info, debug logs + RocketMQUtil::initLog("/tmp/rocketmq_pullconsumer.log"); + + RMQ_DEBUG("consumer.new: %s", group.c_str()); + DefaultMQPullConsumer consumer(group); + + RMQ_DEBUG("consumer.setNamesrvAddr: %s", namesrv.c_str()); + consumer.setNamesrvAddr(namesrv); + + RMQ_DEBUG("consumer.setMessageModel: %s", getMessageModelString(CLUSTERING)); + consumer.setMessageModel(CLUSTERING); + + consumer.setConsumerPullTimeoutMillis(4000); + consumer.setBrokerSuspendMaxTimeMillis(3000); + consumer.setConsumerTimeoutMillisWhenSuspend(5000); + + RMQ_DEBUG("consumer.start"); + consumer.start(); + + RMQ_DEBUG("consumer.fetchSubscribeMessageQueues"); + std::set<MessageQueue>* mqs = consumer.fetchSubscribeMessageQueues(topic); + + std::set<MessageQueue>::iterator it = mqs->begin(); + for (; it!=mqs->end(); it++) + { + MessageQueue mq = *it; + bool noNewMsg = false; + while (!noNewMsg) + { + try + { + RMQ_DEBUG("consumer.fetchConsumeOffset"); + long long offset = consumer.fetchConsumeOffset(mq, false); + if (offset < 0) + { + offset = consumer.maxOffset(mq); + if (offset < 0) + { + offset = LLONG_MAX; + } + } + + RMQ_DEBUG("consumer.pullBlockIfNotFound"); + //PullResult* pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32); + PullResult* pullResult = consumer.pull(mq, "*", offset, 32); + PrintResult(*pullResult); + + RMQ_DEBUG("consumer.updateConsumeOffset"); + consumer.updateConsumeOffset(mq, pullResult->nextBeginOffset); + + switch (pullResult->pullStatus) + { + case FOUND: + // TODO + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + noNewMsg = true; + break; + case OFFSET_ILLEGAL: + break; + default: + break; + } + + delete pullResult; + } + catch (MQException& e) + { + std::cout<<e<<std::endl; + } + } + } + delete mqs; + + RMQ_DEBUG("consumer.shutdown"); + consumer.shutdown(); + + return 0; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/example/demo/PushConsumer.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/example/demo/PushConsumer.cpp b/rocketmq-client4cpp/example/demo/PushConsumer.cpp new file mode 100755 index 0000000..1a8bfed --- /dev/null +++ b/rocketmq-client4cpp/example/demo/PushConsumer.cpp @@ -0,0 +1,256 @@ +/** +* Copyright (C) 2013 suwenkuang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "Common.h" +#include "DefaultMQPushConsumer.h" +using namespace rmq; + +volatile long long g_lastCnt = 0; +volatile long long g_totalCnt = 0; +long long g_lastUpdateTime = 0; + +static std::string bin2str(const std::string& strBin) +{ + if(strBin.size() == 0) + { + return ""; + } + + std::string sOut; + const char *p = (const char *)strBin.data(); + size_t len = strBin.size(); + + char sBuf[255]; + for (size_t i = 0; i < len; ++i, ++p) + { + snprintf(sBuf, sizeof(sBuf), "%02x", (unsigned char) *p); + sOut += sBuf; + } + + return sOut; +} + + +class MsgListener : public MessageListenerConcurrently +{ +public: + MsgListener() + { + consumeTimes = 0; + } + + ~MsgListener() + { + + } + + /** + * consume messages + * ï¼ï¼ï¼Noticeï¼multi-thread call, need to pay attention to dealing with multi-threaded re-entry problem + * @param msgs message list + * @param context context for consumer + * @return [CONSUME_SUCCESS- successï¼RECONSUME_LATER-consume fail and retry later] + */ + ConsumeConcurrentlyStatus consumeMessage(std::list<MessageExt*>& msgs, + ConsumeConcurrentlyContext& context) + { + int cnt = __sync_fetch_and_add(&g_totalCnt, 1); + long long now = MyUtil::getNowMs(); + long long old = g_lastUpdateTime; + if ((now - old) >= 1000) + { + if (__sync_bool_compare_and_swap(&g_lastUpdateTime, old, now)) + { + long long time = now - old; + int tps = (int)((g_totalCnt - g_lastCnt) * 1.0 / time * 1000.0); + g_lastCnt = g_totalCnt; + + MYDEBUG("[consume]msgcount: %lld, TPS: %d\n", g_totalCnt, tps); + } + } + + + MessageExt* msg = msgs.front(); + long long offset = msg->getQueueOffset(); + std::string maxOffset = msg->getProperty(Message::PROPERTY_MAX_OFFSET); + + long long diff = MyUtil::str2ll(maxOffset.c_str()) - offset; + if (diff > 100000) + { + if (diff % 10000 == 0) + { + MYDEBUG("overload, offset:%lld, diff:%lld\n", offset, diff); + } + // return CONSUME_SUCCESS; + } + + std::list<MessageExt*>::iterator it = msgs.begin(); + for (;it != msgs.end();it++) + { + MessageExt* me = *it; + std::string str; + str.assign(me->getBody(),me->getBodyLen()); + + MYLOG("[%d]|%s|%s\n", cnt, me->toString().c_str(), str.c_str()); + } + + consumeTimes++; + + /* + if ((consumeTimes % 2) == 0) + { + return RECONSUME_LATER; + } + else if ((consumeTimes % 3) == 0) + { + context.delayLevelWhenNextConsume = 5; + return RECONSUME_LATER; + } + */ + + // context.ackIndex = msgs.size() - 1; + return CONSUME_SUCCESS; + } + + int consumeTimes; +}; + + +void Usage(const char* program) +{ + printf("Usage:%s ip:port [-g group] [-t topic] [-w logpath]\n", program); + printf("\t -g consumer group\n"); + printf("\t -t topic\n"); + printf("\t -w log path\n"); +} + + +int main(int argc, char* argv[]) +{ + if (argc < 2) + { + Usage(argv[0]); + return 0; + } + + std::string namesrv = argv[1]; + std::string group = "cg_test_push_group"; + std::string topic = "topic_test"; + for (int i=2; i< argc; i++) + { + if (strcmp(argv[i],"-g")==0) + { + if (i+1 < argc) + { + group = argv[i+1]; + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-t")==0) + { + if (i+1 < argc) + { + topic = argv[i+1]; + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else if (strcmp(argv[i],"-w")==0) + { + if (i+1 < argc) + { + MyUtil::initLog(argv[i+1]); + i++; + } + else + { + Usage(argv[0]); + return 0; + } + } + else + { + Usage(argv[0]); + return 0; + } + } + + // init client api log, here is not necessary, need to debug the api need to be initialized, you can consider comment it + // Here only the default print warning, error log, the log will be rolling by day, if you need to modify the log level, please set the environment variable, export ROCKETMQ_LOGLEVEL = loglevel + // The log level is as follows: + // 0 - close the log + // 1 - write error log + // 2 - write error, warning log + // 3 - write error, warning, info log + // 4 - write errors, warnings, info, debug logs + RocketMQUtil::initLog("/tmp/rocketmq_pushconsumer.log"); + + RMQ_DEBUG("consumer.new: %s", group.c_str()); + DefaultMQPushConsumer consumer(group); + + RMQ_DEBUG("consumer.setNamesrvAddr: %s", namesrv.c_str()); + consumer.setNamesrvAddr(namesrv); + + RMQ_DEBUG("consumer.setMessageModel: %s", getMessageModelString(CLUSTERING)); + consumer.setMessageModel(CLUSTERING); + + RMQ_DEBUG("consumer.subscribe"); + consumer.subscribe(topic, "*"); + + consumer.setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET); + + // Set the number of each consumption message, the default is 1 + // consumer.setConsumeMessageBatchMaxSize(1); + + // The number of consumer thread pool, the default minimum 5, the maximum 25, the proposed set to the same, more stable + // consumer.setConsumeThreadMin(25); + // consumer.setConsumeThreadMax(25); + + // Single message consume timeout, default is 15 minutes + // When the consumption times out, message will be send back to the retry queue and re-delivered + // consumer.setConsumeTimeout(15); + + RMQ_DEBUG("consumer.registerMessageListener"); + MsgListener* listener = new MsgListener(); + consumer.registerMessageListener(listener); + + RMQ_DEBUG("consumer.start"); + consumer.start(); + + while(1) + { + if (getchar()=='e'&&getchar()=='x'&&getchar()=='i'&&getchar()=='t') + { + break; + } + ::sleep(1); + } + + RMQ_DEBUG("consumer.shutdown"); + consumer.shutdown(); + delete listener; + + return 0; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/AllocateMessageQueueStrategy.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/AllocateMessageQueueStrategy.h b/rocketmq-client4cpp/include/AllocateMessageQueueStrategy.h new file mode 100755 index 0000000..fc10072 --- /dev/null +++ b/rocketmq-client4cpp/include/AllocateMessageQueueStrategy.h @@ -0,0 +1,54 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_ALLOCATEMESSAGEQUEUESTRATEGY_H__ +#define __RMQ_ALLOCATEMESSAGEQUEUESTRATEGY_H__ + +#include <vector> +#include <string> +#include <list> + +#include "RocketMQClient.h" +#include "MessageQueue.h" + +namespace rmq +{ + /** + * Consumer Queue Automatic Assignment Policy + * + */ + class AllocateMessageQueueStrategy + { + public: + virtual ~AllocateMessageQueueStrategy() {} + + /** + * Assign queues to the current ConsumerId + * + * @param [currentCID] Current ConsumerId + * @param [mqAll] All queues of the current Topic, no duplicate data, and orderly + * @param [cidAll] All subscription groups for the current subscription group, without duplication of data, and orderly + * @return allocation results, no duplicate data + */ + virtual std::vector<MessageQueue>* allocate( + const std::string& consumerGroup, + const std::string& currentCID, + std::vector<MessageQueue>& mqAll, + std::list<std::string>& cidAll)=0; + virtual std::string getName()=0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/ClientConfig.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/ClientConfig.h b/rocketmq-client4cpp/include/ClientConfig.h new file mode 100755 index 0000000..ca36830 --- /dev/null +++ b/rocketmq-client4cpp/include/ClientConfig.h @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2010-2013 kangliqiang <[email protected]> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __RMQ_CLIENTCONFIG_H__ +#define __RMQ_CLIENTCONFIG_H__ + +#include <string> +#include "RocketMQClient.h" + +namespace rmq +{ + /** + * Producer and Consumer common configuration + * + */ + class ClientConfig + { + public: + ClientConfig(); + virtual ~ClientConfig(); + + std::string buildMQClientId(); + void changeInstanceNameToPID(); + + void resetClientConfig(const ClientConfig& cc); + ClientConfig cloneClientConfig(); + + std::string getNamesrvAddr(); + void setNamesrvAddr(const std::string& namesrvAddr); + + std::string getClientIP(); + void setClientIP(const std::string& clientIP); + + std::string getInstanceName(); + void setInstanceName(const std::string& instanceName); + + int getClientCallbackExecutorThreads(); + void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads); + + int getPollNameServerInterval(); + + void setPollNameServerInterval(int pollNameServerInterval); + + int getHeartbeatBrokerInterval(); + void setHeartbeatBrokerInterval(int heartbeatBrokerInterval); + + int getPersistConsumerOffsetInterval(); + void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval); + + std::string toString() const; + + private: + int m_clientCallbackExecutorThreads; + int m_pollNameServerInterval; + int m_heartbeatBrokerInterval; + int m_persistConsumerOffsetInterval; + std::string m_clientIP; + std::string m_instanceName; + std::string m_namesrvAddr; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/ConsumeMessageHook.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/ConsumeMessageHook.h b/rocketmq-client4cpp/include/ConsumeMessageHook.h new file mode 100644 index 0000000..c13e6c7 --- /dev/null +++ b/rocketmq-client4cpp/include/ConsumeMessageHook.h @@ -0,0 +1,45 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_CONSUMEMESSAGEHOOK_H__ +#define __RMQ_CONSUMEMESSAGEHOOK_H__ + +#include <list> + +#include "RocketMQClient.h" +#include "MessageQueue.h" + +namespace rmq +{ + typedef struct + { + std::string consumerGroup; + std::list<MessageExt*> msgList; + MessageQueue mq; + bool success; + void* arg; + } ConsumeMessageContext; + + class ConsumeMessageHook + { + public: + virtual ~ConsumeMessageHook() {} + virtual std::string hookName()=0; + virtual void consumeMessageBefore(const ConsumeMessageContext& context)=0; + virtual void consumeMessageAfter(const ConsumeMessageContext& context)=0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/ConsumeType.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/ConsumeType.h b/rocketmq-client4cpp/include/ConsumeType.h new file mode 100755 index 0000000..a4748ab --- /dev/null +++ b/rocketmq-client4cpp/include/ConsumeType.h @@ -0,0 +1,56 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_CONSUMETYPE_H__ +#define __RMQ_CONSUMETYPE_H__ + +#include "RocketMQClient.h" + +namespace rmq +{ + enum ConsumeType + { + /** + * Active comsume + */ + CONSUME_ACTIVELY, + /** + * Passive comsume + */ + CONSUME_PASSIVELY, + }; + + enum ConsumeFromWhere + { + CONSUME_FROM_LAST_OFFSET, + CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST, + CONSUME_FROM_MIN_OFFSET, + CONSUME_FROM_MAX_OFFSET, + CONSUME_FROM_FIRST_OFFSET, + CONSUME_FROM_TIMESTAMP, + }; + + enum MessageModel + { + BROADCASTING, + CLUSTERING, + }; + + const char* getConsumeTypeString(ConsumeType type); + const char* getConsumeFromWhereString(ConsumeFromWhere type); + const char* getMessageModelString(MessageModel type); +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/DefaultMQProducer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/DefaultMQProducer.h b/rocketmq-client4cpp/include/DefaultMQProducer.h new file mode 100755 index 0000000..9bf2f74 --- /dev/null +++ b/rocketmq-client4cpp/include/DefaultMQProducer.h @@ -0,0 +1,129 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __RMQ_DEFAULTMQPRODUCER_H__ +#define __RMQ_DEFAULTMQPRODUCER_H__ + +#include <string> +#include <list> +#include <vector> + +#include "RocketMQClient.h" +#include "MQClientException.h" +#include "Message.h" +#include "SendResult.h" +#include "ClientConfig.h" +#include "MQProducer.h" + +namespace rmq +{ + class MessageQueue; + class MessageQueueSelector; + class MQClientException; + class Message; + class DefaultMQProducerImpl; + + /** + * Message producer + * + */ + class DefaultMQProducer : public ClientConfig ,public MQProducer + { + public: + DefaultMQProducer(); + DefaultMQProducer(const std::string& producerGroup); + ~DefaultMQProducer(); + + //begin MQProducer + void start(); + void shutdown(); + + std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic); + + SendResult send(Message& msg); + SendResult send(Message& msg, MessageQueue& mq); + SendResult send(Message& msg, MessageQueueSelector* selector, void* arg); + void send(Message& msg, SendCallback* pSendCallback); + void send(Message& msg, MessageQueue& mq, SendCallback* pSendCallback); + void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* pSendCallback); + void sendOneway(Message& msg); + void sendOneway(Message& msg, MessageQueue& mq); + void sendOneway(Message& msg, MessageQueueSelector* selector, void* arg); + + TransactionSendResult sendMessageInTransaction(Message& msg, + LocalTransactionExecuter* pTranExecuter, + void* arg); + //end MQProducer + + void createTopic(const std::string& key, const std::string& newTopic, int queueNum) ; + long long searchOffset(const MessageQueue& mq, long long timestamp); + long long maxOffset(const MessageQueue& mq); + long long minOffset(const MessageQueue& mq); + long long earliestMsgStoreTime(const MessageQueue& mq); + MessageExt* viewMessage(const std::string& msgId) ; + QueryResult queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end); + + std::string getProducerGroup(); + void setProducerGroup(const std::string& producerGroup); + + std::string getCreateTopicKey(); + void setCreateTopicKey(const std::string& createTopicKey); + + int getSendMsgTimeout(); + void setSendMsgTimeout(int sendMsgTimeout) ; + + int getCompressMsgBodyOverHowmuch(); + void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch); + + int getCompressLevel(); + void setCompressLevel(int compressLevel); + + DefaultMQProducerImpl* getDefaultMQProducerImpl(); + + bool isRetryAnotherBrokerWhenNotStoreOK(); + void setRetryAnotherBrokerWhenNotStoreOK(bool retryAnotherBrokerWhenNotStoreOK); + + int getMaxMessageSize(); + void setMaxMessageSize(int maxMessageSize); + + int getDefaultTopicQueueNums(); + void setDefaultTopicQueueNums(int defaultTopicQueueNums); + + int getRetryTimesWhenSendFailed(); + void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed); + + protected: + DefaultMQProducerImpl* m_pDefaultMQProducerImpl; + + private: + std::string m_producerGroup; + std::string m_createTopicKey; + + int m_defaultTopicQueueNums; + int m_sendMsgTimeout; + int m_compressMsgBodyOverHowmuch; + int m_retryTimesWhenSendFailed; + bool m_retryAnotherBrokerWhenNotStoreOK; + int m_maxMessageSize; + int m_compressLevel; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/DefaultMQPullConsumer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/DefaultMQPullConsumer.h b/rocketmq-client4cpp/include/DefaultMQPullConsumer.h new file mode 100755 index 0000000..d9952c5 --- /dev/null +++ b/rocketmq-client4cpp/include/DefaultMQPullConsumer.h @@ -0,0 +1,154 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __RMQ_DEFAULTMQPULLCONSUMER_H__ +#define __RMQ_DEFAULTMQPULLCONSUMER_H__ + +#include <list> +#include <string> + +#include "RocketMQClient.h" +#include "MQClientException.h" +#include "MessageQueue.h" +#include "MessageExt.h" +#include "ClientConfig.h" +#include "MQPullConsumer.h" + +namespace rmq +{ + class OffsetStore; + class DefaultMQPullConsumerImpl; + class AllocateMessageQueueStrategy; + + /** + * Pull Consumer + * + */ + class DefaultMQPullConsumer : public ClientConfig , public MQPullConsumer + { + public: + DefaultMQPullConsumer(); + DefaultMQPullConsumer(const std::string& consumerGroup); + ~DefaultMQPullConsumer(); + + //MQAdmin + void createTopic(const std::string& key, const std::string& newTopic, int queueNum); + long long searchOffset(const MessageQueue& mq, long long timestamp); + long long maxOffset(const MessageQueue& mq); + long long minOffset(const MessageQueue& mq); + long long earliestMsgStoreTime(const MessageQueue& mq); + MessageExt* viewMessage(const std::string& msgId); + QueryResult queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end); + // MQadmin end + + AllocateMessageQueueStrategy* getAllocateMessageQueueStrategy(); + void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy); + int getBrokerSuspendMaxTimeMillis() ; + void setBrokerSuspendMaxTimeMillis(int brokerSuspendMaxTimeMillis); + std::string getConsumerGroup(); + void setConsumerGroup(const std::string& consumerGroup); + int getConsumerPullTimeoutMillis(); + void setConsumerPullTimeoutMillis(int consumerPullTimeoutMillis); + int getConsumerTimeoutMillisWhenSuspend() ; + void setConsumerTimeoutMillisWhenSuspend(int consumerTimeoutMillisWhenSuspend); + MessageModel getMessageModel(); + void setMessageModel(MessageModel messageModel); + MessageQueueListener* getMessageQueueListener(); + void setMessageQueueListener(MessageQueueListener* pMessageQueueListener); + std::set<std::string> getRegisterTopics(); + void setRegisterTopics( std::set<std::string> registerTopics); + + //MQConsumer + void sendMessageBack(MessageExt& msg, int delayLevel); + void sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName); + std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic); + void start(); + void shutdown() ; + //MQConsumer end + + //MQPullConsumer + void registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener); + PullResult* pull(MessageQueue& mq, const std::string& subExpression, long long offset,int maxNums); + void pull(MessageQueue& mq, + const std::string& subExpression, + long long offset, + int maxNums, + PullCallback* pPullCallback); + + PullResult* pullBlockIfNotFound(MessageQueue& mq, + const std::string& subExpression, + long long offset, + int maxNums); + + void pullBlockIfNotFound(MessageQueue& mq, + const std::string& subExpression, + long long offset, + int maxNums, + PullCallback* pPullCallback); + + void updateConsumeOffset(MessageQueue& mq, long long offset); + + long long fetchConsumeOffset(MessageQueue& mq, bool fromStore); + + std::set<MessageQueue>* fetchMessageQueuesInBalance(const std::string& topic); + //MQPullConsumer end + + OffsetStore* getOffsetStore(); + void setOffsetStore(OffsetStore* offsetStore); + + DefaultMQPullConsumerImpl* getDefaultMQPullConsumerImpl(); + + bool isUnitMode(); + void setUnitMode(bool isUnitMode); + + int getMaxReconsumeTimes(); + void setMaxReconsumeTimes(int maxReconsumeTimes); + + protected: + DefaultMQPullConsumerImpl* m_pDefaultMQPullConsumerImpl; + + private: + std::string m_consumerGroup; + int m_brokerSuspendMaxTimeMillis ; + + int m_consumerTimeoutMillisWhenSuspend; + int m_consumerPullTimeoutMillis; + + MessageModel m_messageModel; + MessageQueueListener* m_pMessageQueueListener; + + OffsetStore* m_pOffsetStore; + + std::set<std::string> m_registerTopics; + AllocateMessageQueueStrategy* m_pAllocateMessageQueueStrategy; + + /** + * Whether the unit of subscription group + */ + bool m_unitMode; + + /** + * max retry times��default is 15 + */ + int m_maxReconsumeTimes; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/DefaultMQPushConsumer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/DefaultMQPushConsumer.h b/rocketmq-client4cpp/include/DefaultMQPushConsumer.h new file mode 100755 index 0000000..25ef4fb --- /dev/null +++ b/rocketmq-client4cpp/include/DefaultMQPushConsumer.h @@ -0,0 +1,181 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_DEFAULTMQPUSHCONSUMER_H__ +#define __RMQ_DEFAULTMQPUSHCONSUMER_H__ + +#include <list> +#include <string> + +#include "RocketMQClient.h" +#include "MQClientException.h" +#include "Message.h" +#include "MessageExt.h" +#include "MessageQueue.h" +#include "MessageListener.h" +#include "PullResult.h" +#include "ClientConfig.h" +#include "MQPushConsumer.h" + +namespace rmq +{ + class AllocateMessageQueueStrategy; + class DefaultMQPushConsumerImpl; + class OffsetStore; + + /** + * Push Consumer + * + */ + class DefaultMQPushConsumer : public ClientConfig ,public MQPushConsumer + { + public: + DefaultMQPushConsumer(); + DefaultMQPushConsumer(const std::string& consumerGroup); + ~DefaultMQPushConsumer(); + + //MQAdmin + void createTopic(const std::string& key, const std::string& newTopic, int queueNum); + long long searchOffset(const MessageQueue& mq, long long timestamp); + long long maxOffset(const MessageQueue& mq); + long long minOffset(const MessageQueue& mq); + long long earliestMsgStoreTime(const MessageQueue& mq); + MessageExt* viewMessage(const std::string& msgId); + QueryResult queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end); + + // MQadmin end + + AllocateMessageQueueStrategy* getAllocateMessageQueueStrategy(); + void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy); + + int getConsumeConcurrentlyMaxSpan(); + void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan); + + ConsumeFromWhere getConsumeFromWhere(); + void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere); + + int getConsumeMessageBatchMaxSize(); + void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize); + + std::string getConsumerGroup(); + void setConsumerGroup(const std::string& consumerGroup) ; + + int getConsumeThreadMax() ; + void setConsumeThreadMax(int consumeThreadMax); + + int getConsumeThreadMin(); + void setConsumeThreadMin(int consumeThreadMin); + + MessageListener* getMessageListener(); + void setMessageListener(MessageListener* pMessageListener); + + MessageModel getMessageModel(); + void setMessageModel(MessageModel messageModel) ; + + int getPullBatchSize() ; + void setPullBatchSize(int pullBatchSize); + + long getPullInterval(); + void setPullInterval(long pullInterval); + + int getPullThresholdForQueue(); + void setPullThresholdForQueue(int pullThresholdForQueue); + + std::map<std::string, std::string>& getSubscription(); + void setSubscription(const std::map<std::string, std::string>& subscription); + + //MQConsumer + void sendMessageBack(MessageExt& msg, int delayLevel); + void sendMessageBack(MessageExt& msg, int delayLevel, const std::string brokerName); + std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic); + + void start(); + void shutdown(); + //MQConsumer end + + //MQPushConsumer + void registerMessageListener(MessageListener* pMessageListener); + + void subscribe(const std::string& topic, const std::string& subExpression); + void unsubscribe(const std::string& topic); + + void updateCorePoolSize(int corePoolSize); + + void suspend() ; + void resume(); + //MQPushConsumer end + + OffsetStore* getOffsetStore(); + void setOffsetStore(OffsetStore* offsetStore); + + std::string getConsumeTimestamp(); + void setConsumeTimestamp(std::string consumeTimestamp); + + DefaultMQPushConsumerImpl* getDefaultMQPushConsumerImpl(); + + bool isPostSubscriptionWhenPull(); + void setPostSubscriptionWhenPull(bool postSubscriptionWhenPull); + + bool isUnitMode(); + void setUnitMode(bool isUnitMode); + + int getMaxReconsumeTimes(); + void setMaxReconsumeTimes(int maxReconsumeTimes); + + int getSuspendCurrentQueueTimeMillis(); + void setSuspendCurrentQueueTimeMillis(int suspendCurrentQueueTimeMillis); + + int getConsumeTimeout(); + void setConsumeTimeout(int consumeTimeout); + + protected: + DefaultMQPushConsumerImpl* m_pDefaultMQPushConsumerImpl; + + private: + std::string m_consumerGroup; + MessageModel m_messageModel; + ConsumeFromWhere m_consumeFromWhere; + std::string m_consumeTimestamp; + + AllocateMessageQueueStrategy* m_pAllocateMessageQueueStrategy ; + std::map<std::string /* topic */, std::string /* sub expression */> m_subscription ; + + MessageListener* m_pMessageListener; + OffsetStore* m_pOffsetStore; + + int m_consumeThreadMin; + int m_consumeThreadMax; + + int m_consumeConcurrentlyMaxSpan; + int m_pullThresholdForQueue; + long m_pullInterval; + + int m_consumeMessageBatchMaxSize; + int m_pullBatchSize; + + bool m_postSubscriptionWhenPull; + bool m_unitMode; + int m_maxReconsumeTimes; + + long m_suspendCurrentQueueTimeMillis; + long m_consumeTimeout; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQAdmin.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQAdmin.h b/rocketmq-client4cpp/include/MQAdmin.h new file mode 100755 index 0000000..552a468 --- /dev/null +++ b/rocketmq-client4cpp/include/MQAdmin.h @@ -0,0 +1,66 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __RMQ_MQADMIN_H__ +#define __RMQ_MQADMIN_H__ + +#include <string> + +#include "RocketMQClient.h" +#include "MessageExt.h" + +namespace rmq +{ + class MQClientException; + class RemotingException; + class MQBrokerException; + class InterruptedException; + class MessageQueue; + class QueryResult; + + /** + * MQ Admin + * + */ + class MQAdmin + { + public: + MQAdmin() + { + } + + virtual ~MQAdmin() + { + } + + virtual void createTopic(const std::string& key, const std::string& newTopic, int queueNum)=0; + + virtual long long searchOffset(const MessageQueue& mq, long long timestamp)=0; + virtual long long maxOffset(const MessageQueue& mq)=0; + virtual long long minOffset(const MessageQueue& mq)=0; + + virtual long long earliestMsgStoreTime(const MessageQueue& mq)=0; + + virtual MessageExt* viewMessage(const std::string& msgId)=0; + virtual QueryResult queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end)=0; + }; +} + +#endif
