xuesongxs opened a new issue #12863:
URL: https://github.com/apache/pulsar/issues/12863


   **Describe the bug**
   pulsar v2.8.1
   
   Use the producer asynchronous sending interface to write data to the topic. 
   In broker.conf, acknowledsegmentatbatchindexlevelenabled = true. When 
creating a consumer, set enablebackindexacknowledgesegment (true). 
   After the consumer has been running for a period of time, it can‘t receive 
message, but there is actually a lot of backlog in subscription.
   
   
![image](https://user-images.githubusercontent.com/54351417/142333703-44745c5d-4f18-47c7-94a9-de2c7fdd1252.png)
   
![image](https://user-images.githubusercontent.com/54351417/142333829-4cc85a77-1530-4da4-9907-95d7377749e8.png)
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1.producer code:
   ```
              Producer<NlMessage> producer = 
client.newProducer(Schema.JSON(NlMessage.class))
                       .topic(topic)
                       .producerName(producerName + "-" + 
UUID.randomUUID().toString())
                       .compressionType(CompressionType.ZSTD)
                       .blockIfQueueFull(true)
                       .sendTimeout(0, TimeUnit.SECONDS)
                       .create();
   ```
   Consumer used retry and dead letter topic.
   
   consumer code:
   ```
           ConsumerBuilder<NlMessage> consumerBuilder = 
consumerBuilder.topic(consumerParam.getTopic())
                   .subscriptionName(consumerParam.getSubscription())
                   .subscriptionType(subscriptionType)
                   .enableRetry(consumerParam.isEnableRetry())
                   .enableBatchIndexAcknowledgment(true)
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
           // DLQ
           
consumerBuilder.deadLetterPolicy(consumerParam.getDeadLetterPolicy());
           // 需要在死信策略设置以后 再设置ackTimeout (如果ackTimeout == 0)
           consumerBuilder.ackTimeout(0, TimeUnit.SECONDS);
   
           Consumer<NlMessage> consumer = consumerBuilder.subscribe();
           while (true) {
                Message<NlMessage> message = consumer.receive();
                // doBiz
                int ret = doBiz(message);
                if (ret == 0) {
                    consumer.acknowledge(message);
                } else if(ret == 1) {
                    consumer.negativeAcknowledge(message);
                } else if (ret == 2) {
                    consumer.reconsumeLater(msg, 10, TimeUnit.SECONDS);
                }
           }
   
   ```
   
   2. Start producer and consumer
   3. After running for a period of time, the consumer can't receive the 
message. The consumer has done the negative knowledge operation.
   4. No error in broker.log
   5. jstack consumer pid
   
   **Describe the bug**
   pulsar v2.8.1
   
   Use the producer asynchronous sending interface to write data to the topic. 
   In broker.conf, acknowledsegmentatbatchindexlevelenabled = true. When 
creating a consumer, set enablebackindexacknowledgesegment (true). 
   After the consumer has been running for a period of time, it can‘t receive 
message, but there is actually a lot of backlog in subscription.
   
   
![image](https://user-images.githubusercontent.com/54351417/142333703-44745c5d-4f18-47c7-94a9-de2c7fdd1252.png)
   
![image](https://user-images.githubusercontent.com/54351417/142333829-4cc85a77-1530-4da4-9907-95d7377749e8.png)
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1.producer code:
   ```
              Producer<NlMessage> producer = 
client.newProducer(Schema.JSON(NlMessage.class))
                       .topic(topic)
                       .producerName(producerName + "-" + 
UUID.randomUUID().toString())
                       .compressionType(CompressionType.ZSTD)
                       .blockIfQueueFull(true)
                       .sendTimeout(0, TimeUnit.SECONDS)
                       .create();
   ```
   Consumer used retry and dead letter topic.
   
   consumer code:
   ```
           ConsumerBuilder<NlMessage> consumerBuilder = 
consumerBuilder.topic(consumerParam.getTopic())
                   .subscriptionName(consumerParam.getSubscription())
                   .subscriptionType(subscriptionType)
                   .enableRetry(consumerParam.isEnableRetry())
                   .enableBatchIndexAcknowledgment(true)
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
           // DLQ
           
consumerBuilder.deadLetterPolicy(consumerParam.getDeadLetterPolicy());
           // 需要在死信策略设置以后 再设置ackTimeout (如果ackTimeout == 0)
           consumerBuilder.ackTimeout(0, TimeUnit.SECONDS);
   
           Consumer<NlMessage> consumer = consumerBuilder.subscribe();
           while (true) {
                Message<NlMessage> message = consumer.receive();
                // doBiz
                int ret = doBiz(message);
                if (ret == 0) {
                    consumer.acknowledge(message);
                } else if(ret == 1) {
                    consumer.negativeAcknowledge(message);
                } else if (ret == 2) {
                    consumer.reconsumeLater(msg, 10, TimeUnit.SECONDS);
                }
           }
   
   ```
   
   2. Start producer and consumer
   3. After running for a period of time, the consumer can't receive the 
message. The consumer has done the negative knowledge operation.
   4. No error in broker.log
   5. jstack consumer pid
   
![image](https://user-images.githubusercontent.com/54351417/142336082-e447ee44-2f1c-4b37-878f-4bd463b74e7e.png)
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to