This is an automated email from the ASF dual-hosted git repository.

lrhkobe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new e13ac79c5 [ISSUE #4321] Add Examples And Configs For C SDK (#4326)
e13ac79c5 is described below

commit e13ac79c596035f4cd33f8e51a15904a6d1a4123
Author: zhurq <[email protected]>
AuthorDate: Wed Nov 1 14:29:24 2023 +0800

    [ISSUE #4321] Add Examples And Configs For C SDK (#4326)
    
    * c-sdk dirs
    
    * c-sdk example and config
    
    * add config and example
    
    * add license
---
 eventmesh-sdks/eventmesh-sdk-c/configs/rmb.conf |   61 +
 eventmesh-sdks/eventmesh-sdk-c/examples/demo.c  | 1715 +++++++++++++++++++++++
 2 files changed, 1776 insertions(+)

diff --git a/eventmesh-sdks/eventmesh-sdk-c/configs/rmb.conf 
b/eventmesh-sdks/eventmesh-sdk-c/configs/rmb.conf
new file mode 100644
index 000000000..b30682f8f
--- /dev/null
+++ b/eventmesh-sdks/eventmesh-sdk-c/configs/rmb.conf
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+consumerSysId 1000
+consumerSysVersion 1.0.0
+consumerSvrId 10.199.199.199
+consumerDcn A00
+orgId 99996
+localIdc A
+wemq_user wemq
+wemq_passwd wemq@123
+
+#log configure
+logFile sub
+logLevel 5 
+logFileNums 10
+logFileSize 100000000
+logSwiftType 1
+
+#receive message
+reqFifoPath ./tmp_req.fifo
+reqShmKey 0x27151203
+reqShmSize 20000000
+#receive rr-async reply message
+ayncRspFifoPath ./tmp_aync_rsp.fifo
+ayncRspShmKey 0x77151204
+ayncRspShmSize 20000000
+#receive broadcast message
+broadcastFifoPath ./tmp_broadcast.fifo
+broadcastShmKey 0x77151205
+broadcastShmSize 20000000
+
+#message log control
+logserverSwitch 0
+#req gsl
+ReqGslSwitch 0
+
+#wemq cc configure
+wemqUseHttpCfg 1
+configCenterIp 10.255.34.57
+configCenterPort 8090
+configCenterAddrMulti 172.22.1.82:8090
+#connect directly to access when: wemqUseHttpCfg 0
+wemqProxyIp 172.22.0.51
+wemqProxyPort 10000
+# tlsOnoff:控制与eventmesh的tcp连接是否开启tls
+#   0: 不开启(默认)
+#   1: 开启
+tlsOnoff 0
diff --git a/eventmesh-sdks/eventmesh-sdk-c/examples/demo.c 
b/eventmesh-sdks/eventmesh-sdk-c/examples/demo.c
new file mode 100644
index 000000000..f6b7f4f55
--- /dev/null
+++ b/eventmesh-sdks/eventmesh-sdk-c/examples/demo.c
@@ -0,0 +1,1715 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stddef.h>
+#include <sys/un.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <signal.h>
+#include <sys/wait.h>
+#include <pthread.h>
+
+#include "rmb_sub.h"
+#include "rmb_pub.h"
+#include "rmb_msg.h"
+#include "rmb_udp.h"
+#include "rmb_cfg.h"
+#include "rmb_log.h"
+#include "rmb_errno.h"
+
+#define LOG_PRINT(loglevel,fmt, args...) {log_print(loglevel, "[%s:%d(%s)]:    
 "fmt"", __FILE__, __LINE__, __FUNCTION__, ## args);}
+
+#define UNIX_DOMAIN "unix.domain"
+
+typedef struct tMessage
+{
+  int type;                     //0:pub 1:sub
+  pid_t pid;
+  unsigned int process_num;
+  unsigned int process_status;
+  unsigned int has_send;
+  unsigned int send_msg_num;
+  unsigned int recv_msg_num;
+} tMessage;
+
+#define MAX_MSG_LEN (1024*1024*3+1)
+
+static const char *version_test = "RMB_C_API_V2.2.0";
+
+static const char *send_msg = "pubSendMsg";
+
+pthread_mutex_t test_mutex;
+
+StRmbSub *pRmbSub;
+StRmbPub *pRmbPub;
+
+static char StatLogLevel[6][10] =
+  { "FATAL", "ERROR", "WARN ", "INFO", "DEBUG", "ALL" };
+
+int log_print (int logLevel, const char *format, ...)
+{
+  va_list ap;
+  struct timeval stLogTv;
+
+  va_start (ap, format);
+  gettimeofday (&stLogTv, NULL);
+  fprintf (stdout, "[%s][%s %03d][%d]", StatLogLevel[logLevel],
+           RmbGetDateTimeStr ((const time_t *) &(stLogTv.tv_sec)),
+           (int) ((stLogTv.tv_usec) / 1000), getpid ());
+  //fprintf(stdout, "[%s][%d]", StatLogLevel[logLevel],getpid());
+  vfprintf (stdout, format, ap);
+  va_end (ap);
+  fprintf (stdout, "\n");
+
+  return 0;
+}
+
+void printfUsage (const char *name)
+{
+  printf ("%s: version:%s\n\n", name, version_test);
+
+  printf ("%s: sub server端(非广播)\n", name);
+  printf
+    (" %s sub sleep_time log_control process_nums topic1 topic2 ...\n",
+     name);
+  printf
+    (" sleep_time:睡眠时间,通常为0, log_control:测试内容大小时使用,1:打印,其他值为不打印  
process_nums:进程数量,最小为1\n");
+  printf ("\n");
+
+  printf ("%s: sub server端(广播)\n", name);
+  printf
+    (" %s sub_broadcast process_nums log_control topic1 topic2 ...\n", name);
+  printf ("\n");
+
+  printf
+    ("%s: pub client端(RR同步消息/单播/多播/广播消息发送):\n",
+     name);
+  printf
+    (" %s pub pthread_nums log_control msg_nums msg_len ttl_time sleep_time 
topic1 topic2 ...\n",
+     name);
+  printf
+    (" pthread_nums:线程数量,最小为1, log_control:测试内容大小时使用,1:打印,其他值为不打印 
msg_nums:每个线程发送消息数量 msg:消息内容 msg_len:消息内容大小\n");
+  printf ("\n");
+
+  printf ("%s: pub端(RR异步):\n", name);
+  printf
+    (" %s rr_async_pub process_nums log_control msg_nums msg_len sleep_time 
topic\n",
+     name);
+  printf
+    (" process_nums:进程数量,最小为1, msg_nums:每个进程发送消息数量  msg:消息内容\n");
+  printf ("\n");
+
+  printf ("////////////////////////////////////////////////////////\n");
+
+  exit (0);
+}
+
+typedef struct StDemoArgv
+{
+  //建议字段
+  StRmbPub *pRmbPub;
+  StRmbSub *pRmbSub;
+  StRmbMsg *pReceiveMsg;
+  StRmbMsg *pReplyMsg;
+  //其他字段
+  unsigned int uiLog;
+  unsigned long ulMsgTotal;
+  unsigned int uiSleepTime;
+  unsigned int uiLogServer;
+  unsigned int uiFlag;
+} StDemoArgv;
+
+typedef struct tThreadArgs
+{
+  StRmbPub *pRmbPub;
+  enum EVENT_OR_SERVICE_CALL event_or_service;
+  unsigned int times;
+  int iContent;
+  char cDcn[100];
+  char cServiceId[100];
+  char cSenaId[100];
+  ///////////////
+  unsigned long ulMsgSucc;
+  ///////////////
+  int iLogNum;
+  unsigned long ttl;
+  const char *pMsg;
+} tThreadArgs;
+
+/**
+ * return value:
+ *             0: success
+ *             1: topic check failed
+ *             <0: send error
+ */
+int pubOwnMessage (StRmbPub * pRmbPub, int iEventOrService,
+                   unsigned long ttl_time, const char *cDcn,
+                   const char *cServiceId, const char *cSenaId,
+                   unsigned int log_control, const char *strMsg)
+{
+  int iRet = 0;
+  StRmbMsg *pSendMsg = rmb_msg_malloc ();
+  rmb_msg_clear (pSendMsg);
+  static unsigned int uiSeq = 1;
+  struct timeval tv;
+  gettimeofday (&tv, NULL);
+  unsigned long ulNowTime = tv.tv_sec * 1000 + tv.tv_usec / 1000;
+  char cSeqNo[33];
+  snprintf (cSeqNo, sizeof (cSeqNo), "%013lu%019u", ulNowTime, uiSeq++);
+//      rmb_msg_set_bizSeqNo(pSendMsg, "12345678901234567890123456789012");
+  rmb_msg_set_bizSeqNo (pSendMsg, cSeqNo);
+//      rmb_msg_set_consumerSeqNo(pSendMsg, 
"22222678901234567890123456799999");
+  rmb_msg_set_consumerSeqNo (pSendMsg, cSeqNo);
+  rmb_msg_set_orgSysId (pSendMsg, "9999");
+  //set ttl, millisecond(hao s)
+//      printf("ttl = %lu\n", ttl_time);
+  if (ttl_time > 0)
+  {
+    rmb_msg_set_live_time (pSendMsg, ttl_time);
+  }
+  if (iEventOrService == 0)
+  {
+    iRet =
+      rmb_msg_set_dest (pSendMsg, RMB_DEST_TOPIC, cDcn, RMB_EVENT_CALL,
+                        cServiceId, cSenaId);
+  }
+  else
+  {
+    iRet =
+      rmb_msg_set_dest (pSendMsg, RMB_DEST_TOPIC, cDcn, RMB_SERVICE_CALL,
+                        cServiceId, cSenaId);
+  }
+/*     if (iRet < 0)
+               return 1;*/
+
+  rmb_msg_set_content (pSendMsg, strMsg, strlen (strMsg));
+  char appHeader[100] = "{}";
+  rmb_msg_set_app_header (pSendMsg, appHeader, strlen (appHeader));
+
+  if (iEventOrService == 0)
+  {
+    iRet = rmb_pub_send_msg (pRmbPub, pSendMsg);
+    if (log_control & 2)
+    {
+      if (iRet == 0)
+      {
+        LOG_PRINT (RMB_LOG_INFO, "send msg=%s OK", pSendMsg->cContent);
+      }
+      else
+      {
+        LOG_PRINT (RMB_LOG_ERROR, "rmb_pub_send_msg error!iRet = %d", iRet);
+      }
+    }
+  }
+  else
+  {
+    StRmbMsg *pReceiveMsg = rmb_msg_malloc ();
+    iRet = rmb_pub_send_and_receive (pRmbPub, pSendMsg, pReceiveMsg, 50000);
+    if (iRet == 0)
+    {
+      char receiveBuf[MAX_MSG_LEN];
+      unsigned int receiveLen = sizeof (receiveBuf);
+      rmb_msg_get_content (pReceiveMsg, receiveBuf, &receiveLen);
+
+      if (log_control == 1)
+      {
+        LOG_PRINT (RMB_LOG_INFO, "receive reply:len=%u, %s", receiveLen,
+                   receiveBuf);
+      }
+    }
+    else
+    {
+      LOG_PRINT (RMB_LOG_INFO,
+                 "rmb_pub_send_and_receive error!iRet = %d,msg=%s", iRet,
+                 strMsg);
+    }
+
+    //sleep(1);
+    rmb_msg_free (pReceiveMsg);
+  }
+
+  rmb_msg_free (pSendMsg);
+  return iRet;
+}
+
+//////////////////////////////////////////////////
+
+int pub_message (StRmbPub * pRmbPub, int iEventOrService, int first_msg_nums,
+                 int second_msg_nums, const char *cDcn,
+                 const char *cServiceId, const char *cSenaId)
+{
+  int iRet = 0;
+  StRmbMsg *pSendMsg = rmb_msg_malloc ();
+
+  rmb_msg_set_bizSeqNo (pSendMsg, "12345678901234567890123456789012");
+  rmb_msg_set_consumerSeqNo (pSendMsg, "22222678901234567890123456799999");
+  rmb_msg_set_orgSysId (pSendMsg, "9999");
+  if (iEventOrService == 0)
+  {
+    rmb_msg_set_dest (pSendMsg, RMB_DEST_TOPIC, cDcn, RMB_EVENT_CALL,
+                      cServiceId, cSenaId);
+  }
+  else
+  {
+    rmb_msg_set_dest (pSendMsg, RMB_DEST_TOPIC, cDcn, RMB_SERVICE_CALL,
+                      cServiceId, cSenaId);
+  }
+  rmb_msg_set_content (pSendMsg, send_msg, strlen (send_msg));
+  char appHeader[100] = "{}";
+  rmb_msg_set_app_header (pSendMsg, appHeader, strlen (appHeader));
+
+  unsigned long ulNowTime = 0;
+  unsigned long ulSleepTime = 0;
+  struct timeval tv;
+
+  if (iEventOrService == 0)
+  {
+    gettimeofday (&tv, NULL);
+    ulNowTime = tv.tv_sec * 1000000 + tv.tv_usec;
+    int i;
+    for (i = 0; i < first_msg_nums; i++)
+    {
+      iRet = rmb_pub_send_msg (pRmbPub, pSendMsg);
+#ifdef DEBUG
+      if (iRet == 0)
+      {
+        LOG_PRINT (RMB_LOG_INFO, "send msg=%s OK", pSendMsg->cContent);
+      }
+      else
+      {
+        LOG_PRINT (RMB_LOG_ERROR, "rmb_pub_send_msg error!iRet = %d", iRet);
+      }
+#endif
+    }
+
+    gettimeofday (&tv, NULL);
+    ulSleepTime = tv.tv_sec * 1000000 + tv.tv_usec - ulNowTime;
+    printf ("ulSleepTime = %lu\n", ulSleepTime);
+    if (ulSleepTime < 1000000)
+      usleep (ulSleepTime);
+
+    for (i = 0; i < second_msg_nums; i++)
+    {
+      iRet = rmb_pub_send_msg (pRmbPub, pSendMsg);
+#ifdef DEBUG
+      if (iRet == 0)
+      {
+        LOG_PRINT (RMB_LOG_INFO, "send msg=%s OK", pSendMsg->cContent);
+      }
+      else
+      {
+        LOG_PRINT (RMB_LOG_ERROR, "rmb_pub_send_msg error!iRet = %d", iRet);
+      }
+#endif
+    }
+  }
+  else
+  {
+    StRmbMsg *pReceiveMsg = rmb_msg_malloc ();
+    gettimeofday (&tv, NULL);
+    ulNowTime = tv.tv_sec * 1000000 + tv.tv_usec;
+    int i;
+    for (i = 0; i < first_msg_nums; i++)
+    {
+      iRet = rmb_pub_send_and_receive (pRmbPub, pSendMsg, pReceiveMsg, 5000);
+      if (iRet == 0)
+      {
+        char receiveBuf[1024];
+        unsigned int receiveLen = sizeof (receiveBuf);
+        rmb_msg_get_content (pReceiveMsg, receiveBuf, &receiveLen);
+#ifdef DEBUG
+        LOG_PRINT (RMB_LOG_INFO, "receive reply pkg=%s", receiveBuf);
+      }
+      else
+      {
+        LOG_PRINT (RMB_LOG_ERROR,
+                   "rmb_pub_send_and_receive error!iRet = %d,msg=%s", iRet,
+                   strMsg);
+      }
+#else
+      }
+#endif
+    }
+    gettimeofday (&tv, NULL);
+    ulSleepTime = tv.tv_sec * 1000000 + tv.tv_usec - ulNowTime;
+    printf ("ulSleepTime = %lu\n", ulSleepTime);
+    if (ulSleepTime < 1000000)
+      usleep (ulSleepTime);
+
+    for (i = 0; i < second_msg_nums; i++)
+    {
+      iRet = rmb_pub_send_and_receive (pRmbPub, pSendMsg, pReceiveMsg, 5000);
+      if (iRet == 0)
+      {
+        char receiveBuf[1024];
+        unsigned int receiveLen = sizeof (receiveBuf);
+        rmb_msg_get_content (pReceiveMsg, receiveBuf, &receiveLen);
+#ifdef DEBUG
+        LOG_PRINT (RMB_LOG_INFO, "receive reply pkg=%s", receiveBuf);
+      }
+      else
+      {
+        LOG_PRINT (RMB_LOG_ERROR,
+                   "rmb_pub_send_and_receive error!iRet = %d,msg=%s", iRet,
+                   strMsg);
+      }
+#else
+      }
+#endif
+    }
+
+    rmb_msg_free (pReceiveMsg);
+  }
+  printf ("%s: send msg:%d\n", __func__, first_msg_nums + second_msg_nums);
+
+  rmb_msg_free (pSendMsg);
+  return iRet;
+}
+
+static int pub_board_message (StRmbPub * pRmbPub, int iEventOrService,
+                              unsigned long ttl_time, const char *cDcn,
+                              const char *cServiceId, const char *cSenaId,
+                              const char *cOrgId, unsigned int log_control,
+                              const char *strMsg)
+{
+  int iRet = 0;
+  StRmbMsg *pSendMsg = rmb_msg_malloc ();
+  rmb_msg_clear (pSendMsg);
+  rmb_msg_set_bizSeqNo (pSendMsg, "12345678901234567890123456789012");
+  rmb_msg_set_consumerSeqNo (pSendMsg, "22222678901234567890123456799999");
+  rmb_msg_set_orgSysId (pSendMsg, "9999");
+  //set ttl, millisecond(hao s)
+  //printf("ttl = %lu\n", ttl_time);
+  if (ttl_time > 0)
+    rmb_msg_set_live_time (pSendMsg, ttl_time);
+
+  if (iEventOrService != RMB_EVENT_CALL)
+  {
+    printf ("type:%d is not event call\n", iEventOrService);
+    return -1;
+  }
+
+  iRet = rmb_msg_set_dest_v2_1 (pRmbPub, cDcn, cServiceId, cSenaId, cOrgId);
+  if (iRet < 0)
+  {
+    printf ("rmb_msg_set_dest_v2_1 return: %d\n", iRet);
+    return iRet;
+  }
+
+  rmb_msg_set_content (pSendMsg, strMsg, strlen (strMsg));
+  char appHeader[100] = "{}";
+  rmb_msg_set_app_header (pSendMsg, appHeader, strlen (appHeader));
+  if (iEventOrService == 0)
+  {
+    iRet = rmb_pub_send_msg (pRmbPub, pSendMsg);
+    if (log_control & 2)
+    {
+      if (iRet == 0)
+      {
+        LOG_PRINT (RMB_LOG_INFO, "send msg=%s OK", pSendMsg->cContent);
+      }
+      else
+      {
+        LOG_PRINT (RMB_LOG_ERROR, "rmb_pub_send_msg error!iRet = %d", iRet);
+      }
+    }
+  }
+
+  rmb_msg_free (pSendMsg);
+  return iRet;
+}
+
+int mutil_thread_pub_message (StRmbPub * pRmbPub, int iEventOrService,
+                              unsigned long ttl, int iContentBase,
+                              int iOffset, const char *cDcn,
+                              const char *cServiceId, const char *cSenaId,
+                              const char *strMsg)
+{
+  int iRet = 0;
+  StRmbMsg *pSendMsg = rmb_msg_malloc ();
+
+  rmb_msg_set_bizSeqNo (pSendMsg, "12345678901234567890123456789012");
+  rmb_msg_set_consumerSeqNo (pSendMsg, "22222678901234567890123456799999");
+  rmb_msg_set_orgSysId (pSendMsg, "9999");
+//      printf("ttl = %lu\n", ttl);
+  if (ttl > 0)
+  {
+    rmb_msg_set_live_time (pSendMsg, ttl);
+  }
+  if (iEventOrService == 0)
+  {
+    rmb_msg_set_dest (pSendMsg, RMB_DEST_TOPIC, cDcn, RMB_EVENT_CALL,
+                      cServiceId, cSenaId);
+  }
+  else
+  {
+    rmb_msg_set_dest (pSendMsg, RMB_DEST_TOPIC, cDcn, RMB_SERVICE_CALL,
+                      cServiceId, cSenaId);
+  }
+
+//      char cContent[100];
+//      snprintf(cContent, sizeof(cContent), "thread%d--%dth:%s", 
iContentBase, iOffset, strMsg);
+//      rmb_msg_set_content(pSendMsg, cContent, strlen(cContent));
+  rmb_msg_set_content (pSendMsg, strMsg, strlen (strMsg));
+  char appHeader[100] = "{}";
+  rmb_msg_set_app_header (pSendMsg, appHeader, strlen (appHeader));
+
+  if (iEventOrService == 0)
+  {
+    iRet = rmb_pub_send_msg (pRmbPub, pSendMsg);
+#ifdef DEBUG
+    if (iRet == 0)
+    {
+      LOG_PRINT (RMB_LOG_INFO, "send msg=%s OK", pSendMsg->cContent);
+    }
+    else
+    {
+      LOG_PRINT (RMB_LOG_ERROR, "rmb_pub_send_msg error!iRet = %d", iRet);
+    }
+#endif
+  }
+  else
+  {
+    StRmbMsg *pReceiveMsg = rmb_msg_malloc ();
+    iRet = rmb_pub_send_and_receive (pRmbPub, pSendMsg, pReceiveMsg, 5000);
+    if (iRet == 0)
+    {
+//                      char receiveBuf[1024];
+      char receiveBuf[MAX_MSG_LEN];
+      unsigned int receiveLen = sizeof (receiveBuf);
+      rmb_msg_get_content (pReceiveMsg, receiveBuf, &receiveLen);
+#ifdef DEBUG
+      LOG_PRINT (RMB_LOG_INFO, "receive reply pkg=%s", receiveBuf);
+    }
+    else
+    {
+      LOG_PRINT (RMB_LOG_ERROR,
+                 "rmb_pub_send_and_receive error!iRet = %d,msg=%s", iRet,
+                 strMsg);
+    }
+#else
+    }
+#endif
+    rmb_msg_free (pReceiveMsg);
+  }
+
+  rmb_msg_free (pSendMsg);
+  return iRet;
+}
+
+void mutil_thread_for_pub (void *pArgs)
+{
+  tThreadArgs *pTmp = (tThreadArgs *) pArgs;
+
+  int iRet = 0;
+  unsigned int message_times = 0;
+  int i;
+  for (i = 0; i < pTmp->times; ++i)
+  {
+    iRet =
+      mutil_thread_pub_message (pTmp->pRmbPub, pTmp->event_or_service,
+                                pTmp->ttl, pTmp->iContent, i, pTmp->cDcn,
+                                pTmp->cServiceId, pTmp->cSenaId, pTmp->pMsg);
+    if (iRet == 0)
+    {
+      message_times++;
+      pTmp->ulMsgSucc++;
+    }
+  }
+  if (pTmp->event_or_service == RMB_SERVICE_CALL)
+  {
+    LOG_PRINT (RMB_LOG_INFO, "threadid:%d -- pub send %u and recv msg:%u\n",
+               (int) pthread_self (), pTmp->times, message_times);
+  }
+  else
+  {
+    LOG_PRINT (RMB_LOG_ERROR, "threadid:%d -- pub send %u and succ msg:%u\n",
+               (int) pthread_self (), pTmp->times, message_times);
+  }
+}
+
+//sub回调函数
+void func_with_req (const char *buf, const int len, void *pAgv)
+//void func_with_req(void *pRevMsg, void *pAgv)
+{
+  if (pAgv == NULL)
+  {
+    printf ("%s:%d-%s pAgv or pRevMsg is null\n", __FILE__, __LINE__,
+            __func__);
+    return;
+  }
+  StRmbMsg *pReceiveMsg = rmb_msg_malloc ();
+  if (pReceiveMsg == NULL)
+  {
+    printf ("rmb_msg_malloc failed");
+    return;
+  }
+
+  StDemoArgv *p = (StDemoArgv *) pAgv;
+
+  shift_buf_2_msg (pReceiveMsg, buf, len);
+
+  if (p->uiSleepTime > 0)
+    sleep (p->uiSleepTime);
+
+//      //test for get dest
+//      const char *pDest = NULL;
+//      if ((pDest = rmb_msg_get_dest_ptr(pReceiveMsg)) != NULL) {
+//              printf("get dest name is:%s\n", pDest);
+//      }
+//
+//      //test for get ConsumerSysId
+//      const char *pConsumerSysId = NULL;
+//      if ((pConsumerSysId = rmb_msg_get_consumerSysId_ptr(pReceiveMsg)) != 
NULL) {
+//              printf("get consumer sys id is:%s\n", pConsumerSysId);
+//      }
+//
+//      //test for get cConsumerSvrId
+//      const char *pConsumerSvrId = NULL;
+//      if ((pConsumerSvrId = rmb_msg_get_consumerSvrId_ptr(pReceiveMsg)) != 
NULL) {
+//              printf("get consumer svr id is:%s\n", pConsumerSvrId);
+//      }
+//
+//      printf("get consumerSysVersion version:%s\n", 
pReceiveMsg->sysHeader.cConsumerSysVersion);
+
+//      printf("ulTranTimeStamp=%lu\n", 
pReceiveMsg->sysHeader.ulTranTimeStamp);
+
+  p->uiFlag = 1;
+  p->ulMsgTotal++;
+  const char *pContent = NULL;
+  unsigned int uiContentLen = 0;
+  if ((pContent =
+       rmb_msg_get_content_ptr (pReceiveMsg, &uiContentLen)) != NULL)
+  {
+    if (p->uiLog == 1)
+    {
+      printf ("***get message len=%d,%s\n", uiContentLen, pContent);
+    }
+//              if (pReceiveMsg->iEventOrService == RMB_EVENT_CALL) {
+    if (strlen (pReceiveMsg->replyTo.cDestName) > 0)
+    {
+      char replyContent[MAX_MSG_LEN] = { 0 };
+      unsigned int uiReplyLen = 0;
+      memcpy (replyContent, pContent, uiContentLen);
+      uiReplyLen += uiContentLen;
+      memcpy (&replyContent[uiReplyLen], "_reply", strlen ("_reply"));
+      uiReplyLen += strlen ("_reply");
+      replyContent[uiReplyLen] = '\0';
+
+      rmb_msg_set_content (p->pReplyMsg, replyContent, uiReplyLen);
+
+      char appHeader[10] = "{}";
+      rmb_msg_set_app_header (p->pReplyMsg, appHeader, strlen (appHeader));
+      rmb_sub_reply_msg (p->pRmbSub, pReceiveMsg, p->pReplyMsg);
+    }
+    rmb_sub_ack_msg (p->pRmbSub, pReceiveMsg);
+  }
+
+  rmb_msg_free (pReceiveMsg);
+
+  return;
+}
+
+//sub回调函数
+void func_with_req_pub_reply (const char *buf, const int len, void *pAgv)
+{
+  if (pAgv == NULL)
+  {
+    printf ("%s:%d-%s pAgv or pRevMsg is null\n", __FILE__, __LINE__,
+            __func__);
+    return;
+  }
+  StRmbMsg *pReceiveMsg = rmb_msg_malloc ();
+  if (pReceiveMsg == NULL)
+  {
+    printf ("rmb_msg_malloc failed");
+    return;
+  }
+
+  StDemoArgv *p = (StDemoArgv *) pAgv;
+
+  shift_buf_2_msg (pReceiveMsg, buf, len);
+
+  //test for get dest
+  const char *pDest = NULL;
+  if ((pDest = rmb_msg_get_dest_ptr (pReceiveMsg)) != NULL)
+  {
+    printf ("get dest name is:%s\n", pDest);
+  }
+
+  //test for get ConsumerSysId
+  const char *pConsumerSysId = NULL;
+  if ((pConsumerSysId = rmb_msg_get_consumerSysId_ptr (pReceiveMsg)) != NULL)
+  {
+    printf ("get consumer sys id is:%s\n", pConsumerSysId);
+  }
+
+  //test for get cConsumerSvrId
+  const char *pConsumerSvrId = NULL;
+  if ((pConsumerSvrId = rmb_msg_get_consumerSvrId_ptr (pReceiveMsg)) != NULL)
+  {
+    printf ("get consumer svr id is:%s\n", pConsumerSvrId);
+  }
+
+  printf ("get consumerSysVersion version:%s\n",
+          pReceiveMsg->sysHeader.cConsumerSysVersion);
+
+  p->ulMsgTotal++;
+  const char *pContent = NULL;
+  unsigned int uiContentLen = 0;
+  if ((pContent =
+       rmb_msg_get_content_ptr (pReceiveMsg, &uiContentLen)) != NULL)
+  {
+    if (p->uiLog == 1)
+    {
+      printf ("***get message len=%d,%s\n", uiContentLen, pContent);
+    }
+    //if (p->uiLogServer == 1) {
+    //      rmb_log_for_common(p->pRmbPub->pContext, LOG_MSG_LEVEL_INFO, 
"test", "%s", pReceiveMsg->sysHeader.cBizSeqNo, 
pReceiveMsg->sysHeader.cConsumerSeqNo, pReceiveMsg->sysHeader.cOrgSysId, 
pContent);
+    //}
+//              if (pReceiveMsg->iEventOrService == RMB_EVENT_CALL) {
+    if (strlen (pReceiveMsg->replyTo.cDestName) > 0)
+    {
+      char replyContent[MAX_MSG_LEN] = { 0 };
+      unsigned int uiReplyLen = 0;
+      memcpy (replyContent, pContent, uiContentLen);
+      uiReplyLen += uiContentLen;
+      memcpy (&replyContent[uiReplyLen], "_reply", strlen ("_reply"));
+      uiReplyLen += strlen ("_reply");
+      replyContent[uiReplyLen] = '\0';
+
+      rmb_msg_set_content (p->pReplyMsg, replyContent, uiReplyLen);
+
+      char appHeader[10] = "{}";
+      rmb_msg_set_app_header (p->pReplyMsg, appHeader, strlen (appHeader));
+      //rmb_sub_reply_msg(p->pRmbSub, pReceiveMsg, p->pReplyMsg);
+      rmb_pub_reply_msg (p->pRmbPub, pReceiveMsg, p->pReplyMsg);
+    }
+    rmb_sub_ack_msg (p->pRmbSub, pReceiveMsg);
+  }
+
+  rmb_msg_free (pReceiveMsg);
+
+  return;
+}
+
+/**
+ * broad cast sub callback
+ */
+void func_with_broadcast (const char *buf, const int len, void *pAgv)
+//void func_with_broadcast(void *pRecvMsg, void* pAgv)
+{
+  if (pAgv == NULL)
+  {
+    printf ("%s:%d--%s pAgv is null\n", __FILE__, __LINE__, __func__);
+    return;
+  }
+
+  StDemoArgv *p = (StDemoArgv *) pAgv;
+  StRmbMsg *pReceiveMsg = rmb_msg_malloc ();
+  if (pReceiveMsg == NULL)
+  {
+    printf ("rmb_msg_malloc failed\n");
+    return;
+  }
+
+  shift_buf_2_msg (pReceiveMsg, buf, len);
+
+  p->ulMsgTotal++;
+
+  const char *pContent = NULL;
+  unsigned int uiContentLen = 0;
+  if ((pContent =
+       rmb_msg_get_content_ptr (pReceiveMsg, &uiContentLen)) != NULL)
+  {
+    if (p->uiLog == 1)
+    {
+      LOG_PRINT (RMB_LOG_INFO, "receive broadcase content len=%u, %s",
+                 uiContentLen, pContent);
+    }
+  }
+
+  rmb_msg_free (pReceiveMsg);
+}
+
+//RR异步时,pub回调函数
+void func_with_rr_rsp (const char *buf, const int len, void *pAgv)
+{
+  if (pAgv == NULL)
+  {
+    printf ("pAgv or pRevMsg is null");
+    return;
+  }
+
+  StDemoArgv *p = (StDemoArgv *) pAgv;
+
+  shift_buf_2_msg (p->pReceiveMsg, buf, len);
+
+  p->ulMsgTotal++;
+
+  const char *pUniqueId = rmb_msg_get_uniqueId_ptr (p->pReceiveMsg);
+  if (pUniqueId == NULL)
+  {
+    printf ("%s:%d-%s ,get uniqueId error!\n", __FILE__, __LINE__, __func__);
+  }
+  printf ("receive Msg:uniqueId = %s\n", pUniqueId);
+
+  const char *pContent = NULL;
+  unsigned int uiContentLen = 0;
+
+  if ((pContent =
+       rmb_msg_get_content_ptr (p->pReceiveMsg, &uiContentLen)) != NULL)
+  {
+
+    LOG_PRINT (RMB_LOG_INFO, "recevie reply msg:len=%d,%s\n", uiContentLen,
+               pContent);
+  }
+
+  return;
+}
+
+//key is topic, like: dcn-s-serviceid--scense_id  AB0-s-11000000-26-0
+int get_dcn_service_scense (const char *key, char *dcn, char *event,
+                            char *consumerSysId, char *service_id,
+                            char *scense_id)
+{
+  if (key == NULL)
+  {
+    printf ("key is null");
+    return -1;
+  }
+
+  char *p = (char *) key;
+  char *p1 = NULL;
+
+  //判断分隔符数量是否正确
+  int i = 0;
+
+  do
+  {
+    p1 = strchr (p, '-');
+    if (p1 == NULL)
+    {
+      break;
+    }
+    i++;
+    p = p1 + 1;
+  }
+  while (p != NULL);
+
+  if (i != 4)
+  {
+    LOG_PRINT (RMB_LOG_ERROR, "key:%s is error", key);
+    exit (1);
+  }
+
+  char temp[100];
+  memset (temp, 0x00, sizeof (temp));
+  strncpy (temp, key, sizeof (temp) - 1);
+
+  p = strrchr (temp, '-');
+  *p = '\0';
+  //copy scense id
+  p = strrchr (temp, '-');
+  if (p == NULL || scense_id == NULL)
+  {
+    return -1;
+  }
+  strcpy (scense_id, p + 1);
+  *p = '\0';
+
+  //copy service id
+  p = strrchr (temp, '-');
+  if (p == NULL || service_id == NULL)
+  {
+    return -1;
+  }
+  strcpy (service_id, p + 1);
+  *p = '\0';
+
+  //copy consumerSysId
+  /*
+     p = strrchr(temp, '/');
+     if (p == NULL) {
+     return -1;
+     }
+     if (consumerSysId != NULL) {
+     strcpy(consumerSysId, p+1);
+     }
+     *p = '\0';
+   */
+  //copy event
+  p = strrchr (temp, '-');
+  if (p == NULL)
+  {
+    return -1;
+  }
+  if (event != NULL)
+  {
+    strcpy (event, p + 1);
+  }
+  *p = '\0';
+
+  if (dcn == NULL)
+  {
+    return -1;
+  }
+  strcpy (dcn, temp);
+
+  return 0;
+}
+
+int pub_binary_message (StRmbPub * pRmbPub, const char *cDcn,
+                        const char *cServiceId, const char *cSenaId)
+{
+  int iRet = 0;
+  StRmbMsg *pSendMsg = rmb_msg_malloc ();
+  rmb_msg_clear (pSendMsg);
+  rmb_msg_set_bizSeqNo (pSendMsg, "12345678901234567890123456789012");
+  rmb_msg_set_consumerSeqNo (pSendMsg, "22222678901234567890123456799999");
+  rmb_msg_set_orgSysId (pSendMsg, "9999");
+
+  int iEventOrService = *(cServiceId + 3) - '0';
+  int flag = iEventOrService;
+  printf ("flag: %d\n", flag);
+  rmb_msg_set_dest (pSendMsg, RMB_DEST_TOPIC, cDcn, iEventOrService,
+                    cServiceId, cSenaId);
+
+  char *p = "This is test binary message";
+  char content[1024];
+  unsigned int uiConLen = 0;
+  memset (content, 0x00, sizeof (content));
+  content[0] = 0x02;
+  content[1] = 0;
+  memcpy (&content[2], p, strlen (p));
+  uiConLen = 2 + strlen (p);
+
+  rmb_msg_set_content (pSendMsg, content, uiConLen);
+  char appHeader[100] = "{}";
+  rmb_msg_set_app_header (pSendMsg, appHeader, strlen (appHeader));
+
+  if (flag == 1)
+  {
+    iRet = rmb_pub_send_msg (pRmbPub, pSendMsg);
+    if (iRet == 0)
+    {
+      LOG_PRINT (RMB_LOG_INFO, "send msg=%s OK", pSendMsg->cContent);
+    }
+    else
+    {
+      LOG_PRINT (RMB_LOG_ERROR, "rmb_pub_send_msg error!iRet = %d", iRet);
+    }
+    rmb_msg_free (pSendMsg);
+    return 0;
+  }
+  else
+  {
+    StRmbMsg *pReceiveMsg = rmb_msg_malloc ();
+    iRet = rmb_pub_send_and_receive (pRmbPub, pSendMsg, pReceiveMsg, 5000);
+    if (iRet == 0)
+    {
+      char receiveBuf[1024];
+      unsigned int receiveLen = sizeof (receiveBuf);
+      rmb_msg_get_content (pReceiveMsg, receiveBuf, &receiveLen);
+      LOG_PRINT (RMB_LOG_INFO, "**********receive reply pkg len=%d",
+                 receiveLen);
+      int i = 0;
+      for (i = 0; i < receiveLen; i++)
+      {
+        printf ("%c", receiveBuf[i]);
+      }
+      printf ("\n***********");
+//                      printf("%s:%d-%s ,receive reply pkg=%d%s\n", __FILE__, 
__LINE__, __func__, receiveBuf);
+    }
+    else
+    {
+      LOG_PRINT (RMB_LOG_ERROR, "rmb_pub_send_and_receive error!iRet = %d",
+                 iRet);
+    }
+    rmb_msg_free (pSendMsg);
+    rmb_msg_free (pReceiveMsg);
+    return 0;
+  }
+  return 0;
+}
+
+void destroy ()
+{
+  if (pRmbSub != NULL)
+  {
+    rmb_sub_close_v2 (pRmbSub);
+    //exit(0);
+  }
+  if (pRmbPub != NULL)
+  {
+    rmb_pub_close_v2 (pRmbPub);
+    exit (0);
+  }
+}
+
+void sigusr2_handle (int iSigVal)
+{
+
+  LOG_PRINT (RMB_LOG_INFO, "sub accept signal USR2");
+  //先stop_receive
+  if (rmb_sub_stop_receive (pRmbSub) != 0)
+  {
+    //没有成功停止接收,有异常
+    destroy ();
+  }
+  else
+  {
+    while (rmb_sub_check_req_mq_is_null (pRmbSub) != 0) //本地的共享内存还有未处理完的消息
+    {
+      rmb_sub_do_receive (pRmbSub, 1);
+    }
+    //处理完消息之后,退出
+    destroy ();
+  }
+}
+
+int main (int argc, char *argv[])
+{
+  if (argc <= 1)
+  {
+    printfUsage (argv[0]);
+  }
+
+  int ret = rmb_load_config ("./rmb.conf");
+  if (ret != 0)
+  {
+    printf ("load rmb config file failed \n");
+    return -1;
+  }
+  pRmbSub = (StRmbSub *) calloc (1, sizeof (StRmbSub));
+  if (pRmbSub == NULL)
+  {
+    printf ("%s:%d -- calloc for pRmbSub failed:%s\n", __func__, __LINE__,
+            strerror (errno));
+    return 0;
+  }
+  pRmbPub = (StRmbPub *) calloc (1, sizeof (StRmbPub));
+  if (pRmbPub == NULL)
+  {
+    printf ("%s:%d -- calloc for pRmbPub failed:%s\n", __func__, __LINE__,
+            strerror (errno));
+    return 0;
+  }
+
+  char event[10];
+  char dcn[10];
+  char service_id[10];
+  char scenario_id[10];
+
+  enum EVENT_OR_SERVICE_CALL event_or_serv = RMB_SERVICE_CALL;
+
+  int iRet = 0;
+
+  if (strcmp (argv[1], "sub") == 0)
+  {
+    //./process_name sub sleep_time log_control process_nums queue1 queue2 ...
+    // example: ./rmb_demo sub 1 1 1 FT0-s-98200001-01-1
+    if (argc < 6)
+      printfUsage (argv[0]);
+    signal (SIGUSR2, sigusr2_handle);
+    unsigned int uiShmKeyForReq = pRmbStConfig->uiShmKeyForReq;
+    unsigned int uiShmSizeForReq = pRmbStConfig->uiShmSizeForReq;
+    char strFifoPathForReq[128];
+    memset (strFifoPathForReq, 0x00, sizeof (strFifoPathForReq));
+    strcpy (strFifoPathForReq, pRmbStConfig->strFifoPathForReq);
+
+    LOG_PRINT (RMB_LOG_INFO, "sub pid:%d   process nums:%d", getpid (),
+               atoi (argv[4]));
+    //多进程
+    int i;
+    for (i = 1; i < atoi (argv[4]); i++)
+    {
+      if (fork () == 0)
+      {
+        uiShmKeyForReq += 4096 * i;
+        memset (strFifoPathForReq, 0x00, sizeof (strFifoPathForReq));
+        snprintf (strFifoPathForReq, sizeof (strFifoPathForReq),
+                  "./tmp_req_%d.fifo", getpid ());
+        break;
+      }
+    }
+
+    if ((iRet = rmb_sub_init (pRmbSub)) != 0)
+    {
+      LOG_PRINT (RMB_LOG_ERROR, "rmb_sub_init error=%d", iRet);
+      return -1;
+    }
+
+    StRmbMsg *pReceiveMsg = rmb_msg_malloc ();
+    StRmbMsg *pReplyMsg = rmb_msg_malloc ();
+    StDemoArgv *pTmp = (StDemoArgv *) calloc (1, sizeof (StDemoArgv));
+    if (pReceiveMsg == NULL || pReplyMsg == NULL || pTmp == NULL)
+    {
+      LOG_PRINT (RMB_LOG_ERROR,
+                 "pReceiveMsg is NULL or pReplyMsg is NULL or pTmp is NULL");
+      return -1;
+    }
+    pTmp->pRmbPub = pRmbPub;
+    pTmp->pRmbSub = pRmbSub;
+    pTmp->pReplyMsg = pReplyMsg;
+    pTmp->uiLog = (unsigned int) atoi (argv[3]);
+    ///////////
+    pTmp->ulMsgTotal = 0;
+    pTmp->uiSleepTime = (unsigned int) atoi (argv[2]);
+    pTmp->uiFlag = 0;
+
+    //注册回调
+    if ((iRet =
+         rmb_sub_add_reveive_req_by_mq_v2 (pRmbSub, strFifoPathForReq,
+                                           uiShmKeyForReq, uiShmSizeForReq,
+                                           func_with_req, pTmp)) != 0)
+    {
+      LOG_PRINT (RMB_LOG_ERROR, "rmb_sub_add_reveive_req_by_mq error=%d",
+                 iRet);
+      return -1;
+    }
+
+    st_rmb_queue_info *pQueueInfo;
+    pQueueInfo =
+      (st_rmb_queue_info *) malloc (sizeof (st_rmb_queue_info) * 100);
+    if (pQueueInfo == NULL)
+    {
+      printf ("malloc for pQueueInfo failed\n");
+      return -1;
+    }
+    memset (pQueueInfo, 0x00, (sizeof (st_rmb_queue_info) * 100));
+    st_rmb_queue_info *p = pQueueInfo;
+    unsigned int uiQueneNums = 0;
+
+    for (i = 0; (i < argc - 5) && (i < 100); i++)
+    {
+      memset (event, 0x00, sizeof (event));
+      memset (dcn, 0x00, sizeof (dcn));
+      memset (service_id, 0x00, sizeof (service_id));
+      memset (scenario_id, 0x00, sizeof (scenario_id));
+      if ((iRet =
+           get_dcn_service_scense (argv[i + 5], dcn, event, NULL, service_id,
+                                   scenario_id)) < 0)
+      {
+        printf ("parse queue:%s failed,iRet=%d\n", argv[i + 5], iRet);
+        continue;
+      }
+      if (strcmp (event, "e") == 0)
+      {
+        event_or_serv = RMB_EVENT_CALL;
+      }
+      else
+      {
+        event_or_serv = RMB_SERVICE_CALL;
+      }
+
+      LOG_PRINT (RMB_LOG_INFO,
+                 "sub:dcn:%s event:%d service_id:%s scenario_id:%s", dcn,
+                 (int) event_or_serv, service_id, scenario_id);
+
+      strncpy (p->cDcn, dcn, strlen (dcn));
+      strncpy (p->cServiceId, service_id, strlen (service_id));
+      strncpy (p->cScenarioId, scenario_id, strlen (scenario_id));
+      p += 1;
+      uiQueneNums += 1;
+    }
+
+    if ((iRet = rmb_sub_add_listen (pRmbSub, pQueueInfo, uiQueneNums)) != 0)
+    {
+      printf ("rmb_sub_add_listen failed, iRet = %d\n", iRet);
+      return -2;
+    }
+
+    unsigned long last_msg_total = 0;
+    unsigned long print_msg_ctrl = 0;
+    for (;;)
+    {
+      //to do recevive
+      rmb_sub_do_receive (pRmbSub, 1);
+
+      print_msg_ctrl++;
+
+      if (print_msg_ctrl > 5000)
+      {
+        if (last_msg_total != pTmp->ulMsgTotal)
+        {
+          printf ("%s:%d -- sub: pid:%d receive message total:%lu\n",
+                  __FILE__, __LINE__, getpid (), pTmp->ulMsgTotal);
+          print_msg_ctrl = 0;
+          last_msg_total = pTmp->ulMsgTotal;
+        }
+      }
+    }
+  }
+
+  else if (strcmp (argv[1], "sub_broadcast") == 0)
+  {
+    //./process_name sub_broadcast process_nums log_control topic(queue)1 
topic(queue)2 ...
+    unsigned int uiShmKeyForBroadcastReq = pRmbStConfig->uiShmKeyForBroadcast;
+    unsigned int uiShmSizeRorBroadcastReq =
+      pRmbStConfig->uiShmSizeForBroadcast;
+    char strFifoPathForBroadcastReq[128];
+
+    snprintf (strFifoPathForBroadcastReq, sizeof (strFifoPathForBroadcastReq),
+              "%s", pRmbStConfig->strFifoPathForReq);
+
+    printf ("sub_broadcast process nums:%d", atoi (argv[2]));
+    //mutil-process
+    int i;
+    for (i = 1; i < atoi (argv[2]); i++)
+    {
+      if (fork () == 0)
+      {
+        uiShmKeyForBroadcastReq += 4096 * i;
+        memset (uiShmSizeRorBroadcastReq, 0x00,
+                sizeof (uiShmSizeRorBroadcastReq));
+        snprintf (strFifoPathForBroadcastReq,
+                  sizeof (strFifoPathForBroadcastReq), "./tmp_req_%d.fifo",
+                  getpid ());
+        break;
+      }
+    }
+
+    if ((iRet = rmb_sub_init (pRmbSub)) != 0)
+    {
+      printf ("rmb_sub_init failed\n");
+      return -1;
+    }
+
+    //StRmbMsg *pReceiveMsg = rmb_msg_malloc();
+    StDemoArgv *pTmp = (StDemoArgv *) calloc (1, sizeof (StDemoArgv));
+    pTmp->pRmbPub = pRmbPub;
+    pTmp->pRmbSub = pRmbSub;
+    pTmp->pReplyMsg = NULL;
+    pTmp->ulMsgTotal = 0;
+    pTmp->uiLog = (unsigned int) atoi (argv[3]);
+
+    //rmb_callback_func func, void *func_msg, void* func_argv
+    if ((iRet =
+         rmb_sub_add_reveive_broadcast_by_mq_v2 (pRmbSub,
+                                                 strFifoPathForBroadcastReq,
+                                                 uiShmKeyForBroadcastReq,
+                                                 uiShmSizeRorBroadcastReq,
+                                                 func_with_broadcast,
+                                                 pTmp)) != 0)
+    {
+      LOG_PRINT (RMB_LOG_ERROR,
+                 "rmb_sub_add_reveive_broadcast_by_mq_v2 failed!iRet=%d",
+                 iRet);
+      return -1;
+    }
+
+    st_rmb_queue_info *pQueueInfo;
+    pQueueInfo =
+      (st_rmb_queue_info *) malloc (sizeof (st_rmb_queue_info) * 100);
+    if (pQueueInfo == NULL)
+    {
+      printf ("malloc for pQueueInfo failed\n");
+      return -1;
+    }
+    memset (pQueueInfo, 0x00, (sizeof (st_rmb_queue_info) * 100));
+    st_rmb_queue_info *p = pQueueInfo;
+    unsigned int uiQueneNums = 0;
+
+    for (i = 0; (i < argc - 4) && (i < 100); i++)
+    {
+      memset (dcn, 0x00, sizeof (dcn));
+      memset (service_id, 0x00, sizeof (service_id));
+      memset (scenario_id, 0x00, sizeof (scenario_id));
+
+      if (get_dcn_service_scense
+          (argv[i + 4], dcn, NULL, NULL, service_id, scenario_id) < 0)
+      {
+        printf ("parse:%s failed\n", argv[i + 4]);
+        continue;
+      }
+      LOG_PRINT (RMB_LOG_INFO, "board_sub: dcn:%s service_id:%s scense_id:%s",
+                 dcn, service_id, scenario_id);
+
+      event_or_serv = RMB_EVENT_CALL;
+
+      strncpy (p->cDcn, dcn, strlen (dcn));
+      strncpy (p->cServiceId, service_id, strlen (service_id));
+      strncpy (p->cScenarioId, scenario_id, strlen (scenario_id));
+      p += 1;
+      uiQueneNums += 1;
+    }
+
+    if ((iRet = rmb_sub_add_listen (pRmbSub, pQueueInfo, uiQueneNums)) != 0)
+    {
+      LOG_PRINT (RMB_LOG_ERROR, "rmb_sub_add_listen failed, iRet = %d", iRet);
+      return -2;
+    }
+
+    unsigned long print_msg_ctrl = 0;
+    for (;;)
+    {
+      rmb_sub_do_receive (pRmbSub, 1);
+      print_msg_ctrl++;
+
+      if (print_msg_ctrl > 3000)
+      {
+        LOG_PRINT (RMB_LOG_INFO, "sub: pid:%d receive message total:%lu",
+                   getpid (), pTmp->ulMsgTotal);
+        print_msg_ctrl = 0;
+      }
+    }
+
+  }
+  else if (strcmp (argv[1], "pub") == 0)
+  {
+    //./process_name pub pthread_nums log_control msg_nums msg_len ttl_time 
sleep_time queue1 queue2 ...
+    /*
+     * pub:指"rr_sync_pub" or "unicast_pub"
+     * pthread_nums: 发送消息的线程数量
+     * log_control: 是否打印收发消息的长度,这个是针对包大小测试加的
+     * msg_nums: 每个线程发送消息的数量或单线程往每个queue上发送的消息数量
+     * msg_len: 发送的消息体的总的长度,这个主要是针对测试包大小而加入的
+     * queue1、queue2...: 发送消息使用的queue
+     * 
+     * example: ./rmb_demo pub 1 1 1 10 3000 1 FT0-s-98200001-01-1
+     */
+    if (argc < 9)
+      printfUsage (argv[0]);
+
+    if ((iRet = rmb_pub_init (pRmbPub)) != 0)
+    {
+      LOG_PRINT (RMB_LOG_ERROR, "rmb_pub_init error=%d", iRet);
+      return -1;
+    }
+
+    int sleep_time = 0;
+    if (atoi (argv[7]) > 0)
+      sleep_time = atoi (argv[7]);
+    char *msg = (char *) calloc (sizeof (char), atoi (argv[5]) + 1);
+    int msg_len = strlen (send_msg);
+    int copy_nums = atoi (argv[5]) / msg_len;
+
+    char *p = msg;
+    int i;
+    for (i = 0; i < copy_nums; i++)
+    {
+      memcpy (p, send_msg, msg_len);
+      p += msg_len;
+    }
+
+    if (atoi (argv[5]) % msg_len != 0)
+    {
+      memcpy (p, send_msg, atoi (argv[5]) % msg_len);
+    }
+
+    if (strcmp (argv[3], "1") == 0)
+    {
+      LOG_PRINT (RMB_LOG_INFO, "pub send msg len:%d, content len:%lu",
+                 atoi (argv[5]), strlen (msg));
+    }
+
+    tMessage msg_info;
+    msg_info.type = 0;
+    msg_info.pid = getpid ();
+    msg_info.process_num = 1;
+    msg_info.send_msg_num = 0;
+    msg_info.recv_msg_num = 0;
+    enum EVENT_OR_SERVICE_CALL event_or_serv = RMB_SERVICE_CALL;
+    //queue numbers
+    int iQueueNumbers = argc - 8;
+    if (atoi (argv[2]) == 1)
+    {                           //single thread
+      for (i = 0; i < iQueueNumbers; i++)
+      {
+        char event[100];
+        char dcn[100];
+        char service_id[100];
+        char scenario_id[100];
+
+        memset (event, 0x00, sizeof (event));
+        memset (dcn, 0x00, sizeof (dcn));
+        memset (service_id, 0x00, sizeof (service_id));
+        memset (scenario_id, 0x00, sizeof (scenario_id));
+
+        if (get_dcn_service_scense
+            (argv[i + 8], dcn, event, NULL, service_id, scenario_id) < 0)
+        {
+          continue;
+        }
+
+        if (strcmp (event, "e") == 0)
+        {
+          event_or_serv = RMB_EVENT_CALL;
+        }
+
+        LOG_PRINT (RMB_LOG_INFO,
+                   "dcn:%s event:%d service_id:%s scenario_id:%s", dcn,
+                   (int) event_or_serv, service_id, scenario_id);
+
+        unsigned long pub_send_msg = 0;
+        int l;
+        for (l = 0; l < atoi (argv[4]); l++)
+        {
+          iRet =
+            pubOwnMessage (pRmbPub, event_or_serv, atol (argv[6]), dcn,
+                           service_id, scenario_id,
+                           (unsigned int) atoi (argv[3]), msg);
+          if (iRet == 0)
+          {
+            pub_send_msg++;
+          }
+          if (sleep_time > 0)
+            sleep (sleep_time);
+        }
+
+        msg_info.send_msg_num += (unsigned int) atoi (argv[4]);
+        msg_info.recv_msg_num += pub_send_msg;
+
+        if (event_or_serv == RMB_EVENT_CALL)
+        {
+          LOG_PRINT (RMB_LOG_INFO,
+                     "pid:%ld queue:%s pub send:%d and success:%lu\n",
+                     (long) getpid (), argv[i + 8], atoi (argv[4]),
+                     pub_send_msg);
+        }
+        else
+        {
+          LOG_PRINT (RMB_LOG_INFO,
+                     "pid:%ld queue:%s pub send:%d and recv:%lu\n",
+                     (long) getpid (), argv[i + 8], atoi (argv[4]),
+                     pub_send_msg);
+        }
+      }
+      rmb_pub_close (pRmbPub);
+    }
+    else if (atoi (argv[2]) > 1)
+    {                           //mutil thread
+      //./process_name pub pthread_nums log_control msg_nums msg_len msg 
ttl_time queue1 queue2 ...
+      pthread_t thVec[200];
+      tThreadArgs tArgs[200];
+      unsigned int thread_nums =
+        (atoi (argv[2]) < 200 ? atoi (argv[2]) : 200);
+      for (i = 0; i < iQueueNumbers && i < 200; ++i)
+      {
+        tArgs[i].pRmbPub = pRmbPub;
+        tArgs[i].times = atoi (argv[4]);
+        tArgs[i].iContent = i;
+        tArgs[i].ulMsgSucc = 0;
+        tArgs[i].ttl = atol (argv[6]);
+        tArgs[i].pMsg = msg;
+
+        char event[100];
+
+        memset (event, 0x00, sizeof (event));
+        memset (tArgs[i].cDcn, 0x00, sizeof (tArgs[i].cDcn));
+        memset (tArgs[i].cServiceId, 0x00, sizeof (tArgs[i].cServiceId));
+        memset (tArgs[i].cSenaId, 0x00, sizeof (tArgs[i].cSenaId));
+
+        get_dcn_service_scense (argv[i + 8], tArgs[i].cDcn, event, NULL,
+                                tArgs[i].cServiceId, tArgs[i].cSenaId);
+
+        if (strcmp (event, "e") == 0)
+        {
+          event_or_serv = RMB_EVENT_CALL;
+        }
+        tArgs[i].event_or_service = event_or_serv;
+      }
+      if (thread_nums > iQueueNumbers)
+      {
+        for (i = iQueueNumbers; i < thread_nums; i++)
+        {
+          tArgs[i].pRmbPub = pRmbPub;
+          tArgs[i].times = atoi (argv[4]);
+          tArgs[i].event_or_service =
+            tArgs[i % iQueueNumbers].event_or_service;
+          tArgs[i].iContent = i;
+          tArgs[i].pMsg = msg;
+
+          memset (tArgs[i].cDcn, 0x00, sizeof (tArgs[i].cDcn));
+          strcpy (tArgs[i].cDcn, tArgs[i % iQueueNumbers].cDcn);
+
+          memset (tArgs[i].cServiceId, 0x00, sizeof (tArgs[i].cServiceId));
+          strcpy (tArgs[i].cServiceId, tArgs[i % iQueueNumbers].cServiceId);
+
+          memset (tArgs[i].cSenaId, 0x00, sizeof (tArgs[i].cSenaId));
+          strcpy (tArgs[i].cSenaId, tArgs[i % iQueueNumbers].cSenaId);
+        }
+      }
+      for (i = 0; i < thread_nums; ++i)
+      {
+        if ((iRet =
+             pthread_create (&thVec[i], NULL, (void *) &mutil_thread_for_pub,
+                             (void *) &tArgs[i])) != 0)
+        {
+          printf ("%d thread error!iRet=%d\n", i, iRet);
+        }
+      }
+
+      unsigned long total_msg = 0;
+      for (i = 0; i < thread_nums; ++i)
+      {
+        if ((iRet = pthread_join (thVec[i], NULL)) != 0)
+        {
+          printf ("%d thread error!iRet=%d\n", i, iRet);
+        }
+        else
+        {
+          total_msg += tArgs[i].ulMsgSucc;
+        }
+      }
+      if (event_or_serv == RMB_SERVICE_CALL)
+      {
+        LOG_PRINT (RMB_LOG_INFO, "*****pub send msg:%d recv:%lu",
+                   thread_nums * atoi (argv[4]), total_msg);
+      }
+      else
+      {
+        LOG_PRINT (RMB_LOG_INFO, "*****pub send msg:%d success:%lu",
+                   thread_nums * atoi (argv[4]), total_msg);
+      }
+
+      msg_info.send_msg_num = thread_nums * atoi (argv[4]);
+      msg_info.recv_msg_num = total_msg;
+    }
+
+    free (msg);
+  }
+  else if (strcmp (argv[1], "rr_async_pub") == 0)
+  {
+    //./test_rmb_capi rr_async_pub process_nums log_control msg_nums msg_len 
sleep_time timeout queue
+    if (argc < 9)
+      printfUsage (argv[0]);
+
+    unsigned int uiShmKeyForRsq = pRmbStConfig->uiShmKeyForRRrsp;
+    unsigned int uiShmSizeForRsq = pRmbStConfig->uiShmSizeForRRrsp;
+    char strFifoPathForRsq[128];
+    memset (strFifoPathForRsq, 0x00, sizeof (strFifoPathForRsq));
+    strcpy (strFifoPathForRsq, pRmbStConfig->strFifoPathForRRrsp);
+
+    LOG_PRINT (RMB_LOG_INFO,
+               "***RR async, process total:%d, per process send message 
numbers:%d",
+               atoi (argv[2]), atoi (argv[4]));
+
+    int sleep_time = 0;
+    if (atoi (argv[6]) > 0)
+      sleep_time = atoi (argv[6]);
+    int i;
+    for (i = 1; i < atoi (argv[2]); i++)
+    {
+      if (fork () == 0)
+      {
+        uiShmKeyForRsq += 4096 * i;
+        memset (strFifoPathForRsq, 0x00, sizeof (strFifoPathForRsq));
+        snprintf (strFifoPathForRsq, sizeof (strFifoPathForRsq),
+                  "./tmp_rsq_%d.fifo", getpid ());
+        break;
+      }
+    }
+
+    if ((iRet = rmb_pub_init (pRmbPub)) != 0)
+    {
+      LOG_PRINT (RMB_LOG_ERROR, "rmb_pub_init error=%d", iRet);
+      return -1;
+    }
+
+    if ((iRet = rmb_sub_init (pRmbSub)) != 0)
+    {
+      LOG_PRINT (RMB_LOG_ERROR, "rmb_sub_init error=%d", iRet);
+      return -1;
+    }
+    StRmbMsg *pReceiveMsg = rmb_msg_malloc ();
+    StRmbMsg *pReplyMsg = rmb_msg_malloc ();
+    StDemoArgv *pTmp = (StDemoArgv *) calloc (1, sizeof (StDemoArgv));
+    pTmp->pRmbPub = pRmbPub;
+    pTmp->pRmbSub = pRmbSub;
+    pTmp->pReceiveMsg = pReceiveMsg;
+    pTmp->pReplyMsg = pReplyMsg;
+    pTmp->uiLog = (unsigned int) atoi (argv[3]);
+    pTmp->ulMsgTotal = 0;
+
+    if ((iRet =
+         rmb_sub_add_reveive_rsp_by_mq_v2 (pRmbSub, strFifoPathForRsq,
+                                           uiShmKeyForRsq, uiShmSizeForRsq,
+                                           func_with_rr_rsp, pTmp)) != 0)
+    {
+      LOG_PRINT (RMB_LOG_ERROR, "rmb_sub_add_reveive_req_by_mq error=%d",
+                 iRet);
+      return -1;
+    }
+
+    //./test_rmb_capi rr_async_pub process_nums log_control msg_nums msg_len 
msg queue
+    char *msg = (char *) calloc (sizeof (char), atoi (argv[5]) + 1);
+    int msg_len = strlen (send_msg);
+    int copy_nums = atoi (argv[5]) / msg_len;
+
+    char *p = msg;
+    for (i = 0; i < copy_nums; i++)
+    {
+      memcpy (p, send_msg, msg_len);
+      p += msg_len;
+    }
+
+    if (atoi (argv[5]) % msg_len != 0)
+    {
+      memcpy (p, send_msg, atoi (argv[5]) % msg_len);
+    }
+
+    LOG_PRINT (RMB_LOG_INFO, "rr async pub send msg len:%d, content len:%lu",
+               atoi (argv[5]), strlen (msg));
+
+    StRmbMsg *pSendMsg = rmb_msg_malloc ();
+
+    static unsigned int uiSeq = 1;
+    struct timeval tv;
+    gettimeofday (&tv, NULL);
+    unsigned long ulNowTime = tv.tv_sec * 1000 + tv.tv_usec / 1000;
+    char cSeqNo[33];
+    snprintf (cSeqNo, sizeof (cSeqNo), "%013lu%019u", ulNowTime, uiSeq++);
+    rmb_msg_set_bizSeqNo (pSendMsg, cSeqNo);
+    rmb_msg_set_consumerSeqNo (pSendMsg, cSeqNo);
+    rmb_msg_set_orgSysId (pSendMsg, "9999");
+    rmb_msg_set_content (pSendMsg, msg, strlen (msg));
+    char appHeader[100] = "{}";
+    rmb_msg_set_app_header (pSendMsg, appHeader, strlen (appHeader));
+
+    //增加多个队列支持
+    unsigned int send_msg = 0;
+    for (i = 0; i < argc - 7; i++)
+    {
+      char dcn[100];
+      char service_id[100];
+      char scense_id[100];
+      memset (dcn, 0x00, sizeof (dcn));
+      memset (service_id, 0x00, sizeof (service_id));
+      memset (scense_id, 0x00, sizeof (scense_id));
+      if (get_dcn_service_scense
+          (argv[i + 8], dcn, NULL, NULL, service_id, scense_id) < 0)
+      {
+        printf ("RR async pub queue error:%s\n", argv[i + 6]);
+        continue;
+      }
+      printf ("dcn:%s service_id:%s scense_id:%s\n", dcn, service_id,
+              scense_id);
+
+      rmb_msg_set_dest (pSendMsg, RMB_DEST_TOPIC, dcn, RMB_SERVICE_CALL,
+                        service_id, scense_id);
+      int j;
+      for (j = 0; j < atoi (argv[4]); j++)
+      {
+        iRet = rmb_pub_send_rr_msg_async (pRmbPub, pSendMsg, atoi (argv[7]));
+        if (iRet != 0)
+        {
+          printf ("%s:%d-%s ,rmb_pub_send_and_receive error!iRet = %d\n",
+                  __FILE__, __LINE__, __func__, iRet);
+        }
+        else
+        {
+          send_msg++;
+          if (sleep_time > 0)
+            sleep (sleep_time);
+        }
+      }
+    }
+
+    LOG_PRINT (RMB_LOG_INFO, "pid:%d rmb_pub_async_message success total: %u",
+               (int) getpid (), send_msg);
+
+    tMessage msg_info;
+    msg_info.type = 0;
+    msg_info.pid = getpid ();
+    msg_info.process_num = atoi (argv[4]);
+    msg_info.send_msg_num = atoi (argv[4]) * (argc - 7);
+    msg_info.recv_msg_num = 0;
+
+    unsigned long last_msg_total = 0;
+    unsigned long print_msg_ctrl = 0;
+    for (;;)
+    {
+      //to do recevive
+      rmb_sub_do_receive (pRmbSub, 1);
+      print_msg_ctrl++;
+
+      if (print_msg_ctrl > 10000)
+      {
+        print_msg_ctrl = 0;
+        if (pTmp->ulMsgTotal != last_msg_total)
+        {
+          LOG_PRINT (RMB_LOG_INFO, "pid:%d get_reply_msg_total:%lu",
+                     (int) getpid (), pTmp->ulMsgTotal);
+          last_msg_total = pTmp->ulMsgTotal;
+        }
+      }
+    }
+  }
+  else if (strcmp (argv[1], "pub_board") == 0)
+  {
+    //./process_name pub_board message_nums message orgId queue
+    //send board message
+    if ((iRet = rmb_pub_init (pRmbPub)) != 0)
+    {
+      LOG_PRINT (RMB_LOG_ERROR, "rmb_pub_init error=%d", iRet);
+      return -1;
+    }
+
+    enum EVENT_OR_SERVICE_CALL event_or_serv = RMB_SERVICE_CALL;
+
+    char event[100];
+    char dcn[100];
+    char service_id[100];
+    char scenario_id[100];
+
+    memset (event, 0x00, sizeof (event));
+    memset (dcn, 0x00, sizeof (dcn));
+    memset (service_id, 0x00, sizeof (service_id));
+    memset (scenario_id, 0x00, sizeof (scenario_id));
+
+    iRet =
+      get_dcn_service_scense (argv[5], dcn, event, NULL, service_id,
+                              scenario_id);
+
+    if (iRet < 0)
+    {
+      printf ("get_dcn_service_scense return:%d\n", iRet);
+      return iRet;
+    }
+
+    if (strcmp (event, "e") == 0)
+    {
+      event_or_serv = RMB_EVENT_CALL;
+    }
+    LOG_PRINT (RMB_LOG_INFO, "dcn:%s event:%d service_id:%s scenario_id:%s",
+               dcn, (int) event_or_serv, service_id, scenario_id);
+    unsigned long pub_send_msg = 0;
+    int i;
+    for (i = 0; i < atoi (argv[2]); i++)
+    {
+      iRet =
+        pub_board_message (pRmbPub, event_or_serv, 0, dcn, service_id,
+                           scenario_id, argv[4], 1, argv[3]);
+      if (iRet == 0)
+      {
+        pub_send_msg++;
+      }
+    }
+  }
+
+  else
+  {
+    printf ("unknown argv[1]:%s\n", argv[1]);
+    printfUsage (argv[0]);
+  }
+
+  return 0;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to