This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b4ef76e make ledger rollover check task internal (#8946)
b4ef76e is described below
commit b4ef76ec66069cb1c8632c21e48e088a48efd17c
Author: hangc0276 <[email protected]>
AuthorDate: Fri Jan 8 01:47:16 2021 +0800
make ledger rollover check task internal (#8946)
Fix #7195
### Changes
1. add a schedulerTask to rollover the ledger in `ManagedLedgerImpl`
instead of `BrokerService`
2. add the test.
---
.../apache/bookkeeper/mledger/ManagedLedger.java | 1 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 19 ++++++++++++-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 33 ++++++++++++++++++++++
.../pulsar/broker/service/BrokerService.java | 23 ---------------
4 files changed, 52 insertions(+), 24 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index f75a639..4f4c226 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -583,6 +583,7 @@ public interface ManagedLedger {
/**
* Roll current ledger if it is full
*/
+ @Deprecated
void rollCurrentLedgerIfFull();
/**
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4a49406..e295566 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -176,6 +176,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
final EntryCache entryCache;
private ScheduledFuture<?> timeoutTask;
+ private ScheduledFuture<?> checkLedgerRollTask;
/**
* This lock is held while the ledgers list or propertiesMap is updated
asynchronously on the metadata store. Since we use the store
@@ -382,6 +383,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
});
scheduleTimeoutTask();
+
+ scheduleRollOverLedgerTask();
}
private synchronized void initializeBookKeeper(final
ManagedLedgerInitializeLedgerCallback callback) {
@@ -1318,6 +1321,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
this.timeoutTask.cancel(false);
}
+ if (this.checkLedgerRollTask != null) {
+ this.checkLedgerRollTask.cancel(false);
+ }
+
}
private void closeAllCursors(CloseCallback callback, final Object ctx) {
@@ -1548,6 +1555,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
asyncCreateLedger(bookKeeper, config, digestType, this,
Collections.emptyMap());
}
+ @VisibleForTesting
@Override
public void rollCurrentLedgerIfFull() {
log.info("[{}] Start checking if current ledger is full", name);
@@ -1561,7 +1569,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
lh.getId());
if (rc == BKException.Code.OK) {
- log.debug("Successfuly closed ledger {}", lh.getId());
+ log.debug("Successfully closed ledger {}", lh.getId());
} else {
log.warn("Error when closing ledger {}. Status={}",
lh.getId(), BKException.getMessage(rc));
}
@@ -3467,6 +3475,15 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
+ private void scheduleRollOverLedgerTask() {
+ if (config.getMaximumRolloverTimeMs() > 0) {
+ long interval = config.getMaximumRolloverTimeMs();
+ this.checkLedgerRollTask =
this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
+ rollCurrentLedgerIfFull();
+ }), interval, interval, TimeUnit.MILLISECONDS);
+ }
+ }
+
private void checkAddTimeout() {
long timeoutSec = config.getAddEntryTimeoutSeconds();
if (timeoutSec < 1) {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index ff73f38..11cb57f 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2825,4 +2825,37 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
}
}
+
+ @Test
+ public void testManagedLedgerRollOverIfFull() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setRetentionTime(1, TimeUnit.SECONDS);
+ config.setMaxEntriesPerLedger(2);
+ config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+ config.setMaximumRolloverTime(500, TimeUnit.MILLISECONDS);
+
+ ManagedLedgerImpl ledger =
(ManagedLedgerImpl)factory.open("test_managedLedger_rollOver", config);
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ int msgNum = 10;
+
+ for (int i = 0; i < msgNum; i++) {
+ ledger.addEntry(new byte[1024 * 1024]);
+ }
+
+ Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2);
+ List<Entry> entries = cursor.readEntries(msgNum);
+ Assert.assertEquals(msgNum, entries.size());
+
+ for (Entry entry : entries) {
+ cursor.markDelete(entry.getPosition());
+ }
+ entries.forEach(e -> e.release());
+
+ // all the messages have benn acknowledged
+ // and all the ledgers have been removed except the last ledger
+ Thread.sleep(1000);
+ Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+ Assert.assertEquals(ledger.getCurrentLedgerSize(), 0);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2e8fdf7..8faa462 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -216,7 +216,6 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
private final ScheduledExecutorService compactionMonitor;
private final ScheduledExecutorService messagePublishBufferMonitor;
private final ScheduledExecutorService consumedLedgersMonitor;
- private final ScheduledExecutorService ledgerFullMonitor;
private ScheduledExecutorService topicPublishRateLimiterMonitor;
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
private ScheduledExecutorService deduplicationSnapshotMonitor;
@@ -310,8 +309,6 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
new
DefaultThreadFactory("pulsar-publish-buffer-monitor"));
this.consumedLedgersMonitor = Executors
.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("consumed-Ledgers-monitor"));
- this.ledgerFullMonitor =
- Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("ledger-full-monitor"));
this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
this.backlogQuotaChecker = Executors
@@ -461,7 +458,6 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
this.startConsumedLedgersMonitor();
- this.startLedgerFullMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
this.startCheckReplicationPolicies();
@@ -554,12 +550,6 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
}
- protected void startLedgerFullMonitor() {
- int interval =
pulsar().getConfiguration().getManagedLedgerMaxLedgerRolloverTimeMinutes();
- ledgerFullMonitor.scheduleAtFixedRate(safeRun(this::checkLedgerFull),
- interval, interval, TimeUnit.MINUTES);
- }
-
protected void startBacklogQuotaChecker() {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval =
pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
@@ -698,7 +688,6 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
inactivityMonitor.shutdown();
messageExpiryMonitor.shutdown();
compactionMonitor.shutdown();
- ledgerFullMonitor.shutdown();
messagePublishBufferMonitor.shutdown();
consumedLedgersMonitor.shutdown();
backlogQuotaChecker.shutdown();
@@ -1446,18 +1435,6 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
});
}
- private void checkLedgerFull() {
- forEachTopic((t) -> {
- if (t instanceof PersistentTopic) {
- Optional.ofNullable(((PersistentTopic)
t).getManagedLedger()).ifPresent(
- managedLedger -> {
- managedLedger.rollCurrentLedgerIfFull();
- }
- );
- }
- });
- }
-
public void checkMessageDeduplicationInfo() {
forEachTopic(Topic::checkMessageDeduplicationInfo);
}