This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 71683f0601 [INLONG-9470][Manager] Fix the problem of failed to verify 
if the namespace exists (#9471)
71683f0601 is described below

commit 71683f0601704486ba1beaa8c2820d7000e3ae62
Author: fuweng11 <[email protected]>
AuthorDate: Thu Dec 14 17:18:37 2023 +0800

    [INLONG-9470][Manager] Fix the problem of failed to verify if the namespace 
exists (#9471)
---
 .../inlong/manager/common/util/HttpUtils.java      |  64 +++++++-
 .../pojo/cluster/pulsar/PulsarClusterInfo.java     |  15 ++
 .../resource/queue/pulsar/PulsarOperator.java      |   2 +-
 .../service/resource/queue/pulsar/PulsarUtils.java | 171 ++++++++++++---------
 4 files changed, 177 insertions(+), 75 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
index 1e7efe68ae..5f5d492da6 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.common.util;
 
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import lombok.extern.slf4j.Slf4j;
@@ -95,6 +97,37 @@ public class HttpUtils {
         }
     }
 
+    /**
+     * Send an HTTP request by the given rest template.
+     */
+    public static <T> T request(RestTemplate restTemplate, String[] urls, 
HttpMethod method,
+            String param, HttpHeaders header, Class<T> cls) throws Exception {
+        ResponseEntity<String> exchange;
+        Preconditions.expectNotNull(urls, ErrorCodeEnum.INVALID_PARAMETER, 
"url is blank");
+        for (int i = 0; i < urls.length; i++) {
+            try {
+                HttpEntity<String> request = new HttpEntity<>(param, header);
+                log.debug("send request to {}, param {}", urls[i], param);
+                exchange = restTemplate.exchange(urls[i], method, request, 
String.class);
+                String body = exchange.getBody();
+                HttpStatus statusCode = exchange.getStatusCode();
+                if (!statusCode.is2xxSuccessful()) {
+                    log.error("request error for {}, status code {}, body {}", 
urls[i], statusCode, body);
+                }
+
+                log.debug("response from {}, status code {}", urls[i], 
statusCode);
+                return GSON.fromJson(exchange.getBody(), cls);
+            } catch (RestClientException e) {
+                log.error("request for {}, error, begin retry", urls[i], e);
+                if (i >= (urls.length - 1)) {
+                    log.error("after retry, request for {} exception {} ", 
urls[i], e.getMessage());
+                    throw e;
+                }
+            }
+        }
+        throw new Exception(String.format("send request to %s, params %s 
error", urls, param));
+    }
+
     /**
      * Send an HTTP request
      */
@@ -108,7 +141,7 @@ public class HttpUtils {
         ResponseEntity<T> response = restTemplate.exchange(url, httpMethod, 
requestEntity, typeReference);
 
         log.debug("success request to {}, status code {}", url, 
response.getStatusCode());
-        Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(), 
"Request failed");
+        Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(), 
"Request failed: " + response.getBody());
         return response.getBody();
     }
 
@@ -122,7 +155,34 @@ public class HttpUtils {
         ResponseEntity<String> response = restTemplate.exchange(url, 
httpMethod, requestEntity, String.class);
 
         log.debug("success request to {}, status code {}", url, 
response.getStatusCode());
-        Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(), 
"Request failed");
+        Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(), 
"Request failed: " + response.getBody());
+    }
+
+    /**
+     * Send an void HTTP request
+     */
+    public static void request(RestTemplate restTemplate, String[] urls, 
HttpMethod httpMethod, Object requestBody,
+            HttpHeaders header) {
+        Preconditions.expectNotNull(urls, ErrorCodeEnum.INVALID_PARAMETER, 
"url is blank");
+        for (int i = 0; i < urls.length; i++) {
+            try {
+                log.debug("begin request to {} by request body {}", urls[i], 
GSON.toJson(requestBody));
+                HttpEntity<Object> requestEntity = new 
HttpEntity<>(requestBody, header);
+                ResponseEntity<String> response = 
restTemplate.exchange(urls[i], httpMethod, requestEntity,
+                        String.class);
+
+                log.debug("success request to {}, status code {}", urls[i], 
response.getStatusCode());
+                
Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(),
+                        "Request failed: " + response.getBody());
+                return;
+            } catch (Exception e) {
+                log.error("request for {}, error, begin retry", urls[i], e);
+                if (i >= (urls.length - 1)) {
+                    log.error("after retry, request for {} exception {} ", 
urls[i], e.getMessage());
+                    throw e;
+                }
+            }
+        }
     }
 
     /**
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
index e67ed426b9..96d33049e5 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
@@ -17,9 +17,12 @@
 
 package org.apache.inlong.manager.pojo.cluster.pulsar;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 
 import io.swagger.annotations.ApiModel;
@@ -40,6 +43,8 @@ import lombok.experimental.SuperBuilder;
 @ApiModel("Inlong cluster info for Pulsar")
 public class PulsarClusterInfo extends ClusterInfo {
 
+    public static final String HTTP_PREFIX = "http://";;
+
     @ApiModelProperty(value = "Pulsar admin URL, such as: 
http://127.0.0.1:8080";, notes = "Pulsar service URL is the 'url' field of the 
cluster")
     private String adminUrl;
 
@@ -55,4 +60,14 @@ public class PulsarClusterInfo extends ClusterInfo {
         return CommonBeanUtils.copyProperties(this, PulsarClusterRequest::new);
     }
 
+    public String[] getAdminUrls(String urlSuffix) {
+        String adminUrl = this.getAdminUrl();
+        Preconditions.expectNotBlank(adminUrl, 
ErrorCodeEnum.INVALID_PARAMETER, "admin url is blank");
+        String[] adminUrls = adminUrl.replace(HTTP_PREFIX, 
InlongConstants.EMPTY).split(InlongConstants.COMMA);
+        for (int i = 0; i < adminUrls.length; i++) {
+            adminUrls[i] = HTTP_PREFIX + adminUrls[i] + urlSuffix;
+        }
+        return adminUrls;
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 5ff2c9cecd..bdf7b237d8 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -112,7 +112,7 @@ public class PulsarOperator {
         LOGGER.info("begin to create namespace={}", tenantNamespaceName);
         try {
             // Check whether the namespace exists, and create it if it does 
not exist
-            boolean isExists = this.namespaceExists(pulsarClusterInfo, tenant, 
namespace);
+            boolean isExists = this.namespaceExists(pulsarClusterInfo, tenant, 
tenantNamespaceName);
             if (isExists) {
                 LOGGER.warn("namespace={} already exists, skip to create", 
tenantNamespaceName);
                 return;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
index 9fd81aac03..64802e7a76 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
@@ -57,27 +57,27 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import static 
org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo.HTTP_PREFIX;
+
 /**
  * Pulsar connection utils
  */
 @Slf4j
 public class PulsarUtils {
 
-    private PulsarUtils() {
-    }
-
-    private static final DateTimeFormatter DATE_FORMAT = 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
-            ZoneId.systemDefault());
-
     public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
     public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
     public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
     public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
     public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
     public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
