vongosling closed pull request #28: [ISSUE #24]Catch CPP exception and convert 
to the error code in C API
URL: https://github.com/apache/rocketmq-client-cpp/pull/28
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/include/CCommon.h b/include/CCommon.h
index 7fcf680a..efcd2aa6 100644
--- a/include/CCommon.h
+++ b/include/CCommon.h
@@ -30,7 +30,22 @@ typedef enum _CStatus_{
     OK = 0,
     // Failed, null pointer value
     NULL_POINTER = 1,
+    MALLOC_FAILED = 2,
+    PRODUCER_ERROR_CODE_START = 10,
+    PRODUCER_START_FAILED = 10,
+    PRODUCER_SEND_SYNC_FAILED = 11,
+    PRODUCER_SEND_ONEWAY_FAILED = 12,
+    PRODUCER_SEND_ORDERLY_FAILED = 13,
+
+    PUSHCONSUMER_ERROR_CODE_START = 20,
+    PUSHCONSUMER_START_FAILED = 20,
+
+    PULLCONSUMER_ERROR_CODE_START = 30,
+    PULLCONSUMER_START_FAILED = 30,
+    PULLCONSUMER_FETCH_MQ_FAILED = 31,
+    PULLCONSUMER_FETCH_MESSAGE_FAILED = 32
 } CStatus;
+
 typedef enum _CLogLevel_{
     E_LOG_LEVEL_FATAL = 1,
     E_LOG_LEVEL_ERROR = 2,
diff --git a/include/CPullConsumer.h b/include/CPullConsumer.h
index 8c7f9002..c84f8d4a 100644
--- a/include/CPullConsumer.h
+++ b/include/CPullConsumer.h
@@ -39,16 +39,18 @@ int SetPullConsumerGroupID(CPullConsumer *consumer, const 
char *groupId);
 const char *GetPullConsumerGroupID(CPullConsumer *consumer);
 int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char 
*namesrv);
 int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char 
*accessKey, const char *secretKey,
-                                     const char *channel);
+                                      const char *channel);
 int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
 int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, 
long fileSize);
 int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);
 
-int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, 
CMessageQueue **mqs , int* size);
+int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, 
CMessageQueue **mqs, int *size);
 int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);
 
-CPullResult Pull(CPullConsumer *consumer,const CMessageQueue *mq, const char 
*subExpression, long long offset, int maxNums);
+CPullResult
+Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char 
*subExpression, long long offset, int maxNums);
 int ReleasePullResult(CPullResult pullResult);
