diff --git a/example/CAsyncProducer.c b/example/CAsyncProducer.c
new file mode 100644
index 00000000..56496b5a
--- /dev/null
+++ b/example/CAsyncProducer.c
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <stdio.h>
+
+#include "CProducer.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "CSendResult.h"
+
+#ifdef _WIN32
+#include <windows.h>
+#else
+
+#include <unistd.h>
+#include <memory.h>
+
+#endif
+
+void thread_sleep(unsigned milliseconds) {
+#ifdef _WIN32
+    Sleep(milliseconds);
+#else
+    usleep(milliseconds * 1000);  // takes microseconds
+#endif
+}
+
+void sendSuccessCallback(CSendResult result){
+       printf("Msg Send ID:%s\n", result.msgId);
+}
+
+void sendExceptionCallback(CMQException e){
+       printf("asyn send exception error : %d\n" , e.error);
+       printf("asyn send exception msg : %s\n" , e.msg);
+       printf("asyn send exception file : %s\n" , e.file);
+       printf("asyn send exception line : %d\n" , e.line);
+}
+
+void startSendMessage(CProducer *producer) {
+    int i = 0;
+    char DestMsg[256];
+    CMessage *msg = CreateMessage("T_TestTopic");
+    SetMessageTags(msg, "Test_Tag");
+    SetMessageKeys(msg, "Test_Keys");
+    CSendResult result;
+    for (i = 0; i < 10; i++) {
+        printf("send one message : %d\n", i);
+        memset(DestMsg, 0, sizeof(DestMsg));
+        snprintf(DestMsg, sizeof(DestMsg), "New message body: index %d", i);
+        SetMessageBody(msg, DestMsg);
+        int code = SendMessageAsync(producer, msg, sendSuccessCallback , 
sendExceptionCallback);
+        printf("Async send return code: %d\n", code);
+        thread_sleep(1000);
+    }
+}
+
+void CreateProducerAndStartSendMessage(int i){
+       printf("Producer Initializing.....\n");
+       CProducer *producer = CreateProducer("Group_producer");
+       SetProducerNameServerAddress(producer, "127.0.0.1:9876");
+       if(i == 1){
+               SetProducerSendMsgTimeout(producer , 3);
+       }
+       StartProducer(producer);
+       printf("Producer start.....\n");
+       startSendMessage(producer);
+       ShutdownProducer(producer);
+       DestroyProducer(producer);
+       printf("Producer Shutdown!\n");
+}
+
+int main(int argc, char *argv[]) {
+    printf("Send Async successCallback.....\n");
+    CreateProducerAndStartSendMessage(0);
+
+    printf("Send Async exceptionCallback.....\n");
+    CreateProducerAndStartSendMessage(1);
+
+    return 0;
+}
+
diff --git a/example/Producer.c b/example/Producer.c
index cef8383c..5feedd68 100644
--- a/example/Producer.c
+++ b/example/Producer.c
@@ -62,11 +62,10 @@ int main(int argc, char *argv[]) {
     printf("Producer Initializing.....\n");
 
     CProducer *producer = CreateProducer("Group_producer");
-    SetProducerNameServerAddress(producer, "172.17.0.2:9876");
+    SetProducerNameServerAddress(producer, "127.0.0.1:9876");
     StartProducer(producer);
     printf("Producer start.....\n");
     startSendMessage(producer);
-
     ShutdownProducer(producer);
     DestroyProducer(producer);
     printf("Producer Shutdown!\n");
diff --git a/example/PushConsumer.cpp b/example/PushConsumer.cpp
index d5ce0217..119b7f26 100755
--- a/example/PushConsumer.cpp
+++ b/example/PushConsumer.cpp
@@ -81,7 +81,7 @@ int main(int argc, char *argv[]) {
 
   if (info.syncpush) consumer.setAsyncPull(false);  // set sync pull
   if (info.broadcasting) {
-    consumer.setMessageModel(BROADCASTING);
+    consumer.setMessageModel(rocketmq::BROADCASTING);
   }
 
   consumer.setInstanceName(info.groupname);
diff --git a/include/CCommon.h b/include/CCommon.h
index efcd2aa6..eb9ffbce 100644
--- a/include/CCommon.h
+++ b/include/CCommon.h
@@ -36,6 +36,7 @@ typedef enum _CStatus_{
     PRODUCER_SEND_SYNC_FAILED = 11,
     PRODUCER_SEND_ONEWAY_FAILED = 12,
     PRODUCER_SEND_ORDERLY_FAILED = 13,
+       PRODUCER_SEND_ASYNC_FAILED = 14,
 
     PUSHCONSUMER_ERROR_CODE_START = 20,
     PUSHCONSUMER_START_FAILED = 20,
diff --git a/include/CMQException.h b/include/CMQException.h
new file mode 100644
index 00000000..da26edd9
--- /dev/null
+++ b/include/CMQException.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef __C_MQEXCPTION_H__
+#define __C_MQEXCPTION_H__
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define  MAX_EXEPTION_CHAR_LENGTH 512
+
+typedef struct _CMQException_{
+       int error;
+       int line;
+       char file[MAX_EXEPTION_CHAR_LENGTH];
+       char msg[MAX_EXEPTION_CHAR_LENGTH];
+       char type[MAX_EXEPTION_CHAR_LENGTH];
+
+} CMQException;
+
+#ifdef __cplusplus
+};
+#endif
+#endif
diff --git a/include/CProducer.h b/include/CProducer.h
index 6edd99e5..e75622c4 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -18,9 +18,9 @@
 #ifndef __C_PRODUCER_H__
 #define __C_PRODUCER_H__
 
-#include "CCommon.h"
 #include "CMessage.h"
 #include "CSendResult.h"
+#include "CMQException.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -29,6 +29,8 @@ extern "C" {
 //typedef struct _CProducer_ _CProducer;
 typedef struct CProducer CProducer;
 typedef int(*QueueSelectorCallback)(int size, CMessage *msg, void *arg);
+typedef void(*CSendSuccessCallback)(CSendResult result);
+typedef void(*CSendExceptionCallback)(CMQException e);
 
 ROCKETMQCLIENT_API CProducer *CreateProducer(const char *groupId);
 ROCKETMQCLIENT_API int DestroyProducer(CProducer *producer);
@@ -49,6 +51,7 @@ ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer 
*producer, int level);
 ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer *producer, int 
size);
 
 ROCKETMQCLIENT_API int SendMessageSync(CProducer *producer, CMessage *msg, 
CSendResult *result);
+ROCKETMQCLIENT_API int SendMessageAsync(CProducer *producer, CMessage *msg, 
CSendSuccessCallback cSendSuccessCallback , CSendExceptionCallback 
cSendExceptionCallback);
 ROCKETMQCLIENT_API int SendMessageOneway(CProducer *producer,CMessage *msg);
 ROCKETMQCLIENT_API int SendMessageOrderly(CProducer *producer, CMessage *msg, 
QueueSelectorCallback callback, void *arg, int autoRetryTimes, CSendResult 
*result);
 #ifdef __cplusplus
