This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 59cbd3240bdf79855a9b47dd857ca4d9789f5828 Author: lipenghui <[email protected]> AuthorDate: Wed Aug 11 11:45:47 2021 +0800 Avoid redundant calls for getting the offload policies from the offloader (#11629) * Avoid redundant calls for getting the offload policies from the offloader If we have many ledgers in a managed ledger, for checking if need to delete the offloaded ledger from bookies, for each ledger, will call getOffloadPolicies() from the Offloader. For the BlobStoreManagedLedgerOffloader we are generate the offload policies from the properties for each getting operation(Maybe need another PR to find way to optimize this part). This will lead high CPU usage Stack: ``` "bookkeeper-ml-workers-OrderedExecutor-4-0" #68 prio=5 os_prio=0 tid=0x00007f23663d8000 nid=0xae runnable [0x00007f22b8ac2000] java.lang.Thread.State: RUNNABLE at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.pulsar.common.util.FieldParser.convert(FieldParser.java:119) at org.apache.pulsar.common.util.FieldParser.value(FieldParser.java:194) at org.apache.pulsar.common.policies.data.OffloadPolicies.lambda$create$0(OffloadPolicies.java:265) at org.apache.pulsar.common.policies.data.OffloadPolicies$$Lambda$127/540923243.accept(Unknown Source) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) at org.apache.pulsar.common.policies.data.OffloadPolicies.create(OffloadPolicies.java:261) at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.getOffloadPolicies(BlobStoreManagedLedgerOffloader.java:303) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isOffloadedNeedsDelete(ManagedLedgerImpl.java:2091) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalTrimConsumedLedgers(ManagedLedgerImpl.java:2176) - locked <0x00000006a3f2c000> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$trimConsumedLedgersInBackground$24(ManagedLedgerImpl.java:1997) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$$Lambda$722/254878114.run(Unknown Source) at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) ``` (cherry picked from commit af9b800e3a5fda2c3dfc76f930ddb146e443a141) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 24 ++++++------- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 40 +++++++++++++++++++++- .../mledger/impl/OffloadLedgerDeleteTest.java | 19 +++++----- 3 files changed, 58 insertions(+), 25 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 937d918..a1e93a8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -129,6 +129,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.util.DateFormatter; @@ -2305,19 +2306,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= config.getRetentionSizeInMB() * MegaByte; } - private boolean isOffloadedNeedsDelete(OffloadContext offload) { + boolean isOffloadedNeedsDelete(OffloadContext offload, Optional<OffloadPolicies> offloadPolicies) { long elapsedMs = clock.millis() - offload.getTimestamp(); - - if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE - && config.getLedgerOffloader().getOffloadPolicies() != null - && config.getLedgerOffloader().getOffloadPolicies() - .getManagedLedgerOffloadDeletionLagInMillis() != null) { - return offload.getComplete() && !offload.getBookkeeperDeleted() - && elapsedMs > config.getLedgerOffloader() - .getOffloadPolicies().getManagedLedgerOffloadDeletionLagInMillis(); - } else { - return false; - } + return offloadPolicies.filter(policies -> offload.getComplete() && !offload.getBookkeeperDeleted() + && policies.getManagedLedgerOffloadDeletionLagInMillis() != null + && elapsedMs > policies.getManagedLedgerOffloadDeletionLagInMillis()).isPresent(); } /** @@ -2338,6 +2331,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { List<LedgerInfo> ledgersToDelete = Lists.newArrayList(); List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList(); + Optional<OffloadPolicies> optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null + && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE + ? config.getLedgerOffloader().getOffloadPolicies() + : null); synchronized (this) { if (log.isDebugEnabled()) { log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(), @@ -2421,7 +2418,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } for (LedgerInfo ls : ledgers.values()) { - if (isOffloadedNeedsDelete(ls.getOffloadContext()) && !ledgersToDelete.contains(ls)) { + if (isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies) + && !ledgersToDelete.contains(ls)) { log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name, ls.getLedgerId()); offloadedLedgersToDelete.add(ls); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 274eb92..ee9ff41 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -21,7 +21,13 @@ package org.apache.bookkeeper.mledger.impl; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -83,6 +89,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -101,6 +108,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.mutable.MutableObject; @@ -3169,4 +3177,34 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { cursor3.close(); ledger.close(); } + + @Test + public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(1); + config.setMaxSizePerLedgerMb(1); + LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class); + OffloadPoliciesImpl offloadPolicies = mock(OffloadPoliciesImpl.class); + when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies); + when(ledgerOffloader.getOffloadDriverName()).thenReturn("s3"); + config.setLedgerOffloader(ledgerOffloader); + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open( + "testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers", config); + + // Retain the data. + ledger.openCursor("test-cursor"); + final int entries = 10; + byte[] data = new byte[1024 * 1024]; + for (int i = 0; i < entries; i++) { + ledger.addEntry(data); + } + assertEquals(ledger.ledgers.size(), 10); + + // Set a new offloader to cleanup the execution times of getOffloadPolicies() + ledgerOffloader = mock(NullLedgerOffloader.class); + config.setLedgerOffloader(ledgerOffloader); + + ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE); + verify(ledgerOffloader, times(1)).getOffloadPolicies(); + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java index 258987c..f25332e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java @@ -20,14 +20,13 @@ package org.apache.bookkeeper.mledger.impl; import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue; -import java.lang.reflect.Method; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.util.MockClock; @@ -219,25 +218,23 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase { config.setLedgerOffloader(ledgerOffloader); config.setClock(clock); - ManagedLedger managedLedger = factory.open("isOffloadedNeedsDeleteTest", config); - Class<ManagedLedgerImpl> clazz = ManagedLedgerImpl.class; - Method method = clazz.getDeclaredMethod("isOffloadedNeedsDelete", MLDataFormats.OffloadContext.class); - method.setAccessible(true); + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("isOffloadedNeedsDeleteTest", config); MLDataFormats.OffloadContext offloadContext = MLDataFormats.OffloadContext.newBuilder() .setTimestamp(config.getClock().millis() - 1000) .setComplete(true) .setBookkeeperDeleted(false) .build(); - Boolean needsDelete = (Boolean) method.invoke(managedLedger, offloadContext); + + boolean needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertFalse(needsDelete); offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(500L); - needsDelete = (Boolean) method.invoke(managedLedger, offloadContext); + needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertTrue(needsDelete); offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(1000L * 2); - needsDelete = (Boolean) method.invoke(managedLedger, offloadContext); + needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertFalse(needsDelete); offloadContext = MLDataFormats.OffloadContext.newBuilder() @@ -245,7 +242,7 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase { .setComplete(false) .setBookkeeperDeleted(false) .build(); - needsDelete = (Boolean) method.invoke(managedLedger, offloadContext); + needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertFalse(needsDelete); offloadContext = MLDataFormats.OffloadContext.newBuilder() @@ -253,7 +250,7 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase { .setComplete(true) .setBookkeeperDeleted(true) .build(); - needsDelete = (Boolean) method.invoke(managedLedger, offloadContext); + needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertFalse(needsDelete); }
