This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 20483f54efff842e84e6fe32f33fe7ebfc352e38 Author: Qiang Zhao <[email protected]> AuthorDate: Wed May 8 13:10:49 2024 +0800 [fix][broker] avoid offload system topic (#22497) Co-authored-by: 道君 <[email protected]> (cherry picked from commit 3114199c185cb03a7fdb1b8af2bbc356162cf42d) --- .../pulsar/broker/service/BrokerService.java | 8 +- .../pulsar/broker/service/BrokerServiceTest.java | 93 ++++++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8a12178cef5..b74e6ee2c4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1991,7 +1991,13 @@ public class BrokerService implements Closeable { topicLevelOffloadPolicies, OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), getPulsar().getConfig().getProperties()); - if (NamespaceService.isSystemServiceNamespace(namespace.toString())) { + if (NamespaceService.isSystemServiceNamespace(namespace.toString()) + || SystemTopicNames.isSystemTopic(topicName)) { + /* + Avoid setting broker internal system topics using off-loader because some of them are the + preconditions of other topics. The slow replying log speed will cause a delay in all the topic + loading.(timeout) + */ managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE); } else { if (topicLevelOffloadPolicies != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 1bfbe85c826..daa61070dbf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -67,11 +67,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; @@ -111,6 +114,8 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; @@ -1772,4 +1777,92 @@ public class BrokerServiceTest extends BrokerTestBase { admin.topics().delete(topic); admin.namespaces().deleteNamespace(namespace); } + + + @Test + public void testOffloadConfShouldNotAppliedForSystemTopic() throws PulsarAdminException { + final String driver = "aws-s3"; + final String region = "test-region"; + final String bucket = "test-bucket"; + final String role = "test-role"; + final String roleSessionName = "test-role-session-name"; + final String credentialId = "test-credential-id"; + final String credentialSecret = "test-credential-secret"; + final String endPoint = "test-endpoint"; + final Integer maxBlockSizeInBytes = 5; + final Integer readBufferSizeInBytes = 2; + final Long offloadThresholdInBytes = 10L; + final Long offloadThresholdInSeconds = 1000L; + final Long offloadDeletionLagInMillis = 5L; + + final OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create( + driver, + region, + bucket, + endPoint, + role, + roleSessionName, + credentialId, + credentialSecret, + maxBlockSizeInBytes, + readBufferSizeInBytes, + offloadThresholdInBytes, + offloadThresholdInSeconds, + offloadDeletionLagInMillis, + OffloadedReadPriority.TIERED_STORAGE_FIRST + ); + + var fakeOffloader = new LedgerOffloader() { + @Override + public String getOffloadDriverName() { + return driver; + } + + @Override + public CompletableFuture<Void> offload(ReadHandle ledger, UUID uid, Map<String, String> extraMetadata) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) { + return CompletableFuture.completedFuture(null); + } + + @Override + public OffloadPoliciesImpl getOffloadPolicies() { + return offloadPolicies; + } + + @Override + public void close() { + } + }; + + final BrokerService brokerService = pulsar.getBrokerService(); + final String namespace = "prop/" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + admin.namespaces().setOffloadPolicies(namespace, offloadPolicies); + + // Inject the cache to avoid real load off-loader jar + final Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = pulsar.getLedgerOffloaderMap(); + ledgerOffloaderMap.put(NamespaceName.get(namespace), fakeOffloader); + + // (1) test normal topic + final String normalTopic = "persistent://" + namespace + "/" + UUID.randomUUID(); + var managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(normalTopic)).join(); + + Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), fakeOffloader); + + // (2) test system topic + for (String eventTopicName : SystemTopicNames.EVENTS_TOPIC_NAMES) { + managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join(); + Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), NullLedgerOffloader.INSTANCE); + } + } } +
