This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 144930e  [ISSUE #494] [Part-A] Add register and unregister MQAdmin in 
MQClientFactory (#495)
144930e is described below

commit 144930eafd764fa85a588b7ba00d6a83643b5717
Author: takagi <[email protected]>
AuthorDate: Tue Jun 24 11:01:30 2025 +0800

    [ISSUE #494] [Part-A] Add register and unregister MQAdmin in 
MQClientFactory (#495)
---
 src/MQClientFactory.cpp       | 45 ++++++++++++++++++++++++
 src/MQClientFactory.h         | 13 +++++++
 src/client/DefaultMQAdmin.cpp | 80 +++++++++++++++++++++++++++++++++++++++++++
 src/client/DefaultMQAdmin.h   | 32 +++++++++++++++++
 src/common/UtilAll.h          |  1 +
 src/include/MQAdmin.h         | 28 +++++++++++++++
 6 files changed, 199 insertions(+)

diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 69d2543..c4673d8 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -393,6 +393,35 @@ void MQClientFactory::unregisterConsumer(MQConsumer* 
pConsumer) {
   eraseConsumerFromTable(groupName);
 }
 
+bool MQClientFactory::registerMQAdmin(MQAdmin* pAdmin) {
+  string groupName = pAdmin->getGroupName();
+  string namesrvaddr = pAdmin->getNamesrvAddr();
+  if (groupName.empty()) {
+    return false;
+  }
+  if (!addAdminToTable(groupName, pAdmin)) {
+    return false;
+  }
+  LOG_DEBUG("registerAdmin success:%s", groupName.c_str());
+  //<!set nameserver;
+  if (namesrvaddr.empty()) {
+    string nameSrvDomain(pAdmin->getNamesrvDomain());
+    if (!nameSrvDomain.empty())
+      m_nameSrvDomain = nameSrvDomain;
+    
pAdmin->setNamesrvAddr(m_pClientAPIImpl->fetchNameServerAddr(m_nameSrvDomain));
+  } else {
+    m_bFetchNSService = false;
+    m_pClientAPIImpl->updateNameServerAddr(namesrvaddr);
+    LOG_INFO("user specfied name server address: %s", namesrvaddr.c_str());
+  }
+  return true;
+}
+
+void MQClientFactory::unregisterMQAdmin(MQAdmin* pAdmin) {
+  string groupName = pAdmin->getGroupName();
+  eraseAdminFromTable(groupName);
+}
+
 MQProducer* MQClientFactory::selectProducer(const string& producerName) {
   boost::lock_guard<boost::mutex> lock(m_producerTableMutex);
   if (m_producerTable.find(producerName) != m_producerTable.end()) {
@@ -493,6 +522,22 @@ void MQClientFactory::eraseConsumerFromTable(const string& 
consumerName) {
     LOG_WARN("could not find consumer:%s from table", consumerName.c_str());
 }
 
+bool MQClientFactory::addAdminToTable(const string& adminName, MQAdmin* 
pMQAdmin) {
+  boost::lock_guard<boost::recursive_mutex> lock(m_adminTableMutex);
+  if (m_adminTable.find(adminName) != m_adminTable.end())
+    return false;
+  m_adminTable[adminName] = pMQAdmin;
+  return true;
+}
+
+void MQClientFactory::eraseAdminFromTable(const string& adminName) {
+  boost::lock_guard<boost::recursive_mutex> lock(m_adminTableMutex);
+  if (m_adminTable.find(adminName) != m_adminTable.end())
+    m_adminTable.erase(adminName);
+  else
+    LOG_WARN("could not find admin:%s from table", adminName.c_str());
+}
+
 int MQClientFactory::getConsumerTableSize() {
   boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
   return m_consumerTable.size();
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index e071db2..6e79b28 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -23,6 +23,7 @@
 #include <boost/thread/recursive_mutex.hpp>
 #include <boost/thread/thread.hpp>
 #include "FindBrokerResult.h"
+#include "MQAdmin.h"
 #include "MQClientAPIImpl.h"
 #include "MQClientException.h"
 #include "MQConsumer.h"
@@ -57,6 +58,8 @@ class MQClientFactory {
   virtual void unregisterProducer(MQProducer* pProducer);
   virtual bool registerConsumer(MQConsumer* pConsumer);
   virtual void unregisterConsumer(MQConsumer* pConsumer);
+  virtual bool registerMQAdmin(MQAdmin* pAdmin);
+  virtual void unregisterMQAdmin(MQAdmin* pAdmin);
 
   void createTopic(const string& key,
                    const string& newTopic,
@@ -157,6 +160,9 @@ class MQClientFactory {
   int getProducerTableSize();
   void insertProducerInfoToHeartBeatData(HeartbeatData* pHeartbeatData);
 
+  // admin related operation
+  void eraseAdminFromTable(const string& adminName);
+
   // topicPublishInfo related operation
   void addTopicInfoToTable(const string& topic, 
boost::shared_ptr<TopicPublishInfo> pTopicPublishInfo);
   void eraseTopicInfoFromTable(const string& topic);
@@ -173,6 +179,7 @@ class MQClientFactory {
 
   bool addProducerToTable(const string& producerName, MQProducer* pMQProducer);
   bool addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer);
+  bool addAdminToTable(const string& adminName, MQAdmin* pMQAdmin);
 
  private:
   string m_nameSrvDomain;  // per clientId
@@ -190,6 +197,12 @@ class MQClientFactory {
   boost::recursive_mutex m_consumerTableMutex;
   MQCMAP m_consumerTable;
 
+  //<! group --> MQAdmin;
+  typedef map<string, MQAdmin*> MQAMAP;
+  // Changed to recursive mutex due to avoid deadlock issue:
+  boost::recursive_mutex m_adminTableMutex;
+  MQAMAP m_adminTable;
+
   //<! Topic---> TopicRouteData
   typedef map<string, TopicRouteData*> TRDMAP;
   boost::mutex m_topicRouteTableMutex;
diff --git a/src/client/DefaultMQAdmin.cpp b/src/client/DefaultMQAdmin.cpp
new file mode 100644
index 0000000..e58022c
--- /dev/null
+++ b/src/client/DefaultMQAdmin.cpp
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefaultMQAdmin.h"
+
+namespace rocketmq {
+DefaultMQAdmin::DefaultMQAdmin(const std::string& groupName) {
+  //<!set default group name;
+  string gname = groupName.empty() ? DEFAULT_ADMIN_GROUP : groupName;
+  setGroupName(gname);
+}
+
+DefaultMQAdmin::~DefaultMQAdmin() {}
+
+void DefaultMQAdmin::start() {
+#ifndef WIN32
+  /* Ignore the SIGPIPE */
+  struct sigaction sa;
+  memset(&sa, 0, sizeof(struct sigaction));
+  sa.sa_handler = SIG_IGN;
+  sa.sa_flags = 0;
+  sigaction(SIGPIPE, &sa, 0);
+#endif
+  LOG_WARN("###Current Admin@%s", getClientVersionString().c_str());
+  switch (m_serviceState) {
+    case CREATE_JUST: {
+      m_serviceState = START_FAILED;
+      DefaultMQClient::start();
+      LOG_INFO("DefaultMQAdmin:%s start", m_GroupName.c_str());
+      bool registerOK = getFactory()->registerMQAdmin(this);
+      if (!registerOK) {
+        m_serviceState = CREATE_JUST;
+        THROW_MQEXCEPTION(
+            MQClientException,
+            "The admin group[" + getGroupName() + "] has been created before, 
specify another name please.", -1);
+      }
+      getFactory()->start();
+      m_serviceState = RUNNING;
+      break;
+    }
+    case RUNNING:
+    case START_FAILED:
+    case SHUTDOWN_ALREADY:
+      break;
+    default:
+      break;
+  }
+}
+
+void DefaultMQAdmin::shutdown() {
+  switch (m_serviceState) {
+    case RUNNING: {
+      LOG_INFO("DefaultMQAdmin:%s shutdown", m_GroupName.c_str());
+      getFactory()->unregisterMQAdmin(this);
+      getFactory()->shutdown();
+      m_serviceState = SHUTDOWN_ALREADY;
+      break;
+    }
+    case SHUTDOWN_ALREADY:
+    case CREATE_JUST:
+      break;
+    default:
+      break;
+  }
+}
+}  // namespace rocketmq
\ No newline at end of file
diff --git a/src/client/DefaultMQAdmin.h b/src/client/DefaultMQAdmin.h
new file mode 100644
index 0000000..baa7e0f
--- /dev/null
+++ b/src/client/DefaultMQAdmin.h
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef DEFAULTMQADMIN_H
+#define DEFAULTMQADMIN_H
+#include "MQAdmin.h"
+#include "MQClientFactory.h"
+
+namespace rocketmq {
+class DefaultMQAdmin : public MQAdmin {
+ public:
+  DefaultMQAdmin(const std::string& groupname);
+  ~DefaultMQAdmin();
+  void start();
+  void shutdown();
+};
+};  // namespace rocketmq
+#endif  // DEFAULTMQADMIN_H
\ No newline at end of file
diff --git a/src/common/UtilAll.h b/src/common/UtilAll.h
index 58e62ab..b2bc1a3 100644
--- a/src/common/UtilAll.h
+++ b/src/common/UtilAll.h
@@ -54,6 +54,7 @@ const string DEFAULT_TOPIC = "TBW102";
 const string BENCHMARK_TOPIC = "BenchmarkTest";
 const string DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
 const string DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
+const string DEFAULT_ADMIN_GROUP = "DEFAULT_ADMIN";
 const string TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER";
 const string CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER";
 const string SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
diff --git a/src/include/MQAdmin.h b/src/include/MQAdmin.h
new file mode 100644
index 0000000..545dacb
--- /dev/null
+++ b/src/include/MQAdmin.h
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef DEFAULTMQADMIN_H__
+#define DEFAULTMQADMIN_H__
+#include "DefaultMQClient.h"
+
+namespace rocketmq {
+class MQAdmin : public DefaultMQClient {
+ public:
+  virtual ~MQAdmin() {}
+};
+}  // namespace rocketmq
+#endif  // DEFAULTMQADMIN_H__
\ No newline at end of file

Reply via email to