vongosling closed pull request #29: Add name server domain support and refactor 
pull consumer.
URL: https://github.com/apache/rocketmq-client-cpp/pull/29
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/example/PullConsumeMessage.c b/example/PullConsumeMessage.c
index 44e2abdb..8be2207c 100644
--- a/example/PullConsumeMessage.c
+++ b/example/PullConsumeMessage.c
@@ -40,19 +40,57 @@ void thread_sleep(unsigned milliseconds) {
 #endif
 }
 
-int main(int argc,char * argv [])
-{
-    int i = 0;
+int main(int argc, char *argv[]) {
+    int i = 0, j = 0;
+    int ret = 0;
+    int size = 0;
+    CMessageQueue *mqs = NULL;
     printf("PullConsumer Initializing....\n");
-    CPullConsumer* consumer = CreatePullConsumer("Group_Consumer_Test");
-    SetPullConsumerNameServerAddress(consumer,"172.17.0.2:9876");
+    CPullConsumer *consumer = CreatePullConsumer("Group_Consumer_Test");
+    SetPullConsumerNameServerAddress(consumer, "172.17.0.2:9876");
     StartPullConsumer(consumer);
     printf("Pull Consumer Start...\n");
-    for( i=0; i<10; i++)
-    {
-        printf("Now Running : %d S\n",i*10);
-        thread_sleep(10000);
+    for (i = 1; i <= 5; i++) {
+        printf("FetchSubscriptionMessageQueues : %d times\n", i);
+        ret = FetchSubscriptionMessageQueues(consumer, "T_TestTopic", &mqs, 
&size);
+        if(ret != OK) {
+            printf("Get MQ Queue Failed,ErrorCode:%d\n", ret);
+        }
+        printf("Get MQ Size:%d\n", size);
+        for (j = 0; j < size; j++) {
+            int noNewMsg = 0;
+            long long tmpoffset = 0;
+            printf("Pull Message For Topic:%s,Queue:%s,QueueId:%d\n", 
mqs[j].topic, mqs[j].brokerName, mqs[j].queueId);
+            do {
+                int k = 0;
+                CPullResult pullResult = Pull(consumer, &mqs[j], "*", 
tmpoffset, 32);
+                if (pullResult.pullStatus != E_BROKER_TIMEOUT) {
+                    tmpoffset = pullResult.nextBeginOffset;
+                }
+                
printf("PullStatus:%d,MaxOffset:%lld,MinOffset:%lld,NextBegainOffset:%lld", 
pullResult.pullStatus,
+                       pullResult.maxOffset, pullResult.minOffset, 
pullResult.nextBeginOffset);
+                switch (pullResult.pullStatus) {
+                    case E_FOUND:
+                        printf("Get Message Size:%d\n", pullResult.size);
+                        for (k = 0; k < pullResult.size; ++k) {
+                            printf("Got Message ID:%s,Body:%s\n", 
GetMessageId(pullResult.msgFoundList[k]),GetMessageBody(pullResult.msgFoundList[k]));
+                        }
+                        break;
+                    case E_NO_MATCHED_MSG:
+                        noNewMsg = 1;
+                        break;
+                    default:
+                        noNewMsg = 0;
+                }
+                ReleasePullResult(pullResult);
+                thread_sleep(100);
+            } while (noNewMsg == 0);
+            thread_sleep(1000);
+        }
+        thread_sleep(2000);
+        ReleaseSubscriptionMessageQueue(mqs);
     }
+    thread_sleep(5000);
     ShutdownPullConsumer(consumer);
     DestroyPullConsumer(consumer);
     printf("PullConsumer Shutdown!\n");
diff --git a/include/CProducer.h b/include/CProducer.h
index 74bdb9d8..6edd99e5 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -36,6 +36,7 @@ ROCKETMQCLIENT_API int StartProducer(CProducer *producer);
 ROCKETMQCLIENT_API int ShutdownProducer(CProducer *producer);
 
 ROCKETMQCLIENT_API int SetProducerNameServerAddress(CProducer *producer, const 
char *namesrv);
+ROCKETMQCLIENT_API int SetProducerNameServerDomain(CProducer *producer, const 
char *domain);
 ROCKETMQCLIENT_API int SetProducerGroupName(CProducer *producer, const char 
*groupName);
 ROCKETMQCLIENT_API int SetProducerInstanceName(CProducer *producer, const char 
*instanceName);
 ROCKETMQCLIENT_API int SetProducerSessionCredentials(CProducer *producer, 
const char *accessKey, const char *secretKey,
diff --git a/include/CPullConsumer.h b/include/CPullConsumer.h
index c84f8d4a..a4c4f5ae 100644
--- a/include/CPullConsumer.h
+++ b/include/CPullConsumer.h
@@ -31,25 +31,26 @@ extern "C" {
 typedef struct CPullConsumer CPullConsumer;
 
 
-CPullConsumer *CreatePullConsumer(const char *groupId);
-int DestroyPullConsumer(CPullConsumer *consumer);
-int StartPullConsumer(CPullConsumer *consumer);
-int ShutdownPullConsumer(CPullConsumer *consumer);
-int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId);
-const char *GetPullConsumerGroupID(CPullConsumer *consumer);
-int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char 
*namesrv);
-int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char 
*accessKey, const char *secretKey,
+ROCKETMQCLIENT_API CPullConsumer *CreatePullConsumer(const char *groupId);
+ROCKETMQCLIENT_API int DestroyPullConsumer(CPullConsumer *consumer);
+ROCKETMQCLIENT_API int StartPullConsumer(CPullConsumer *consumer);
+ROCKETMQCLIENT_API int ShutdownPullConsumer(CPullConsumer *consumer);
+ROCKETMQCLIENT_API int SetPullConsumerGroupID(CPullConsumer *consumer, const 
char *groupId);
+ROCKETMQCLIENT_API const char *GetPullConsumerGroupID(CPullConsumer *consumer);
+ROCKETMQCLIENT_API int SetPullConsumerNameServerAddress(CPullConsumer 
*consumer, const char *namesrv);
+ROCKETMQCLIENT_API int SetPullConsumerNameServerDomain(CPullConsumer 
*consumer, const char *domain);
+ROCKETMQCLIENT_API int SetPullConsumerSessionCredentials(CPullConsumer 
*consumer, const char *accessKey, const char *secretKey,
                                       const char *channel);
-int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
-int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, 
long fileSize);
-int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);
+ROCKETMQCLIENT_API int SetPullConsumerLogPath(CPullConsumer *consumer, const 
char *logPath);
+ROCKETMQCLIENT_API int SetPullConsumerLogFileNumAndSize(CPullConsumer 
*consumer, int fileNum, long fileSize);
+ROCKETMQCLIENT_API int SetPullConsumerLogLevel(CPullConsumer *consumer, 
CLogLevel level);
 
-int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, 
CMessageQueue **mqs, int *size);
-int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);
+ROCKETMQCLIENT_API int FetchSubscriptionMessageQueues(CPullConsumer *consumer, 
const char *topic, CMessageQueue **mqs, int *size);
+ROCKETMQCLIENT_API int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);
 