-
+    private static final DateTimeFormatter DATE_FORMAT = 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
+            ZoneId.systemDefault());
     private static final Gson GSON = new GsonBuilder().create(); // thread safe
 
+    private PulsarUtils() {
+    }
+
     /**
      * Get http headers by token.
      *
@@ -102,8 +102,8 @@ public class PulsarUtils {
      */
     public static List<String> getClusters(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo)
             throws Exception {
-        final String url = clusterInfo.getAdminUrl() + QUERY_CLUSTERS_PATH;
-        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+        return HttpUtils.request(restTemplate, 
clusterInfo.getAdminUrls(QUERY_CLUSTERS_PATH), HttpMethod.GET, null,
+                getHttpHeaders(clusterInfo.getToken()),
                 ArrayList.class);
     }
 
@@ -120,10 +120,9 @@ public class PulsarUtils {
         List<String> clusters = getClusters(restTemplate, clusterInfo);
         List<String> brokers = new ArrayList<>();
         for (String brokerName : clusters) {
-            String url = clusterInfo.getAdminUrl() + QUERY_BROKERS_PATH + "/" 
+ brokerName;
-            brokers.addAll(
-                    HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
-                            ArrayList.class));
+            brokers.addAll(HttpUtils.request(restTemplate,
+                    clusterInfo.getAdminUrls(QUERY_BROKERS_PATH + "/" + 
brokerName), HttpMethod.GET, null,
+                    getHttpHeaders(clusterInfo.getToken()), ArrayList.class));
         }
         return brokers;
     }
