This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e213756971 [ISSUE #10419] Fix NPE when class filter route data is
missing (#10420)
e213756971 is described below
commit e2137569717df3567fc51629d7ee3c9b36724628
Author: yin_bo_ <[email protected]>
AuthorDate: Sat Jun 20 10:27:55 2026 +0800
[ISSUE #10419] Fix NPE when class filter route data is missing (#10420)
* [ISSUE #10419] Fix NPE when class filter route data is missing
* [ISSUE #10419] Add class filter route fallback tests
---
.../client/impl/consumer/PullAPIWrapper.java | 8 +--
.../client/impl/consumer/PullAPIWrapperTest.java | 63 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 2bd0d9994e..83e9e4d075 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -299,10 +299,12 @@ public class PullAPIWrapper {
ConcurrentMap<String, TopicRouteData> topicRouteTable =
this.mQClientFactory.getTopicRouteTable();
if (topicRouteTable != null) {
TopicRouteData topicRouteData = topicRouteTable.get(topic);
- List<String> list =
topicRouteData.getFilterServerTable().get(brokerAddr);
+ if (topicRouteData != null &&
topicRouteData.getFilterServerTable() != null) {
+ List<String> list =
topicRouteData.getFilterServerTable().get(brokerAddr);
- if (list != null && !list.isEmpty()) {
- return list.get(randomNum() % list.size());
+ if (list != null && !list.isEmpty()) {
+ return list.get(randomNum() % list.size());
+ }
}
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapperTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapperTest.java
index 2ffa8f4f14..cc38cb5c8f 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapperTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapperTest.java
@@ -60,6 +60,7 @@ import java.util.concurrent.ConcurrentMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -153,6 +154,68 @@ public class PullAPIWrapperTest {
any(PullCallback.class));
}
+ @Test
+ public void testPullKernelImplWithMissingTopicRouteDataForClassFilter() {
+ when(mQClientFactory.getTopicRouteTable()).thenReturn(new
ConcurrentHashMap<>());
+
+ assertFindFilterServerFailed();
+ }
+
+ @Test
+ public void testPullKernelImplWithMissingFilterServerTableForClassFilter()
{
+ TopicRouteData topicRouteData = new TopicRouteData();
+ topicRouteData.setFilterServerTable(null);
+ ConcurrentMap<String, TopicRouteData> topicRouteTable = new
ConcurrentHashMap<>();
+ topicRouteTable.put(defaultTopic, topicRouteData);
+ when(mQClientFactory.getTopicRouteTable()).thenReturn(topicRouteTable);
+
+ assertFindFilterServerFailed();
+ }
+
+ @Test
+ public void
testPullKernelImplWithMissingBrokerFilterServerForClassFilter() {
+ TopicRouteData topicRouteData = new TopicRouteData();
+ topicRouteData.setFilterServerTable(new HashMap<>());
+ ConcurrentMap<String, TopicRouteData> topicRouteTable = new
ConcurrentHashMap<>();
+ topicRouteTable.put(defaultTopic, topicRouteData);
+ when(mQClientFactory.getTopicRouteTable()).thenReturn(topicRouteTable);
+
+ assertFindFilterServerFailed();
+ }
+
+ @Test
+ public void testPullKernelImplWithEmptyFilterServerListForClassFilter() {
+ TopicRouteData topicRouteData = new TopicRouteData();
+ HashMap<String, List<String>> filterServerTable = new HashMap<>();
+ filterServerTable.put(defaultBrokerAddr, new ArrayList<>());
+ topicRouteData.setFilterServerTable(filterServerTable);
+ ConcurrentMap<String, TopicRouteData> topicRouteTable = new
ConcurrentHashMap<>();
+ topicRouteTable.put(defaultTopic, topicRouteData);
+ when(mQClientFactory.getTopicRouteTable()).thenReturn(topicRouteTable);
+
+ assertFindFilterServerFailed();
+ }
+
+ private void assertFindFilterServerFailed() {
+ MQClientException actual = assertThrows(MQClientException.class, () ->
pullAPIWrapper.pullKernelImpl(
+ createMessageQueue(),
+ "",
+ "",
+ 1L,
+ 1L,
+ 1,
+ 1,
+ PullSysFlag.buildSysFlag(false, false, false, true),
+ 1L,
+ System.currentTimeMillis(),
+ defaultTimeout,
+ CommunicationMode.SYNC,
+ null
+ ));
+
+ assertTrue(actual.getMessage().contains("Find Filter Server Failed"));
+ }
+
@Test
public void testSetConnectBrokerByUser() {
pullAPIWrapper.setConnectBrokerByUser(true);