nicoloboschi commented on a change in pull request #13073:
URL: https://github.com/apache/pulsar/pull/13073#discussion_r760800239
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -705,7 +709,9 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback
callback, Object ctx)
// Jump to specific thread to avoid contention from writers writing
from different threads
executor.executeOrdered(name, safeRun(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this,
buffer, callback, ctx);
+ checkInactiveLedgerAndRollOver(lastAddEntryTimeMs);
Review comment:
should we wrap also the `internalAsyncAddEntry` at line 724?
https://github.com/apache/pulsar/pull/13073/files#diff-f6a849bd8fdb782ef6c17a2e07a2c54c3dc7d1655c00ec3546d5f3b3fc61e970L712-R732
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -4012,4 +4018,12 @@ private void cancelScheduledTasks() {
}
}
+ private void checkInactiveLedgerAndRollOver(final long lastAddEntryTimeMs)
{
+ long currentTimeMs = System.currentTimeMillis();
+ if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs >
(lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
+ log.info("[{}| Closing inactive ledger, last-add entry {}", name,
lastAddEntryTimeMs);
+ ledgerClosed(currentLedger);
Review comment:
I think we should call `rollCurrentLedgerIfFull` method which actually
close the ledger and it's the same rollover procedure used for topic when they
reach the max quota
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -4012,4 +4018,12 @@ private void cancelScheduledTasks() {
}
}
+ private void checkInactiveLedgerAndRollOver(final long lastAddEntryTimeMs)
{
+ long currentTimeMs = System.currentTimeMillis();
+ if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs >
(lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
+ log.info("[{}| Closing inactive ledger, last-add entry {}", name,
lastAddEntryTimeMs);
Review comment:
typo, use `]` instead of `|`
##########
File path:
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -2228,6 +2228,14 @@
)
private int managedLedgerOffloadPrefetchRounds = 1;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Time to rollover ledger for inactive topic (duration without
any publish on that topic)."
+ + "Disable rollover with value 0 (Default value 0)"
+ )
+ private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0;
Review comment:
minutes or seconds ?
in conf/broker.conf and the doc you set minutes instead of seconds
##########
File path:
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
##########
@@ -3393,4 +3393,26 @@ public void testCancellationOfScheduledTasks() throws
Exception {
assertTrue(timeoutTask2.isCancelled());
assertTrue(checkLedgerRollTask2.isCancelled());
}
+
+ @Test
+ public void testInactiveLedgerRollOver() throws Exception {
+ int inactiveLedgerRollOverTimeMs = 5;
+ ManagedLedgerFactoryConfig factoryConf = new
ManagedLedgerFactoryConfig();
+ ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs,
TimeUnit.MILLISECONDS);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("rollover_inactive", config);
Review comment:
we should cleanup resources here
##########
File path:
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
##########
@@ -3393,4 +3393,26 @@ public void testCancellationOfScheduledTasks() throws
Exception {
assertTrue(timeoutTask2.isCancelled());
assertTrue(checkLedgerRollTask2.isCancelled());
}
+
+ @Test
+ public void testInactiveLedgerRollOver() throws Exception {
+ int inactiveLedgerRollOverTimeMs = 5;
+ ManagedLedgerFactoryConfig factoryConf = new
ManagedLedgerFactoryConfig();
+ ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs,
TimeUnit.MILLISECONDS);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("rollover_inactive", config);
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ int totalAddEntries = 5;
+ for (int i = 0; i < totalAddEntries; i++) {
+ String content = "entry"; // 5 bytes
+ ledger.addEntry(content.getBytes());
Review comment:
it would be interesting to see what happens in a failure scenario, when
the addEntry is rejected.
From your implementation we consider the topic active even if a write fails.
it makes sense to me. just asking to add a unit test for that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]