@@ -138,8 +137,8 @@ public class PulsarUtils {
      */
     public static List<String> getTenants(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo)
             throws Exception {
-        final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH;
-        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+        return HttpUtils.request(restTemplate, 
clusterInfo.getAdminUrls(QUERY_TENANTS_PATH), HttpMethod.GET, null,
+                getHttpHeaders(clusterInfo.getToken()),
                 ArrayList.class);
     }
 
@@ -154,8 +153,9 @@ public class PulsarUtils {
      */
     public static List<String> getNamespaces(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
             String tenant) throws Exception {
-        String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + "/" + 
tenant;
-        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+        return HttpUtils.request(restTemplate, 
clusterInfo.getAdminUrls(QUERY_NAMESPACE_PATH + "/" + tenant),
+                HttpMethod.GET, null,
+                getHttpHeaders(clusterInfo.getToken()),
                 ArrayList.class);
     }
 
@@ -165,18 +165,18 @@ public class PulsarUtils {
      * @param restTemplate spring framework RestTemplate
      * @param clusterInfo pulsar cluster info
      * @param tenant pulsar tenant name
-     * @param tenantInfo  pulsar tenant info
+     * @param tenantInfo pulsar tenant info
      * @throws Exception any exception if occurred
      */
     public static void createTenant(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String tenant,
             PulsarTenantInfo tenantInfo) throws Exception {
-        final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH + 
"/" + tenant;
         HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
         MediaType type = MediaType.parseMediaType("application/json; 
charset=UTF-8");
         headers.setContentType(type);
         headers.add("Accept", MediaType.APPLICATION_JSON.toString());
         String param = GSON.toJson(tenantInfo);
-        HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+        HttpUtils.request(restTemplate, 
clusterInfo.getAdminUrls(QUERY_TENANTS_PATH + "/" + tenant), HttpMethod.PUT,
+                param, headers);
     }
 
     /**
@@ -191,8 +191,6 @@ public class PulsarUtils {
      */
     public static void createNamespace(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String tenant,
             String namespaceName, PulsarNamespacePolicies policies) throws 
Exception {
-        final String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + 
InlongConstants.SLASH + tenant
-                + InlongConstants.SLASH + namespaceName;
         HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
         MediaType type = MediaType.parseMediaType("application/json; 
charset=UTF-8");
         headers.setContentType(type);
@@ -200,7 +198,8 @@ public class PulsarUtils {
         String param = GSON.toJson(policies);
         param = param.replaceAll("messageTtlInSeconds", 
"message_ttl_in_seconds")
                 .replaceAll("retentionPolicies", "retention_policies");
-        HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+        HttpUtils.request(restTemplate, 
clusterInfo.getAdminUrls(QUERY_NAMESPACE_PATH + InlongConstants.SLASH + tenant
+                + InlongConstants.SLASH + namespaceName), HttpMethod.PUT, 
param, headers);
     }
 
     /**
@@ -215,8 +214,9 @@ public class PulsarUtils {
      */
     public static List<String> getTopics(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String tenant,
             String namespace) throws Exception {
-        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
tenant + "/" + namespace;
-        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+        return HttpUtils.request(restTemplate,
+                clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + tenant 
+ "/" + namespace), HttpMethod.GET, null,
+                getHttpHeaders(clusterInfo.getToken()),
                 ArrayList.class);
     }
 
@@ -232,9 +232,11 @@ public class PulsarUtils {
      */
     public static List<String> getPartitionedTopics(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
             String tenant, String namespace) throws Exception {
-        String url =
-                clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
tenant + "/" + namespace + "/partitioned";
-        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+        return HttpUtils.request(restTemplate,
+                clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + tenant 
+ "/" + namespace
+                        + "/partitioned"),
+                HttpMethod.GET, null,
+                getHttpHeaders(clusterInfo.getToken()),
                 ArrayList.class);
     }
 
@@ -248,8 +250,8 @@ public class PulsarUtils {
      */
     public static void createNonPartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
             String topicPath) throws Exception {
-        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath;
-        HttpUtils.request(restTemplate, url, HttpMethod.PUT, null, 
getHttpHeaders(clusterInfo.getToken()));
+        HttpUtils.request(restTemplate, 
clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + topicPath),
+                HttpMethod.PUT, null, getHttpHeaders(clusterInfo.getToken()));
     }
 
     /**
@@ -262,8 +264,9 @@ public class PulsarUtils {
      */
     public static void createPartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
             String topicPath, Integer numPartitions) throws Exception {
-        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
-        HttpUtils.request(restTemplate, url, HttpMethod.PUT, 
numPartitions.toString(),
+        HttpUtils.request(restTemplate,
+                clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions"), HttpMethod.PUT,
+                numPartitions.toString(),
                 getHttpHeaders(clusterInfo.getToken()));
     }
 
@@ -278,8 +281,9 @@ public class PulsarUtils {
      */
     public static JsonObject getInternalStatsPartitionedTopics(RestTemplate 
restTemplate,
             PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
-        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitioned-internalStats";
-        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+        return HttpUtils.request(restTemplate, 
clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + topicPath
+                + "/partitioned-internalStats"), HttpMethod.GET, null,
+                getHttpHeaders(clusterInfo.getToken()),
                 JsonObject.class);
     }
 
@@ -294,8 +298,9 @@ public class PulsarUtils {
      */
     public static PulsarTopicMetadata getPartitionedTopicMetadata(RestTemplate 
restTemplate,
             PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
-        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
-        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+        return HttpUtils.request(restTemplate,
+                clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions"), HttpMethod.GET, null,
+                getHttpHeaders(clusterInfo.getToken()),
                 PulsarTopicMetadata.class);
     }
 
@@ -309,8 +314,8 @@ public class PulsarUtils {
      */
     public static void deleteNonPartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
             String topicPath) throws Exception {
-        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath;
-        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, 
getHttpHeaders(clusterInfo.getToken()));
+        HttpUtils.request(restTemplate, 
clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + topicPath),
+                HttpMethod.DELETE, null, 
getHttpHeaders(clusterInfo.getToken()));
     }
 
     /**
@@ -323,10 +328,11 @@ public class PulsarUtils {
      */
     public static void forceDeleteNonPartitionedTopic(RestTemplate 
restTemplate, PulsarClusterInfo clusterInfo,
             String topicPath) throws Exception {
-        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath;
         Map<String, Boolean> uriVariables = new HashMap<>();
         uriVariables.put("force", true);
-        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, 
getHttpHeaders(clusterInfo.getToken()));
+        HttpUtils.request(restTemplate, 
clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + topicPath),
+                HttpMethod.DELETE, uriVariables,
+                getHttpHeaders(clusterInfo.getToken()));
     }
 
     /**
@@ -339,8 +345,9 @@ public class PulsarUtils {
      */
     public static void deletePartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
             String topicPath) throws Exception {
-        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
-        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, 
getHttpHeaders(clusterInfo.getToken()));
+        HttpUtils.request(restTemplate,
+                clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions"), HttpMethod.DELETE,
+                null, getHttpHeaders(clusterInfo.getToken()));
     }
 
     /**
@@ -353,10 +360,12 @@ public class PulsarUtils {
      */
     public static void forceDeletePartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
             String topicPath) throws Exception {
-        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
         Map<String, Boolean> uriVariables = new HashMap<>();
         uriVariables.put("force", true);
-        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, 
getHttpHeaders(clusterInfo.getToken()));
+        HttpUtils.request(restTemplate,
+                clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions"), HttpMethod.DELETE,
+                uriVariables,
+                getHttpHeaders(clusterInfo.getToken()));
     }
 
     /**
@@ -407,8 +416,8 @@ public class PulsarUtils {
      */
     public static String lookupTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String topicPath)
             throws Exception {
-        String url = clusterInfo.getAdminUrl() + LOOKUP_TOPIC_PATH + 
"/persistent/" + topicPath;
-        PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, url, 
HttpMethod.GET, null,
+        PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate,
+                clusterInfo.getAdminUrls(LOOKUP_TOPIC_PATH + "/persistent/" + 
topicPath), HttpMethod.GET, null,
                 getHttpHeaders(clusterInfo.getToken()), 
PulsarLookupTopicInfo.class);
         return topicInfo.getBrokerUrl();
     }
