This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c8a0dff7e1b51630840a301a35b83c62d188f983
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Mar 11 10:06:33 2022 +0800

    Cancel offload tasks when managed ledger closed. (#14545)
    
    ### Motivation
    When the user config the offloader, as the ledger close, it will trigger 
the ledger to offload. If there are many ledgers that need to offload, but the 
topic has been unloaded, the offloader will continue to offload. Because the 
offloader uses the shared executor pool in ManagedLedgerFactoryImpl and when 
the managed ledger closes, it doesn't cancel the tasks.
    
    ```
    15:29:59.180 [pulsar-web-41-3] INFO  
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Unloading 
topic persistent://public/default/UpdateNodeCharts
    15:29:59.201 [pulsar-web-41-3] INFO  
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - 
[public/default/persistent/UpdateNodeCharts] Closing managed ledger
    15:29:59.216 [main-EventThread] INFO  
org.apache.bookkeeper.mledger.impl.MetaStoreImpl - 
[public/default/persistent/UpdateNodeCharts] [cloud-nodes-service] Updating 
cursor info ledgerId=-1 mark-delete=789182:82011
    15:29:59.219 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - 
[public/default/persistent/UpdateNodeCharts][cloud-nodes-service] Closed cursor 
at md-position=789182:82011
    15:29:59.221 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  
org.apache.pulsar.broker.service.persistent.PersistentTopic - 
[persistent://public/default/UpdateNodeCharts] Topic closed
    15:29:59.221 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Successfully 
unloaded topic persistent://public/default/UpdateNodeCharts
    15:31:05.432 [offloader-OrderedScheduler-1-0] INFO  
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - 
[public/default/persistent/UpdateNodeCharts] Preparing metadata to offload 
ledger 422142 with uuid 030267e2-a2f9-40a3-848b-482f9b007c00
    15:31:05.432 [offloader-OrderedScheduler-1-0] INFO  
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - 
[public/default/persistent/UpdateNodeCharts] Found previous offload attempt for 
ledger 422142, uuid 030267e2-a2f9-40a3-848b-482f9b007c00, cleaning up
    15:31:05.432 [offloader-OrderedScheduler-1-0] INFO  
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - 
[public/default/persistent/UpdateNodeCharts] Cleanup offload for ledgerId 
422142 uuid 3725b3c1-1dbc-481f-a1dd-8aaffb75e603 because of the reason Previous 
failed offload.
    ```
    
    ### Modifications
    
    - When do `offloadLoop`, check state first. if `Close`, nothing to do.
    
    (cherry picked from commit e0687e37e137f55c6cffa263d8ac8af9169dad92)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 40 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a13cf68..5334544 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2360,13 +2360,13 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                                     + ", total size = {}, already offloaded = 
{}, to offload = {}",
                             name, 
toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
                             sizeSummed, alreadyOffloadedSize, toOffloadSize);
+                    offloadLoop(unlockingPromise, toOffload, 
PositionImpl.LATEST, Optional.empty());
                 } else {
                     // offloadLoop will complete immediately with an empty 
list to offload
                     log.debug("[{}] Nothing to offload, total size = {}, 
already offloaded = {}, threshold = {}",
                             name, sizeSummed, alreadyOffloadedSize, threshold);
+                    unlockingPromise.complete(PositionImpl.LATEST);
                 }
-
-                offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, 
Optional.empty());
             }
         }
     }
@@ -2929,6 +2929,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     private void offloadLoop(CompletableFuture<PositionImpl> promise, 
Queue<LedgerInfo> ledgersToOffload,
             PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
+        if (getState() == State.Closed) {
+            promise.completeExceptionally(new 
ManagedLedgerAlreadyClosedException(
+                    String.format("managed ledger [%s] has already closed", 
name)));
+            return;
+        }
         LedgerInfo info = ledgersToOffload.poll();
         if (info == null) {
             if (firstError.isPresent()) {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 982b914..c6008c76 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -121,6 +122,7 @@ import 
org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Stat;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -3449,4 +3451,42 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         factory.shutdown();
     }
 
+    @Test
+    public void testOffloadTaskCancelled() throws Exception {
+        ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+
+        OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
+        offloadPolicies.setManagedLedgerOffloadDriver("mock");
+        offloadPolicies.setManagedLedgerOffloadThresholdInBytes(0L);
+        LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
+        
Mockito.when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
+        
Mockito.when(ledgerOffloader.getOffloadDriverName()).thenReturn(offloadPolicies.getManagedLedgerOffloadDriver());
+        config.setLedgerOffloader(ledgerOffloader);
+
+        CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>();
+        readHandle.complete(mock(ReadHandle.class));
+
+        CompletableFuture<Void> offloadFuture = new CompletableFuture<>();
+        offloadFuture.complete(null);
+        Mockito.when(ledgerOffloader.offload(any(ReadHandle.class), 
any(UUID.class), any(Map.class))).thenReturn(offloadFuture);
+
+        final ManagedLedgerImpl ledgerInit = (ManagedLedgerImpl) 
factory.open("test-offload-task-close", config);
+        final ManagedLedgerImpl ledger = spy(ledgerInit);
+        long ledgerId = 3L;
+        doReturn(readHandle).when(ledger).getLedgerHandle(ledgerId);
+        doReturn(ManagedLedgerImpl.State.Closed).when(ledger).getState();
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+        ledger.close();
+
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<LedgerInfo> ledgerInfo = 
ledger.getLedgerInfo(ledgerId);
+            Assert.assertFalse(ledgerInfo.get(100, 
TimeUnit.MILLISECONDS).getOffloadContext().getComplete());
+        });
+    }
+
 }

Reply via email to