-CPullResult
+ROCKETMQCLIENT_API CPullResult
 Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char 
*subExpression, long long offset, int maxNums);
-int ReleasePullResult(CPullResult pullResult);
+ROCKETMQCLIENT_API int ReleasePullResult(CPullResult pullResult);
 
 #ifdef __cplusplus
 };
diff --git a/include/CPullResult.h b/include/CPullResult.h
index eb44fbd7..e22fd9ed 100644
--- a/include/CPullResult.h
+++ b/include/CPullResult.h
@@ -24,8 +24,7 @@
 #ifdef __cplusplus
 extern "C" {
 #endif
-typedef enum E_CPullStatus
-{
+typedef enum E_CPullStatus {
     E_FOUND,
     E_NO_NEW_MSG,
     E_NO_MATCHED_MSG,
@@ -35,11 +34,12 @@ typedef enum E_CPullStatus
 
 typedef struct _CPullResult_ {
     CPullStatus pullStatus;
-    long long  nextBeginOffset;
-    long long  minOffset;
-    long long  maxOffset;
-    CMessageExt** msgFoundList;
+    long long nextBeginOffset;
+    long long minOffset;
+    long long maxOffset;
+    CMessageExt **msgFoundList;
     int size;
+    void *pData;
 } CPullResult;
 
 #ifdef __cplusplus
diff --git a/include/CPushConsumer.h b/include/CPushConsumer.h
index aee34403..49cbf964 100644
--- a/include/CPushConsumer.h
+++ b/include/CPushConsumer.h
@@ -42,6 +42,7 @@ ROCKETMQCLIENT_API int ShutdownPushConsumer(CPushConsumer 
*consumer);
 ROCKETMQCLIENT_API int SetPushConsumerGroupID(CPushConsumer *consumer, const 
char *groupId);
 ROCKETMQCLIENT_API const char *GetPushConsumerGroupID(CPushConsumer *consumer);
 ROCKETMQCLIENT_API int SetPushConsumerNameServerAddress(CPushConsumer 
*consumer, const char *namesrv);
+ROCKETMQCLIENT_API int SetPushConsumerNameServerDomain(CPushConsumer 
*consumer, const char *domain);
 ROCKETMQCLIENT_API int Subscribe(CPushConsumer *consumer, const char *topic, 
const char *expression);
 ROCKETMQCLIENT_API int RegisterMessageCallbackOrderly(CPushConsumer *consumer, 
MessageCallBack pCallback);
 ROCKETMQCLIENT_API int RegisterMessageCallback(CPushConsumer *consumer, 
MessageCallBack pCallback);
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 67a99bd0..2c3c9c91 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -85,7 +85,13 @@ int SetProducerNameServerAddress(CProducer *producer, const 
char *namesrv) {
     ((DefaultMQProducer *) producer)->setNamesrvAddr(namesrv);
     return OK;
 }
-
+int SetProducerNameServerDomain(CProducer *producer, const char *domain) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQProducer *) producer)->setNamesrvDomain(domain);
+    return OK;
+}
 int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) {
     //CSendResult sendResult;
     if (producer == NULL || msg == NULL || result == NULL) {
diff --git a/src/extern/CPullConsumer.cpp b/src/extern/CPullConsumer.cpp
index bd71ffb9..415bb5b4 100644
--- a/src/extern/CPullConsumer.cpp
+++ b/src/extern/CPullConsumer.cpp
@@ -80,6 +80,13 @@ int SetPullConsumerNameServerAddress(CPullConsumer 
*consumer, const char *namesr
     ((DefaultMQPullConsumer *) consumer)->setNamesrvAddr(namesrv);
     return OK;
 }
+int SetPullConsumerNameServerDomain(CPullConsumer *consumer, const char 
*domain) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQPullConsumer *) consumer)->setNamesrvDomain(domain);
+    return OK;
+}
 int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char 
