st0ckface opened a new issue #6552: Cant ack messages on exclusive 
subscriptions with partioned topic
URL: https://github.com/apache/pulsar/issues/6552
 
 
   **Describe the bug**
   When creating a consumer with an exclusive subscription, acked messages are 
not reflected in broker stats.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Create a consumer with an exclusive subscription to a partitioned topic
   2. Ack each message individually as they are received
   3. Notice that "unackedMessages" continually climbs in the `topic stats` via 
the admin client
   ```java
   static PulsarClient getJPulsarClient() throws Exception{
           return PulsarClient.builder()
                   .serviceUrl("pulsar://localhost:6650")
                   .connectionsPerBroker(20)
                   .ioThreads(8)
                   .listenerThreads(8)
                   .build();
   
       }
   
       static Consumer<byte[]> getJPulsarConsumer(PulsarClient client, String 
topic, String subscription) throws Exception {
         return  client.newConsumer(Schema.BYTES)
                   .subscriptionName(subscription)
                   .subscriptionType(SubscriptionType.Exclusive)
                   .topic(topic)
                   .consumerName("ackTestConsumer")
                 .subscribe();
       }
   
       public static void main(String... args) throws Exception{
           PulsarClient client = getJPulsarClient();
           String topic = "public/default/870|EXTERNID_NG|2020-03-17";
           String subscription = "ackTestSubscription"
           Consumer<byte[]> consumer = getJPulsarConsumer(client, topic, 
subscription);
   
           int maxMessages = 1000000;
           List<CompletableFuture<Object>> futures = new ArrayList<>();
           for(int i = 0; i < maxMessages; i ++){
               futures.add(consumer.receiveAsync().thenApplyAsync(m -> {
                   try {
                       consumer.acknowledge(m.getMessageId());
                   } catch (PulsarClientException e) {
                       e.printStackTrace();
                   }
                   return null;
               }));
           }
   
           CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()])).get();
       }
   ```
   
   **Expected behavior**
   The "unackedMessages" metric should _at most_ hover at or around the 
receiveQueueSize if not 0. The backlog should also be descreasing
   
   **Screenshots**
   If applicable, add screenshots to help explain your problem.
   ![Screen Shot 2020-03-18 at 5 52 35 
PM](https://user-images.githubusercontent.com/1275634/77017925-69f9f100-6941-11ea-84d9-0db2039cafdf.png)
   ![Screen Shot 2020-03-18 at 5 52 18 
PM](https://user-images.githubusercontent.com/1275634/77017931-6ebea500-6941-11ea-9457-0e4880ca8ad6.png)
   
![image](https://user-images.githubusercontent.com/1275634/77018096-de349480-6941-11ea-973f-8b7491ba7e5c.png)
   
   
   
   **Desktop (please complete the following information):**
    - OS: N/A
   
   **Additional context**
   If the subscription is changed to Shared, the topicStats produce the 
expected behavior (showing messages acked)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to