hangc0276 opened a new issue #11211:
URL: https://github.com/apache/pulsar/issues/11211
**Describe the bug**
When produce message to topic and trigger topic compaction, and then the
original message expired by retention policy, the message which compacted into
the compact ledger will also be lost.
The following test will show this case.
```Java
@Test(timeOut = 30000)
public void testCompactWithMessageTimeout() throws Exception {
final String topic =
"persistent://my-property/use/my-ns/testCompactWithMessageTimeout-" +
UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 1);
Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
// send 10 messages
for (int i = 0; i < 10; ++i) {
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
}
// trigger topic compaction
admin.topics().triggerCompaction(topic);
boolean succeed = retryStrategically((test) -> {
try {
return
LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
} catch (PulsarAdminException e) {
return false;
}
}, 10, 200);
Assert.assertTrue(succeed);
// unload topic to trigger ledger roll over
admin.topics().unload(topic);
// change ledger retention time and trim expired ledger to ensure the
compacted message will be expired
PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topic).get();
ManagedLedgerConfig managedLedgerConfig =
persistentTopic.getManagedLedger().getConfig();
managedLedgerConfig.setRetentionTime(1, TimeUnit.MILLISECONDS);
persistentTopic.getManagedLedger().trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
// send another 10 messages
for (int i = 10; i < 20; ++i) {
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
}
// trigger topic compaction
admin.topics().triggerCompaction(topic);
succeed = retryStrategically((test) -> {
try {
return
LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
} catch (PulsarAdminException e) {
return false;
}
}, 10, 200);
Assert.assertTrue(succeed);
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.subscriptionName("test")
.readCompacted(true)
.startMessageId(MessageId.earliest)
.create();
// check message in compacted topic, it should contains the whole
messages
for (int i = 0; i < 20; ++i) {
Message<String> msg = reader.readNext();
Assert.assertEquals(msg.getKey(), String.valueOf(i));
Assert.assertEquals(msg.getValue(), String.valueOf(i));
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]