This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c947ed4b4cd558dde2f760aa77710bd2044b5788
Author: lipenghui <[email protected]>
AuthorDate: Fri Oct 1 00:28:59 2021 +0800

    The loadbalancer should avoid offload the heartbeat namespace (#12252)
    
    * The loadbalancer should avoid offload the heartbeat namespace
    
    The heartbeat namespace is sticky to a broker, so it will not owned by 
other brokers, It will not make any sense for rebalancing the heartbeat 
namespace.
    
    ```
    09:49:55.946 [pulsar-load-manager-1-1] WARN  
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Error when 
trying to perform load shedding on 
pulsar/pluster-2/10.1.131.232:8080/0x00000000_0xffffffff for broker 
10.1.131.232:8080 
org.apache.pulsar.client.admin.PulsarAdminException$NotFoundException: 
Namespace does not exist
            at 
org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:220)
            at 
org.apache.pulsar.client.admin.internal.BaseResource$1.failed(BaseResource.java:130)
            at 
org.glassfish.jersey.client.JerseyInvocation$1.failed(JerseyInvocation.java:839)
            at 
org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:820)
            at 
org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:229)
            at 
org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62)
            at 
org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:173)
            at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
            at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
            at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
            at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
            at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
            at 
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:288)
            at 
org.glassfish.jersey.client.ClientRuntime$2.response(ClientRuntime.java:173)
            at 
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$apply$1(AsyncHttpConnector.java:212)
            at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
            at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
            at 
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$3(AsyncHttpConnector.java:253)
            at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
            at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
            at 
org.asynchttpclient.netty.NettyResponseFuture.loadContent(NettyResponseFuture.java:222)
            at 
org.asynchttpclient.netty.NettyResponseFuture.done(NettyResponseFuture.java:257)
            at 
org.asynchttpclient.netty.handler.AsyncHttpClientHandler.finishUpdate(AsyncHttpClientHandler.java:241)
            at 
org.asynchttpclient.netty.handler.HttpHandler.handleChunk(HttpHandler.java:114)
            at 
org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:143)
            at 
org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
            at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
     ```
    
    * Fix checkstyle
    
    (cherry picked from commit 14fc0d3116a0b26365e691fb87fa7c19abc0fc23)
---
 .../broker/loadbalance/impl/OverloadShedder.java   | 22 +++++++++++++---------
 .../broker/loadbalance/impl/ThresholdShedder.java  | 17 +++++++++++------
 2 files changed, 24 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index 89e3204..214825b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
+import static 
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
+import static 
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import java.util.Map;
@@ -98,15 +100,17 @@ public class OverloadShedder implements 
LoadSheddingStrategy {
                 // make up for at least the minimum throughput to offload
 
                 loadData.getBundleData().entrySet().stream()
-                        .filter(e -> 
localData.getBundles().contains(e.getKey()))
-                        .map((e) -> {
-                            // Map to throughput value
-                            // Consider short-term byte rate to address system 
resource burden
-                            String bundle = e.getKey();
-                            BundleData bundleData = e.getValue();
-                            TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
-                            double throughput = 
shortTermData.getMsgThroughputIn() + shortTermData
-                                    .getMsgThroughputOut();
+                    .filter(e -> 
!HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
+                            && 
!HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()
+                            && localData.getBundles().contains(e.getKey()))
+                    .map((e) -> {
+                        // Map to throughput value
+                        // Consider short-term byte rate to address system 
resource burden
+                        String bundle = e.getKey();
+                        BundleData bundleData = e.getValue();
+                        TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
+                        double throughput = shortTermData.getMsgThroughputIn() 
+ shortTermData
+                                .getMsgThroughputOut();
                     return Pair.of(bundle, throughput);
                 }).filter(e -> {
                     // Only consider bundles that were not already unloaded 
recently
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index e9c0aa0..8cde354 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
+import static 
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
+import static 
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import java.util.HashMap;
@@ -95,12 +97,15 @@ public class ThresholdShedder implements 
LoadSheddingStrategy {
             MutableBoolean atLeastOneBundleSelected = new 
MutableBoolean(false);
 
             if (localData.getBundles().size() > 1) {
-                loadData.getBundleData().entrySet().stream().map((e) -> {
-                    String bundle = e.getKey();
-                    BundleData bundleData = e.getValue();
-                    TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
-                    double throughput = shortTermData.getMsgThroughputIn() + 
shortTermData.getMsgThroughputOut();
-                    return Pair.of(bundle, throughput);
+                loadData.getBundleData().entrySet().stream()
+                    .filter(e -> 
!HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
+                        && 
!HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches())
+                    .map((e) -> {
+                        String bundle = e.getKey();
+                        BundleData bundleData = e.getValue();
+                        TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
+                        double throughput = shortTermData.getMsgThroughputIn() 
+ shortTermData.getMsgThroughputOut();
+                        return Pair.of(bundle, throughput);
                 }).filter(e ->
                         !recentlyUnloadedBundles.containsKey(e.getLeft())
                 ).filter(e ->

Reply via email to