haibo-duan commented on code in PR #8941:
URL: https://github.com/apache/inlong/pull/8941#discussion_r1390570482


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java:
##########
@@ -39,54 +66,921 @@ public class PulsarUtils {
     private PulsarUtils() {
     }
 
+    private static final DateTimeFormatter DATE_FORMAT = 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
+            ZoneId.systemDefault());
+
+    public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
+    public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
+    public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
+    public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
+    public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
+    public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
+
+    private static final Gson GSON = new GsonBuilder().create(); // thread safe
+
+    /**
+     * Get http headers by token.
+     *
+     * @param token pulsar token info
+     * @return add http headers for token info
+     */
+    private static HttpHeaders getHttpHeaders(String token) {
+        HttpHeaders headers = new HttpHeaders();
+        if (StringUtils.isNotEmpty(token)) {
+            headers.add("Authorization", "Bearer " + token);
+        }
+        return headers;
+    }
+
+    /**
+     * Get pulsar cluster info list.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @return list of pulsar cluster infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getClusters(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo)
+            throws Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_CLUSTERS_PATH;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
     /**
-     * Get pulsar admin info
+     * Get the list of active brokers.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @return list of pulsar broker infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getBrokers(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo)
+            throws Exception {
+        List<String> clusters = getClusters(restTemplate, clusterInfo);
+        List<String> brokers = new ArrayList<>();
+        for (String brokerName : clusters) {
+            String url = clusterInfo.getAdminUrl() + QUERY_BROKERS_PATH + "/" 
+ brokerName;
+            brokers.addAll(
+                    HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                            ArrayList.class));
+        }
+        return brokers;
+    }
+
+    /**
+     * Get pulsar tenant info list.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @return list of pulsar tenant infos
+     * @throws Exception any exception if occurred
      */
-    public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarCluster) 
throws PulsarClientException {
-        Preconditions.expectNotBlank(pulsarCluster.getAdminUrl(), 
ErrorCodeEnum.INVALID_PARAMETER,
-                "Pulsar adminUrl cannot be empty");
-        PulsarAdmin pulsarAdmin;
-        if (StringUtils.isEmpty(pulsarCluster.getToken())) {
-            pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl());
+    public static List<String> getTenants(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo)
+            throws Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Get pulsar namespace info list.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @return list of pulsar namespace infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getNamespaces(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String tenant) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + "/" + 
tenant;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Create a new pulsar tenant.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @param tenantInfo  pulsar tenant info
+     * @throws Exception any exception if occurred
+     */
+    public static void createTenant(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String tenant,
+            PulsarTenantInfo tenantInfo) throws Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH + 
"/" + tenant;
+        HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+        MediaType type = MediaType.parseMediaType("application/json; 
charset=UTF-8");
+        headers.setContentType(type);
+        headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+        String param = GSON.toJson(tenantInfo);
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+    }
+
+    /**
+     * Creates a new pulsar namespace with the specified policies.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar namespace name
+     * @param namespaceName pulsar namespace name
+     * @param policies pulsar namespace policies info
+     * @throws Exception any exception if occurred
+     */
+    public static void createNamespace(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String tenant,
+            String namespaceName, PulsarNamespacePolicies policies) throws 
Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + 
InlongConstants.SLASH + tenant
+                + InlongConstants.SLASH + namespaceName;
+        HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+        MediaType type = MediaType.parseMediaType("application/json; 
charset=UTF-8");
+        headers.setContentType(type);
+        headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+        String param = GSON.toJson(policies);
+        param = param.replaceAll("messageTtlInSeconds", 
"message_ttl_in_seconds")
+                .replaceAll("retentionPolicies", "retention_policies");
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+    }
+
+    /**
+     * Get the list of topics under a namespace.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @param namespace pulsar namespace name
+     * @return list of pulsar topic infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getTopics(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String tenant,
+            String namespace) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
tenant + "/" + namespace;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Get the list of partitioned topics under a namespace.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @param namespace pulsar namespace name
+     * @return list of pulsar partitioned topic infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getPartitionedTopics(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String tenant, String namespace) throws Exception {
+        String url =
+                clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
tenant + "/" + namespace + "/partitioned";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Create a non-partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void createNonPartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath;
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, null, 
getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Create a partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void createPartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath, Integer numPartitions) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, 
numPartitions.toString(),
+                getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Get the stats-internal for the partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return pulsar internal stat info of partitioned topic
+     * @throws Exception any exception if occurred
+     */
+    public static JsonObject getInternalStatsPartitionedTopics(RestTemplate 
restTemplate,
+            PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitioned-internalStats";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                JsonObject.class);
+    }
+
+    /**
+     * Get partitioned topic metadata.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return pulsar topic metadata info
+     * @throws Exception any exception if occurred
+     */
+    public static PulsarTopicMetadata getPartitionedTopicMetadata(RestTemplate 
restTemplate,
+            PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                PulsarTopicMetadata.class);
+    }
+
+    /**
+     * Delete a topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void deleteNonPartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath;
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, 
getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Force delete a topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void forceDeleteNonPartitionedTopic(RestTemplate 
restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath;
+        Map<String, Boolean> uriVariables = new HashMap<>();
+        uriVariables.put("force", true);
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, 
getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Delete a partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void deletePartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, 
getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Force delete a partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void forceDeletePartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
+        Map<String, Boolean> uriVariables = new HashMap<>();
+        uriVariables.put("force", true);
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, 
getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Delete a partitioned or non-partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @param isPartitioned pulsar is partitioned topic
+     * @throws Exception any exception if occurred
+     */
+    public static void deleteTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String topicPath,
+            boolean isPartitioned) throws Exception {
+        if (isPartitioned) {
+            deletePartitionedTopic(restTemplate, clusterInfo, topicPath);
         } else {
-            pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl(), 
pulsarCluster.getToken());
+            deleteNonPartitionedTopic(restTemplate, clusterInfo, topicPath);
         }
