eolivelli commented on a change in pull request #12141:
URL: https://github.com/apache/pulsar/pull/12141#discussion_r714237977
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
##########
@@ -170,8 +171,89 @@ public void testConsumedLedgersTrimNoSubscriptions()
throws Exception {
// lastMessageId should be available even in this case, but is must
// refer to -1
MessageId messageIdAfterTrim =
pulsar.getAdminClient().topics().getLastMessageId(topicName);
- LOG.info("lastmessageid " + messageIdAfterTrim);
+ log.info("admin lastmessageid {}", messageIdAfterTrim);
+ assertEquals(messageIdAfterTrim, MessageId.earliest);
+
+ messageIdAfterTrim = persistentTopic.getLastMessageId().get();
+ log.info("topic lastmessageid {}", messageIdAfterTrim);
assertEquals(messageIdAfterTrim, MessageId.earliest);
+ }
+
+ @Test
+ public void testTerminateAndRestart() throws Exception {
+ conf.setRetentionCheckIntervalInSeconds(10000);
+ conf.setBrokerDeleteInactiveTopicsEnabled(false);
+ super.baseSetup();
+ final String topicName =
"persistent://prop/ns-abc/testTerminateAndRestart";
+
+ // write some messages
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .producerName("producer-name")
+ .create();
+
+ // set retention parameters, the ledgers are to be deleted as soon as
possible
+ // but the topic is not to be automatically deleted
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ ManagedLedgerConfig managedLedgerConfig =
persistentTopic.getManagedLedger().getConfig();
+ managedLedgerConfig.setRetentionSizeInMB(-1);
+ managedLedgerConfig.setRetentionTime(100000, TimeUnit.SECONDS);
+ managedLedgerConfig.setMaxEntriesPerLedger(2);
+ managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+ MessageId initialMessageId = persistentTopic.getLastMessageId().get();
+ log.info("lastmessageid " + initialMessageId);
Review comment:
Nit: use parameters
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -770,13 +770,15 @@ private synchronized void
internalAsyncAddEntry(OpAddEntry addOperation) {
currentLedgerSize += addOperation.data.readableBytes();
if (log.isDebugEnabled()) {
- log.debug("[{}] Write into current ledger lh={} entries={}",
name, currentLedger.getId(),
+ log.debug("[{}] Write into current ledger lh={} entries={}",
name,
+ currentLedger == null ? null : currentLedger.getId(),
Review comment:
For these logging cases we could add a Long getCurrentLedgerId() method
that returns null is there is no currentLedger.
We will save some code duplication
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1602,7 +1605,7 @@ synchronized void createLedgerAfterClosed() {
@Override
public void rollCurrentLedgerIfFull() {
log.info("[{}] Start checking if current ledger is full", name);
- if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
+ if (currentLedger != null && currentLedgerEntries > 0 &&
currentLedgerIsFull()) {
Review comment:
Will it make sense to exit early from this method if there is no
'currentLedger' ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]