This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 98843214d51 [branch-2.8][fix][broker] Avoid heartbeat topic to 
offload. (#15393)
98843214d51 is described below

commit 98843214d512e1944b7c5846de624b0bda37b937
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu May 5 02:36:15 2022 +0200

    [branch-2.8][fix][broker] Avoid heartbeat topic to offload. (#15393)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java     |  4 ++++
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java     |  2 +-
 .../apache/pulsar/broker/service/BrokerService.java    |  6 +++++-
 .../broker/systopic/PartitionedSystemTopicTest.java    | 18 ++++++++++++++++++
 4 files changed, 28 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 2ebca81ac92..89e9ac250bd 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2776,6 +2776,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     @Override
     public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
+        if (config.getLedgerOffloader() != null && config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE) {
+            callback.offloadFailed(new 
ManagedLedgerException("NullLedgerOffloader"), ctx);
+            return;
+        }
         PositionImpl requestOffloadTo = (PositionImpl) pos;
         if (!isValidPosition(requestOffloadTo) &&
                 // Also consider the case where the last ledger is currently
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index a7092e4ec46..a0930e42442 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -85,7 +85,7 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
             ledger.offloadPrefix(p);
             fail("Should have thrown an exception");
         } catch (ManagedLedgerException e) {
-            assertEquals(e.getCause().getClass(), CompletionException.class);
+            assertEquals(e.getMessage(), "NullLedgerOffloader");
         }
         assertEquals(ledger.getLedgersInfoAsList().size(), 5);
         assertEquals(ledger.getLedgersInfoAsList().stream()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9650c101cd8..8695ee67042 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -90,6 +90,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -106,6 +107,7 @@ import 
org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
@@ -1509,7 +1511,9 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                     topicLevelOffloadPolicies,
                     
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
                     getPulsar().getConfig().getProperties());
-            if (topicLevelOffloadPolicies != null) {
+            if 
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+                
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+            } else if (topicLevelOffloadPolicies != null) {
                 try {
                     LedgerOffloader topicLevelLedgerOffLoader = 
pulsar().createManagedLedgerOffloader(offloadPolicies);
                     
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index bbd3cae7117..6517e8de34d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -20,10 +20,15 @@ package org.apache.pulsar.broker.systopic;
 
 import com.google.common.collect.Sets;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pulsar.broker.admin.impl.BrokersBase;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
@@ -104,4 +109,17 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         }
     }
 
+    @Test
+    public void testHealthCheckTopicNotOffload() throws Exception {
+        final String heartbeatNamespace = 
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent://" + 
heartbeatNamespace
+                        + "/" + BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                .getTopic(topicName.toString(), true).get().get();
+        admin.brokers().healthcheck();
+        admin.topics().triggerOffload(topicName.toString(), 
MessageId.earliest);
+        
Assert.assertEquals(admin.topics().getStats(topicName.toString()).getMsgInCounter(),
 1);
+        
Assert.assertEquals(persistentTopic.getManagedLedger().getOffloadedSize(), 0);
+    }
 }

Reply via email to