This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b7f0004313e [fix][broker] Fix broker load manager class filter NPE
(#20350)
b7f0004313e is described below
commit b7f0004313ea4565717cc6d3c0b99aee5c079c6c
Author: Kai Wang <[email protected]>
AuthorDate: Mon May 22 08:51:10 2023 +0800
[fix][broker] Fix broker load manager class filter NPE (#20350)
PIP: https://github.com/apache/pulsar/issues/16691
### Motivation
When upgrading the pulsar version and changing the pulsar load manager to
`ExtensibleLoadManagerImpl` it might cause NPE. The root cause is the old
version of pulsar does not contain the `loadManagerClassName` field.
```
2023-05-18T05:42:50,557+0000 [pulsar-io-4-1] INFO
org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:51345] connected with
role=[[email protected]](mailto:[email protected])
using authMethod=token, clientVersion=Pulsar Go 0.9.0,
clientProtocolVersion=18, proxyVersion=null
2023-05-18T05:42:50,558+0000 [pulsar-io-4-1] WARN
org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup
[[email protected]](mailto:[email protected]) for
topic persistent://xxx with error java.lang.NullPointerException: Cannot invoke
“String.equals(Object)” because the return value of
“org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()”
is null
java.util.concurrent.CompletionException: java.lang.NullPointerException:
Cannot invoke “String.equals(Object)” because the return value of
“org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()”
is null
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
~[?:?]
at
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1194)
~[?:?]
at
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
~[?:?]
at
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:385)
~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
at
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$6(ExtensibleLoadManagerImpl.java:336)
~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
at
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
~[?:?]
at
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
~[?:?]
at
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$10(ExtensibleLoadManagerImpl.java:333)
~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:409)
~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1]
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:243)
~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1]
at
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.assign(ExtensibleLoadManagerImpl.java:327)
~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
at
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper.findBrokerServiceUrl(ExtensibleLoadManagerWrapper.java:66)
~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
at
org.apache.pulsar.broker.namespace.NamespaceService.lambda$getBrokerServiceUrlAsync$0(NamespaceService.java:191)
~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
```
### Modifications
* Add null check when using`getLoadManagerClassName`.
* Add test to cover this case.
* Add `RedirectManager` unit test.
---
.../filter/BrokerLoadManagerClassFilter.java | 5 +-
.../extensions/manager/RedirectManager.java | 17 +++-
.../impl/BrokerLoadManagerClassFilter.java | 5 +-
.../filter/BrokerLoadManagerClassFilterTest.java | 3 +-
.../extensions/manager/RedirectManagerTest.java | 111 +++++++++++++++++++++
.../impl/BrokerLoadManagerClassFilterTest.java | 6 ++
6 files changed, 139 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
index 4ee28a5225a..07109b277ae 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;
import java.util.Map;
+import java.util.Objects;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
@@ -43,7 +44,9 @@ public class BrokerLoadManagerClassFilter implements
BrokerFilter {
}
brokers.entrySet().removeIf(entry -> {
BrokerLookupData v = entry.getValue();
- return
!v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName());
+ // The load manager class name can be null if the cluster has old
version of broker.
+ return !Objects.equals(v.getLoadManagerClassName(),
+ context.brokerConfiguration().getLoadManagerClassName());
});
return brokers;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java
index 4aff77937a5..3455b333b0a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java
@@ -19,9 +19,11 @@
package org.apache.pulsar.broker.loadbalance.extensions.manager;
import static
org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -48,6 +50,12 @@ public class RedirectManager {
this.brokerLookupDataLockManager =
pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
}
+ @VisibleForTesting
+ public RedirectManager(PulsarService pulsar, LockManager<BrokerLookupData>
brokerLookupDataLockManager) {
+ this.pulsar = pulsar;
+ this.brokerLookupDataLockManager = brokerLookupDataLockManager;
+ }
+
public CompletableFuture<Map<String, BrokerLookupData>>
getAvailableBrokerLookupDataAsync() {
return
brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenCompose(availableBrokers
-> {
Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
@@ -69,7 +77,7 @@ public class RedirectManager {
public CompletableFuture<Optional<LookupResult>>
findRedirectLookupResultAsync() {
String currentLMClassName =
pulsar.getConfiguration().getLoadManagerClassName();
- boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(),
log);
+ boolean debug =
ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log);
return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
if (lookupDataMap.isEmpty()) {
String errorMsg = "No available broker found.";
@@ -89,9 +97,10 @@ public class RedirectManager {
log.warn(errorMsg);
throw new IllegalStateException(errorMsg);
}
- if
(latestServiceLookupData.get().getLoadManagerClassName().equals(currentLMClassName))
{
+
+ if
(Objects.equals(latestServiceLookupData.get().getLoadManagerClassName(),
currentLMClassName)) {
if (debug) {
- log.info("We don't need to redirect, current load manager
class name: {}",
+ log.info("No need to redirect, current load manager class
name: {}",
currentLMClassName);
}
return Optional.empty();
@@ -99,7 +108,7 @@ public class RedirectManager {
var serviceLookupDataObj = latestServiceLookupData.get();
var candidateBrokers = new ArrayList<ServiceLookupData>();
lookupDataMap.forEach((key, value) -> {
- if
(value.getLoadManagerClassName().equals(serviceLookupDataObj.getLoadManagerClassName()))
{
+ if (Objects.equals(value.getLoadManagerClassName(),
serviceLookupDataObj.getLoadManagerClassName())) {
candidateBrokers.add(value);
}
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java
index 5d6a56ba869..13e3fdc537e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
+import java.util.Objects;
import java.util.Set;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilter;
@@ -32,8 +33,8 @@ public class BrokerLoadManagerClassFilter implements
BrokerFilter {
LoadData loadData,
ServiceConfiguration conf) throws BrokerFilterException
{
loadData.getBrokerData().forEach((key, value) -> {
- if (!value.getLocalData().getLoadManagerClassName()
- .equals(conf.getLoadManagerClassName())) {
+ // The load manager class name can be null if the cluster has old
version of broker.
+ if
(!Objects.equals(value.getLocalData().getLoadManagerClassName(),
conf.getLoadManagerClassName())) {
brokers.remove(key);
}
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
index 0169b57fe99..4aef87cf63a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
@@ -44,7 +44,8 @@ public class BrokerLoadManagerClassFilterTest extends
BrokerFilterTestBase {
"broker1", getLookupData("3.0.0",
ExtensibleLoadManagerImpl.class.getName()),
"broker2", getLookupData("3.0.0",
ExtensibleLoadManagerImpl.class.getName()),
"broker3", getLookupData("3.0.0",
ModularLoadManagerImpl.class.getName()),
- "broker4", getLookupData("3.0.0",
ModularLoadManagerImpl.class.getName())
+ "broker4", getLookupData("3.0.0",
ModularLoadManagerImpl.class.getName()),
+ "broker5", getLookupData("3.0.0", null)
);
Map<String, BrokerLookupData> result = filter.filter(new
HashMap<>(originalBrokers), null, context);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java
new file mode 100644
index 00000000000..cbf77b59d5a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+
+/**
+ * Unit test {@link RedirectManager}.
+ */
+public class RedirectManagerTest {
+
+ @Test
+ public void testFindRedirectLookupResultAsync() throws ExecutionException,
InterruptedException {
+ PulsarService pulsar = mock(PulsarService.class);
+ ServiceConfiguration configuration = new ServiceConfiguration();
+ when(pulsar.getConfiguration()).thenReturn(configuration);
+ RedirectManager redirectManager = spy(new RedirectManager(pulsar,
null));
+
+
configuration.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ configuration.setLoadBalancerDebugModeEnabled(true);
+
+ // Test 1: No load manager class name found.
+ doReturn(CompletableFuture.completedFuture(
+ new HashMap<>(){{
+ put("broker-1", getLookupData("broker-1", null, 10));
+ put("broker-2", getLookupData("broker-2",
ModularLoadManagerImpl.class.getName(), 1));
+ }}
+ )).when(redirectManager).getAvailableBrokerLookupDataAsync();
+
+ // Should redirect to broker-1, since broker-1 has the latest load
manager, even though the class name is null.
+ Optional<LookupResult> lookupResult =
redirectManager.findRedirectLookupResultAsync().get();
+ assertTrue(lookupResult.isPresent());
+
assertTrue(lookupResult.get().getLookupData().getBrokerUrl().contains("broker-1"));
+
+ // Test 2: Should redirect to broker-1, since the latest broker are
using ExtensibleLoadManagerImpl
+ doReturn(CompletableFuture.completedFuture(
+ new HashMap<>(){{
+ put("broker-1", getLookupData("broker-1",
ExtensibleLoadManagerImpl.class.getName(), 10));
+ put("broker-2", getLookupData("broker-2",
ModularLoadManagerImpl.class.getName(), 1));
+ }}
+ )).when(redirectManager).getAvailableBrokerLookupDataAsync();
+
+ lookupResult = redirectManager.findRedirectLookupResultAsync().get();
+ assertTrue(lookupResult.isPresent());
+
assertTrue(lookupResult.get().getLookupData().getBrokerUrl().contains("broker-1"));
+
+
+ // Test 3: Should not redirect, since current broker are using
ModularLoadManagerImpl
+ doReturn(CompletableFuture.completedFuture(
+ new HashMap<>(){{
+ put("broker-1", getLookupData("broker-1",
ExtensibleLoadManagerImpl.class.getName(), 10));
+ put("broker-2", getLookupData("broker-2",
ModularLoadManagerImpl.class.getName(), 100));
+ }}
+ )).when(redirectManager).getAvailableBrokerLookupDataAsync();
+
+ lookupResult = redirectManager.findRedirectLookupResultAsync().get();
+ assertFalse(lookupResult.isPresent());
+ }
+
+
+ public BrokerLookupData getLookupData(String broker, String
loadManagerClassName, long startTimeStamp) {
+ String webServiceUrl = "http://" + broker + ":8080";
+ String webServiceUrlTls = "https://" + broker + ":8081";
+ String pulsarServiceUrl = "pulsar://" + broker + ":6650";
+ String pulsarServiceUrlTls = "pulsar+ssl://" + broker + ":6651";
+ Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
+ Map<String, String> protocols = new HashMap<>(){{
+ put("kafka", "9092");
+ }};
+ return new BrokerLookupData(
+ webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
+ pulsarServiceUrlTls, advertisedListeners, protocols, true,
true,
+ loadManagerClassName, startTimeStamp, "3.0.0");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java
index 856bbac0292..56332111f93 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java
@@ -46,8 +46,12 @@ public class BrokerLoadManagerClassFilterTest {
LocalBrokerData localBrokerData1 = new LocalBrokerData();
localBrokerData1.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+
+ LocalBrokerData localBrokerData2 = new LocalBrokerData();
+ localBrokerData2.setLoadManagerClassName(null);
loadData.getBrokerData().put("broker1", new
BrokerData(localBrokerData));
loadData.getBrokerData().put("broker2", new
BrokerData(localBrokerData1));
+ loadData.getBrokerData().put("broker3", new
BrokerData(localBrokerData2));
ServiceConfiguration conf = new ServiceConfiguration();
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
@@ -55,6 +59,7 @@ public class BrokerLoadManagerClassFilterTest {
Set<String> brokers = new HashSet<>(){{
add("broker1");
add("broker2");
+ add("broker3");
}};
filter.filter(brokers, null, loadData, conf);
@@ -64,6 +69,7 @@ public class BrokerLoadManagerClassFilterTest {
brokers = new HashSet<>(){{
add("broker1");
add("broker2");
+ add("broker3");
}};
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
filter.filter(brokers, null, loadData, conf);