merlimat commented on a change in pull request #7111:
URL: https://github.com/apache/pulsar/pull/7111#discussion_r432816840



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -472,4 +472,9 @@ void asyncSetProperties(Map<String, String> properties, 
final AsyncCallbacks.Set
      * @param promise
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
+
+    /**
+     * Roll current ledger if it is full
+     */
+    void rollCurrentLedgerIfFull();

Review comment:
       Why do we need to expose this in the interface, it should be better to 
keep in implementation details

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1391,6 +1392,38 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
         }
     }
 
+    synchronized void createLedgerAfterClosed() {
+        STATE_UPDATER.set(this, State.CreatingLedger);
+        this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
+        mbean.startDataLedgerCreateOp();
+        asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap());
+    }
+
+    @Override
+    public void rollCurrentLedgerIfFull() {
+        log.info("[{}] Start checking if current ledger is full", name);
+        if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
+            STATE_UPDATER.set(this, State.ClosingLedger);
+            currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
+                @Override
+                public void closeComplete(int rc, LedgerHandle lh, Object o) {
+                    checkArgument(currentLedger.getId() == lh.getId(), 
"ledgerId %s doesn't match with acked ledgerId %s",
+                            currentLedger.getId(),
+                            lh.getId());
+
+                    if (rc == BKException.Code.OK) {
+                        log.debug("Successfuly closed ledger {}", lh.getId());
+                    } else {
+                        log.warn("Error when closing ledger {}. Status={}", 
lh.getId(), BKException.getMessage(rc));
+                    }
+
+                    ledgerClosed(lh);
+                    createLedgerAfterClosed();

Review comment:
       We don't need initiate the creation of a ledger at this point. We can 
stay in LedgerClosed state until a new write comes in.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -277,6 +278,8 @@ public BrokerService(PulsarService pulsar) throws Exception 
{
             Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-publish-buffer-monitor"));
         this.consumedLedgersMonitor = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("consumed-Ledgers-monitor"));
+        this.ledgerFullMonitor =

Review comment:
       This thread is not being stopped. Potentially we could also reuse an 
existing executor.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -485,6 +489,12 @@ protected void startConsumedLedgersMonitor() {
         }
     }
 
+    protected void startLedgerFullMonitor() {
+        int interval = 
pulsar().getConfiguration().getManagedLedgerMaxLedgerRolloverTimeMinutes();
+        ledgerFullMonitor.scheduleAtFixedRate(safeRun(this::checkLedgerFull),

Review comment:
       the recurring task needs to be cancelled when BrokerService is closed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to