fuweng11 commented on code in PR #8941:
URL: https://github.com/apache/inlong/pull/8941#discussion_r1367604829


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java:
##########
@@ -260,31 +264,42 @@ public void createSubscription(PulsarAdmin pulsarAdmin, 
String fullTopicName, St
     /**
      * Create a Pulsar subscription for the specified topic list
      */
-    public void createSubscriptions(PulsarAdmin pulsarAdmin, String 
subscription, PulsarTopicInfo topicInfo,
-            List<String> topicList) throws PulsarAdminException {
+    public void createSubscriptions(PulsarClusterInfo pulsarClusterInfo, 
String subscription, PulsarTopicInfo topicInfo,
+            List<String> topicList) throws Exception {
         for (String topic : topicList) {
             topicInfo.setTopicName(topic);
             String fullTopicName = topicInfo.getPulsarTenant() + "/" + 
topicInfo.getNamespace() + "/" + topic;
-            this.createSubscription(pulsarAdmin, fullTopicName, 
topicInfo.getQueueModule(), subscription);
+            this.createSubscription(pulsarClusterInfo, fullTopicName, 
topicInfo.getQueueModule(), subscription);
         }
         LOGGER.info("success to create subscription={} for multiple 
topics={}", subscription, topicList);
     }
 
     /**
      * Check if Pulsar tenant exists
+     *
+     * @param pulsarClusterInfo

Review Comment:
   Please add a specific description, like:
   ```
        /**
        * Poll the Sort status infos by the given inlong groups
        *
        * @param groupInfos inlong group infos
        * @param credentials credential info
        * @return list of Sort status infos
        * @throws Exception any exception if occurred
        */
   ```



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java:
##########
@@ -39,54 +51,420 @@ public class PulsarUtils {
     private PulsarUtils() {
     }
 
+    public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
+    public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
+    public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
+    public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
+    public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
+    public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
+
+    private static final Gson GSON = new GsonBuilder().create(); // thread safe
+
+    /**
+     * get http headers by token.
+     *
+     * @param token
+     * @return

Review Comment:
   Ditto.



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java:
##########
@@ -260,31 +264,42 @@ public void createSubscription(PulsarAdmin pulsarAdmin, 
String fullTopicName, St
     /**
      * Create a Pulsar subscription for the specified topic list
      */
-    public void createSubscriptions(PulsarAdmin pulsarAdmin, String 
subscription, PulsarTopicInfo topicInfo,
-            List<String> topicList) throws PulsarAdminException {
+    public void createSubscriptions(PulsarClusterInfo pulsarClusterInfo, 
String subscription, PulsarTopicInfo topicInfo,
+            List<String> topicList) throws Exception {
         for (String topic : topicList) {
             topicInfo.setTopicName(topic);
             String fullTopicName = topicInfo.getPulsarTenant() + "/" + 
topicInfo.getNamespace() + "/" + topic;
-            this.createSubscription(pulsarAdmin, fullTopicName, 
topicInfo.getQueueModule(), subscription);
+            this.createSubscription(pulsarClusterInfo, fullTopicName, 
topicInfo.getQueueModule(), subscription);
         }
         LOGGER.info("success to create subscription={} for multiple 
topics={}", subscription, topicList);
     }
 
     /**
      * Check if Pulsar tenant exists
+     *
+     * @param pulsarClusterInfo
+     * @param tenant
+     * @return
+     * @throws Exception
      */
-    private boolean tenantIsExists(PulsarAdmin pulsarAdmin, String tenant) 
throws PulsarAdminException {
-        List<String> tenantList = pulsarAdmin.tenants().getTenants();
-        return tenantList.contains(tenant);
+    private boolean tenantIsExists(PulsarClusterInfo pulsarClusterInfo, String 
tenant) throws Exception {
+        List<String> tenants = PulsarUtils.getPulsarTenants(restTemplate, 
pulsarClusterInfo);
+        return tenants.contains(tenant);
     }
 
     /**
-     * Check whether the Pulsar namespace exists under the specified tenant
+     * Check whether the Pulsar namespace exists under the specified tenant.
+     *
+     * @param pulsarClusterInfo
+     * @param tenant
+     * @param namespace
+     * @return
+     * @throws Exception

Review Comment:
   Ditto.



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java:
##########
@@ -39,54 +51,420 @@ public class PulsarUtils {
     private PulsarUtils() {
     }
 
+    public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
+    public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
+    public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
+    public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
+    public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
+    public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
+
+    private static final Gson GSON = new GsonBuilder().create(); // thread safe
+
+    /**
+     * get http headers by token.
+     *
+     * @param token
+     * @return
+     */
+    private static HttpHeaders getHttpHeaders(String token) {
+        HttpHeaders headers = new HttpHeaders();
+        if (StringUtils.isNotEmpty(token)) {
+            headers.add("Authorization", "Bearer " + token);
+        }
+        return headers;
+    }
+
     /**
-     * Get pulsar admin info
+     * Get pulsar cluster info list.
+     *
+     * @param restTemplate
+     * @param clusterInfo
+     * @return
+     * @throws Exception

Review Comment:
   Ditto.



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java:
##########
@@ -39,54 +51,420 @@ public class PulsarUtils {
     private PulsarUtils() {
     }
 
+    public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
+    public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
+    public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
+    public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
+    public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
+    public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
+
+    private static final Gson GSON = new GsonBuilder().create(); // thread safe
+
+    /**
+     * get http headers by token.
+     *
+     * @param token
+     * @return
+     */
+    private static HttpHeaders getHttpHeaders(String token) {
+        HttpHeaders headers = new HttpHeaders();
+        if (StringUtils.isNotEmpty(token)) {
+            headers.add("Authorization", "Bearer " + token);
+        }
+        return headers;
+    }
+
     /**
-     * Get pulsar admin info
+     * Get pulsar cluster info list.
+     *
+     * @param restTemplate
+     * @param clusterInfo
+     * @return
+     * @throws Exception
      */
-    public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarCluster) 
throws PulsarClientException {
-        Preconditions.expectNotBlank(pulsarCluster.getAdminUrl(), 
ErrorCodeEnum.INVALID_PARAMETER,
-                "Pulsar adminUrl cannot be empty");
-        PulsarAdmin pulsarAdmin;
-        if (StringUtils.isEmpty(pulsarCluster.getToken())) {
-            pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl());
+    public static List<String> getPulsarClusters(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo)
+            throws Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_CLUSTERS_PATH;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Get the list of active brokers.
+     *
+     * @param restTemplate
+     * @param clusterInfo
+     * @return
+     * @throws Exception

Review Comment:
   Ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to