hangc0276 opened a new issue #11211:
URL: https://github.com/apache/pulsar/issues/11211


   **Describe the bug**
   When produce message to topic and trigger topic compaction, and then the 
original message expired by retention policy, the message which compacted into 
the compact ledger will also be lost. 
   
   The following test will show this case.
   
   ```Java
   @Test(timeOut = 30000)
   public void testCompactWithMessageTimeout() throws Exception {
       final String topic = 
"persistent://my-property/use/my-ns/testCompactWithMessageTimeout-" + 
UUID.randomUUID();
       admin.topics().createPartitionedTopic(topic, 1);
   
       Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
       // send 10 messages
       for (int i = 0; i < 10; ++i) {
           
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
       }
   
       // trigger topic compaction
       admin.topics().triggerCompaction(topic);
       boolean succeed = retryStrategically((test) -> {
           try {
               return 
LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
           } catch (PulsarAdminException e) {
               return false;
           }
       }, 10, 200);
       Assert.assertTrue(succeed);
   
       // unload topic to trigger ledger roll over
       admin.topics().unload(topic);
   
       // change ledger retention time and trim expired ledger to ensure the 
compacted message will be expired
       PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topic).get();
       ManagedLedgerConfig managedLedgerConfig = 
persistentTopic.getManagedLedger().getConfig();
       managedLedgerConfig.setRetentionTime(1, TimeUnit.MILLISECONDS);
       
persistentTopic.getManagedLedger().trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
   
       // send another 10 messages
       for (int i = 10; i < 20; ++i) {
           
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
       }
   
       // trigger topic compaction
       admin.topics().triggerCompaction(topic);
       succeed = retryStrategically((test) -> {
           try {
               return 
LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
           } catch (PulsarAdminException e) {
               return false;
           }
       }, 10, 200);
       Assert.assertTrue(succeed);
   
       Reader<String> reader = pulsarClient.newReader(Schema.STRING)
               .topic(topic)
               .subscriptionName("test")
               .readCompacted(true)
               .startMessageId(MessageId.earliest)
               .create();
   
       // check message in compacted topic, it should contains the whole 
messages
       for (int i = 0; i < 20; ++i) {
           Message<String> msg = reader.readNext();
           Assert.assertEquals(msg.getKey(), String.valueOf(i));
           Assert.assertEquals(msg.getValue(), String.valueOf(i));
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to