This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 71683f0601 [INLONG-9470][Manager] Fix the problem of failed to verify
if the namespace exists (#9471)
71683f0601 is described below
commit 71683f0601704486ba1beaa8c2820d7000e3ae62
Author: fuweng11 <[email protected]>
AuthorDate: Thu Dec 14 17:18:37 2023 +0800
[INLONG-9470][Manager] Fix the problem of failed to verify if the namespace
exists (#9471)
---
.../inlong/manager/common/util/HttpUtils.java | 64 +++++++-
.../pojo/cluster/pulsar/PulsarClusterInfo.java | 15 ++
.../resource/queue/pulsar/PulsarOperator.java | 2 +-
.../service/resource/queue/pulsar/PulsarUtils.java | 171 ++++++++++++---------
4 files changed, 177 insertions(+), 75 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
index 1e7efe68ae..5f5d492da6 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.common.util;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
@@ -95,6 +97,37 @@ public class HttpUtils {
}
}
+ /**
+ * Send an HTTP request by the given rest template.
+ */
+ public static <T> T request(RestTemplate restTemplate, String[] urls,
HttpMethod method,
+ String param, HttpHeaders header, Class<T> cls) throws Exception {
+ ResponseEntity<String> exchange;
+ Preconditions.expectNotNull(urls, ErrorCodeEnum.INVALID_PARAMETER,
"url is blank");
+ for (int i = 0; i < urls.length; i++) {
+ try {
+ HttpEntity<String> request = new HttpEntity<>(param, header);
+ log.debug("send request to {}, param {}", urls[i], param);
+ exchange = restTemplate.exchange(urls[i], method, request,
String.class);
+ String body = exchange.getBody();
+ HttpStatus statusCode = exchange.getStatusCode();
+ if (!statusCode.is2xxSuccessful()) {
+ log.error("request error for {}, status code {}, body {}",
urls[i], statusCode, body);
+ }
+
+ log.debug("response from {}, status code {}", urls[i],
statusCode);
+ return GSON.fromJson(exchange.getBody(), cls);
+ } catch (RestClientException e) {
+ log.error("request for {}, error, begin retry", urls[i], e);
+ if (i >= (urls.length - 1)) {
+ log.error("after retry, request for {} exception {} ",
urls[i], e.getMessage());
+ throw e;
+ }
+ }
+ }
+ throw new Exception(String.format("send request to %s, params %s
error", urls, param));
+ }
+
/**
* Send an HTTP request
*/
@@ -108,7 +141,7 @@ public class HttpUtils {
ResponseEntity<T> response = restTemplate.exchange(url, httpMethod,
requestEntity, typeReference);
log.debug("success request to {}, status code {}", url,
response.getStatusCode());
- Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(),
"Request failed");
+ Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(),
"Request failed: " + response.getBody());
return response.getBody();
}
@@ -122,7 +155,34 @@ public class HttpUtils {
ResponseEntity<String> response = restTemplate.exchange(url,
httpMethod, requestEntity, String.class);
log.debug("success request to {}, status code {}", url,
response.getStatusCode());
- Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(),
"Request failed");
+ Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(),
"Request failed: " + response.getBody());
+ }
+
+ /**
+ * Send an void HTTP request
+ */
+ public static void request(RestTemplate restTemplate, String[] urls,
HttpMethod httpMethod, Object requestBody,
+ HttpHeaders header) {
+ Preconditions.expectNotNull(urls, ErrorCodeEnum.INVALID_PARAMETER,
"url is blank");
+ for (int i = 0; i < urls.length; i++) {
+ try {
+ log.debug("begin request to {} by request body {}", urls[i],
GSON.toJson(requestBody));
+ HttpEntity<Object> requestEntity = new
HttpEntity<>(requestBody, header);
+ ResponseEntity<String> response =
restTemplate.exchange(urls[i], httpMethod, requestEntity,
+ String.class);
+
+ log.debug("success request to {}, status code {}", urls[i],
response.getStatusCode());
+
Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(),
+ "Request failed: " + response.getBody());
+ return;
+ } catch (Exception e) {
+ log.error("request for {}, error, begin retry", urls[i], e);
+ if (i >= (urls.length - 1)) {
+ log.error("after retry, request for {} exception {} ",
urls[i], e.getMessage());
+ throw e;
+ }
+ }
+ }
}
/**
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
index e67ed426b9..96d33049e5 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
@@ -17,9 +17,12 @@
package org.apache.inlong.manager.pojo.cluster.pulsar;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import io.swagger.annotations.ApiModel;
@@ -40,6 +43,8 @@ import lombok.experimental.SuperBuilder;
@ApiModel("Inlong cluster info for Pulsar")
public class PulsarClusterInfo extends ClusterInfo {
+ public static final String HTTP_PREFIX = "http://";
+
@ApiModelProperty(value = "Pulsar admin URL, such as:
http://127.0.0.1:8080", notes = "Pulsar service URL is the 'url' field of the
cluster")
private String adminUrl;
@@ -55,4 +60,14 @@ public class PulsarClusterInfo extends ClusterInfo {
return CommonBeanUtils.copyProperties(this, PulsarClusterRequest::new);
}
+ public String[] getAdminUrls(String urlSuffix) {
+ String adminUrl = this.getAdminUrl();
+ Preconditions.expectNotBlank(adminUrl,
ErrorCodeEnum.INVALID_PARAMETER, "admin url is blank");
+ String[] adminUrls = adminUrl.replace(HTTP_PREFIX,
InlongConstants.EMPTY).split(InlongConstants.COMMA);
+ for (int i = 0; i < adminUrls.length; i++) {
+ adminUrls[i] = HTTP_PREFIX + adminUrls[i] + urlSuffix;
+ }
+ return adminUrls;
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 5ff2c9cecd..bdf7b237d8 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -112,7 +112,7 @@ public class PulsarOperator {
LOGGER.info("begin to create namespace={}", tenantNamespaceName);
try {
// Check whether the namespace exists, and create it if it does
not exist
- boolean isExists = this.namespaceExists(pulsarClusterInfo, tenant,
namespace);
+ boolean isExists = this.namespaceExists(pulsarClusterInfo, tenant,
tenantNamespaceName);
if (isExists) {
LOGGER.warn("namespace={} already exists, skip to create",
tenantNamespaceName);
return;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
index 9fd81aac03..64802e7a76 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
@@ -57,27 +57,27 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import static
org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo.HTTP_PREFIX;
+
/**
* Pulsar connection utils
*/
@Slf4j
public class PulsarUtils {
- private PulsarUtils() {
- }
-
- private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
- ZoneId.systemDefault());
-
public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
-
+ private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
+ ZoneId.systemDefault());
private static final Gson GSON = new GsonBuilder().create(); // thread safe
+ private PulsarUtils() {
+ }
+
/**
* Get http headers by token.
*
@@ -102,8 +102,8 @@ public class PulsarUtils {
*/
public static List<String> getClusters(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo)
throws Exception {
- final String url = clusterInfo.getAdminUrl() + QUERY_CLUSTERS_PATH;
- return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ return HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_CLUSTERS_PATH), HttpMethod.GET, null,
+ getHttpHeaders(clusterInfo.getToken()),
ArrayList.class);
}
@@ -120,10 +120,9 @@ public class PulsarUtils {
List<String> clusters = getClusters(restTemplate, clusterInfo);
List<String> brokers = new ArrayList<>();
for (String brokerName : clusters) {
- String url = clusterInfo.getAdminUrl() + QUERY_BROKERS_PATH + "/"
+ brokerName;
- brokers.addAll(
- HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
- ArrayList.class));
+ brokers.addAll(HttpUtils.request(restTemplate,
+ clusterInfo.getAdminUrls(QUERY_BROKERS_PATH + "/" +
brokerName), HttpMethod.GET, null,
+ getHttpHeaders(clusterInfo.getToken()), ArrayList.class));
}
return brokers;
}
@@ -138,8 +137,8 @@ public class PulsarUtils {
*/
public static List<String> getTenants(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo)
throws Exception {
- final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH;
- return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ return HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_TENANTS_PATH), HttpMethod.GET, null,
+ getHttpHeaders(clusterInfo.getToken()),
ArrayList.class);
}
@@ -154,8 +153,9 @@ public class PulsarUtils {
*/
public static List<String> getNamespaces(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo,
String tenant) throws Exception {
- String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + "/" +
tenant;
- return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ return HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_NAMESPACE_PATH + "/" + tenant),
+ HttpMethod.GET, null,
+ getHttpHeaders(clusterInfo.getToken()),
ArrayList.class);
}
@@ -165,18 +165,18 @@ public class PulsarUtils {
* @param restTemplate spring framework RestTemplate
* @param clusterInfo pulsar cluster info
* @param tenant pulsar tenant name
- * @param tenantInfo pulsar tenant info
+ * @param tenantInfo pulsar tenant info
* @throws Exception any exception if occurred
*/
public static void createTenant(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo, String tenant,
PulsarTenantInfo tenantInfo) throws Exception {
- final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH +
"/" + tenant;
HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
MediaType type = MediaType.parseMediaType("application/json;
charset=UTF-8");
headers.setContentType(type);
headers.add("Accept", MediaType.APPLICATION_JSON.toString());
String param = GSON.toJson(tenantInfo);
- HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+ HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_TENANTS_PATH + "/" + tenant), HttpMethod.PUT,
+ param, headers);
}
/**
@@ -191,8 +191,6 @@ public class PulsarUtils {
*/
public static void createNamespace(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo, String tenant,
String namespaceName, PulsarNamespacePolicies policies) throws
Exception {
- final String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH +
InlongConstants.SLASH + tenant
- + InlongConstants.SLASH + namespaceName;
HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
MediaType type = MediaType.parseMediaType("application/json;
charset=UTF-8");
headers.setContentType(type);
@@ -200,7 +198,8 @@ public class PulsarUtils {
String param = GSON.toJson(policies);
param = param.replaceAll("messageTtlInSeconds",
"message_ttl_in_seconds")
.replaceAll("retentionPolicies", "retention_policies");
- HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+ HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_NAMESPACE_PATH + InlongConstants.SLASH + tenant
+ + InlongConstants.SLASH + namespaceName), HttpMethod.PUT,
param, headers);
}
/**
@@ -215,8 +214,9 @@ public class PulsarUtils {
*/
public static List<String> getTopics(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo, String tenant,
String namespace) throws Exception {
- String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
tenant + "/" + namespace;
- return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ return HttpUtils.request(restTemplate,
+ clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + tenant
+ "/" + namespace), HttpMethod.GET, null,
+ getHttpHeaders(clusterInfo.getToken()),
ArrayList.class);
}
@@ -232,9 +232,11 @@ public class PulsarUtils {
*/
public static List<String> getPartitionedTopics(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo,
String tenant, String namespace) throws Exception {
- String url =
- clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
tenant + "/" + namespace + "/partitioned";
- return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ return HttpUtils.request(restTemplate,
+ clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + tenant
+ "/" + namespace
+ + "/partitioned"),
+ HttpMethod.GET, null,
+ getHttpHeaders(clusterInfo.getToken()),
ArrayList.class);
}
@@ -248,8 +250,8 @@ public class PulsarUtils {
*/
public static void createNonPartitionedTopic(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo,
String topicPath) throws Exception {
- String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath;
- HttpUtils.request(restTemplate, url, HttpMethod.PUT, null,
getHttpHeaders(clusterInfo.getToken()));
+ HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + topicPath),
+ HttpMethod.PUT, null, getHttpHeaders(clusterInfo.getToken()));
}
/**
@@ -262,8 +264,9 @@ public class PulsarUtils {
*/
public static void createPartitionedTopic(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo,
String topicPath, Integer numPartitions) throws Exception {
- String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath + "/partitions";
- HttpUtils.request(restTemplate, url, HttpMethod.PUT,
numPartitions.toString(),
+ HttpUtils.request(restTemplate,
+ clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" +
topicPath + "/partitions"), HttpMethod.PUT,
+ numPartitions.toString(),
getHttpHeaders(clusterInfo.getToken()));
}
@@ -278,8 +281,9 @@ public class PulsarUtils {
*/
public static JsonObject getInternalStatsPartitionedTopics(RestTemplate
restTemplate,
PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
- String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath + "/partitioned-internalStats";
- return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ return HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + topicPath
+ + "/partitioned-internalStats"), HttpMethod.GET, null,
+ getHttpHeaders(clusterInfo.getToken()),
JsonObject.class);
}
@@ -294,8 +298,9 @@ public class PulsarUtils {
*/
public static PulsarTopicMetadata getPartitionedTopicMetadata(RestTemplate
restTemplate,
PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
- String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath + "/partitions";
- return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ return HttpUtils.request(restTemplate,
+ clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" +
topicPath + "/partitions"), HttpMethod.GET, null,
+ getHttpHeaders(clusterInfo.getToken()),
PulsarTopicMetadata.class);
}
@@ -309,8 +314,8 @@ public class PulsarUtils {
*/
public static void deleteNonPartitionedTopic(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo,
String topicPath) throws Exception {
- String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath;
- HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null,
getHttpHeaders(clusterInfo.getToken()));
+ HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + topicPath),
+ HttpMethod.DELETE, null,
getHttpHeaders(clusterInfo.getToken()));
}
/**
@@ -323,10 +328,11 @@ public class PulsarUtils {
*/
public static void forceDeleteNonPartitionedTopic(RestTemplate
restTemplate, PulsarClusterInfo clusterInfo,
String topicPath) throws Exception {
- String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath;
Map<String, Boolean> uriVariables = new HashMap<>();
uriVariables.put("force", true);
- HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables,
getHttpHeaders(clusterInfo.getToken()));
+ HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + topicPath),
+ HttpMethod.DELETE, uriVariables,
+ getHttpHeaders(clusterInfo.getToken()));
}
/**
@@ -339,8 +345,9 @@ public class PulsarUtils {
*/
public static void deletePartitionedTopic(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo,
String topicPath) throws Exception {
- String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath + "/partitions";
- HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null,
getHttpHeaders(clusterInfo.getToken()));
+ HttpUtils.request(restTemplate,
+ clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" +
topicPath + "/partitions"), HttpMethod.DELETE,
+ null, getHttpHeaders(clusterInfo.getToken()));
}
/**
@@ -353,10 +360,12 @@ public class PulsarUtils {
*/
public static void forceDeletePartitionedTopic(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo,
String topicPath) throws Exception {
- String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath + "/partitions";
Map<String, Boolean> uriVariables = new HashMap<>();
uriVariables.put("force", true);
- HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables,
getHttpHeaders(clusterInfo.getToken()));
+ HttpUtils.request(restTemplate,
+ clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" +
topicPath + "/partitions"), HttpMethod.DELETE,
+ uriVariables,
+ getHttpHeaders(clusterInfo.getToken()));
}
/**
@@ -407,8 +416,8 @@ public class PulsarUtils {
*/
public static String lookupTopic(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo, String topicPath)
throws Exception {
- String url = clusterInfo.getAdminUrl() + LOOKUP_TOPIC_PATH +
"/persistent/" + topicPath;
- PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, url,
HttpMethod.GET, null,
+ PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate,
+ clusterInfo.getAdminUrls(LOOKUP_TOPIC_PATH + "/persistent/" +
topicPath), HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
PulsarLookupTopicInfo.class);
return topicInfo.getBrokerUrl();
}
@@ -428,8 +437,9 @@ public class PulsarUtils {
Map<String, String> map = new LinkedHashMap<>();
for (int i = 0; i < metadata.getPartitions(); i++) {
String partitionTopicName = topicPath + "-partition-" + i;
- String partitionUrl = clusterInfo.getAdminUrl() +
LOOKUP_TOPIC_PATH + "/persistent/" + partitionTopicName;
- PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate,
partitionUrl, HttpMethod.GET, null,
+ PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate,
+ clusterInfo.getAdminUrls(LOOKUP_TOPIC_PATH +
"/persistent/" + partitionTopicName), HttpMethod.GET,
+ null,
getHttpHeaders(clusterInfo.getToken()),
PulsarLookupTopicInfo.class);
map.put(partitionTopicName, topicInfo.getBrokerUrl());
}
@@ -447,15 +457,17 @@ public class PulsarUtils {
*/
public static List<String> getSubscriptions(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo,
String topicPath) throws Exception {
- String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath + "/subscriptions";
- return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ return HttpUtils.request(restTemplate,
+ clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" +
topicPath + "/subscriptions"), HttpMethod.GET,
+ null,
+ getHttpHeaders(clusterInfo.getToken()),
ArrayList.class);
}
/**
* Create a subscription on the topic.
*
- * @param restTemplate spring framework RestTemplate
+ * @param restTemplate spring framework RestTemplate
* @param clusterInfo pulsar cluster info
* @param topicPath pulsar topic path
* @param subscription pulsar topic subscription info
@@ -463,8 +475,6 @@ public class PulsarUtils {
*/
public static void createSubscription(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo, String topicPath,
String subscription) throws Exception {
- String url =
- clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath + "/subscription/" + subscription;
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("entryId", Long.MAX_VALUE);
jsonObject.addProperty("ledgerId", Long.MAX_VALUE);
@@ -473,7 +483,10 @@ public class PulsarUtils {
MediaType type = MediaType.parseMediaType("application/json;
charset=UTF-8");
headers.setContentType(type);
headers.add("Accept", MediaType.APPLICATION_JSON.toString());
- HttpUtils.request(restTemplate, url, HttpMethod.PUT,
jsonObject.toString(), headers);
+ HttpUtils.request(restTemplate,
+ clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" +
topicPath + "/subscription/"
+ + subscription),
+ HttpMethod.PUT, jsonObject.toString(), headers);
}
/**
@@ -481,7 +494,7 @@ public class PulsarUtils {
*
* @param restTemplate spring framework RestTemplate
* @param clusterInfo pulsar cluster info
- * @param topicPartition pulsar topic partition info
+ * @param topicPartition pulsar topic partition info
* @param messageType pulsar message type info
* @param messagePosition pulsar message position info
* @return spring framework HttpEntity
@@ -489,22 +502,36 @@ public class PulsarUtils {
*/
public static ResponseEntity<byte[]> examineMessage(RestTemplate
restTemplate, PulsarClusterInfo clusterInfo,
String topicPartition, String messageType, int messagePosition)
throws Exception {
- StringBuilder urlBuilder = new
StringBuilder().append(clusterInfo.getAdminUrl())
- .append(QUERY_PERSISTENT_PATH)
- .append("/")
- .append(topicPartition)
- .append("/examinemessage")
- .append("?initialPosition=")
- .append(messageType)
- .append("&messagePosition=")
- .append(messagePosition);
- ResponseEntity<byte[]> response =
restTemplate.exchange(urlBuilder.toString(), HttpMethod.GET,
- new HttpEntity<>(getHttpHeaders(clusterInfo.getToken())),
byte[].class);
- if (!response.getStatusCode().is2xxSuccessful()) {
- log.error("request error for {}, status code {}, body {}",
urlBuilder.toString(), response.getStatusCode(),
- response.getBody());
- }
- return response;
+ String adminUrl = clusterInfo.getAdminUrl();
+ String[] adminUrls = adminUrl.replace(HTTP_PREFIX,
InlongConstants.EMPTY).split(InlongConstants.COMMA);
+ for (int i = 0; i < adminUrls.length; i++) {
+ try {
+ StringBuilder urlBuilder = new
StringBuilder().append(HTTP_PREFIX + adminUrls[i])
+ .append(QUERY_PERSISTENT_PATH)
+ .append("/")
+ .append(topicPartition)
+ .append("/examinemessage")
+ .append("?initialPosition=")
+ .append(messageType)
+ .append("&messagePosition=")
+ .append(messagePosition);
+ ResponseEntity<byte[]> response =
restTemplate.exchange(urlBuilder.toString(), HttpMethod.GET,
+ new
HttpEntity<>(getHttpHeaders(clusterInfo.getToken())), byte[].class);
+ if (!response.getStatusCode().is2xxSuccessful()) {
+ log.error("request error for {}, status code {}, body {}",
urlBuilder.toString(),
+ response.getStatusCode(),
+ response.getBody());
+ }
+ return response;
+ } catch (Exception e) {
+ log.error("examine message for topic partition={} error, begin
retry", topicPartition, e);
+ if (i >= (adminUrls.length - 1)) {
+ log.error("after retry, examine message for topic
partition={} still error", topicPartition, e);
+ throw e;
+ }
+ }
+ }
+ throw new Exception(String.format("examine message failed for topic
partition=%s", topicPartition));
}
public static PulsarMessageInfo
getMessageFromHttpResponse(ResponseEntity<byte[]> response, String topic)
@@ -518,7 +545,7 @@ public class PulsarUtils {
}
/**
- * Copy from getMessagesFromHttpResponse method of
org.apache.pulsar.client.admin.internal.TopicsImpl class.
+ * Copy from getMessagesFromHttpResponse method of
org.apache.pulsar.client.admin.internal.TopicsImpl class.
*
* @param response
* @param topic