*accessKey, const char *secretKey,
                                       const char *channel) {
     if (consumer == NULL) {
@@ -119,24 +126,26 @@ int FetchSubscriptionMessageQueues(CPullConsumer 
*consumer, const char *topic, C
         return NULL_POINTER;
     }
     unsigned int index = 0;
+    CMessageQueue *temMQ = NULL;
     std::vector<MQMessageQueue> fullMQ;
     try {
         ((DefaultMQPullConsumer *) 
consumer)->fetchSubscribeMessageQueues(topic, fullMQ);
         *size = fullMQ.size();
         //Alloc memory to save the pointer to CPP MessageQueue, and the 
MessageQueues may be changed.
         //Thus, this memory should be released by users using 
@ReleaseSubscribeMessageQueue every time.
-        *mqs = (CMessageQueue *) malloc(*size * sizeof(CMessageQueue));
-        if (*mqs == NULL) {
+        temMQ = (CMessageQueue *) malloc(*size * sizeof(CMessageQueue));
+        if (temMQ == NULL) {
             *size = 0;
             *mqs = NULL;
             return MALLOC_FAILED;
         }
         auto iter = fullMQ.begin();
         for (index = 0; iter != fullMQ.end() && index <= fullMQ.size(); 
++iter, index++) {
-            strncpy(mqs[index]->topic, iter->getTopic().c_str(), 
MAX_TOPIC_LENGTH - 1);
-            strncpy(mqs[index]->brokerName, iter->getBrokerName().c_str(), 
MAX_BROKER_NAME_ID_LENGTH - 1);
-            mqs[index]->queueId = iter->getQueueId();
+            strncpy(temMQ[index].topic, iter->getTopic().c_str(), 
MAX_TOPIC_LENGTH - 1);
+            strncpy(temMQ[index].brokerName, iter->getBrokerName().c_str(), 
MAX_BROKER_NAME_ID_LENGTH - 1);
+            temMQ[index].queueId = iter->getQueueId();
         }
+        *mqs = temMQ;
     } catch (MQException &e) {
         *size = 0;
         *mqs = NULL;
@@ -160,7 +169,7 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, 
const char *subExpression
     PullResult cppPullResult;
     try {
         cppPullResult = ((DefaultMQPullConsumer *) 
consumer)->pull(messageQueue, subExpression, offset, maxNums);
-    }catch (exception &e){
+    } catch (exception &e) {
         cppPullResult.pullStatus = BROKER_TIMEOUT;
     }
 
@@ -171,11 +180,13 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, 
const char *subExpression
             pullResult.minOffset = cppPullResult.minOffset;
             pullResult.nextBeginOffset = cppPullResult.nextBeginOffset;
             pullResult.size = cppPullResult.msgFoundList.size();
+            PullResult *tmpPullResult = new PullResult(cppPullResult);
+            pullResult.pData = tmpPullResult;
             //Alloc memory to save the pointer to CPP MQMessageExt, which will 
be release by the CPP SDK core.
             //Thus, this memory should be released by users using 
@ReleasePullResult
             pullResult.msgFoundList = (CMessageExt **) malloc(pullResult.size 
* sizeof(CMessageExt *));
             for (size_t i = 0; i < cppPullResult.msgFoundList.size(); i++) {
-                MQMessageExt *msg = const_cast<MQMessageExt 
*>(&cppPullResult.msgFoundList[i]);
+                MQMessageExt *msg = const_cast<MQMessageExt 
*>(&tmpPullResult->msgFoundList[i]);
                 pullResult.msgFoundList[i] = (CMessageExt *) (msg);
             }
             break;
@@ -204,9 +215,16 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, 
const char *subExpression
     return pullResult;
 }
 int ReleasePullResult(CPullResult pullResult) {
-    if (pullResult.size == 0 || pullResult.msgFoundList == NULL) {
+    if (pullResult.size == 0 || pullResult.msgFoundList == NULL || 
pullResult.pData == NULL) {
         return NULL_POINTER;
     }
+    if (pullResult.pData != NULL) {
+        try {
+            delete ((PullResult *) pullResult.pData);
+        } catch (exception &e) {
+            return NULL_POINTER;
+        }
+    }
     free((void *) pullResult.msgFoundList);
     pullResult.msgFoundList = NULL;
     return OK;
diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp
index 9d3ab6be..2c35c749 100644
--- a/src/extern/CPushConsumer.cpp
+++ b/src/extern/CPushConsumer.cpp
@@ -139,6 +139,13 @@ int SetPushConsumerNameServerAddress(CPushConsumer 
*consumer, const char *namesr
     ((DefaultMQPushConsumer *) consumer)->setNamesrvAddr(namesrv);
     return OK;
 }
+int SetPushConsumerNameServerDomain(CPushConsumer *consumer, const char 
*domain) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQPushConsumer *) consumer)->setNamesrvDomain(domain);
+    return OK;
+}
 int Subscribe(CPushConsumer *consumer, const char *topic, const char 
*expression) {
     if (consumer == NULL) {
         return NULL_POINTER;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to