This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c947ed4b4cd558dde2f760aa77710bd2044b5788 Author: lipenghui <[email protected]> AuthorDate: Fri Oct 1 00:28:59 2021 +0800 The loadbalancer should avoid offload the heartbeat namespace (#12252) * The loadbalancer should avoid offload the heartbeat namespace The heartbeat namespace is sticky to a broker, so it will not owned by other brokers, It will not make any sense for rebalancing the heartbeat namespace. ``` 09:49:55.946 [pulsar-load-manager-1-1] WARN org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Error when trying to perform load shedding on pulsar/pluster-2/10.1.131.232:8080/0x00000000_0xffffffff for broker 10.1.131.232:8080 org.apache.pulsar.client.admin.PulsarAdminException$NotFoundException: Namespace does not exist at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:220) at org.apache.pulsar.client.admin.internal.BaseResource$1.failed(BaseResource.java:130) at org.glassfish.jersey.client.JerseyInvocation$1.failed(JerseyInvocation.java:839) at org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:820) at org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:229) at org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62) at org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:173) at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) at org.glassfish.jersey.internal.Errors.process(Errors.java:292) at org.glassfish.jersey.internal.Errors.process(Errors.java:274) at org.glassfish.jersey.internal.Errors.process(Errors.java:244) at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:288) at org.glassfish.jersey.client.ClientRuntime$2.response(ClientRuntime.java:173) at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$apply$1(AsyncHttpConnector.java:212) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$3(AsyncHttpConnector.java:253) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.asynchttpclient.netty.NettyResponseFuture.loadContent(NettyResponseFuture.java:222) at org.asynchttpclient.netty.NettyResponseFuture.done(NettyResponseFuture.java:257) at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.finishUpdate(AsyncHttpClientHandler.java:241) at org.asynchttpclient.netty.handler.HttpHandler.handleChunk(HttpHandler.java:114) at org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:143) at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ``` * Fix checkstyle (cherry picked from commit 14fc0d3116a0b26365e691fb87fa7c19abc0fc23) --- .../broker/loadbalance/impl/OverloadShedder.java | 22 +++++++++++++--------- .../broker/loadbalance/impl/ThresholdShedder.java | 17 +++++++++++------ 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java index 89e3204..214825b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.loadbalance.impl; +import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN; +import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import java.util.Map; @@ -98,15 +100,17 @@ public class OverloadShedder implements LoadSheddingStrategy { // make up for at least the minimum throughput to offload loadData.getBundleData().entrySet().stream() - .filter(e -> localData.getBundles().contains(e.getKey())) - .map((e) -> { - // Map to throughput value - // Consider short-term byte rate to address system resource burden - String bundle = e.getKey(); - BundleData bundleData = e.getValue(); - TimeAverageMessageData shortTermData = bundleData.getShortTermData(); - double throughput = shortTermData.getMsgThroughputIn() + shortTermData - .getMsgThroughputOut(); + .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches() + && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches() + && localData.getBundles().contains(e.getKey())) + .map((e) -> { + // Map to throughput value + // Consider short-term byte rate to address system resource burden + String bundle = e.getKey(); + BundleData bundleData = e.getValue(); + TimeAverageMessageData shortTermData = bundleData.getShortTermData(); + double throughput = shortTermData.getMsgThroughputIn() + shortTermData + .getMsgThroughputOut(); return Pair.of(bundle, throughput); }).filter(e -> { // Only consider bundles that were not already unloaded recently diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java index e9c0aa0..8cde354 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.loadbalance.impl; +import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN; +import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import java.util.HashMap; @@ -95,12 +97,15 @@ public class ThresholdShedder implements LoadSheddingStrategy { MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false); if (localData.getBundles().size() > 1) { - loadData.getBundleData().entrySet().stream().map((e) -> { - String bundle = e.getKey(); - BundleData bundleData = e.getValue(); - TimeAverageMessageData shortTermData = bundleData.getShortTermData(); - double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut(); - return Pair.of(bundle, throughput); + loadData.getBundleData().entrySet().stream() + .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches() + && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()) + .map((e) -> { + String bundle = e.getKey(); + BundleData bundleData = e.getValue(); + TimeAverageMessageData shortTermData = bundleData.getShortTermData(); + double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut(); + return Pair.of(bundle, throughput); }).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft()) ).filter(e ->
