This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 59cbd3240bdf79855a9b47dd857ca4d9789f5828
Author: lipenghui <[email protected]>
AuthorDate: Wed Aug 11 11:45:47 2021 +0800

    Avoid redundant calls for getting the offload policies from the offloader 
(#11629)
    
    * Avoid redundant calls for getting the offload policies from the offloader
    
    If we have many ledgers in a managed ledger, for checking if need to delete 
the offloaded ledger from bookies,
    for each ledger, will call getOffloadPolicies() from the Offloader. For the 
BlobStoreManagedLedgerOffloader we
    are generate the offload policies from the properties for each getting 
operation(Maybe need another PR to find way to optimize this part).
    This will lead high CPU usage
    
    Stack:
    
    ```
    "bookkeeper-ml-workers-OrderedExecutor-4-0" #68 prio=5 os_prio=0 
tid=0x00007f23663d8000 nid=0xae runnable [0x00007f22b8ac2000]
       java.lang.Thread.State: RUNNABLE
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.pulsar.common.util.FieldParser.convert(FieldParser.java:119)
        at org.apache.pulsar.common.util.FieldParser.value(FieldParser.java:194)
        at 
org.apache.pulsar.common.policies.data.OffloadPolicies.lambda$create$0(OffloadPolicies.java:265)
        at 
org.apache.pulsar.common.policies.data.OffloadPolicies$$Lambda$127/540923243.accept(Unknown
 Source)
        at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
        at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
        at 
org.apache.pulsar.common.policies.data.OffloadPolicies.create(OffloadPolicies.java:261)
        at 
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.getOffloadPolicies(BlobStoreManagedLedgerOffloader.java:303)
        at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isOffloadedNeedsDelete(ManagedLedgerImpl.java:2091)
        at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalTrimConsumedLedgers(ManagedLedgerImpl.java:2176)
        - locked <0x00000006a3f2c000> (a 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$trimConsumedLedgersInBackground$24(ManagedLedgerImpl.java:1997)
        at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$$Lambda$722/254878114.run(Unknown
 Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at 
org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    ```
    
    (cherry picked from commit af9b800e3a5fda2c3dfc76f930ddb146e443a141)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 24 ++++++-------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 40 +++++++++++++++++++++-
 .../mledger/impl/OffloadLedgerDeleteTest.java      | 19 +++++-----
 3 files changed, 58 insertions(+), 25 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 937d918..a1e93a8 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -129,6 +129,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.util.DateFormatter;
@@ -2305,19 +2306,11 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                 && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= 
config.getRetentionSizeInMB() * MegaByte;
     }
 