@@ -428,8 +437,9 @@ public class PulsarUtils {
         Map<String, String> map = new LinkedHashMap<>();
         for (int i = 0; i < metadata.getPartitions(); i++) {
             String partitionTopicName = topicPath + "-partition-" + i;
-            String partitionUrl = clusterInfo.getAdminUrl() + 
LOOKUP_TOPIC_PATH + "/persistent/" + partitionTopicName;
-            PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, 
partitionUrl, HttpMethod.GET, null,
+            PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate,
+                    clusterInfo.getAdminUrls(LOOKUP_TOPIC_PATH + 
"/persistent/" + partitionTopicName), HttpMethod.GET,
+                    null,
                     getHttpHeaders(clusterInfo.getToken()), 
PulsarLookupTopicInfo.class);
             map.put(partitionTopicName, topicInfo.getBrokerUrl());
         }
@@ -447,15 +457,17 @@ public class PulsarUtils {
      */
     public static List<String> getSubscriptions(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
             String topicPath) throws Exception {
-        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/subscriptions";
-        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+        return HttpUtils.request(restTemplate,
+                clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/subscriptions"), HttpMethod.GET,
+                null,
+                getHttpHeaders(clusterInfo.getToken()),
                 ArrayList.class);
     }
 
     /**
      * Create a subscription on the topic.
      *
-     * @param restTemplate  spring framework RestTemplate
+     * @param restTemplate spring framework RestTemplate
      * @param clusterInfo pulsar cluster info
      * @param topicPath pulsar topic path
      * @param subscription pulsar topic subscription info
@@ -463,8 +475,6 @@ public class PulsarUtils {
      */
     public static void createSubscription(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String topicPath,
             String subscription) throws Exception {
-        String url =
-                clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/subscription/" + subscription;
         JsonObject jsonObject = new JsonObject();
         jsonObject.addProperty("entryId", Long.MAX_VALUE);
         jsonObject.addProperty("ledgerId", Long.MAX_VALUE);
@@ -473,7 +483,10 @@ public class PulsarUtils {
         MediaType type = MediaType.parseMediaType("application/json; 
charset=UTF-8");
         headers.setContentType(type);
         headers.add("Accept", MediaType.APPLICATION_JSON.toString());
-        HttpUtils.request(restTemplate, url, HttpMethod.PUT, 
jsonObject.toString(), headers);
+        HttpUtils.request(restTemplate,
+                clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/subscription/"
+                        + subscription),
+                HttpMethod.PUT, jsonObject.toString(), headers);
     }
 
     /**
@@ -481,7 +494,7 @@ public class PulsarUtils {
      *
      * @param restTemplate spring framework RestTemplate
      * @param clusterInfo pulsar cluster info
-     * @param topicPartition  pulsar topic partition info
+     * @param topicPartition pulsar topic partition info
      * @param messageType pulsar message type info
      * @param messagePosition pulsar message position info
      * @return spring framework HttpEntity
@@ -489,22 +502,36 @@ public class PulsarUtils {
      */
     public static ResponseEntity<byte[]> examineMessage(RestTemplate 
restTemplate, PulsarClusterInfo clusterInfo,
             String topicPartition, String messageType, int messagePosition) 
throws Exception {
-        StringBuilder urlBuilder = new 
StringBuilder().append(clusterInfo.getAdminUrl())
-                .append(QUERY_PERSISTENT_PATH)
-                .append("/")
-                .append(topicPartition)
-                .append("/examinemessage")
-                .append("?initialPosition=")
-                .append(messageType)
-                .append("&messagePosition=")
-                .append(messagePosition);
-        ResponseEntity<byte[]> response = 
restTemplate.exchange(urlBuilder.toString(), HttpMethod.GET,
-                new HttpEntity<>(getHttpHeaders(clusterInfo.getToken())), 
byte[].class);
-        if (!response.getStatusCode().is2xxSuccessful()) {
-            log.error("request error for {}, status code {}, body {}", 
urlBuilder.toString(), response.getStatusCode(),
-                    response.getBody());
-        }
-        return response;
+        String adminUrl = clusterInfo.getAdminUrl();
+        String[] adminUrls = adminUrl.replace(HTTP_PREFIX, 
InlongConstants.EMPTY).split(InlongConstants.COMMA);
+        for (int i = 0; i < adminUrls.length; i++) {
+            try {
+                StringBuilder urlBuilder = new 
StringBuilder().append(HTTP_PREFIX + adminUrls[i])
+                        .append(QUERY_PERSISTENT_PATH)
+                        .append("/")
+                        .append(topicPartition)
+                        .append("/examinemessage")
+                        .append("?initialPosition=")
+                        .append(messageType)
+                        .append("&messagePosition=")
+                        .append(messagePosition);
+                ResponseEntity<byte[]> response = 
restTemplate.exchange(urlBuilder.toString(), HttpMethod.GET,
+                        new 
HttpEntity<>(getHttpHeaders(clusterInfo.getToken())), byte[].class);
+                if (!response.getStatusCode().is2xxSuccessful()) {
+                    log.error("request error for {}, status code {}, body {}", 
urlBuilder.toString(),
+                            response.getStatusCode(),
+                            response.getBody());
+                }
+                return response;
+            } catch (Exception e) {
+                log.error("examine message for topic partition={} error, begin 
retry", topicPartition, e);
+                if (i >= (adminUrls.length - 1)) {
+                    log.error("after retry, examine message for topic 
partition={} still error", topicPartition, e);
+                    throw e;
+                }
+            }
+        }
+        throw new Exception(String.format("examine message failed for topic 
partition=%s", topicPartition));
     }
 
     public static PulsarMessageInfo 
getMessageFromHttpResponse(ResponseEntity<byte[]> response, String topic)
@@ -518,7 +545,7 @@ public class PulsarUtils {
     }
 
     /**
-     *  Copy from getMessagesFromHttpResponse method of 
org.apache.pulsar.client.admin.internal.TopicsImpl class.
+     * Copy from getMessagesFromHttpResponse method of 
org.apache.pulsar.client.admin.internal.TopicsImpl class.
      *
      * @param response
      * @param topic

Reply via email to