This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 98843214d51 [branch-2.8][fix][broker] Avoid heartbeat topic to
offload. (#15393)
98843214d51 is described below
commit 98843214d512e1944b7c5846de624b0bda37b937
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu May 5 02:36:15 2022 +0200
[branch-2.8][fix][broker] Avoid heartbeat topic to offload. (#15393)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 ++++
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 2 +-
.../apache/pulsar/broker/service/BrokerService.java | 6 +++++-
.../broker/systopic/PartitionedSystemTopicTest.java | 18 ++++++++++++++++++
4 files changed, 28 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 2ebca81ac92..89e9ac250bd 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2776,6 +2776,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public void asyncOffloadPrefix(Position pos, OffloadCallback callback,
Object ctx) {
+ if (config.getLedgerOffloader() != null && config.getLedgerOffloader()
== NullLedgerOffloader.INSTANCE) {
+ callback.offloadFailed(new
ManagedLedgerException("NullLedgerOffloader"), ctx);
+ return;
+ }
PositionImpl requestOffloadTo = (PositionImpl) pos;
if (!isValidPosition(requestOffloadTo) &&
// Also consider the case where the last ledger is currently
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index a7092e4ec46..a0930e42442 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -85,7 +85,7 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
ledger.offloadPrefix(p);
fail("Should have thrown an exception");
} catch (ManagedLedgerException e) {
- assertEquals(e.getCause().getClass(), CompletionException.class);
+ assertEquals(e.getMessage(), "NullLedgerOffloader");
}
assertEquals(ledger.getLedgersInfoAsList().size(), 5);
assertEquals(ledger.getLedgersInfoAsList().stream()
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9650c101cd8..8695ee67042 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -90,6 +90,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
@@ -106,6 +107,7 @@ import
org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import
org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
@@ -1509,7 +1511,9 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
topicLevelOffloadPolicies,
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
getPulsar().getConfig().getProperties());
- if (topicLevelOffloadPolicies != null) {
+ if
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+ } else if (topicLevelOffloadPolicies != null) {
try {
LedgerOffloader topicLevelLedgerOffLoader =
pulsar().createManagedLedgerOffloader(offloadPolicies);
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index bbd3cae7117..6517e8de34d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -20,10 +20,15 @@ package org.apache.pulsar.broker.systopic;
import com.google.common.collect.Sets;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pulsar.broker.admin.impl.BrokersBase;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
@@ -104,4 +109,17 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
}
}
+ @Test
+ public void testHealthCheckTopicNotOffload() throws Exception {
+ final String heartbeatNamespace =
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(),
+ pulsar.getConfig());
+ TopicName topicName = TopicName.get("persistent://" +
heartbeatNamespace
+ + "/" + BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopic(topicName.toString(), true).get().get();
+ admin.brokers().healthcheck();
+ admin.topics().triggerOffload(topicName.toString(),
MessageId.earliest);
+
Assert.assertEquals(admin.topics().getStats(topicName.toString()).getMsgInCounter(),
1);
+
Assert.assertEquals(persistentTopic.getManagedLedger().getOffloadedSize(), 0);
+ }
}