+
 #ifdef __cplusplus
 };
 #endif
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index a97f1f16..67a99bd0 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -25,21 +25,23 @@
 extern "C" {
 #endif
 using namespace rocketmq;
+using namespace std;
 
 class SelectMessageQueue : public MessageQueueSelector {
- public:
-    SelectMessageQueue(QueueSelectorCallback callback){
+public:
+    SelectMessageQueue(QueueSelectorCallback callback) {
         m_pCallback = callback;
     }
 
     MQMessageQueue select(const std::vector<MQMessageQueue> &mqs,
-                        const MQMessage &msg, void *arg) {
-        CMessage * message = (CMessage *) &msg;
+                          const MQMessage &msg, void *arg) {
+        CMessage *message = (CMessage *) &msg;
         //Get the index of sending MQMessageQueue through callback function.
-        int index = m_pCallback(mqs.size(),message,arg);
+        int index = m_pCallback(mqs.size(), message, arg);
         return mqs[index];
     }
-  private:
+
+private:
     QueueSelectorCallback m_pCallback;
 };
 
@@ -62,7 +64,11 @@ int StartProducer(CProducer *producer) {
     if (producer == NULL) {
         return NULL_POINTER;
     }
-    ((DefaultMQProducer *) producer)->start();
+    try {
+        ((DefaultMQProducer *) producer)->start();
+    } catch (exception &e) {
+        return PRODUCER_START_FAILED;
+    }
     return OK;
 }
 int ShutdownProducer(CProducer *producer) {
@@ -85,57 +91,70 @@ int SendMessageSync(CProducer *producer, CMessage *msg, 
CSendResult *result) {
     if (producer == NULL || msg == NULL || result == NULL) {
         return NULL_POINTER;
     }
-    DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
-    MQMessage *message = (MQMessage *) msg;
-    SendResult sendResult = defaultMQProducer->send(*message);
-    switch (sendResult.getSendStatus()) {
-        case SEND_OK:
-            result->sendStatus = E_SEND_OK;
-            break;
-        case SEND_FLUSH_DISK_TIMEOUT:
-            result->sendStatus = E_SEND_FLUSH_DISK_TIMEOUT;
-            break;
-        case SEND_FLUSH_SLAVE_TIMEOUT:
-            result->sendStatus = E_SEND_FLUSH_SLAVE_TIMEOUT;
-            break;
-        case SEND_SLAVE_NOT_AVAILABLE:
-            result->sendStatus = E_SEND_SLAVE_NOT_AVAILABLE;
-            break;
-        default:
-            result->sendStatus = E_SEND_OK;
-            break;
-    }
-    result->offset = sendResult.getQueueOffset();
-    //strcpy(result->msgId, sendResult.getMsgId().c_str());
-    strncpy(result->msgId, sendResult.getMsgId().c_str(), 
MAX_MESSAGE_ID_LENGTH - 1);
-    result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
-    return OK;
-}
-
-int SendMessageOneway(CProducer *producer,CMessage *msg) {
+    try {
+        DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
+        MQMessage *message = (MQMessage *) msg;
+        SendResult sendResult = defaultMQProducer->send(*message);
+        switch (sendResult.getSendStatus()) {
+            case SEND_OK:
+                result->sendStatus = E_SEND_OK;
+                break;
+            case SEND_FLUSH_DISK_TIMEOUT:
+                result->sendStatus = E_SEND_FLUSH_DISK_TIMEOUT;
+                break;
+            case SEND_FLUSH_SLAVE_TIMEOUT:
+                result->sendStatus = E_SEND_FLUSH_SLAVE_TIMEOUT;
+                break;
+            case SEND_SLAVE_NOT_AVAILABLE:
+                result->sendStatus = E_SEND_SLAVE_NOT_AVAILABLE;
+                break;
+            default:
+                result->sendStatus = E_SEND_OK;
+                break;
+        }
+        result->offset = sendResult.getQueueOffset();
+        strncpy(result->msgId, sendResult.getMsgId().c_str(), 
MAX_MESSAGE_ID_LENGTH - 1);
+        result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+    } catch (exception &e) {
+        return PRODUCER_SEND_SYNC_FAILED;
+    }
+    return OK;
+}
+
+int SendMessageOneway(CProducer *producer, CMessage *msg) {
     if (producer == NULL || msg == NULL) {
         return NULL_POINTER;
     }
     DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
     MQMessage *message = (MQMessage *) msg;
-    defaultMQProducer->sendOneway(*message);
+    try {
+        defaultMQProducer->sendOneway(*message);
+    } catch (exception &e) {
+        return PRODUCER_SEND_ONEWAY_FAILED;
+    }
     return OK;
 }
 
-int SendMessageOrderly(CProducer *producer, CMessage *msg, 
QueueSelectorCallback callback, void *arg, int autoRetryTimes, CSendResult 
*result) {
-    if(producer == NULL || msg == NULL || callback == NULL || arg == NULL || 
result == NULL){
+int
+SendMessageOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback 
callback, void *arg, int autoRetryTimes,
+                   CSendResult *result) {
+    if (producer == NULL || msg == NULL || callback == NULL || arg == NULL || 
result == NULL) {
         return NULL_POINTER;
     }
     DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
     MQMessage *message = (MQMessage *) msg;
-    //Constructing SelectMessageQueue objects through function pointer callback
-    SelectMessageQueue selectMessageQueue(callback);
-    SendResult sendResult = 
defaultMQProducer->send(*message,&selectMessageQueue,arg,autoRetryTimes);
-    //Convert SendStatus to CSendStatus
-    result->sendStatus = CSendStatus((int)sendResult.getSendStatus());
-    result->offset = sendResult.getQueueOffset();
-    strncpy(result->msgId, sendResult.getMsgId().c_str(), 
MAX_MESSAGE_ID_LENGTH - 1);
-    result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+    try {
+        //Constructing SelectMessageQueue objects through function pointer 
callback
+        SelectMessageQueue selectMessageQueue(callback);
+        SendResult sendResult = defaultMQProducer->send(*message, 
&selectMessageQueue, arg, autoRetryTimes);
+        //Convert SendStatus to CSendStatus
+        result->sendStatus = CSendStatus((int) sendResult.getSendStatus());
+        result->offset = sendResult.getQueueOffset();
+        strncpy(result->msgId, sendResult.getMsgId().c_str(), 
MAX_MESSAGE_ID_LENGTH - 1);
+        result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+    } catch (exception &e) {
+        return PRODUCER_SEND_ORDERLY_FAILED;
+    }
     return OK;
 }
 
diff --git a/src/extern/CPullConsumer.cpp b/src/extern/CPullConsumer.cpp
index 65daae38..bd71ffb9 100644
--- a/src/extern/CPullConsumer.cpp
+++ b/src/extern/CPullConsumer.cpp
@@ -46,7 +46,11 @@ int StartPullConsumer(CPullConsumer *consumer) {
     if (consumer == NULL) {
         return NULL_POINTER;
     }
-    ((DefaultMQPullConsumer *) consumer)->start();
+    try {
+        ((DefaultMQPullConsumer *) consumer)->start();
+    } catch (exception &e) {
+        return PULLCONSUMER_START_FAILED;
+    }
     return OK;
 }
 int ShutdownPullConsumer(CPullConsumer *consumer) {
@@ -125,7 +129,7 @@ int FetchSubscriptionMessageQueues(CPullConsumer *consumer, 
const char *topic, C
         if (*mqs == NULL) {
             *size = 0;
             *mqs = NULL;
-            return NULL_POINTER;
+            return MALLOC_FAILED;
         }
         auto iter = fullMQ.begin();
         for (index = 0; iter != fullMQ.end() && index <= fullMQ.size(); 
++iter, index++) {
@@ -136,6 +140,7 @@ int FetchSubscriptionMessageQueues(CPullConsumer *consumer, 
const char *topic, C
     } catch (MQException &e) {
         *size = 0;
         *mqs = NULL;
+        return PULLCONSUMER_FETCH_MQ_FAILED;
     }
     return OK;
 }
@@ -147,12 +152,17 @@ int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs) {
     mqs = NULL;
     return OK;
 }
-CPullResult Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char 
*subExpression, long long offset, int maxNums) {
+CPullResult
+Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char 
*subExpression, long long offset, int maxNums) {
     CPullResult pullResult;
     memset(&pullResult, 0, sizeof(CPullResult));
     MQMessageQueue messageQueue(mq->topic, mq->brokerName, mq->queueId);
     PullResult cppPullResult;
-    cppPullResult = ((DefaultMQPullConsumer *) consumer)->pull(messageQueue, 
subExpression, offset, maxNums);
+    try {
+        cppPullResult = ((DefaultMQPullConsumer *) 
consumer)->pull(messageQueue, subExpression, offset, maxNums);
+    }catch (exception &e){
+        cppPullResult.pullStatus = BROKER_TIMEOUT;
+    }
 
     switch (cppPullResult.pullStatus) {
         case FOUND: {
diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp
index 39e516aa..9d3ab6be 100644
--- a/src/extern/CPushConsumer.cpp
+++ b/src/extern/CPushConsumer.cpp
@@ -54,12 +54,13 @@ class MessageListenerInner : public 
MessageListenerConcurrently {
     CPushConsumer *m_pconsumer;
 };
 
-class MessageListenerOrderlyInner:public MessageListenerOrderly {
+class MessageListenerOrderlyInner : public MessageListenerOrderly {
 public:
-    MessageListenerOrderlyInner(CPushConsumer *consumer, MessageCallBack 
pCallback){
+    MessageListenerOrderlyInner(CPushConsumer *consumer, MessageCallBack 
pCallback) {
         m_pconsumer = consumer;
         m_pMsgReceiveCallback = pCallback;
     }
+
     ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
         if (m_pMsgReceiveCallback == NULL) {
             return RECONSUME_LATER;
@@ -72,6 +73,7 @@ class MessageListenerOrderlyInner:public 
MessageListenerOrderly {
         }
         return CONSUME_SUCCESS;
     }
+
 private:
     MessageCallBack m_pMsgReceiveCallback;
     CPushConsumer *m_pconsumer;
@@ -103,7 +105,11 @@ int StartPushConsumer(CPushConsumer *consumer) {
     if (consumer == NULL) {
         return NULL_POINTER;
     }
-    ((DefaultMQPushConsumer *) consumer)->start();
+    try {
+        ((DefaultMQPushConsumer *) consumer)->start();
+    } catch (exception &e) {
+        return PUSHCONSUMER_START_FAILED;
+    }
     return OK;
 }
 int ShutdownPushConsumer(CPushConsumer *consumer) {
@@ -152,10 +158,10 @@ int RegisterMessageCallback(CPushConsumer *consumer, 
MessageCallBack pCallback)
 }
 
 int RegisterMessageCallbackOrderly(CPushConsumer *consumer, MessageCallBack 
pCallback) {
-    if(consumer == NULL || pCallback == NULL){
+    if (consumer == NULL || pCallback == NULL) {
         return NULL_POINTER;
     }
-    MessageListenerOrderlyInner *messageListenerOrderlyInner = new 
MessageListenerOrderlyInner(consumer,pCallback);
+    MessageListenerOrderlyInner *messageListenerOrderlyInner = new 
MessageListenerOrderlyInner(consumer, pCallback);
     ((DefaultMQPushConsumer *) 
consumer)->registerMessageListener(messageListenerOrderlyInner);
     g_OrderListenerMap[consumer] = messageListenerOrderlyInner;
     return OK;
@@ -164,13 +170,13 @@ int RegisterMessageCallbackOrderly(CPushConsumer 
*consumer, MessageCallBack pCal
 
 int UnregisterMessageCallbackOrderly(CPushConsumer *consumer) {
     if (consumer == NULL) {
-            return NULL_POINTER;
+        return NULL_POINTER;
     }
-    map<CPushConsumer *,MessageListenerOrderlyInner *>::iterator iter;
+    map<CPushConsumer *, MessageListenerOrderlyInner *>::iterator iter;
     iter = g_OrderListenerMap.find(consumer);
-    if(iter != g_OrderListenerMap.end()){
+    if (iter != g_OrderListenerMap.end()) {
         MessageListenerOrderlyInner *listenerInner = iter->second;
-        if(listenerInner != NULL){
+        if (listenerInner != NULL) {
             delete listenerInner;
         }
         g_OrderListenerMap.erase(iter);
@@ -195,11 +201,11 @@ int UnregisterMessageCallback(CPushConsumer *consumer) {
     return OK;
 }
 
-int SetPushConsumerMessageModel(CPushConsumer *consumer, CMessageModel 
messageModel){
-    if(consumer == NULL){
+int SetPushConsumerMessageModel(CPushConsumer *consumer, CMessageModel 
messageModel) {
+    if (consumer == NULL) {
         return NULL_POINTER;
     }
-    ((DefaultMQPushConsumer *) 
consumer)->setMessageModel(MessageModel((int)messageModel));
+    ((DefaultMQPushConsumer *) consumer)->setMessageModel(MessageModel((int) 
messageModel));
     return OK;
 }
 int SetPushConsumerThreadCount(CPushConsumer *consumer, int threadCount) {
@@ -226,7 +232,7 @@ int SetPushConsumerInstanceName(CPushConsumer *consumer, 
const char *instanceNam
 }
 
 int SetPushConsumerSessionCredentials(CPushConsumer *consumer, const char 
*accessKey, const char *secretKey,
-                                     const char *channel) {
+                                      const char *channel) {
     if (consumer == NULL) {
         return NULL_POINTER;
     }
@@ -247,7 +253,7 @@ int SetPushConsumerLogFileNumAndSize(CPushConsumer 
*consumer, int fileNum, long
     if (consumer == NULL) {
         return NULL_POINTER;
     }
-    ((DefaultMQPushConsumer *) 
consumer)->setLogFileSizeAndNum(fileNum,fileSize);
+    ((DefaultMQPushConsumer *) consumer)->setLogFileSizeAndNum(fileNum, 
fileSize);
     return OK;
 }
 
@@ -255,7 +261,7 @@ int SetPushConsumerLogLevel(CPushConsumer *consumer, 
CLogLevel level) {
     if (consumer == NULL) {
         return NULL_POINTER;
     }
-    ((DefaultMQPushConsumer *) consumer)->setLogLevel((elogLevel)level);
+    ((DefaultMQPushConsumer *) consumer)->setLogLevel((elogLevel) level);
     return OK;
 }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to