aloyszhang commented on code in PR #8941:
URL: https://github.com/apache/inlong/pull/8941#discussion_r1379664691
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java:
##########
@@ -39,54 +65,919 @@ public class PulsarUtils {
private PulsarUtils() {
}
+ private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
+ ZoneId.systemDefault());
+
+ public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
+ public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
+ public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
+ public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
+ public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
+ public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
+
+ private static final Gson GSON = new GsonBuilder().create(); // thread safe
+
+ /**
+ * Get http headers by token.
+ *
+ * @param token pulsar token info
+ * @return add http headers for token info
+ */
+ private static HttpHeaders getHttpHeaders(String token) {
+ HttpHeaders headers = new HttpHeaders();
+ if (StringUtils.isNotEmpty(token)) {
+ headers.add("Authorization", "Bearer " + token);
+ }
+ return headers;
+ }
+
+ /**
+ * Get pulsar cluster info list.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @return list of pulsar cluster infos
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getPulsarClusters(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo)
Review Comment:
How about change `getPulsarClusters` to `getClusters`
I think there is no need to specify pulsar in the method name here since we
are already in a class named `PulsarUtils`
The following method names may have the same problem, please check also.
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java:
##########
@@ -39,54 +65,919 @@ public class PulsarUtils {
private PulsarUtils() {
}
+ private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
+ ZoneId.systemDefault());
+
+ public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
+ public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
+ public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
+ public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
+ public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
+ public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
+
+ private static final Gson GSON = new GsonBuilder().create(); // thread safe
+
+ /**
+ * Get http headers by token.
+ *
+ * @param token pulsar token info
+ * @return add http headers for token info
+ */
+ private static HttpHeaders getHttpHeaders(String token) {
+ HttpHeaders headers = new HttpHeaders();
+ if (StringUtils.isNotEmpty(token)) {
+ headers.add("Authorization", "Bearer " + token);
+ }
+ return headers;
+ }
+
+ /**
+ * Get pulsar cluster info list.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @return list of pulsar cluster infos
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getPulsarClusters(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo)
+ throws Exception {
+ final String url = clusterInfo.getAdminUrl() + QUERY_CLUSTERS_PATH;
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class);
+ }
+
/**
- * Get pulsar admin info
+ * Get the list of active brokers.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @return list of pulsar broker infos
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getPulsarBrokers(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo)
+ throws Exception {
+ List<String> clusters = getPulsarClusters(restTemplate, clusterInfo);
+ List<String> brokers = new ArrayList<>();
+ for (String brokerName : brokers) {
+ String url = clusterInfo.getAdminUrl() + QUERY_BROKERS_PATH + "/"
+ brokerName;
+ clusters.addAll(
+ HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class));
+ }
+ return clusters;
+ }
+
+ /**
+ * Get pulsar tenant info list.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @return list of pulsar tenant infos
+ * @throws Exception any exception if occurred
*/
- public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarCluster)
throws PulsarClientException {
- Preconditions.expectNotBlank(pulsarCluster.getAdminUrl(),
ErrorCodeEnum.INVALID_PARAMETER,
- "Pulsar adminUrl cannot be empty");
- PulsarAdmin pulsarAdmin;
- if (StringUtils.isEmpty(pulsarCluster.getToken())) {
- pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl());
+ public static List<String> getPulsarTenants(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo)
+ throws Exception {
+ final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH;
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class);
+ }
+
+ /**
+ * Get pulsar namespace info list.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param tenant pulsar tenant name
+ * @return list of pulsar namespace infos
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getPulsarNamespaces(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo,
+ String tenant) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + "/" +
tenant;
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class);
+ }
+
+ /**
+ * Create a new pulsar tenant.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param tenant pulsar tenant name
+ * @param tenantInfo pulsar tenant info
+ * @throws Exception any exception if occurred
+ */
+ public static void createTenant(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo, String tenant,
+ PulsarTenantInfo tenantInfo) throws Exception {
+ final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH +
"/" + tenant;
+ HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+ MediaType type = MediaType.parseMediaType("application/json;
charset=UTF-8");
+ headers.setContentType(type);
+ headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+ String param = GSON.toJson(tenantInfo);
+ HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+ }
+
+ /**
+ * Creates a new pulsar namespace with the specified policies.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param namespaceName pulsar namespace name
+ * @param policies pulsar namespace policies info
+ * @throws Exception any exception if occurred
+ */
+ public static void createNamespace(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo, String namespaceName,
+ PulsarNamespacePolicies policies) throws Exception {
+ final String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH +
"/" + namespaceName;
+ HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+ MediaType type = MediaType.parseMediaType("application/json;
charset=UTF-8");
+ headers.setContentType(type);
+ headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+ String param = GSON.toJson(policies);
+ param = param.replaceAll("messageTtlInSeconds",
"message_ttl_in_seconds")
+ .replaceAll("retentionPolicies", "retention_policies");
+ HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+ }
+
+ /**
+ * Get the list of topics under a namespace.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param tenant pulsar tenant name
+ * @param namespace pulsar namespace name
+ * @return list of pulsar topic infos
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getPulsarTopics(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo, String tenant,
+ String namespace) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
tenant + "/" + namespace;
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class);
+ }
+
+ /**
+ * Get the list of partitioned topics under a namespace.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param tenant pulsar tenant name
+ * @param namespace pulsar namespace name
+ * @return list of pulsar partitioned topic infos
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getPulsarPartitionedTopics(RestTemplate
restTemplate, PulsarClusterInfo clusterInfo,
+ String tenant, String namespace) throws Exception {
+ String url =
+ clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
tenant + "/" + namespace + "/partitioned";
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class);
+ }
+
+ /**
+ * Create a non-partitioned topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @throws Exception any exception if occurred
+ */
+ public static void createNonPartitionedTopic(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo,
+ String topicPath) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath;
+ HttpUtils.request(restTemplate, url, HttpMethod.PUT, null,
getHttpHeaders(clusterInfo.getToken()));
+ }
+
+ /**
+ * Create a partitioned topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @throws Exception any exception if occurred
+ */
+ public static void createPartitionedTopic(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo,
+ String topicPath, Integer numPartitions) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" +
topicPath + "/partitions";
+ HttpUtils.request(restTemplate, url, HttpMethod.PUT,
numPartitions.toString(),
+ getHttpHeaders(clusterInfo.getToken()));
+ }
+
+ /**
+ * Get the stats-internal for the partitioned topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @return pulsar internal stat info of partitioned topic
+ * @throws Exception any exception if occurred
+ */
+ public static JsonObject getPulsarStatsPartitionedTopics(RestTemplate
restTemplate,
Review Comment:
Confused method name.
How about `getPartitionedTopicInternalStats`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]