codelipenghui commented on code in PR #22034:
URL: https://github.com/apache/pulsar/pull/22034#discussion_r1493743281
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2667,7 +2664,8 @@ void internalTrimLedgers(boolean isTruncate,
CompletableFuture<?> promise) {
ledgers.headMap(slowestReaderLedgerId,
false).values().iterator();
while (ledgerInfoIterator.hasNext()){
LedgerInfo ls = ledgerInfoIterator.next();
- // currentLedger can not be deleted
+ // If the current ledger is closed, the new ledger will be
created later and this current ledger will
+ // be deleted next time.
Review Comment:
I don't think we need to change the description here.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1754,10 +1754,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
- if (!pendingAddEntries.isEmpty()) {
- // Need to create a new ledger to write pending entries
- createLedgerAfterClosed();
- }
+ createLedgerAfterClosed();
Review Comment:
We'd better also remove the other places that call `createLedgerAfterClosed`
directly from the ManagedLedgerImpl.java
- rollCurrentLedgerIfFull
- checkInactiveLedgerAndRollOver
The solution introduced a behavior change in the broker. But it looks make
sense. All the other places (managed ledger initialization, the task for
checking the maximum rollover time ...) are proactively create a new ledger if
the current ledger is closed except the ledger rollover by the entry size and
entry count. With this solution we can get a better publish latency for the
newly published message to a new Ledger (It requires to create a Ledger first
and write the message to the Ledger).
If we want to move to lazy ledger creation to reduce the ledgers count. We
also need to apply the lazy creation to the managed ledger initialization and
other places. It could be different case with this fix.
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -4232,4 +4232,24 @@ public void
testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce
verify(ledgerOffloader, times(0))
.deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
}
+
+ @Test
+ public void testDeleteCurrentLedgerWhenItIsClosed() throws Exception {
+ ManagedLedgerConfig config = spy(new ManagedLedgerConfig());
+ ManagedLedgerImpl ml = spy((ManagedLedgerImpl)
factory.open("testDeleteCurrentLedgerWhenItIsClosed", config));
+ assertEquals(ml.ledgers.size(), 1);
+ ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS);
+ Thread.sleep(10);
+ ml.addEntry(new byte[4]);
+ ml.internalTrimLedgers(false, Futures.NULL_PROMISE);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml.state, ManagedLedgerImpl.State.ClosedLedger);
+ assertEquals(ml.ledgers.size(), 0);
Review Comment:
And the current ledger should not be removed?
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -4232,4 +4232,24 @@ public void
testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce
verify(ledgerOffloader, times(0))
.deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
}
+
+ @Test
+ public void testDeleteCurrentLedgerWhenItIsClosed() throws Exception {
+ ManagedLedgerConfig config = spy(new ManagedLedgerConfig());
+ ManagedLedgerImpl ml = spy((ManagedLedgerImpl)
factory.open("testDeleteCurrentLedgerWhenItIsClosed", config));
+ assertEquals(ml.ledgers.size(), 1);
+ ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS);
Review Comment:
It looks the change will not effect the ledger rollover by maximum rollover
time, right?
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -4232,4 +4232,24 @@ public void
testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce
verify(ledgerOffloader, times(0))
.deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
}
+
+ @Test
+ public void testDeleteCurrentLedgerWhenItIsClosed() throws Exception {
+ ManagedLedgerConfig config = spy(new ManagedLedgerConfig());
+ ManagedLedgerImpl ml = spy((ManagedLedgerImpl)
factory.open("testDeleteCurrentLedgerWhenItIsClosed", config));
+ assertEquals(ml.ledgers.size(), 1);
+ ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS);
+ Thread.sleep(10);
+ ml.addEntry(new byte[4]);
+ ml.internalTrimLedgers(false, Futures.NULL_PROMISE);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml.state, ManagedLedgerImpl.State.ClosedLedger);
Review Comment:
It should be wrong. A new ledger will be created after the rollover.
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -4232,4 +4232,24 @@ public void
testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce
verify(ledgerOffloader, times(0))
.deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
}
+
+ @Test
+ public void testDeleteCurrentLedgerWhenItIsClosed() throws Exception {
+ ManagedLedgerConfig config = spy(new ManagedLedgerConfig());
+ ManagedLedgerImpl ml = spy((ManagedLedgerImpl)
factory.open("testDeleteCurrentLedgerWhenItIsClosed", config));
+ assertEquals(ml.ledgers.size(), 1);
+ ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS);
+ Thread.sleep(10);
+ ml.addEntry(new byte[4]);
+ ml.internalTrimLedgers(false, Futures.NULL_PROMISE);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml.state, ManagedLedgerImpl.State.ClosedLedger);
+ assertEquals(ml.ledgers.size(), 0);
Review Comment:
I guess @liangyepianzhou you need to refine your test to have a clear scope
of what is tested in this method.
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -4232,4 +4232,24 @@ public void
testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce
verify(ledgerOffloader, times(0))
.deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
}
+
+ @Test
+ public void testDeleteCurrentLedgerWhenItIsClosed() throws Exception {
+ ManagedLedgerConfig config = spy(new ManagedLedgerConfig());
+ ManagedLedgerImpl ml = spy((ManagedLedgerImpl)
factory.open("testDeleteCurrentLedgerWhenItIsClosed", config));
Review Comment:
Any reason to use `spy()` here? You can set any configurations you want via
a new ManagedLedgerConfig and create a new Ledger with the config instance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]