This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 43bcb9d859 [INLONG-9827][Manager] Fix the problem of failed to check 
if the consumption group exists (#9828)
43bcb9d859 is described below

commit 43bcb9d8599dc89c265d4f49b3d816daa321cc8a
Author: fuweng11 <[email protected]>
AuthorDate: Sun Mar 17 20:46:00 2024 +0800

    [INLONG-9827][Manager] Fix the problem of failed to check if the 
consumption group exists (#9828)
---
 .../resource/queue/tubemq/TubeMQOperator.java      | 55 +++++++++++-----------
 1 file changed, 28 insertions(+), 27 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
index d4f852f893..e13b4cbe2c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
@@ -65,7 +65,6 @@ public class TubeMQOperator {
      * TubeMQ const for HTTP URL format
      */
     private static final String TOPIC_NAME = "&topicName=";
-    private static final String CONSUME_GROUP = "&consumeGroup=";
     private static final String GROUP_NAME = "&groupName=";
     private static final String BROKER_ID = "&brokerId=";
     private static final String CREATE_USER = "&createUser=";
@@ -89,18 +88,18 @@ public class TubeMQOperator {
      */
     public void createTopic(@Nonnull TubeClusterInfo tubeCluster, String 
topicName, String operator) {
         String masterUrl = tubeCluster.getMasterWebUrl();
-        LOGGER.info("begin to create tubemq topic {} in master {}", topicName, 
masterUrl);
+        LOGGER.info("begin to create TubeMQ topic {} in master {}", topicName, 
masterUrl);
         if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(topicName)) {
-            throw new BusinessException("tubemq master url or tubemq topic 
cannot be null");
+            throw new BusinessException("TubeMQ master url or TubeMQ topic 
cannot be null");
         }
 
         if (this.isTopicExist(masterUrl, topicName)) {
-            LOGGER.warn("tubemq topic {} already exists in {}, skip to 
create", topicName, masterUrl);
+            LOGGER.warn("TubeMQ topic {} already exists in {}, skip to 
create", topicName, masterUrl);
             return;
         }
 
         this.createTopicOpt(masterUrl, topicName, tubeCluster.getToken(), 
operator);
-        LOGGER.info("success to create tubemq topic {} in {}", topicName, 
masterUrl);
+        LOGGER.info("success to create TubeMQ topic {} in {}", topicName, 
masterUrl);
     }
 
     /**
@@ -110,39 +109,39 @@ public class TubeMQOperator {
         String masterUrl = tubeCluster.getMasterWebUrl();
         LOGGER.info("begin to create consumer group {} for topic {} in master 
{}", consumerGroup, topic, masterUrl);
         if (StringUtils.isEmpty(masterUrl) || 
StringUtils.isEmpty(consumerGroup) || StringUtils.isEmpty(topic)) {
-            throw new BusinessException("tubemq master url, consumer group, or 
tubemq topic cannot be null");
+            throw new BusinessException("TubeMQ master url, consumer group, or 
TubeMQ topic cannot be null");
         }
 
         if (!this.isTopicExist(masterUrl, topic)) {
-            LOGGER.warn("cannot create tubemq consumer group {}, as the topic 
{} not exists in master {}",
+            LOGGER.warn("cannot create TubeMQ consumer group {}, as the topic 
{} not exists in master {}",
                     consumerGroup, topic, masterUrl);
             return;
         }
 
         if (this.isConsumerGroupExist(masterUrl, topic, consumerGroup)) {
-            LOGGER.warn("tubemq consumer group {} already exists for topic {} 
in master {}, skip to create",
+            LOGGER.warn("TubeMQ consumer group {} already exists for topic {} 
in master {}, skip to create",
                     consumerGroup, topic, masterUrl);
             return;
         }
 
         this.createConsumerGroupOpt(masterUrl, topic, consumerGroup, 
tubeCluster.getToken(), operator);
-        LOGGER.info("success to create tubemq consumer group {} for topic {} 
in {}", consumerGroup, topic, masterUrl);
+        LOGGER.info("success to create TubeMQ consumer group {} for topic {} 
in {}", consumerGroup, topic, masterUrl);
     }
 
     /**
      * Check if the topic is exists in the TubeMQ.
      */
     public boolean isTopicExist(String masterUrl, String topicName) {
-        LOGGER.info("begin to check if the tubemq topic {} exists", topicName);
+        LOGGER.info("begin to check if the TubeMQ topic {} exists", topicName);
         String url = masterUrl + QUERY_TOPIC_PATH + TOPIC_NAME + topicName;
         try {
             TopicResponse topicView = HttpUtils.request(restTemplate, url, 
HttpMethod.GET,
                     null, new HttpHeaders(), TopicResponse.class);
             if (CollectionUtils.isEmpty(topicView.getData())) {
-                LOGGER.warn("tubemq topic {} not exists in {}", topicName, 
url);
+                LOGGER.warn("TubeMQ topic {} not exists in {}", topicName, 
url);
                 return false;
             }
-            LOGGER.info("tubemq topic {} exists in {}", topicName, url);
+            LOGGER.info("TubeMQ topic {} exists in {}", topicName, url);
             return true;
         } catch (Exception e) {
             String msg = String.format("failed to check if the topic %s exist 
in ", topicName);
@@ -156,15 +155,17 @@ public class TubeMQOperator {
      */
     public boolean isConsumerGroupExist(String masterUrl, String topicName, 
String consumerGroup) {
         LOGGER.info("begin to check if the consumer group {} exists on topic 
{}", consumerGroup, topicName);
-        String url = masterUrl + QUERY_CONSUMER_PATH + TOPIC_NAME + topicName 
+ CONSUME_GROUP + consumerGroup;
+        String url = masterUrl + QUERY_CONSUMER_PATH + TOPIC_NAME + topicName 
+ GROUP_NAME + consumerGroup;
         try {
             ConsumerGroupResponse response = HttpUtils.request(restTemplate, 
url, HttpMethod.GET,
                     null, new HttpHeaders(), ConsumerGroupResponse.class);
             if (CollectionUtils.isEmpty(response.getData())) {
-                LOGGER.warn("tubemq consumer group {} not exists for topic {} 
in {}", consumerGroup, topicName, url);
+                LOGGER.warn("TubeMQ consumer group {} not exists for topic {} 
in {}, response={}", consumerGroup,
+                        topicName, url, response);
                 return false;
             }
-            LOGGER.info("tubemq consumer group {} exists for topic {} in {}", 
consumerGroup, topicName, url);
+            LOGGER.info("TubeMQ consumer group {} exists for topic {} in {}, 
response={}", consumerGroup, topicName,
+                    url, response);
             return true;
         } catch (Exception e) {
             String msg = String.format("failed to check if the consumer group 
%s for topic %s exist in ",
@@ -183,7 +184,7 @@ public class TubeMQOperator {
             TubeBrokerInfo brokerInfo = HttpUtils.request(restTemplate, url, 
HttpMethod.GET,
                     null, new HttpHeaders(), TubeBrokerInfo.class);
             if (brokerInfo.getErrCode() != SUCCESS_CODE) {
-                String msg = "failed to query tubemq broker from %s, error: 
%s";
+                String msg = "failed to query TubeMQ broker from %s, error: 
%s";
                 LOGGER.error(String.format(msg, url, brokerInfo.getErrMsg()));
                 throw new BusinessException(String.format(msg, masterUrl, 
brokerInfo.getErrMsg()));
             }
@@ -191,11 +192,11 @@ public class TubeMQOperator {
             // is success, divide the broker by status
             brokerInfo.divideBrokerListByStatus();
             if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("success to query tubemq broker from {}, result 
{}", url, brokerInfo.getData());
+                LOGGER.debug("success to query TubeMQ broker from {}, result 
{}", url, brokerInfo.getData());
             }
             return brokerInfo;
         } catch (Exception e) {
-            String msg = "failed to query tubemq broker from %s";
+            String msg = "failed to query TubeMQ broker from %s";
             LOGGER.error(String.format(msg, url), e);
             throw new BusinessException(String.format(msg, masterUrl) + ", 
error: " + e.getMessage());
         }
@@ -205,7 +206,7 @@ public class TubeMQOperator {
      * Create topic operation.
      */
     private void createTopicOpt(String masterUrl, String topicName, String 
token, String operator) {
-        LOGGER.info(String.format("begin to create tubemq topic %s in master 
%s", topicName, masterUrl));
+        LOGGER.info(String.format("begin to create TubeMQ topic %s in master 
%s", topicName, masterUrl));
         TubeBrokerInfo brokerView = this.getBrokerInfo(masterUrl);
         List<Integer> allBrokers = brokerView.getAllBrokerIdList();
         if (CollectionUtils.isEmpty(allBrokers)) {
@@ -222,15 +223,15 @@ public class TubeMQOperator {
             TubeHttpResponse response = HttpUtils.request(restTemplate, url, 
HttpMethod.GET,
                     null, new HttpHeaders(), TubeHttpResponse.class);
             if (response.getErrCode() != SUCCESS_CODE) {
-                String msg = String.format("failed to create tubemq topic %s, 
error: %s",
+                String msg = String.format("failed to create TubeMQ topic %s, 
error: %s",
                         topicName, response.getErrMsg());
                 LOGGER.error(msg + " in {} for brokers {}", masterUrl, 
allBrokers);
                 throw new BusinessException(msg);
             }
 
-            LOGGER.info("success to create tubemq topic {} in {}", topicName, 
url);
+            LOGGER.info("success to create TubeMQ topic {} in {}", topicName, 
url);
         } catch (Exception e) {
-            String msg = String.format("failed to create tubemq topic %s in 
%s", topicName, masterUrl);
+            String msg = String.format("failed to create TubeMQ topic %s in 
%s", topicName, masterUrl);
             LOGGER.error(msg, e);
             throw new BusinessException(msg + ", error: " + e.getMessage());
         }
@@ -251,14 +252,14 @@ public class TubeMQOperator {
             TubeHttpResponse response = HttpUtils.request(restTemplate, url, 
HttpMethod.GET,
                     null, new HttpHeaders(), TubeHttpResponse.class);
             if (response.getErrCode() != SUCCESS_CODE) {
-                String msg = String.format("failed to create tubemq consumer 
group %s for topic %s, error: %s",
+                String msg = String.format("failed to create TubeMQ consumer 
group %s for topic %s, error: %s",
                         consumerGroup, topicName, response.getErrMsg());
                 LOGGER.error(msg + ", url {}", url);
                 throw new BusinessException(msg);
             }
-            LOGGER.info("success to create tubemq topic {} in {}", topicName, 
url);
+            LOGGER.info("success to create TubeMQ topic {} in {}", topicName, 
url);
         } catch (Exception e) {
-            String msg = String.format("failed to create tubemq topic %s in 
%s", topicName, masterUrl);
+            String msg = String.format("failed to create TubeMQ topic %s in 
%s", topicName, masterUrl);
             LOGGER.error(msg, e);
             throw new BusinessException(msg + ", error: " + e.getMessage());
         }
@@ -277,11 +278,11 @@ public class TubeMQOperator {
         List<BriefMQMessage> messageList = new ArrayList<>();
         try {
             if (StringUtils.isEmpty(brokerUrl) || 
StringUtils.isEmpty(topicName)) {
-                throw new BusinessException("tubemq master url or tubemq topic 
cannot be null");
+                throw new BusinessException("TubeMQ master url or TubeMQ topic 
cannot be null");
             }
 
             if (!this.isTopicExist(masterUrl, topicName)) {
-                LOGGER.error("tubemq topic {} not exists in {}, skip to 
query", topicName, masterUrl);
+                LOGGER.error("TubeMQ topic {} not exists in {}, skip to 
query", topicName, masterUrl);
                 throw new BusinessException("TubeMQ master url or TubeMQ topic 
cannot be null");
             }
 

Reply via email to