This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 43bcb9d859 [INLONG-9827][Manager] Fix the problem of failed to check
if the consumption group exists (#9828)
43bcb9d859 is described below
commit 43bcb9d8599dc89c265d4f49b3d816daa321cc8a
Author: fuweng11 <[email protected]>
AuthorDate: Sun Mar 17 20:46:00 2024 +0800
[INLONG-9827][Manager] Fix the problem of failed to check if the
consumption group exists (#9828)
---
.../resource/queue/tubemq/TubeMQOperator.java | 55 +++++++++++-----------
1 file changed, 28 insertions(+), 27 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
index d4f852f893..e13b4cbe2c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
@@ -65,7 +65,6 @@ public class TubeMQOperator {
* TubeMQ const for HTTP URL format
*/
private static final String TOPIC_NAME = "&topicName=";
- private static final String CONSUME_GROUP = "&consumeGroup=";
private static final String GROUP_NAME = "&groupName=";
private static final String BROKER_ID = "&brokerId=";
private static final String CREATE_USER = "&createUser=";
@@ -89,18 +88,18 @@ public class TubeMQOperator {
*/
public void createTopic(@Nonnull TubeClusterInfo tubeCluster, String
topicName, String operator) {
String masterUrl = tubeCluster.getMasterWebUrl();
- LOGGER.info("begin to create tubemq topic {} in master {}", topicName,
masterUrl);
+ LOGGER.info("begin to create TubeMQ topic {} in master {}", topicName,
masterUrl);
if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(topicName)) {
- throw new BusinessException("tubemq master url or tubemq topic
cannot be null");
+ throw new BusinessException("TubeMQ master url or TubeMQ topic
cannot be null");
}
if (this.isTopicExist(masterUrl, topicName)) {
- LOGGER.warn("tubemq topic {} already exists in {}, skip to
create", topicName, masterUrl);
+ LOGGER.warn("TubeMQ topic {} already exists in {}, skip to
create", topicName, masterUrl);
return;
}
this.createTopicOpt(masterUrl, topicName, tubeCluster.getToken(),
operator);
- LOGGER.info("success to create tubemq topic {} in {}", topicName,
masterUrl);
+ LOGGER.info("success to create TubeMQ topic {} in {}", topicName,
masterUrl);
}
/**
@@ -110,39 +109,39 @@ public class TubeMQOperator {
String masterUrl = tubeCluster.getMasterWebUrl();
LOGGER.info("begin to create consumer group {} for topic {} in master
{}", consumerGroup, topic, masterUrl);
if (StringUtils.isEmpty(masterUrl) ||
StringUtils.isEmpty(consumerGroup) || StringUtils.isEmpty(topic)) {
- throw new BusinessException("tubemq master url, consumer group, or
tubemq topic cannot be null");
+ throw new BusinessException("TubeMQ master url, consumer group, or
TubeMQ topic cannot be null");
}
if (!this.isTopicExist(masterUrl, topic)) {
- LOGGER.warn("cannot create tubemq consumer group {}, as the topic
{} not exists in master {}",
+ LOGGER.warn("cannot create TubeMQ consumer group {}, as the topic
{} not exists in master {}",
consumerGroup, topic, masterUrl);
return;
}
if (this.isConsumerGroupExist(masterUrl, topic, consumerGroup)) {
- LOGGER.warn("tubemq consumer group {} already exists for topic {}
in master {}, skip to create",
+ LOGGER.warn("TubeMQ consumer group {} already exists for topic {}
in master {}, skip to create",
consumerGroup, topic, masterUrl);
return;
}
this.createConsumerGroupOpt(masterUrl, topic, consumerGroup,
tubeCluster.getToken(), operator);
- LOGGER.info("success to create tubemq consumer group {} for topic {}
in {}", consumerGroup, topic, masterUrl);
+ LOGGER.info("success to create TubeMQ consumer group {} for topic {}
in {}", consumerGroup, topic, masterUrl);
}
/**
* Check if the topic is exists in the TubeMQ.
*/
public boolean isTopicExist(String masterUrl, String topicName) {
- LOGGER.info("begin to check if the tubemq topic {} exists", topicName);
+ LOGGER.info("begin to check if the TubeMQ topic {} exists", topicName);
String url = masterUrl + QUERY_TOPIC_PATH + TOPIC_NAME + topicName;
try {
TopicResponse topicView = HttpUtils.request(restTemplate, url,
HttpMethod.GET,
null, new HttpHeaders(), TopicResponse.class);
if (CollectionUtils.isEmpty(topicView.getData())) {
- LOGGER.warn("tubemq topic {} not exists in {}", topicName,
url);
+ LOGGER.warn("TubeMQ topic {} not exists in {}", topicName,
url);
return false;
}
- LOGGER.info("tubemq topic {} exists in {}", topicName, url);
+ LOGGER.info("TubeMQ topic {} exists in {}", topicName, url);
return true;
} catch (Exception e) {
String msg = String.format("failed to check if the topic %s exist
in ", topicName);
@@ -156,15 +155,17 @@ public class TubeMQOperator {
*/
public boolean isConsumerGroupExist(String masterUrl, String topicName,
String consumerGroup) {
LOGGER.info("begin to check if the consumer group {} exists on topic
{}", consumerGroup, topicName);
- String url = masterUrl + QUERY_CONSUMER_PATH + TOPIC_NAME + topicName
+ CONSUME_GROUP + consumerGroup;
+ String url = masterUrl + QUERY_CONSUMER_PATH + TOPIC_NAME + topicName
+ GROUP_NAME + consumerGroup;
try {
ConsumerGroupResponse response = HttpUtils.request(restTemplate,
url, HttpMethod.GET,
null, new HttpHeaders(), ConsumerGroupResponse.class);
if (CollectionUtils.isEmpty(response.getData())) {
- LOGGER.warn("tubemq consumer group {} not exists for topic {}
in {}", consumerGroup, topicName, url);
+ LOGGER.warn("TubeMQ consumer group {} not exists for topic {}
in {}, response={}", consumerGroup,
+ topicName, url, response);
return false;
}
- LOGGER.info("tubemq consumer group {} exists for topic {} in {}",
consumerGroup, topicName, url);
+ LOGGER.info("TubeMQ consumer group {} exists for topic {} in {},
response={}", consumerGroup, topicName,
+ url, response);
return true;
} catch (Exception e) {
String msg = String.format("failed to check if the consumer group
%s for topic %s exist in ",
@@ -183,7 +184,7 @@ public class TubeMQOperator {
TubeBrokerInfo brokerInfo = HttpUtils.request(restTemplate, url,
HttpMethod.GET,
null, new HttpHeaders(), TubeBrokerInfo.class);
if (brokerInfo.getErrCode() != SUCCESS_CODE) {
- String msg = "failed to query tubemq broker from %s, error:
%s";
+ String msg = "failed to query TubeMQ broker from %s, error:
%s";
LOGGER.error(String.format(msg, url, brokerInfo.getErrMsg()));
throw new BusinessException(String.format(msg, masterUrl,
brokerInfo.getErrMsg()));
}
@@ -191,11 +192,11 @@ public class TubeMQOperator {
// is success, divide the broker by status
brokerInfo.divideBrokerListByStatus();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("success to query tubemq broker from {}, result
{}", url, brokerInfo.getData());
+ LOGGER.debug("success to query TubeMQ broker from {}, result
{}", url, brokerInfo.getData());
}
return brokerInfo;
} catch (Exception e) {
- String msg = "failed to query tubemq broker from %s";
+ String msg = "failed to query TubeMQ broker from %s";
LOGGER.error(String.format(msg, url), e);
throw new BusinessException(String.format(msg, masterUrl) + ",
error: " + e.getMessage());
}
@@ -205,7 +206,7 @@ public class TubeMQOperator {
* Create topic operation.
*/
private void createTopicOpt(String masterUrl, String topicName, String
token, String operator) {
- LOGGER.info(String.format("begin to create tubemq topic %s in master
%s", topicName, masterUrl));
+ LOGGER.info(String.format("begin to create TubeMQ topic %s in master
%s", topicName, masterUrl));
TubeBrokerInfo brokerView = this.getBrokerInfo(masterUrl);
List<Integer> allBrokers = brokerView.getAllBrokerIdList();
if (CollectionUtils.isEmpty(allBrokers)) {
@@ -222,15 +223,15 @@ public class TubeMQOperator {
TubeHttpResponse response = HttpUtils.request(restTemplate, url,
HttpMethod.GET,
null, new HttpHeaders(), TubeHttpResponse.class);
if (response.getErrCode() != SUCCESS_CODE) {
- String msg = String.format("failed to create tubemq topic %s,
error: %s",
+ String msg = String.format("failed to create TubeMQ topic %s,
error: %s",
topicName, response.getErrMsg());
LOGGER.error(msg + " in {} for brokers {}", masterUrl,
allBrokers);
throw new BusinessException(msg);
}
- LOGGER.info("success to create tubemq topic {} in {}", topicName,
url);
+ LOGGER.info("success to create TubeMQ topic {} in {}", topicName,
url);
} catch (Exception e) {
- String msg = String.format("failed to create tubemq topic %s in
%s", topicName, masterUrl);
+ String msg = String.format("failed to create TubeMQ topic %s in
%s", topicName, masterUrl);
LOGGER.error(msg, e);
throw new BusinessException(msg + ", error: " + e.getMessage());
}
@@ -251,14 +252,14 @@ public class TubeMQOperator {
TubeHttpResponse response = HttpUtils.request(restTemplate, url,
HttpMethod.GET,
null, new HttpHeaders(), TubeHttpResponse.class);
if (response.getErrCode() != SUCCESS_CODE) {
- String msg = String.format("failed to create tubemq consumer
group %s for topic %s, error: %s",
+ String msg = String.format("failed to create TubeMQ consumer
group %s for topic %s, error: %s",
consumerGroup, topicName, response.getErrMsg());
LOGGER.error(msg + ", url {}", url);
throw new BusinessException(msg);
}
- LOGGER.info("success to create tubemq topic {} in {}", topicName,
url);
+ LOGGER.info("success to create TubeMQ topic {} in {}", topicName,
url);
} catch (Exception e) {
- String msg = String.format("failed to create tubemq topic %s in
%s", topicName, masterUrl);
+ String msg = String.format("failed to create TubeMQ topic %s in
%s", topicName, masterUrl);
LOGGER.error(msg, e);
throw new BusinessException(msg + ", error: " + e.getMessage());
}
@@ -277,11 +278,11 @@ public class TubeMQOperator {
List<BriefMQMessage> messageList = new ArrayList<>();
try {
if (StringUtils.isEmpty(brokerUrl) ||
StringUtils.isEmpty(topicName)) {
- throw new BusinessException("tubemq master url or tubemq topic
cannot be null");
+ throw new BusinessException("TubeMQ master url or TubeMQ topic
cannot be null");
}
if (!this.isTopicExist(masterUrl, topicName)) {
- LOGGER.error("tubemq topic {} not exists in {}, skip to
query", topicName, masterUrl);
+ LOGGER.error("TubeMQ topic {} not exists in {}, skip to
query", topicName, masterUrl);
throw new BusinessException("TubeMQ master url or TubeMQ topic
cannot be null");
}