-        return pulsarAdmin;
     }
 
     /**
-     * Get the pulsar admin from the given service URL.
+     * Force delete a partitioned or non-partitioned topic.
      *
-     * @apiNote It must be closed after use.
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @param isPartitioned pulsar is partitioned topic
+     * @throws Exception any exception if occurred
      */
-    public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl) throws 
PulsarClientException {
-        return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
+    public static void forceDeleteTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String topicPath,
+            boolean isPartitioned)
+            throws Exception {
+        if (isPartitioned) {
+            forceDeletePartitionedTopic(restTemplate, clusterInfo, topicPath);
+        } else {
+            forceDeleteNonPartitionedTopic(restTemplate, clusterInfo, 
topicPath);
+        }
     }
 
     /**
-     * Get the pulsar admin from the given service URL and token.
-     * <p/>
-     * Currently only token is supported as an authentication type.
+     * lookup persistent topic info.
      *
-     * @apiNote It must be closed after use.
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return pulsar broker url
+     * @throws Exception any exception if occurred
      */
-    public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String 
token) throws PulsarClientException {
-        return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
-                .authentication(AuthenticationFactory.token(token)).build();
+    public static String lookupTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String topicPath)
+            throws Exception {
+        String url = clusterInfo.getAdminUrl() + LOOKUP_TOPIC_PATH + 
"/persistent/" + topicPath;
+        PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, url, 
HttpMethod.GET, null,
+                getHttpHeaders(clusterInfo.getToken()), 
PulsarLookupTopicInfo.class);
+        return topicInfo.getBrokerUrl();
     }
 
     /**
-     * Get pulsar cluster info list.
+     * lookup persistent partitioned topic info.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return map of partitioned topic info
+     * @throws Exception any exception if occurred
+     */
+    public static Map<String, String> lookupPartitionedTopic(RestTemplate 
restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        PulsarTopicMetadata metadata = 
getPartitionedTopicMetadata(restTemplate, clusterInfo, topicPath);
+        Map<String, String> map = new LinkedHashMap<>();
+        for (int i = 0; i < metadata.getPartitions(); i++) {
+            String partitionTopicName = topicPath + "-partition-" + i;
+            String partitionUrl = clusterInfo.getAdminUrl() + 
LOOKUP_TOPIC_PATH + "/persistent/" + partitionTopicName;
+            PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, 
partitionUrl, HttpMethod.GET, null,
+                    getHttpHeaders(clusterInfo.getToken()), 
PulsarLookupTopicInfo.class);
+            map.put(partitionTopicName, topicInfo.getBrokerUrl());
+        }
+        return map;
+    }
+
+    /**
+     * Get the list of persistent subscriptions for a given topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return list of pulsar topic subscription info
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getSubscriptions(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/subscriptions";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Create a subscription on the topic.
+     *
+     * @param restTemplate  spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @param subscription pulsar topic subscription info
+     * @throws Exception any exception if occurred
+     */
+    public static void createSubscription(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String topicPath,
+            String subscription) throws Exception {
+        String url =
+                clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/subscription/" + subscription;
+        JsonObject jsonObject = new JsonObject();
+        jsonObject.addProperty("entryId", Long.MAX_VALUE);
+        jsonObject.addProperty("ledgerId", Long.MAX_VALUE);
+        jsonObject.addProperty("partitionIndex", -1);
+        HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+        MediaType type = MediaType.parseMediaType("application/json; 
charset=UTF-8");
+        headers.setContentType(type);
+        headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, 
jsonObject.toString(), headers);
+    }
+
+    /**
+     * Examine a specific message on a topic by position relative to the 
earliest or the latest message.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPartition  pulsar topic partition info
+     * @param messageType pulsar message type info
+     * @param messagePosition pulsar message position info
+     * @return spring framework HttpEntity
+     * @throws Exception any exception if occurred
+     */
+    public static ResponseEntity<byte[]> examineMessage(RestTemplate 
restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPartition, String messageType, int messagePosition) 
throws Exception {
+        StringBuilder urlBuilder = new 
StringBuilder().append(clusterInfo.getAdminUrl())
+                .append(QUERY_PERSISTENT_PATH)
+                .append("/")
+                .append(topicPartition)
+                .append("/examinemessage")
+                .append("?initialPosition=")
+                .append(messageType)
+                .append("&messagePosition=")
+                .append(messagePosition);
+        ResponseEntity<byte[]> response = 
restTemplate.exchange(urlBuilder.toString(), HttpMethod.GET,
+                new HttpEntity<>(getHttpHeaders(clusterInfo.getToken())), 
byte[].class);
+        if (!response.getStatusCode().is2xxSuccessful()) {
+            log.error("request error for {}, status code {}, body {}", 
urlBuilder.toString(), response.getStatusCode(),
+                    response.getBody());
+        }
+        return response;
+    }
+
+    public static PulsarMessageInfo 
getMessageFromHttpResponse(ResponseEntity<byte[]> response, String topic)
+            throws Exception {
+        List<PulsarMessageInfo> messages = 
PulsarUtils.getMessagesFromHttpResponse(response, topic);
+        if (messages.size() > 0) {
+            return messages.get(0);
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     *  Copy from getMessagesFromHttpResponse method of 
org.apache.pulsar.client.admin.internal.TopicsImpl class.
+     *
+     * @param response
+     * @param topic
+     * @return
+     * @throws Exception
      */
-    public static List<String> getPulsarClusters(PulsarAdmin pulsarAdmin) 
throws PulsarAdminException {
-        return pulsarAdmin.clusters().getClusters();
+    public static List<PulsarMessageInfo> 
getMessagesFromHttpResponse(ResponseEntity<byte[]> response, String topic)
+            throws Exception {
+        HttpHeaders headers = response.getHeaders();
+        String msgId = headers.getFirst("X-Pulsar-Message-ID");
+        String brokerEntryTimestamp = 
headers.getFirst("X-Pulsar-Broker-Entry-METADATA-timestamp");
+        String brokerEntryIndex = 
headers.getFirst("X-Pulsar-Broker-Entry-METADATA-index");
+        PulsarBrokerEntryMetadata brokerEntryMetadata;
+        if (brokerEntryTimestamp == null && brokerEntryIndex == null) {
+            brokerEntryMetadata = null;
+        } else {
+            brokerEntryMetadata = new PulsarBrokerEntryMetadata();
+            if (brokerEntryTimestamp != null) {
+                
brokerEntryMetadata.setBrokerTimestamp(parse(brokerEntryTimestamp.toString()));
+            }
+            if (brokerEntryIndex != null) {
+                brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex));
+            }
+        }
+
+        PulsarMessageMetadata messageMetadata = new PulsarMessageMetadata();
+        Map<String, String> properties = Maps.newTreeMap();
+
+        Object tmp = headers.getFirst("X-Pulsar-publish-time");
+        if (tmp != null) {
+            messageMetadata.setPublishTime(parse(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-event-time");
+        if (tmp != null) {
+            messageMetadata.setEventTime(parse(tmp.toString()));
+        }
+        tmp = headers.getFirst("X-Pulsar-deliver-at-time");
+        if (tmp != null) {
+            messageMetadata.setDeliverAtTime(parse(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-null-value");
+        if (tmp != null) {
+            messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-producer-name");
+        if (tmp != null) {
+            messageMetadata.setProducerName(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-sequence-id");
+        if (tmp != null) {
+            messageMetadata.setSequenceId(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-replicated-from");
+        if (tmp != null) {
+            messageMetadata.setReplicatedFrom(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-partition-key");
+        if (tmp != null) {
+            messageMetadata.setPartitionKey(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-compression");
+        if (tmp != null) {
+            messageMetadata.setCompression(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-uncompressed-size");
+        if (tmp != null) {
+            
messageMetadata.setUncompressedSize(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-encryption-algo");
+        if (tmp != null) {
+            messageMetadata.setEncryptionAlgo(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-partition-key-b64-encoded");
+        if (tmp != null) {
+            
messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-marker-type");
+        if (tmp != null) {
+            messageMetadata.setMarkerType(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-txnid-least-bits");
+        if (tmp != null) {
+            messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-txnid-most-bits");
+        if (tmp != null) {
+            messageMetadata.setTxnidMostBits(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-highest-sequence-id");
+        if (tmp != null) {
+            
messageMetadata.setHighestSequenceId(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-uuid");
+        if (tmp != null) {
+            messageMetadata.setUuid(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-num-chunks-from-msg");
+        if (tmp != null) {
+            
messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-total-chunk-msg-size");
+        if (tmp != null) {
+            
messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-chunk-id");
+        if (tmp != null) {
+            messageMetadata.setChunkId(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-null-partition-key");
+        if (tmp != null) {
+            
messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
+        if (tmp != null) {
+            
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-ordering-key");
+        if (tmp != null) {
+            
messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-schema-version-b64encoded");
+        if (tmp != null) {
+            
messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
+        if (tmp != null) {
+            
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        List<String> tmpList = (List) headers.get("X-Pulsar-replicated-to");
+        if (ObjectUtils.isNotEmpty(tmpList)) {
+            if (ObjectUtils.isEmpty(messageMetadata.getReplicateTos())) {
+                messageMetadata.setReplicateTos(Lists.newArrayList(tmpList));
+            } else {
+                messageMetadata.getReplicateTos().addAll(tmpList);
+            }
+        }
+
+        tmp = headers.getFirst("X-Pulsar-batch-size");
+        if (tmp != null) {
+            properties.put("X-Pulsar-batch-size", (String) tmp);
+        }
+
+        for (Entry<String, List<String>> entry : headers.entrySet()) {
+            if (entry.getKey().contains("X-Pulsar-PROPERTY-")) {
+                String keyName = 
entry.getKey().substring("X-Pulsar-PROPERTY-".length());
+                properties.put(keyName, (String) ((List) 
entry.getValue()).get(0));
+            }
+        }
+
+        tmp = headers.getFirst("X-Pulsar-num-batch-message");
+        if (tmp != null) {
+            properties.put("X-Pulsar-num-batch-message", (String) tmp);
+        }
+        boolean isEncrypted = false;
+        tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
+        if (tmp != null) {
+            isEncrypted = Boolean.parseBoolean(tmp.toString());
+        }
+
+        if (!isEncrypted && headers.get("X-Pulsar-num-batch-message") != null) 
{
+            return getIndividualMsgsFromBatch(topic, msgId, 
response.getBody(), properties, messageMetadata,
+                    brokerEntryMetadata);
+        }
+
+        PulsarMessageInfo messageInfo = new PulsarMessageInfo();
+        messageInfo.setTopic(topic);
+        messageInfo.setMessageId(msgId);
+        messageInfo.setProperties(messageMetadata.getProperties());
+        messageInfo.setBody(response.getBody());
+        messageInfo.setPulsarMessageMetadata(messageMetadata);
+        if (brokerEntryMetadata != null) {
+            messageInfo.setPulsarBrokerEntryMetadata(brokerEntryMetadata);
+        }
+        return Collections.singletonList(messageInfo);
+    }
+
+    private static long parse(String datetime) throws DateTimeParseException {
+        Instant instant = Instant.from(DATE_FORMAT.parse(datetime));
+        return instant.toEpochMilli();
+    }
+
+    /**
+     * Copy from getIndividualMsgsFromBatch method of 
org.apache.pulsar.client.admin.internal.TopicsImpl class.
+     *
+     * @param topic
+     * @param msgId
+     * @param data
+     * @param properties
+     * @param metadata
+     * @param brokerMetadata
+     * @return
+     */
+    private static List<PulsarMessageInfo> getIndividualMsgsFromBatch(String 
topic, String msgId, byte[] data,
+            Map<String, String> properties, PulsarMessageMetadata metadata, 
PulsarBrokerEntryMetadata brokerMetadata) {
+        List<PulsarMessageInfo> ret = new ArrayList<>();
+        int batchSize = 
Integer.parseInt(properties.get("X-Pulsar-num-batch-message"));
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        for (int i = 0; i < batchSize; ++i) {
+            String batchMsgId = msgId + ":" + i;
+            PulsarMessageMetadata singleMetadata = new PulsarMessageMetadata();
+            singleMetadata.setProperties(properties);
+            ByteBuffer singleMessagePayload = 
deSerializeSingleMessageInBatch(buffer, singleMetadata, i, batchSize);
+            PulsarMessageInfo messageInfo = new PulsarMessageInfo();
+            messageInfo.setTopic(topic);
+            messageInfo.setMessageId(batchMsgId);
+            messageInfo.setProperties(singleMetadata.getProperties());
+            messageInfo.setPulsarMessageMetadata(metadata);
+            messageInfo.setBody(singleMessagePayload.array());
+            if (brokerMetadata != null) {
+                messageInfo.setPulsarBrokerEntryMetadata(brokerMetadata);
+            }
+            ret.add(messageInfo);
+        }
+        buffer.clear();
+        return ret;
+    }
+
+    /**
+     * Copy from deSerializeSingleMessageInBatch method of 
org.apache.pulsar.common.protocol.Commands class.
+     *
+     * @param uncompressedPayload
+     * @param metadata
+     * @param index
+     * @param batchSize
+     * @return
+     */
+    private static ByteBuffer deSerializeSingleMessageInBatch(ByteBuffer 
uncompressedPayload,
+            PulsarMessageMetadata metadata, int index, int batchSize) {
+        int singleMetaSize = (int) uncompressedPayload.getInt();
+        metaDataParseFrom(metadata, uncompressedPayload, singleMetaSize);
+        int singleMessagePayloadSize = metadata.getPayloadSize();
+        int readerIndex = uncompressedPayload.position();
+        byte[] singleMessagePayload = new byte[singleMessagePayloadSize];
+        uncompressedPayload.get(singleMessagePayload);
+        if (index < batchSize) {
+            uncompressedPayload.position(readerIndex + 
singleMessagePayloadSize);
+        }
+        return ByteBuffer.wrap(singleMessagePayload);
+    }
+
+    /**
+     * Copy from parseFrom method of 
org.apache.pulsar.common.api.proto.SingleMessageMetadata class.
+     *
+     * @param metadata
+     * @param buffer
+     * @param size
+     */
+    private static void metaDataParseFrom(PulsarMessageMetadata metadata, 
ByteBuffer buffer, int size) {
+        int endIdx = size + buffer.position();
+        while (buffer.position() < endIdx) {
+            int tag = readVarInt(buffer);
+            switch (tag) {
+                case 10:
+                    int _propertiesSize = readVarInt(buffer);
+                    parseFrom(metadata, buffer, _propertiesSize);
+                    break;
+                case 18:
+                    int _partitionKeyBufferLen = readVarInt(buffer);
+                    byte[] partitionKeyArray = new 
byte[_partitionKeyBufferLen];
+                    buffer.get(partitionKeyArray);
+                    metadata.setPartitionKey(new String(partitionKeyArray));
+                    break;
+                case 24:
+                    int payloadSize = readVarInt(buffer);
+                    metadata.setPayloadSize(payloadSize);
+                    break;
+                case 32:
+                    boolean compactedOut = readVarInt(buffer) == 1;
+                    metadata.setCompactedOut(compactedOut);
+                    break;
+                case 40:
+                    long eventTime = readVarInt64(buffer);
+                    metadata.setEventTime(eventTime);
+                    break;
+                case 48:
+                    boolean partitionKeyB64Encoded = readVarInt(buffer) == 1;
+                    metadata.setPartitionKeyB64Encoded(partitionKeyB64Encoded);
+                    break;
+                case 58:
+                    int _orderingKeyLen = readVarInt(buffer);
+                    byte[] orderingKeyArray = new byte[_orderingKeyLen];
+                    metadata.setOrderingKey(orderingKeyArray);
+                    break;
+                case 64:
+                    long sequenceId = readVarInt64(buffer);
+                    metadata.setSequenceId(sequenceId);
+                    break;
+                case 72:
+                    boolean nullValue = readVarInt(buffer) == 1;
+                    metadata.setNullValue(nullValue);
+                    break;
+                case 80:
+                    boolean nullPartitionKey = readVarInt(buffer) == 1;
+                    metadata.setNullPartitionKey(nullPartitionKey);
+                    break;
+                default:
+                    skipUnknownField(tag, buffer);
+            }
+        }
     }
 
     /**
-     * Get pulsar cluster service url.
+     * Copy from readVarInt method of 
org.apache.pulsar.common.api.proto.LightProtoCodec class.
+     *
+     * @param buf
+     * @return
      */
-    public static String getServiceUrl(PulsarAdmin pulsarAdmin, String 
pulsarCluster) throws PulsarAdminException {
-        return 
pulsarAdmin.clusters().getCluster(pulsarCluster).getServiceUrl();
+    private static int readVarInt(ByteBuffer buf) {
+        byte tmp = buf.get();
+        if (tmp >= 0) {
+            return tmp;
+        } else {
+            int result = tmp & 127;

Review Comment:
   These codes are copied from readVarInt method of 
org.apache.pulsar.common.api.proto.LightProtoCodec class,I think it's best to 
be consistent with the original codes.



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java:
##########
@@ -39,54 +66,921 @@ public class PulsarUtils {
     private PulsarUtils() {
     }
 
+    private static final DateTimeFormatter DATE_FORMAT = 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
+            ZoneId.systemDefault());
+
+    public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
+    public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
+    public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
+    public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
+    public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
+    public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
+
+    private static final Gson GSON = new GsonBuilder().create(); // thread safe
+
+    /**
+     * Get http headers by token.
+     *
+     * @param token pulsar token info
+     * @return add http headers for token info
+     */
+    private static HttpHeaders getHttpHeaders(String token) {
+        HttpHeaders headers = new HttpHeaders();
+        if (StringUtils.isNotEmpty(token)) {
+            headers.add("Authorization", "Bearer " + token);
+        }
+        return headers;
+    }
+
+    /**
+     * Get pulsar cluster info list.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @return list of pulsar cluster infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getClusters(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo)
+            throws Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_CLUSTERS_PATH;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
     /**
-     * Get pulsar admin info
+     * Get the list of active brokers.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @return list of pulsar broker infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getBrokers(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo)
+            throws Exception {
+        List<String> clusters = getClusters(restTemplate, clusterInfo);
+        List<String> brokers = new ArrayList<>();
+        for (String brokerName : clusters) {
+            String url = clusterInfo.getAdminUrl() + QUERY_BROKERS_PATH + "/" 
+ brokerName;
+            brokers.addAll(
+                    HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                            ArrayList.class));
+        }
+        return brokers;
+    }
+
+    /**
+     * Get pulsar tenant info list.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @return list of pulsar tenant infos
+     * @throws Exception any exception if occurred
      */
-    public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarCluster) 
throws PulsarClientException {
-        Preconditions.expectNotBlank(pulsarCluster.getAdminUrl(), 
ErrorCodeEnum.INVALID_PARAMETER,
-                "Pulsar adminUrl cannot be empty");
-        PulsarAdmin pulsarAdmin;
-        if (StringUtils.isEmpty(pulsarCluster.getToken())) {
-            pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl());
+    public static List<String> getTenants(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo)
+            throws Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Get pulsar namespace info list.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @return list of pulsar namespace infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getNamespaces(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String tenant) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + "/" + 
tenant;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Create a new pulsar tenant.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @param tenantInfo  pulsar tenant info
+     * @throws Exception any exception if occurred
+     */
+    public static void createTenant(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String tenant,
+            PulsarTenantInfo tenantInfo) throws Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH + 
"/" + tenant;
+        HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+        MediaType type = MediaType.parseMediaType("application/json; 
charset=UTF-8");
+        headers.setContentType(type);
+        headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+        String param = GSON.toJson(tenantInfo);
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+    }
+
+    /**
+     * Creates a new pulsar namespace with the specified policies.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar namespace name
+     * @param namespaceName pulsar namespace name
+     * @param policies pulsar namespace policies info
+     * @throws Exception any exception if occurred
+     */
+    public static void createNamespace(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String tenant,
+            String namespaceName, PulsarNamespacePolicies policies) throws 
Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + 
InlongConstants.SLASH + tenant
+                + InlongConstants.SLASH + namespaceName;
+        HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+        MediaType type = MediaType.parseMediaType("application/json; 
charset=UTF-8");
+        headers.setContentType(type);
+        headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+        String param = GSON.toJson(policies);
+        param = param.replaceAll("messageTtlInSeconds", 
"message_ttl_in_seconds")
+                .replaceAll("retentionPolicies", "retention_policies");
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+    }
+
+    /**
+     * Get the list of topics under a namespace.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @param namespace pulsar namespace name
+     * @return list of pulsar topic infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getTopics(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String tenant,
+            String namespace) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
tenant + "/" + namespace;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Get the list of partitioned topics under a namespace.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @param namespace pulsar namespace name
+     * @return list of pulsar partitioned topic infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getPartitionedTopics(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String tenant, String namespace) throws Exception {
+        String url =
+                clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
tenant + "/" + namespace + "/partitioned";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Create a non-partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void createNonPartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath;
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, null, 
getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Create a partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void createPartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath, Integer numPartitions) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, 
numPartitions.toString(),
+                getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Get the stats-internal for the partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return pulsar internal stat info of partitioned topic
+     * @throws Exception any exception if occurred
+     */
+    public static JsonObject getInternalStatsPartitionedTopics(RestTemplate 
restTemplate,
+            PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitioned-internalStats";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                JsonObject.class);
+    }
+
+    /**
+     * Get partitioned topic metadata.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return pulsar topic metadata info
+     * @throws Exception any exception if occurred
+     */
+    public static PulsarTopicMetadata getPartitionedTopicMetadata(RestTemplate 
restTemplate,
+            PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                PulsarTopicMetadata.class);
+    }
+
+    /**
+     * Delete a topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void deleteNonPartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath;
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, 
getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Force delete a topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void forceDeleteNonPartitionedTopic(RestTemplate 
restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath;
+        Map<String, Boolean> uriVariables = new HashMap<>();
+        uriVariables.put("force", true);
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, 
getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Delete a partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void deletePartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, 
getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Force delete a partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void forceDeletePartitionedTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/partitions";
+        Map<String, Boolean> uriVariables = new HashMap<>();
+        uriVariables.put("force", true);
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, 
getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Delete a partitioned or non-partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @param isPartitioned pulsar is partitioned topic
+     * @throws Exception any exception if occurred
+     */
+    public static void deleteTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String topicPath,
+            boolean isPartitioned) throws Exception {
+        if (isPartitioned) {
+            deletePartitionedTopic(restTemplate, clusterInfo, topicPath);
         } else {
-            pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl(), 
pulsarCluster.getToken());
+            deleteNonPartitionedTopic(restTemplate, clusterInfo, topicPath);
         }
-        return pulsarAdmin;
     }
 
     /**
-     * Get the pulsar admin from the given service URL.
+     * Force delete a partitioned or non-partitioned topic.
      *
-     * @apiNote It must be closed after use.
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @param isPartitioned pulsar is partitioned topic
+     * @throws Exception any exception if occurred
      */
-    public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl) throws 
PulsarClientException {
-        return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
+    public static void forceDeleteTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String topicPath,
+            boolean isPartitioned)
+            throws Exception {
+        if (isPartitioned) {
+            forceDeletePartitionedTopic(restTemplate, clusterInfo, topicPath);
+        } else {
+            forceDeleteNonPartitionedTopic(restTemplate, clusterInfo, 
topicPath);
+        }
     }
 
     /**
-     * Get the pulsar admin from the given service URL and token.
-     * <p/>
-     * Currently only token is supported as an authentication type.
+     * lookup persistent topic info.
      *
-     * @apiNote It must be closed after use.
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return pulsar broker url
+     * @throws Exception any exception if occurred
      */
-    public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String 
token) throws PulsarClientException {
-        return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
-                .authentication(AuthenticationFactory.token(token)).build();
+    public static String lookupTopic(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String topicPath)
+            throws Exception {
+        String url = clusterInfo.getAdminUrl() + LOOKUP_TOPIC_PATH + 
"/persistent/" + topicPath;
+        PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, url, 
HttpMethod.GET, null,
+                getHttpHeaders(clusterInfo.getToken()), 
PulsarLookupTopicInfo.class);
+        return topicInfo.getBrokerUrl();
     }
 
     /**
-     * Get pulsar cluster info list.
+     * lookup persistent partitioned topic info.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return map of partitioned topic info
+     * @throws Exception any exception if occurred
+     */
+    public static Map<String, String> lookupPartitionedTopic(RestTemplate 
restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        PulsarTopicMetadata metadata = 
getPartitionedTopicMetadata(restTemplate, clusterInfo, topicPath);
+        Map<String, String> map = new LinkedHashMap<>();
+        for (int i = 0; i < metadata.getPartitions(); i++) {
+            String partitionTopicName = topicPath + "-partition-" + i;
+            String partitionUrl = clusterInfo.getAdminUrl() + 
LOOKUP_TOPIC_PATH + "/persistent/" + partitionTopicName;
+            PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, 
partitionUrl, HttpMethod.GET, null,
+                    getHttpHeaders(clusterInfo.getToken()), 
PulsarLookupTopicInfo.class);
+            map.put(partitionTopicName, topicInfo.getBrokerUrl());
+        }
+        return map;
+    }
+
+    /**
+     * Get the list of persistent subscriptions for a given topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return list of pulsar topic subscription info
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getSubscriptions(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/subscriptions";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, 
getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Create a subscription on the topic.
+     *
+     * @param restTemplate  spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @param subscription pulsar topic subscription info
+     * @throws Exception any exception if occurred
+     */
+    public static void createSubscription(RestTemplate restTemplate, 
PulsarClusterInfo clusterInfo, String topicPath,
+            String subscription) throws Exception {
+        String url =
+                clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + 
topicPath + "/subscription/" + subscription;
+        JsonObject jsonObject = new JsonObject();
+        jsonObject.addProperty("entryId", Long.MAX_VALUE);
+        jsonObject.addProperty("ledgerId", Long.MAX_VALUE);
+        jsonObject.addProperty("partitionIndex", -1);
+        HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+        MediaType type = MediaType.parseMediaType("application/json; 
charset=UTF-8");
+        headers.setContentType(type);
+        headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, 
jsonObject.toString(), headers);
+    }
+
+    /**
+     * Examine a specific message on a topic by position relative to the 
earliest or the latest message.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPartition  pulsar topic partition info
+     * @param messageType pulsar message type info
+     * @param messagePosition pulsar message position info
+     * @return spring framework HttpEntity
+     * @throws Exception any exception if occurred
+     */
+    public static ResponseEntity<byte[]> examineMessage(RestTemplate 
restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPartition, String messageType, int messagePosition) 
throws Exception {
+        StringBuilder urlBuilder = new 
StringBuilder().append(clusterInfo.getAdminUrl())
+                .append(QUERY_PERSISTENT_PATH)
+                .append("/")
+                .append(topicPartition)
+                .append("/examinemessage")
+                .append("?initialPosition=")
+                .append(messageType)
+                .append("&messagePosition=")
+                .append(messagePosition);
+        ResponseEntity<byte[]> response = 
restTemplate.exchange(urlBuilder.toString(), HttpMethod.GET,
+                new HttpEntity<>(getHttpHeaders(clusterInfo.getToken())), 
byte[].class);
+        if (!response.getStatusCode().is2xxSuccessful()) {
+            log.error("request error for {}, status code {}, body {}", 
urlBuilder.toString(), response.getStatusCode(),
+                    response.getBody());
+        }
+        return response;
+    }
+
+    public static PulsarMessageInfo 
getMessageFromHttpResponse(ResponseEntity<byte[]> response, String topic)
+            throws Exception {
+        List<PulsarMessageInfo> messages = 
PulsarUtils.getMessagesFromHttpResponse(response, topic);
+        if (messages.size() > 0) {
+            return messages.get(0);
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     *  Copy from getMessagesFromHttpResponse method of 
org.apache.pulsar.client.admin.internal.TopicsImpl class.
+     *
+     * @param response
+     * @param topic
+     * @return
+     * @throws Exception
      */
-    public static List<String> getPulsarClusters(PulsarAdmin pulsarAdmin) 
throws PulsarAdminException {
-        return pulsarAdmin.clusters().getClusters();
+    public static List<PulsarMessageInfo> 
getMessagesFromHttpResponse(ResponseEntity<byte[]> response, String topic)
+            throws Exception {
+        HttpHeaders headers = response.getHeaders();
+        String msgId = headers.getFirst("X-Pulsar-Message-ID");
+        String brokerEntryTimestamp = 
headers.getFirst("X-Pulsar-Broker-Entry-METADATA-timestamp");
+        String brokerEntryIndex = 
headers.getFirst("X-Pulsar-Broker-Entry-METADATA-index");
+        PulsarBrokerEntryMetadata brokerEntryMetadata;
+        if (brokerEntryTimestamp == null && brokerEntryIndex == null) {
+            brokerEntryMetadata = null;
+        } else {
+            brokerEntryMetadata = new PulsarBrokerEntryMetadata();
+            if (brokerEntryTimestamp != null) {
+                
brokerEntryMetadata.setBrokerTimestamp(parse(brokerEntryTimestamp.toString()));
+            }
+            if (brokerEntryIndex != null) {
+                brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex));
+            }
+        }
+
+        PulsarMessageMetadata messageMetadata = new PulsarMessageMetadata();
+        Map<String, String> properties = Maps.newTreeMap();
+
+        Object tmp = headers.getFirst("X-Pulsar-publish-time");
+        if (tmp != null) {
+            messageMetadata.setPublishTime(parse(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-event-time");
+        if (tmp != null) {
+            messageMetadata.setEventTime(parse(tmp.toString()));
+        }
+        tmp = headers.getFirst("X-Pulsar-deliver-at-time");
+        if (tmp != null) {
+            messageMetadata.setDeliverAtTime(parse(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-null-value");
+        if (tmp != null) {
+            messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-producer-name");
+        if (tmp != null) {
+            messageMetadata.setProducerName(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-sequence-id");
+        if (tmp != null) {
+            messageMetadata.setSequenceId(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-replicated-from");
+        if (tmp != null) {
+            messageMetadata.setReplicatedFrom(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-partition-key");
+        if (tmp != null) {
+            messageMetadata.setPartitionKey(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-compression");
+        if (tmp != null) {
+            messageMetadata.setCompression(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-uncompressed-size");
+        if (tmp != null) {
+            
messageMetadata.setUncompressedSize(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-encryption-algo");
+        if (tmp != null) {
+            messageMetadata.setEncryptionAlgo(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-partition-key-b64-encoded");
+        if (tmp != null) {
+            
messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-marker-type");
+        if (tmp != null) {
+            messageMetadata.setMarkerType(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-txnid-least-bits");
+        if (tmp != null) {
+            messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-txnid-most-bits");
+        if (tmp != null) {
+            messageMetadata.setTxnidMostBits(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-highest-sequence-id");
+        if (tmp != null) {
+            
messageMetadata.setHighestSequenceId(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-uuid");
+        if (tmp != null) {
+            messageMetadata.setUuid(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-num-chunks-from-msg");
+        if (tmp != null) {
+            
messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-total-chunk-msg-size");
+        if (tmp != null) {
+            
messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-chunk-id");
+        if (tmp != null) {
+            messageMetadata.setChunkId(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-null-partition-key");
+        if (tmp != null) {
+            
messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
+        if (tmp != null) {
+            
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-ordering-key");
+        if (tmp != null) {
+            
messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-schema-version-b64encoded");
+        if (tmp != null) {
+            
messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
+        if (tmp != null) {
+            
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        List<String> tmpList = (List) headers.get("X-Pulsar-replicated-to");
+        if (ObjectUtils.isNotEmpty(tmpList)) {
+            if (ObjectUtils.isEmpty(messageMetadata.getReplicateTos())) {
+                messageMetadata.setReplicateTos(Lists.newArrayList(tmpList));
+            } else {
+                messageMetadata.getReplicateTos().addAll(tmpList);
+            }
+        }
+
+        tmp = headers.getFirst("X-Pulsar-batch-size");
+        if (tmp != null) {
+            properties.put("X-Pulsar-batch-size", (String) tmp);
+        }
+
+        for (Entry<String, List<String>> entry : headers.entrySet()) {
+            if (entry.getKey().contains("X-Pulsar-PROPERTY-")) {
+                String keyName = 
entry.getKey().substring("X-Pulsar-PROPERTY-".length());
+                properties.put(keyName, (String) ((List) 
entry.getValue()).get(0));
+            }
+        }
+
+        tmp = headers.getFirst("X-Pulsar-num-batch-message");
+        if (tmp != null) {
+            properties.put("X-Pulsar-num-batch-message", (String) tmp);
+        }
+        boolean isEncrypted = false;
+        tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
+        if (tmp != null) {
+            isEncrypted = Boolean.parseBoolean(tmp.toString());
+        }
+
+        if (!isEncrypted && headers.get("X-Pulsar-num-batch-message") != null) 
{
+            return getIndividualMsgsFromBatch(topic, msgId, 
response.getBody(), properties, messageMetadata,
+                    brokerEntryMetadata);
+        }
+
+        PulsarMessageInfo messageInfo = new PulsarMessageInfo();
+        messageInfo.setTopic(topic);
+        messageInfo.setMessageId(msgId);
+        messageInfo.setProperties(messageMetadata.getProperties());
+        messageInfo.setBody(response.getBody());
+        messageInfo.setPulsarMessageMetadata(messageMetadata);
+        if (brokerEntryMetadata != null) {
+            messageInfo.setPulsarBrokerEntryMetadata(brokerEntryMetadata);
+        }
+        return Collections.singletonList(messageInfo);
+    }
+
+    private static long parse(String datetime) throws DateTimeParseException {
+        Instant instant = Instant.from(DATE_FORMAT.parse(datetime));
+        return instant.toEpochMilli();
+    }
+
+    /**
+     * Copy from getIndividualMsgsFromBatch method of 
org.apache.pulsar.client.admin.internal.TopicsImpl class.
+     *
+     * @param topic
+     * @param msgId
+     * @param data
+     * @param properties
+     * @param metadata
+     * @param brokerMetadata
+     * @return
+     */
+    private static List<PulsarMessageInfo> getIndividualMsgsFromBatch(String 
topic, String msgId, byte[] data,
+            Map<String, String> properties, PulsarMessageMetadata metadata, 
PulsarBrokerEntryMetadata brokerMetadata) {
+        List<PulsarMessageInfo> ret = new ArrayList<>();
+        int batchSize = 
Integer.parseInt(properties.get("X-Pulsar-num-batch-message"));
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        for (int i = 0; i < batchSize; ++i) {
+            String batchMsgId = msgId + ":" + i;
+            PulsarMessageMetadata singleMetadata = new PulsarMessageMetadata();
+            singleMetadata.setProperties(properties);
+            ByteBuffer singleMessagePayload = 
deSerializeSingleMessageInBatch(buffer, singleMetadata, i, batchSize);
+            PulsarMessageInfo messageInfo = new PulsarMessageInfo();
+            messageInfo.setTopic(topic);
+            messageInfo.setMessageId(batchMsgId);
+            messageInfo.setProperties(singleMetadata.getProperties());
+            messageInfo.setPulsarMessageMetadata(metadata);
+            messageInfo.setBody(singleMessagePayload.array());
+            if (brokerMetadata != null) {
+                messageInfo.setPulsarBrokerEntryMetadata(brokerMetadata);
+            }
+            ret.add(messageInfo);
+        }
+        buffer.clear();
+        return ret;
+    }
+
+    /**
+     * Copy from deSerializeSingleMessageInBatch method of 
org.apache.pulsar.common.protocol.Commands class.
+     *
+     * @param uncompressedPayload
+     * @param metadata
+     * @param index
+     * @param batchSize
+     * @return
+     */
+    private static ByteBuffer deSerializeSingleMessageInBatch(ByteBuffer 
uncompressedPayload,
+            PulsarMessageMetadata metadata, int index, int batchSize) {
+        int singleMetaSize = (int) uncompressedPayload.getInt();
+        metaDataParseFrom(metadata, uncompressedPayload, singleMetaSize);
+        int singleMessagePayloadSize = metadata.getPayloadSize();
+        int readerIndex = uncompressedPayload.position();
+        byte[] singleMessagePayload = new byte[singleMessagePayloadSize];
+        uncompressedPayload.get(singleMessagePayload);
+        if (index < batchSize) {
+            uncompressedPayload.position(readerIndex + 
singleMessagePayloadSize);
+        }
+        return ByteBuffer.wrap(singleMessagePayload);
+    }
+
+    /**
+     * Copy from parseFrom method of 
org.apache.pulsar.common.api.proto.SingleMessageMetadata class.
+     *
+     * @param metadata
+     * @param buffer
+     * @param size
+     */
+    private static void metaDataParseFrom(PulsarMessageMetadata metadata, 
ByteBuffer buffer, int size) {
+        int endIdx = size + buffer.position();
+        while (buffer.position() < endIdx) {
+            int tag = readVarInt(buffer);
+            switch (tag) {
+                case 10:
+                    int _propertiesSize = readVarInt(buffer);
+                    parseFrom(metadata, buffer, _propertiesSize);
+                    break;
+                case 18:
+                    int _partitionKeyBufferLen = readVarInt(buffer);
+                    byte[] partitionKeyArray = new 
byte[_partitionKeyBufferLen];
+                    buffer.get(partitionKeyArray);
+                    metadata.setPartitionKey(new String(partitionKeyArray));
+                    break;
+                case 24:
+                    int payloadSize = readVarInt(buffer);
+                    metadata.setPayloadSize(payloadSize);
+                    break;
+                case 32:
+                    boolean compactedOut = readVarInt(buffer) == 1;
+                    metadata.setCompactedOut(compactedOut);
+                    break;
+                case 40:
+                    long eventTime = readVarInt64(buffer);
+                    metadata.setEventTime(eventTime);
+                    break;
+                case 48:
+                    boolean partitionKeyB64Encoded = readVarInt(buffer) == 1;
+                    metadata.setPartitionKeyB64Encoded(partitionKeyB64Encoded);
+                    break;
+                case 58:
+                    int _orderingKeyLen = readVarInt(buffer);
+                    byte[] orderingKeyArray = new byte[_orderingKeyLen];
+                    metadata.setOrderingKey(orderingKeyArray);
+                    break;
+                case 64:
+                    long sequenceId = readVarInt64(buffer);
+                    metadata.setSequenceId(sequenceId);
+                    break;
+                case 72:
+                    boolean nullValue = readVarInt(buffer) == 1;
+                    metadata.setNullValue(nullValue);
+                    break;
+                case 80:
+                    boolean nullPartitionKey = readVarInt(buffer) == 1;
+                    metadata.setNullPartitionKey(nullPartitionKey);
+                    break;
+                default:
+                    skipUnknownField(tag, buffer);
+            }
+        }
     }
 
     /**
-     * Get pulsar cluster service url.
+     * Copy from readVarInt method of 
org.apache.pulsar.common.api.proto.LightProtoCodec class.
+     *
+     * @param buf
+     * @return
      */
-    public static String getServiceUrl(PulsarAdmin pulsarAdmin, String 
pulsarCluster) throws PulsarAdminException {
-        return 
pulsarAdmin.clusters().getCluster(pulsarCluster).getServiceUrl();
+    private static int readVarInt(ByteBuffer buf) {
+        byte tmp = buf.get();
+        if (tmp >= 0) {
+            return tmp;
+        } else {
+            int result = tmp & 127;

Review Comment:
   These codes are copied from readVarInt method of 
org.apache.pulsar.common.api.proto.LightProtoCodec class,I think it's best to 
be consistent with the original codes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to