-    private boolean isOffloadedNeedsDelete(OffloadContext offload) {
+    boolean isOffloadedNeedsDelete(OffloadContext offload, 
Optional<OffloadPolicies> offloadPolicies) {
         long elapsedMs = clock.millis() - offload.getTimestamp();
-
-        if (config.getLedgerOffloader() != null && config.getLedgerOffloader() 
!= NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies()
-                    .getManagedLedgerOffloadDeletionLagInMillis() != null) {
-            return offload.getComplete() && !offload.getBookkeeperDeleted()
-                    && elapsedMs > config.getLedgerOffloader()
-                    
.getOffloadPolicies().getManagedLedgerOffloadDeletionLagInMillis();
-        } else {
-            return false;
-        }
+        return offloadPolicies.filter(policies -> offload.getComplete() && 
!offload.getBookkeeperDeleted()
+                && policies.getManagedLedgerOffloadDeletionLagInMillis() != 
null
+                && elapsedMs > 
policies.getManagedLedgerOffloadDeletionLagInMillis()).isPresent();
     }
 
     /**
@@ -2338,6 +2331,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
         List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList();
+        Optional<OffloadPolicies> optionalOffloadPolicies = 
Optional.ofNullable(config.getLedgerOffloader() != null
+                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+                ? config.getLedgerOffloader().getOffloadPolicies()
+                : null);
         synchronized (this) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Start TrimConsumedLedgers. ledgers={} 
totalSize={}", name, ledgers.keySet(),
@@ -2421,7 +2418,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             }
 
             for (LedgerInfo ls : ledgers.values()) {
-                if (isOffloadedNeedsDelete(ls.getOffloadContext()) && 
!ledgersToDelete.contains(ls)) {
+                if (isOffloadedNeedsDelete(ls.getOffloadContext(), 
optionalOffloadPolicies)
+                        && !ledgersToDelete.contains(ls)) {
                     log.debug("[{}] Ledger {} has been offloaded, bookkeeper 
ledger needs to be deleted", name,
                             ls.getLedgerId());
                     offloadedLedgersToDelete.add(ls);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 274eb92..ee9ff41 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -21,7 +21,13 @@ package org.apache.bookkeeper.mledger.impl;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -83,6 +89,7 @@ import 
org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -101,6 +108,7 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -3169,4 +3177,34 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         cursor3.close();
         ledger.close();
     }
+
+    @Test
+    public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() 
throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(1);
+        config.setMaxSizePerLedgerMb(1);
+        LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class);
+        OffloadPoliciesImpl offloadPolicies = mock(OffloadPoliciesImpl.class);
+        when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
+        when(ledgerOffloader.getOffloadDriverName()).thenReturn("s3");
+        config.setLedgerOffloader(ledgerOffloader);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open(
+                "testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers", 
config);
+
+        // Retain the data.
+        ledger.openCursor("test-cursor");
+        final int entries = 10;
+        byte[] data = new byte[1024 * 1024];
+        for (int i = 0; i < entries; i++) {
+            ledger.addEntry(data);
+        }
+        assertEquals(ledger.ledgers.size(), 10);
+
+        // Set a new offloader to cleanup the execution times of 
getOffloadPolicies()
+        ledgerOffloader = mock(NullLedgerOffloader.class);
+        config.setLedgerOffloader(ledgerOffloader);
+
+        ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
+        verify(ledgerOffloader, times(1)).getOffloadPolicies();
+    }
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
index 258987c..f25332e 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
@@ -20,14 +20,13 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static 
org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;
 
-import java.lang.reflect.Method;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.util.MockClock;
@@ -219,25 +218,23 @@ public class OffloadLedgerDeleteTest extends 
MockedBookKeeperTestCase {
         config.setLedgerOffloader(ledgerOffloader);
         config.setClock(clock);
 
-        ManagedLedger managedLedger = 
factory.open("isOffloadedNeedsDeleteTest", config);
-        Class<ManagedLedgerImpl> clazz = ManagedLedgerImpl.class;
-        Method method = clazz.getDeclaredMethod("isOffloadedNeedsDelete", 
MLDataFormats.OffloadContext.class);
-        method.setAccessible(true);
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
factory.open("isOffloadedNeedsDeleteTest", config);
 
         MLDataFormats.OffloadContext offloadContext = 
MLDataFormats.OffloadContext.newBuilder()
                 .setTimestamp(config.getClock().millis() - 1000)
                 .setComplete(true)
                 .setBookkeeperDeleted(false)
                 .build();
-        Boolean needsDelete = (Boolean) method.invoke(managedLedger, 
offloadContext);
+
+        boolean needsDelete = 
managedLedger.isOffloadedNeedsDelete(offloadContext, 
Optional.of(offloadPolicies));
         Assert.assertFalse(needsDelete);
 
         offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(500L);
-        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, 
Optional.of(offloadPolicies));
         Assert.assertTrue(needsDelete);
 
         offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(1000L * 2);
-        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, 
Optional.of(offloadPolicies));
         Assert.assertFalse(needsDelete);
 
         offloadContext = MLDataFormats.OffloadContext.newBuilder()
@@ -245,7 +242,7 @@ public class OffloadLedgerDeleteTest extends 
MockedBookKeeperTestCase {
                 .setComplete(false)
                 .setBookkeeperDeleted(false)
                 .build();
-        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, 
Optional.of(offloadPolicies));
         Assert.assertFalse(needsDelete);
 
         offloadContext = MLDataFormats.OffloadContext.newBuilder()
@@ -253,7 +250,7 @@ public class OffloadLedgerDeleteTest extends 
MockedBookKeeperTestCase {
                 .setComplete(true)
                 .setBookkeeperDeleted(true)
                 .build();
-        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, 
Optional.of(offloadPolicies));
         Assert.assertFalse(needsDelete);
 
     }

Reply via email to