sigikjohn-vargheese commented on issue #19970:
URL: https://github.com/apache/pulsar/issues/19970#issuecomment-1497537294

   @tisonkun Here is pared down snippet. I have a service which connects to 
Pulsar admin API to create Partitioned topics and then get 
PartitionedTopicStats. Details from PartitionedTopicStats are then mapped to my 
POJO. I have a condition to check whether subscription type is empty. From 
PartitionedTopicStats it always comes back empty because that field is missing 
in the Admin API response. If I replace PartitionedTopicStats with TopicStats, 
I can see that the type is not empty. 
   
   Please note that this behavior can be verified using CLI as well
   pulsar-admin topics partitioned-stats <topicName> --> missing subscription 
type
   pulsar-admin topics stats <partitionedTopicName> --> has subscription type
   
   // Interface to connect to pulsar admin Java client
   public interface PulsarAdminService {
       PulsarAdmin getPulsarAdmin();
       Topics getTopics();
       CustomTopic createPartitionedTopic(String namespace, String topicPath, 
String createdBy, String description, int numPartitions);
   }
   
   // Service to connect to pulsar admin Java client
   @Service
   public class PulsarAdminServiceImpl implements PulsarAdminService {
   
        private PulsarAdmin pulsarAdmin;
   
        @Override
        public Topics getTopics() {
               return getPulsarAdmin().topics();
        }
   
                @Override
        public CustomTopic createPartitionedTopic(String namespace, String 
topicPath, String createdBy, String description, int numPartitions) {
               Map<String, String> properties = new HashMap<>();
               properties.put(CREATED_ON, new 
SimpleDateFormat(DATE_FORMAT).format(new Date()));
               properties.put(DESCRIPTION, description);
               properties.put(CREATED_BY, createdBy);
            try {
                   getTopics().createPartitionedTopic(topicPath, numPartitions, 
properties);
   
                   PartitionedTopicStats pts = 
getTopics().getPartitionedStats(topicPath, false);
                return buildTopic(namespace, pts); 
            } catch (PulsarAdminException.ConflictException e) {
                   LOG.warn("Topic already exists", e);
               } 
                }
   
        @Override
        public PulsarAdmin getPulsarAdmin() {
               if(pulsarAdmin == null){
                pulsarAdmin = createPulsarAdmin();
               }
               return pulsarAdmin;
           }
   
        private PulsarAdmin createPulsarAdmin() {
               try {
                PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder();
                pulsarAdminBuilder.serviceHttpUrl("http://localhost:8080";);
                
pulsarAdminBuilder.authentication(AuthenticationFactory.token("real JWT goes 
here"));
                pulsarAdminBuilder.allowTlsInsecureConnection(false);
                pulsarAdminBuilder.tlsTrustCertsFilePath("some path");
                pulsarAdminBuilder.enableTlsHostnameVerification(false);
                return pulsarAdminBuilder.build();
               } catch (RuntimeException | PulsarClientException e) {
                LOG.error("Failed to create Pulsar Admin instance., e);
               }
        }
   
        private CustomTopic buildTopic(String namespace, PartitionedTopicStats 
pts) {
            CustomTopic topic = new CustomTopic();
            Map<String, String> topicProps = 
partitionedTopicStats.getMetadata().properties;
            topic.setName(topicName);
            topic.setNamespace(namespace);
               
topic.setPartitions(partitionedTopicStats.getMetadata().partitions);
               topic.setPersistent(true);
            topic.setDescription(topicProps.get("description"));
               topic.setCreatedBy(topicProps.get("createdBy"));
            List<CustomSubscription> subscriptionList = new ArrayList<>();
            for (Map.Entry<String, ? extends SubscriptionStats> entry : 
pts.getSubscriptions().entrySet()) {
                CustomSubscription subscription = new CustomSubscription();
                subscription.setName(entry.getKey());
                SubscriptionStats subscriptionStats = entry.getValue();
                if (null != subscriptionStats.getType() && 
!subscriptionStats.getType().trim().isEmpty()) { // DEFECT - type is missing
                    subscription.setType(subscriptionStats.getType()); 
                }
                   subscriptionList.add(subscription);
               }
            topic.setSubscriptions(subscriptionList);
            return topic;
        }
   }
   
   // Custom POJOs
   public class CustomTopic implements Serializable {
   
       public static final String PROP_NAME = "name";
       public static final String PROP_NAMESPACE = "namespace";
       public static final String PROP_PERSISTENT = "persistent";
       public static final String PROP_SUBSCRIPTIONS = "subscriptions";
       public static final String PROP_DESCRIPTION = "description";
       public static final String PROP_CREATED_BY = "createdBy";
       public static final String PROP_PARTITIONS = "partitions";
   
       @NotNull
       private String name;
       @NotNull
       private String namespace;
       @NotNull
       private Boolean persistent;
       private List<CustomSubscription> subscriptions;
       private String description;
       private String createdBy;
       @NotNull
       private Integer partitions;
   
       public CustomTopic() {
       }
   
       public CustomTopic(@NotNull String name, @NotNull String namespace, 
@NotNull Boolean persistent, @NotNull Integer partitions) {
           this.name = name;
           this.environment = environment;
           this.persistent = persistent;
           this.partitions = partitions;
       }
   
       @NotNull
       public String getName() {
           return this.name;
       }
   
       public void setName(@NotNull String name) {
           this.name = name;
       }
   
       @NotNull
       public String getNamespace() {
           return this.namespace;
       }
   
       public void setNamespace(@NotNull String namespace) {
           this.namespace = namespace;
       }
   
       @NotNull
       public Boolean isPersistent() {
           return this.persistent;
       }
   
       public void setPersistent(@NotNull Boolean persistent) {
           this.persistent = persistent;
       }
   
       public List<CustomSubscription> getSubscriptions() {
           return this.subscriptions;
       }
   
       public void setSubscriptions(List<CustomSubscription> subscriptions) {
           this.subscriptions = subscriptions;
   
       public String getDescription() {
           return this.description;
       }
   
       public void setDescription(String description) {
           this.description = description;
       }
   
       public String getCreatedBy() {
           return this.createdBy;
       }
   
       public void setCreatedBy(String createdBy) {
           this.createdBy = createdBy;
       }
   
       @NotNull
       public Integer getPartitions() {
           return this.partitions;
       }
   
       public void setPartitions(@NotNull Integer partitions) {
           this.partitions = partitions;
       }
   }
   
   public class CustomSubscription implements Serializable {
   
       public static final String PROP_NAME = "name";
       public static final String PROP_TYPE = "type";
   
       @NotNull
       private String name;
       private String type;
   
       public CustomSubscription() {
       }
   
       public CustomSubscription(@NotNull String name) {
           this.name = name;
           this.topic = topic;
       }
   
       @NotNull
       public String getName() {
           return this.name;
       }
   
       public void setName(@NotNull String name) {
           this.name = name;
       }
   
       public String getType() {
           return this.type;
       }
   
       public void setType(String type) {
           this.type = type;
       }
   }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to