diff --git a/include/MQClientException.h b/include/MQClientException.h
index bf29863a..2c3d2eda 100755
--- a/include/MQClientException.h
+++ b/include/MQClientException.h
@@ -21,12 +21,17 @@
 #include <ostream>
 #include <sstream>
 #include <string>
+
+#include <string.h>
 #include "RocketMQClient.h"
+#include "CCommon.h"
+
 
 
 namespace rocketmq {
 //<!***************************************************************************
 class ROCKETMQCLIENT_API MQException : public std::exception {
+
  public:
   MQException(const std::string& msg, int error, const char* file,
               int line) throw()
@@ -60,6 +65,10 @@ class ROCKETMQCLIENT_API MQException : public std::exception 
{
 
   virtual const char* GetType() const throw() { return m_type.c_str(); }
 
+  int GetLine() { return m_line;}
+
+  const char* GetFile()  { return m_file.c_str(); }
+
  protected:
   int m_error;
   int m_line;
@@ -68,6 +77,7 @@ class ROCKETMQCLIENT_API MQException : public std::exception {
   std::string m_type;
 };
 
+
 inline std::ostream& operator<<(std::ostream& os, const MQException& e) {
   os << "Type: " << e.GetType() << " , " << e.what();
   return os;
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 0715942a..c238e592 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -16,10 +16,17 @@
  */
 
 #include "DefaultMQProducer.h"
+#include "AsyncCallback.h"
+
 #include "CProducer.h"
 #include "CCommon.h"
-#include <string.h>
+#include "CSendResult.h"
 #include "CMessage.h"
+#include "CMQException.h"
+
+#include <string.h>
+#include <typeinfo>
+
 
 #ifdef __cplusplus
 extern "C" {
@@ -45,6 +52,35 @@ class SelectMessageQueue : public MessageQueueSelector {
     QueueSelectorCallback m_pCallback;
 };
 
+class CSendCallback : public AutoDeleteSendCallBack{
+public:
+       CSendCallback(CSendSuccessCallback 
cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
+               m_cSendSuccessCallback = cSendSuccessCallback;
+               m_cSendExceptionCallback = cSendExceptionCallback;
+       }
+       virtual ~CSendCallback(){}
+       virtual void onSuccess(SendResult& sendResult) {
+               CSendResult result;
+               result.sendStatus = CSendStatus((int) 
sendResult.getSendStatus());
+               result.offset = sendResult.getQueueOffset();
+               strncpy(result.msgId, sendResult.getMsgId().c_str(), 
MAX_MESSAGE_ID_LENGTH - 1);
+               result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+               m_cSendSuccessCallback(result);
+
+       }
+    virtual void onException(MQException& e) {
+        CMQException exception;
+        exception.error = e.GetError();
+        exception.line  = e.GetLine();
+        strncpy(exception.msg, e.what(), MAX_EXEPTION_CHAR_LENGTH - 1);
+        strncpy(exception.file, e.GetFile(), MAX_EXEPTION_CHAR_LENGTH - 1);
+       m_cSendExceptionCallback( exception );
+    }
+private:
+       CSendSuccessCallback m_cSendSuccessCallback;
+       CSendExceptionCallback m_cSendExceptionCallback;
+};
+
 
 CProducer *CreateProducer(const char *groupId) {
     if (groupId == NULL) {
@@ -127,6 +163,30 @@ int SendMessageSync(CProducer *producer, CMessage *msg, 
CSendResult *result) {
     return OK;
 }
 
+int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback 
cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
+       if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || 
cSendExceptionCallback == NULL) {
+               return NULL_POINTER;
+       }
+       DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
+       MQMessage *message = (MQMessage *) msg;
+       CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback , 
cSendExceptionCallback);
+
+       try {
+               defaultMQProducer->send(*message ,cSendCallback);
+       } catch (exception &e) {
+               if(cSendCallback != NULL){
+                       if(typeid(e) == typeid( MQException )){
+                               MQException &mqe = (MQException &)e;
+                               cSendCallback->onException( mqe );
+                       }
+                       delete cSendCallback;
+                       cSendCallback = NULL;
+               }
+               return PRODUCER_SEND_ASYNC_FAILED;
+       }
+       return OK;
+}
+
 int SendMessageOneway(CProducer *producer, CMessage *msg) {
     if (producer == NULL || msg == NULL) {
         return NULL_POINTER;
diff --git a/test/src/UrlTest.cpp b/test/src/UrlTest.cpp
index c7dead00..e310cf44 100644
--- a/test/src/UrlTest.cpp
+++ b/test/src/UrlTest.cpp
@@ -20,6 +20,15 @@
 #include "gtest/gtest.h"
 #include "gmock/gmock.h"
 
+#include <stdio.h>
+
+#include "CProducer.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "CSendResult.h"
+#include "CMQException.h"
+#include <unistd.h>
+
 using namespace std;
 using ::testing::InitGoogleTest;
 using ::testing::InitGoogleMock;
@@ -53,9 +62,13 @@ TEST(Url, Url) {
 
 }
 
+
+
 int main(int argc, char* argv[]) {
        InitGoogleMock(&argc, argv);
 
        testing::GTEST_FLAG(filter) = "Url.Url";
-       return RUN_ALL_TESTS();
+       int itestts = RUN_ALL_TESTS();
+       printf("i %d" , itestts);
+       return itestts;
 }


With regards,
Apache Git Services

Reply via email to