This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new e628502 Realization C asynSend (#65)
e628502 is described below
commit e628502f0b36ee073ecc938bb4e7b6f25c65b277
Author: githublaohu <[email protected]>
AuthorDate: Wed Jan 16 13:48:44 2019 +0800
Realization C asynSend (#65)
Realization C asynSend
---
example/{Producer.c => CAsyncProducer.c} | 44 ++++++++++++++++-------
example/Producer.c | 3 +-
example/PushConsumer.cpp | 2 +-
include/CCommon.h | 1 +
include/CMQException.h | 41 +++++++++++++++++++++
include/CProducer.h | 5 ++-
include/MQClientException.h | 10 ++++++
src/extern/CProducer.cpp | 62 +++++++++++++++++++++++++++++++-
test/src/UrlTest.cpp | 15 +++++++-
9 files changed, 165 insertions(+), 18 deletions(-)
diff --git a/example/Producer.c b/example/CAsyncProducer.c
similarity index 57%
copy from example/Producer.c
copy to example/CAsyncProducer.c
index cef8383..56496b5 100644
--- a/example/Producer.c
+++ b/example/CAsyncProducer.c
@@ -39,6 +39,17 @@ void thread_sleep(unsigned milliseconds) {
#endif
}
+void sendSuccessCallback(CSendResult result){
+ printf("Msg Send ID:%s\n", result.msgId);
+}
+
+void sendExceptionCallback(CMQException e){
+ printf("asyn send exception error : %d\n" , e.error);
+ printf("asyn send exception msg : %s\n" , e.msg);
+ printf("asyn send exception file : %s\n" , e.file);
+ printf("asyn send exception line : %d\n" , e.line);
+}
+
void startSendMessage(CProducer *producer) {
int i = 0;
char DestMsg[256];
@@ -49,27 +60,36 @@ void startSendMessage(CProducer *producer) {
for (i = 0; i < 10; i++) {
printf("send one message : %d\n", i);
memset(DestMsg, 0, sizeof(DestMsg));
- snprintf(DestMsg, 255, "New message body: index %d", i);
+ snprintf(DestMsg, sizeof(DestMsg), "New message body: index %d", i);
SetMessageBody(msg, DestMsg);
- SendMessageSync(producer, msg, &result);
- printf("Msg Send ID:%s\n", result.msgId);
+ int code = SendMessageAsync(producer, msg, sendSuccessCallback ,
sendExceptionCallback);
+ printf("Async send return code: %d\n", code);
thread_sleep(1000);
}
}
+void CreateProducerAndStartSendMessage(int i){
+ printf("Producer Initializing.....\n");
+ CProducer *producer = CreateProducer("Group_producer");
+ SetProducerNameServerAddress(producer, "127.0.0.1:9876");
+ if(i == 1){
+ SetProducerSendMsgTimeout(producer , 3);
+ }
+ StartProducer(producer);
+ printf("Producer start.....\n");
+ startSendMessage(producer);
+ ShutdownProducer(producer);
+ DestroyProducer(producer);
+ printf("Producer Shutdown!\n");
+}
int main(int argc, char *argv[]) {
- printf("Producer Initializing.....\n");
+ printf("Send Async successCallback.....\n");
+ CreateProducerAndStartSendMessage(0);
- CProducer *producer = CreateProducer("Group_producer");
- SetProducerNameServerAddress(producer, "172.17.0.2:9876");
- StartProducer(producer);
- printf("Producer start.....\n");
- startSendMessage(producer);
+ printf("Send Async exceptionCallback.....\n");
+ CreateProducerAndStartSendMessage(1);
- ShutdownProducer(producer);
- DestroyProducer(producer);
- printf("Producer Shutdown!\n");
return 0;
}
diff --git a/example/Producer.c b/example/Producer.c
index cef8383..5feedd6 100644
--- a/example/Producer.c
+++ b/example/Producer.c
@@ -62,11 +62,10 @@ int main(int argc, char *argv[]) {
printf("Producer Initializing.....\n");
CProducer *producer = CreateProducer("Group_producer");
- SetProducerNameServerAddress(producer, "172.17.0.2:9876");
+ SetProducerNameServerAddress(producer, "127.0.0.1:9876");
StartProducer(producer);
printf("Producer start.....\n");
startSendMessage(producer);
-
ShutdownProducer(producer);
DestroyProducer(producer);
printf("Producer Shutdown!\n");
diff --git a/example/PushConsumer.cpp b/example/PushConsumer.cpp
index d5ce021..119b7f2 100755
--- a/example/PushConsumer.cpp
+++ b/example/PushConsumer.cpp
@@ -81,7 +81,7 @@ int main(int argc, char *argv[]) {
if (info.syncpush) consumer.setAsyncPull(false); // set sync pull
if (info.broadcasting) {
- consumer.setMessageModel(BROADCASTING);
+ consumer.setMessageModel(rocketmq::BROADCASTING);
}
consumer.setInstanceName(info.groupname);
diff --git a/include/CCommon.h b/include/CCommon.h
index efcd2aa..eb9ffbc 100644
--- a/include/CCommon.h
+++ b/include/CCommon.h
@@ -36,6 +36,7 @@ typedef enum _CStatus_{
PRODUCER_SEND_SYNC_FAILED = 11,
PRODUCER_SEND_ONEWAY_FAILED = 12,
PRODUCER_SEND_ORDERLY_FAILED = 13,
+ PRODUCER_SEND_ASYNC_FAILED = 14,
PUSHCONSUMER_ERROR_CODE_START = 20,
PUSHCONSUMER_START_FAILED = 20,
diff --git a/include/CMQException.h b/include/CMQException.h
new file mode 100644
index 0000000..da26edd
--- /dev/null
+++ b/include/CMQException.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef __C_MQEXCPTION_H__
+#define __C_MQEXCPTION_H__
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define MAX_EXEPTION_CHAR_LENGTH 512
+
+typedef struct _CMQException_{
+ int error;
+ int line;
+ char file[MAX_EXEPTION_CHAR_LENGTH];
+ char msg[MAX_EXEPTION_CHAR_LENGTH];
+ char type[MAX_EXEPTION_CHAR_LENGTH];
+
+} CMQException;
+
+#ifdef __cplusplus
+};
+#endif
+#endif
diff --git a/include/CProducer.h b/include/CProducer.h
index 6edd99e..e75622c 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -18,9 +18,9 @@
#ifndef __C_PRODUCER_H__
#define __C_PRODUCER_H__
-#include "CCommon.h"
#include "CMessage.h"
#include "CSendResult.h"
+#include "CMQException.h"
#ifdef __cplusplus
extern "C" {
@@ -29,6 +29,8 @@ extern "C" {
//typedef struct _CProducer_ _CProducer;
typedef struct CProducer CProducer;
typedef int(*QueueSelectorCallback)(int size, CMessage *msg, void *arg);
+typedef void(*CSendSuccessCallback)(CSendResult result);
+typedef void(*CSendExceptionCallback)(CMQException e);
ROCKETMQCLIENT_API CProducer *CreateProducer(const char *groupId);
ROCKETMQCLIENT_API int DestroyProducer(CProducer *producer);
@@ -49,6 +51,7 @@ ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer
*producer, int level);
ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer *producer, int
size);
ROCKETMQCLIENT_API int SendMessageSync(CProducer *producer, CMessage *msg,
CSendResult *result);
+ROCKETMQCLIENT_API int SendMessageAsync(CProducer *producer, CMessage *msg,
CSendSuccessCallback cSendSuccessCallback , CSendExceptionCallback
cSendExceptionCallback);
ROCKETMQCLIENT_API int SendMessageOneway(CProducer *producer,CMessage *msg);
ROCKETMQCLIENT_API int SendMessageOrderly(CProducer *producer, CMessage *msg,
QueueSelectorCallback callback, void *arg, int autoRetryTimes, CSendResult
*result);
#ifdef __cplusplus
diff --git a/include/MQClientException.h b/include/MQClientException.h
index bf29863..2c3d2ed 100755
--- a/include/MQClientException.h
+++ b/include/MQClientException.h
@@ -21,12 +21,17 @@
#include <ostream>
#include <sstream>
#include <string>
+
+#include <string.h>
#include "RocketMQClient.h"
+#include "CCommon.h"
+
namespace rocketmq {
//<!***************************************************************************
class ROCKETMQCLIENT_API MQException : public std::exception {
+
public:
MQException(const std::string& msg, int error, const char* file,
int line) throw()
@@ -60,6 +65,10 @@ class ROCKETMQCLIENT_API MQException : public std::exception
{
virtual const char* GetType() const throw() { return m_type.c_str(); }
+ int GetLine() { return m_line;}
+
+ const char* GetFile() { return m_file.c_str(); }
+
protected:
int m_error;
int m_line;
@@ -68,6 +77,7 @@ class ROCKETMQCLIENT_API MQException : public std::exception {
std::string m_type;
};
+
inline std::ostream& operator<<(std::ostream& os, const MQException& e) {
os << "Type: " << e.GetType() << " , " << e.what();
return os;
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 0715942..c238e59 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -16,10 +16,17 @@
*/
#include "DefaultMQProducer.h"
+#include "AsyncCallback.h"
+
#include "CProducer.h"
#include "CCommon.h"
-#include <string.h>
+#include "CSendResult.h"
#include "CMessage.h"
+#include "CMQException.h"
+
+#include <string.h>
+#include <typeinfo>
+
#ifdef __cplusplus
extern "C" {
@@ -45,6 +52,35 @@ private:
QueueSelectorCallback m_pCallback;
};
+class CSendCallback : public AutoDeleteSendCallBack{
+public:
+ CSendCallback(CSendSuccessCallback
cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
+ m_cSendSuccessCallback = cSendSuccessCallback;
+ m_cSendExceptionCallback = cSendExceptionCallback;
+ }
+ virtual ~CSendCallback(){}
+ virtual void onSuccess(SendResult& sendResult) {
+ CSendResult result;
+ result.sendStatus = CSendStatus((int)
sendResult.getSendStatus());
+ result.offset = sendResult.getQueueOffset();
+ strncpy(result.msgId, sendResult.getMsgId().c_str(),
MAX_MESSAGE_ID_LENGTH - 1);
+ result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+ m_cSendSuccessCallback(result);
+
+ }
+ virtual void onException(MQException& e) {
+ CMQException exception;
+ exception.error = e.GetError();
+ exception.line = e.GetLine();
+ strncpy(exception.msg, e.what(), MAX_EXEPTION_CHAR_LENGTH - 1);
+ strncpy(exception.file, e.GetFile(), MAX_EXEPTION_CHAR_LENGTH - 1);
+ m_cSendExceptionCallback( exception );
+ }
+private:
+ CSendSuccessCallback m_cSendSuccessCallback;
+ CSendExceptionCallback m_cSendExceptionCallback;
+};
+
CProducer *CreateProducer(const char *groupId) {
if (groupId == NULL) {
@@ -127,6 +163,30 @@ int SendMessageSync(CProducer *producer, CMessage *msg,
CSendResult *result) {
return OK;
}
+int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback
cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
+ if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL ||
cSendExceptionCallback == NULL) {
+ return NULL_POINTER;
+ }
+ DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
+ MQMessage *message = (MQMessage *) msg;
+ CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback ,
cSendExceptionCallback);
+
+ try {
+ defaultMQProducer->send(*message ,cSendCallback);
+ } catch (exception &e) {
+ if(cSendCallback != NULL){
+ if(typeid(e) == typeid( MQException )){
+ MQException &mqe = (MQException &)e;
+ cSendCallback->onException( mqe );
+ }
+ delete cSendCallback;
+ cSendCallback = NULL;
+ }
+ return PRODUCER_SEND_ASYNC_FAILED;
+ }
+ return OK;
+}
+
int SendMessageOneway(CProducer *producer, CMessage *msg) {
if (producer == NULL || msg == NULL) {
return NULL_POINTER;
diff --git a/test/src/UrlTest.cpp b/test/src/UrlTest.cpp
index c7dead0..e310cf4 100644
--- a/test/src/UrlTest.cpp
+++ b/test/src/UrlTest.cpp
@@ -20,6 +20,15 @@
#include "gtest/gtest.h"
#include "gmock/gmock.h"
+#include <stdio.h>
+
+#include "CProducer.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "CSendResult.h"
+#include "CMQException.h"
+#include <unistd.h>
+
using namespace std;
using ::testing::InitGoogleTest;
using ::testing::InitGoogleMock;
@@ -53,9 +62,13 @@ TEST(Url, Url) {
}
+
+
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
testing::GTEST_FLAG(filter) = "Url.Url";
- return RUN_ALL_TESTS();
+ int itestts = RUN_ALL_TESTS();
+ printf("i %d" , itestts);
+ return itestts;
}