vongosling closed pull request #29: Add name server domain support and refactor
pull consumer.
URL: https://github.com/apache/rocketmq-client-cpp/pull/29
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/example/PullConsumeMessage.c b/example/PullConsumeMessage.c
index 44e2abdb..8be2207c 100644
--- a/example/PullConsumeMessage.c
+++ b/example/PullConsumeMessage.c
@@ -40,19 +40,57 @@ void thread_sleep(unsigned milliseconds) {
#endif
}
-int main(int argc,char * argv [])
-{
- int i = 0;
+int main(int argc, char *argv[]) {
+ int i = 0, j = 0;
+ int ret = 0;
+ int size = 0;
+ CMessageQueue *mqs = NULL;
printf("PullConsumer Initializing....\n");
- CPullConsumer* consumer = CreatePullConsumer("Group_Consumer_Test");
- SetPullConsumerNameServerAddress(consumer,"172.17.0.2:9876");
+ CPullConsumer *consumer = CreatePullConsumer("Group_Consumer_Test");
+ SetPullConsumerNameServerAddress(consumer, "172.17.0.2:9876");
StartPullConsumer(consumer);
printf("Pull Consumer Start...\n");
- for( i=0; i<10; i++)
- {
- printf("Now Running : %d S\n",i*10);
- thread_sleep(10000);
+ for (i = 1; i <= 5; i++) {
+ printf("FetchSubscriptionMessageQueues : %d times\n", i);
+ ret = FetchSubscriptionMessageQueues(consumer, "T_TestTopic", &mqs,
&size);
+ if(ret != OK) {
+ printf("Get MQ Queue Failed,ErrorCode:%d\n", ret);
+ }
+ printf("Get MQ Size:%d\n", size);
+ for (j = 0; j < size; j++) {
+ int noNewMsg = 0;
+ long long tmpoffset = 0;
+ printf("Pull Message For Topic:%s,Queue:%s,QueueId:%d\n",
mqs[j].topic, mqs[j].brokerName, mqs[j].queueId);
+ do {
+ int k = 0;
+ CPullResult pullResult = Pull(consumer, &mqs[j], "*",
tmpoffset, 32);
+ if (pullResult.pullStatus != E_BROKER_TIMEOUT) {
+ tmpoffset = pullResult.nextBeginOffset;
+ }
+
printf("PullStatus:%d,MaxOffset:%lld,MinOffset:%lld,NextBegainOffset:%lld",
pullResult.pullStatus,
+ pullResult.maxOffset, pullResult.minOffset,
pullResult.nextBeginOffset);
+ switch (pullResult.pullStatus) {
+ case E_FOUND:
+ printf("Get Message Size:%d\n", pullResult.size);
+ for (k = 0; k < pullResult.size; ++k) {
+ printf("Got Message ID:%s,Body:%s\n",
GetMessageId(pullResult.msgFoundList[k]),GetMessageBody(pullResult.msgFoundList[k]));
+ }
+ break;
+ case E_NO_MATCHED_MSG:
+ noNewMsg = 1;
+ break;
+ default:
+ noNewMsg = 0;
+ }
+ ReleasePullResult(pullResult);
+ thread_sleep(100);
+ } while (noNewMsg == 0);
+ thread_sleep(1000);
+ }
+ thread_sleep(2000);
+ ReleaseSubscriptionMessageQueue(mqs);
}
+ thread_sleep(5000);
ShutdownPullConsumer(consumer);
DestroyPullConsumer(consumer);
printf("PullConsumer Shutdown!\n");
diff --git a/include/CProducer.h b/include/CProducer.h
index 74bdb9d8..6edd99e5 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -36,6 +36,7 @@ ROCKETMQCLIENT_API int StartProducer(CProducer *producer);
ROCKETMQCLIENT_API int ShutdownProducer(CProducer *producer);
ROCKETMQCLIENT_API int SetProducerNameServerAddress(CProducer *producer, const
char *namesrv);
+ROCKETMQCLIENT_API int SetProducerNameServerDomain(CProducer *producer, const
char *domain);
ROCKETMQCLIENT_API int SetProducerGroupName(CProducer *producer, const char
*groupName);
ROCKETMQCLIENT_API int SetProducerInstanceName(CProducer *producer, const char
*instanceName);
ROCKETMQCLIENT_API int SetProducerSessionCredentials(CProducer *producer,
const char *accessKey, const char *secretKey,
diff --git a/include/CPullConsumer.h b/include/CPullConsumer.h
index c84f8d4a..a4c4f5ae 100644
--- a/include/CPullConsumer.h
+++ b/include/CPullConsumer.h
@@ -31,25 +31,26 @@ extern "C" {
typedef struct CPullConsumer CPullConsumer;
-CPullConsumer *CreatePullConsumer(const char *groupId);
-int DestroyPullConsumer(CPullConsumer *consumer);
-int StartPullConsumer(CPullConsumer *consumer);
-int ShutdownPullConsumer(CPullConsumer *consumer);
-int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId);
-const char *GetPullConsumerGroupID(CPullConsumer *consumer);
-int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char
*namesrv);
-int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char
*accessKey, const char *secretKey,
+ROCKETMQCLIENT_API CPullConsumer *CreatePullConsumer(const char *groupId);
+ROCKETMQCLIENT_API int DestroyPullConsumer(CPullConsumer *consumer);
+ROCKETMQCLIENT_API int StartPullConsumer(CPullConsumer *consumer);
+ROCKETMQCLIENT_API int ShutdownPullConsumer(CPullConsumer *consumer);
+ROCKETMQCLIENT_API int SetPullConsumerGroupID(CPullConsumer *consumer, const
char *groupId);
+ROCKETMQCLIENT_API const char *GetPullConsumerGroupID(CPullConsumer *consumer);
+ROCKETMQCLIENT_API int SetPullConsumerNameServerAddress(CPullConsumer
*consumer, const char *namesrv);
+ROCKETMQCLIENT_API int SetPullConsumerNameServerDomain(CPullConsumer
*consumer, const char *domain);
+ROCKETMQCLIENT_API int SetPullConsumerSessionCredentials(CPullConsumer
*consumer, const char *accessKey, const char *secretKey,
const char *channel);
-int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
-int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum,
long fileSize);
-int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);
+ROCKETMQCLIENT_API int SetPullConsumerLogPath(CPullConsumer *consumer, const
char *logPath);
+ROCKETMQCLIENT_API int SetPullConsumerLogFileNumAndSize(CPullConsumer
*consumer, int fileNum, long fileSize);
+ROCKETMQCLIENT_API int SetPullConsumerLogLevel(CPullConsumer *consumer,
CLogLevel level);
-int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic,
CMessageQueue **mqs, int *size);
-int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);
+ROCKETMQCLIENT_API int FetchSubscriptionMessageQueues(CPullConsumer *consumer,
const char *topic, CMessageQueue **mqs, int *size);
+ROCKETMQCLIENT_API int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs);
-CPullResult
+ROCKETMQCLIENT_API CPullResult
Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char
*subExpression, long long offset, int maxNums);
-int ReleasePullResult(CPullResult pullResult);
+ROCKETMQCLIENT_API int ReleasePullResult(CPullResult pullResult);
#ifdef __cplusplus
};
diff --git a/include/CPullResult.h b/include/CPullResult.h
index eb44fbd7..e22fd9ed 100644
--- a/include/CPullResult.h
+++ b/include/CPullResult.h
@@ -24,8 +24,7 @@
#ifdef __cplusplus
extern "C" {
#endif
-typedef enum E_CPullStatus
-{
+typedef enum E_CPullStatus {
E_FOUND,
E_NO_NEW_MSG,
E_NO_MATCHED_MSG,
@@ -35,11 +34,12 @@ typedef enum E_CPullStatus
typedef struct _CPullResult_ {
CPullStatus pullStatus;
- long long nextBeginOffset;
- long long minOffset;
- long long maxOffset;
- CMessageExt** msgFoundList;
+ long long nextBeginOffset;
+ long long minOffset;
+ long long maxOffset;
+ CMessageExt **msgFoundList;
int size;
+ void *pData;
} CPullResult;
#ifdef __cplusplus
diff --git a/include/CPushConsumer.h b/include/CPushConsumer.h
index aee34403..49cbf964 100644
--- a/include/CPushConsumer.h
+++ b/include/CPushConsumer.h
@@ -42,6 +42,7 @@ ROCKETMQCLIENT_API int ShutdownPushConsumer(CPushConsumer
*consumer);
ROCKETMQCLIENT_API int SetPushConsumerGroupID(CPushConsumer *consumer, const
char *groupId);
ROCKETMQCLIENT_API const char *GetPushConsumerGroupID(CPushConsumer *consumer);
ROCKETMQCLIENT_API int SetPushConsumerNameServerAddress(CPushConsumer
*consumer, const char *namesrv);
+ROCKETMQCLIENT_API int SetPushConsumerNameServerDomain(CPushConsumer
*consumer, const char *domain);
ROCKETMQCLIENT_API int Subscribe(CPushConsumer *consumer, const char *topic,
const char *expression);
ROCKETMQCLIENT_API int RegisterMessageCallbackOrderly(CPushConsumer *consumer,
MessageCallBack pCallback);
ROCKETMQCLIENT_API int RegisterMessageCallback(CPushConsumer *consumer,
MessageCallBack pCallback);
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 67a99bd0..2c3c9c91 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -85,7 +85,13 @@ int SetProducerNameServerAddress(CProducer *producer, const
char *namesrv) {
((DefaultMQProducer *) producer)->setNamesrvAddr(namesrv);
return OK;
}
-
+int SetProducerNameServerDomain(CProducer *producer, const char *domain) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultMQProducer *) producer)->setNamesrvDomain(domain);
+ return OK;
+}
int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) {
//CSendResult sendResult;
if (producer == NULL || msg == NULL || result == NULL) {
diff --git a/src/extern/CPullConsumer.cpp b/src/extern/CPullConsumer.cpp
index bd71ffb9..415bb5b4 100644
--- a/src/extern/CPullConsumer.cpp
+++ b/src/extern/CPullConsumer.cpp
@@ -80,6 +80,13 @@ int SetPullConsumerNameServerAddress(CPullConsumer
*consumer, const char *namesr
((DefaultMQPullConsumer *) consumer)->setNamesrvAddr(namesrv);
return OK;
}
+int SetPullConsumerNameServerDomain(CPullConsumer *consumer, const char
*domain) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultMQPullConsumer *) consumer)->setNamesrvDomain(domain);
+ return OK;
+}
int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char
*accessKey, const char *secretKey,
const char *channel) {
if (consumer == NULL) {
@@ -119,24 +126,26 @@ int FetchSubscriptionMessageQueues(CPullConsumer
*consumer, const char *topic, C
return NULL_POINTER;
}
unsigned int index = 0;
+ CMessageQueue *temMQ = NULL;
std::vector<MQMessageQueue> fullMQ;
try {
((DefaultMQPullConsumer *)
consumer)->fetchSubscribeMessageQueues(topic, fullMQ);
*size = fullMQ.size();
//Alloc memory to save the pointer to CPP MessageQueue, and the
MessageQueues may be changed.
//Thus, this memory should be released by users using
@ReleaseSubscribeMessageQueue every time.
- *mqs = (CMessageQueue *) malloc(*size * sizeof(CMessageQueue));
- if (*mqs == NULL) {
+ temMQ = (CMessageQueue *) malloc(*size * sizeof(CMessageQueue));
+ if (temMQ == NULL) {
*size = 0;
*mqs = NULL;
return MALLOC_FAILED;
}
auto iter = fullMQ.begin();
for (index = 0; iter != fullMQ.end() && index <= fullMQ.size();
++iter, index++) {
- strncpy(mqs[index]->topic, iter->getTopic().c_str(),
MAX_TOPIC_LENGTH - 1);
- strncpy(mqs[index]->brokerName, iter->getBrokerName().c_str(),
MAX_BROKER_NAME_ID_LENGTH - 1);
- mqs[index]->queueId = iter->getQueueId();
+ strncpy(temMQ[index].topic, iter->getTopic().c_str(),
MAX_TOPIC_LENGTH - 1);
+ strncpy(temMQ[index].brokerName, iter->getBrokerName().c_str(),
MAX_BROKER_NAME_ID_LENGTH - 1);
+ temMQ[index].queueId = iter->getQueueId();
}
+ *mqs = temMQ;
} catch (MQException &e) {
*size = 0;
*mqs = NULL;
@@ -160,7 +169,7 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq,
const char *subExpression
PullResult cppPullResult;
try {
cppPullResult = ((DefaultMQPullConsumer *)
consumer)->pull(messageQueue, subExpression, offset, maxNums);
- }catch (exception &e){
+ } catch (exception &e) {
cppPullResult.pullStatus = BROKER_TIMEOUT;
}
@@ -171,11 +180,13 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq,
const char *subExpression
pullResult.minOffset = cppPullResult.minOffset;
pullResult.nextBeginOffset = cppPullResult.nextBeginOffset;
pullResult.size = cppPullResult.msgFoundList.size();
+ PullResult *tmpPullResult = new PullResult(cppPullResult);
+ pullResult.pData = tmpPullResult;
//Alloc memory to save the pointer to CPP MQMessageExt, which will
be release by the CPP SDK core.
//Thus, this memory should be released by users using
@ReleasePullResult
pullResult.msgFoundList = (CMessageExt **) malloc(pullResult.size
* sizeof(CMessageExt *));
for (size_t i = 0; i < cppPullResult.msgFoundList.size(); i++) {
- MQMessageExt *msg = const_cast<MQMessageExt
*>(&cppPullResult.msgFoundList[i]);
+ MQMessageExt *msg = const_cast<MQMessageExt
*>(&tmpPullResult->msgFoundList[i]);
pullResult.msgFoundList[i] = (CMessageExt *) (msg);
}
break;
@@ -204,9 +215,16 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq,
const char *subExpression
return pullResult;
}
int ReleasePullResult(CPullResult pullResult) {
- if (pullResult.size == 0 || pullResult.msgFoundList == NULL) {
+ if (pullResult.size == 0 || pullResult.msgFoundList == NULL ||
pullResult.pData == NULL) {
return NULL_POINTER;
}
+ if (pullResult.pData != NULL) {
+ try {
+ delete ((PullResult *) pullResult.pData);
+ } catch (exception &e) {
+ return NULL_POINTER;
+ }
+ }
free((void *) pullResult.msgFoundList);
pullResult.msgFoundList = NULL;
return OK;
diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp
index 9d3ab6be..2c35c749 100644
--- a/src/extern/CPushConsumer.cpp
+++ b/src/extern/CPushConsumer.cpp
@@ -139,6 +139,13 @@ int SetPushConsumerNameServerAddress(CPushConsumer
*consumer, const char *namesr
((DefaultMQPushConsumer *) consumer)->setNamesrvAddr(namesrv);
return OK;
}
+int SetPushConsumerNameServerDomain(CPushConsumer *consumer, const char
*domain) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultMQPushConsumer *) consumer)->setNamesrvDomain(domain);
+ return OK;
+}
int Subscribe(CPushConsumer *consumer, const char *topic, const char
*expression) {
if (consumer == NULL) {
return NULL_POINTER;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services