This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b4ef76e  make ledger rollover check task internal (#8946)
b4ef76e is described below

commit b4ef76ec66069cb1c8632c21e48e088a48efd17c
Author: hangc0276 <[email protected]>
AuthorDate: Fri Jan 8 01:47:16 2021 +0800

    make ledger rollover check task internal (#8946)
    
    Fix #7195
    
    ### Changes
    1. add a schedulerTask to rollover the ledger in `ManagedLedgerImpl` 
instead of `BrokerService`
    2. add the test.
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  1 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 19 ++++++++++++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 33 ++++++++++++++++++++++
 .../pulsar/broker/service/BrokerService.java       | 23 ---------------
 4 files changed, 52 insertions(+), 24 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index f75a639..4f4c226 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -583,6 +583,7 @@ public interface ManagedLedger {
     /**
      * Roll current ledger if it is full
      */
+    @Deprecated
     void rollCurrentLedgerIfFull();
 
     /**
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4a49406..e295566 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -176,6 +176,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     final EntryCache entryCache;
 
     private ScheduledFuture<?> timeoutTask;
+    private ScheduledFuture<?> checkLedgerRollTask;
 
     /**
      * This lock is held while the ledgers list or propertiesMap is updated 
asynchronously on the metadata store. Since we use the store
@@ -382,6 +383,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         });
 
         scheduleTimeoutTask();
+
+        scheduleRollOverLedgerTask();
     }
 
     private synchronized void initializeBookKeeper(final 
ManagedLedgerInitializeLedgerCallback callback) {
@@ -1318,6 +1321,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             this.timeoutTask.cancel(false);
         }
 
+        if (this.checkLedgerRollTask != null) {
+            this.checkLedgerRollTask.cancel(false);
+        }
+
     }
 
     private void closeAllCursors(CloseCallback callback, final Object ctx) {
@@ -1548,6 +1555,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap());
     }
 
+    @VisibleForTesting
     @Override
     public void rollCurrentLedgerIfFull() {
         log.info("[{}] Start checking if current ledger is full", name);
@@ -1561,7 +1569,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                             lh.getId());
 
                     if (rc == BKException.Code.OK) {
-                        log.debug("Successfuly closed ledger {}", lh.getId());
+                        log.debug("Successfully closed ledger {}", lh.getId());
                     } else {
                         log.warn("Error when closing ledger {}. Status={}", 
lh.getId(), BKException.getMessage(rc));
                     }
@@ -3467,6 +3475,15 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
+    private void scheduleRollOverLedgerTask() {
+        if (config.getMaximumRolloverTimeMs() > 0) {
+            long interval = config.getMaximumRolloverTimeMs();
+            this.checkLedgerRollTask = 
this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
+                rollCurrentLedgerIfFull();
+            }), interval, interval, TimeUnit.MILLISECONDS);
+        }
+    }
+
     private void checkAddTimeout() {
         long timeoutSec = config.getAddEntryTimeoutSeconds();
         if (timeoutSec < 1) {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index ff73f38..11cb57f 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2825,4 +2825,37 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
             Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
         }
     }
+
+    @Test
+    public void testManagedLedgerRollOverIfFull() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionTime(1, TimeUnit.SECONDS);
+        config.setMaxEntriesPerLedger(2);
+        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        config.setMaximumRolloverTime(500, TimeUnit.MILLISECONDS);
+
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("test_managedLedger_rollOver", config);
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        int msgNum = 10;
+
+        for (int i = 0; i < msgNum; i++) {
+            ledger.addEntry(new byte[1024 * 1024]);
+        }
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2);
+        List<Entry> entries = cursor.readEntries(msgNum);
+        Assert.assertEquals(msgNum, entries.size());
+
+        for (Entry entry : entries) {
+            cursor.markDelete(entry.getPosition());
+        }
+        entries.forEach(e -> e.release());
+
+        // all the messages have benn acknowledged
+        // and all the ledgers have been removed except the last ledger
+        Thread.sleep(1000);
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        Assert.assertEquals(ledger.getCurrentLedgerSize(), 0);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2e8fdf7..8faa462 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -216,7 +216,6 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     private final ScheduledExecutorService compactionMonitor;
     private final ScheduledExecutorService messagePublishBufferMonitor;
     private final ScheduledExecutorService consumedLedgersMonitor;
-    private final ScheduledExecutorService ledgerFullMonitor;
     private ScheduledExecutorService topicPublishRateLimiterMonitor;
     private ScheduledExecutorService brokerPublishRateLimiterMonitor;
     private ScheduledExecutorService deduplicationSnapshotMonitor;
@@ -310,8 +309,6 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                         new 
DefaultThreadFactory("pulsar-publish-buffer-monitor"));
         this.consumedLedgersMonitor = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("consumed-Ledgers-monitor"));
-        this.ledgerFullMonitor =
-                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("ledger-full-monitor"));
 
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
         this.backlogQuotaChecker = Executors
@@ -461,7 +458,6 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         this.startCompactionMonitor();
         this.startMessagePublishBufferMonitor();
         this.startConsumedLedgersMonitor();
-        this.startLedgerFullMonitor();
         this.startBacklogQuotaChecker();
         this.updateBrokerPublisherThrottlingMaxRate();
         this.startCheckReplicationPolicies();
@@ -554,12 +550,6 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         }
     }
 
-    protected void startLedgerFullMonitor() {
-        int interval = 
pulsar().getConfiguration().getManagedLedgerMaxLedgerRolloverTimeMinutes();
-        ledgerFullMonitor.scheduleAtFixedRate(safeRun(this::checkLedgerFull),
-                interval, interval, TimeUnit.MINUTES);
-    }
-
     protected void startBacklogQuotaChecker() {
         if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
             final int interval = 
pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
@@ -698,7 +688,6 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         inactivityMonitor.shutdown();
         messageExpiryMonitor.shutdown();
         compactionMonitor.shutdown();
-        ledgerFullMonitor.shutdown();
         messagePublishBufferMonitor.shutdown();
         consumedLedgersMonitor.shutdown();
         backlogQuotaChecker.shutdown();
@@ -1446,18 +1435,6 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         });
     }
 
-    private void checkLedgerFull() {
-        forEachTopic((t) -> {
-            if (t instanceof PersistentTopic) {
-                Optional.ofNullable(((PersistentTopic) 
t).getManagedLedger()).ifPresent(
-                        managedLedger -> {
-                            managedLedger.rollCurrentLedgerIfFull();
-                        }
-                );
-            }
-        });
-    }
-
     public void checkMessageDeduplicationInfo() {
         forEachTopic(Topic::checkMessageDeduplicationInfo);
     }

Reply via email to