This is an automated email from the ASF dual-hosted git repository.

RongtongJin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e213756971 [ISSUE #10419] Fix NPE when class filter route data is 
missing (#10420)
e213756971 is described below

commit e2137569717df3567fc51629d7ee3c9b36724628
Author: yin_bo_ <[email protected]>
AuthorDate: Sat Jun 20 10:27:55 2026 +0800

    [ISSUE #10419] Fix NPE when class filter route data is missing (#10420)
    
    * [ISSUE #10419] Fix NPE when class filter route data is missing
    
    * [ISSUE #10419] Add class filter route fallback tests
---
 .../client/impl/consumer/PullAPIWrapper.java       |  8 +--
 .../client/impl/consumer/PullAPIWrapperTest.java   | 63 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 3 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 2bd0d9994e..83e9e4d075 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -299,10 +299,12 @@ public class PullAPIWrapper {
         ConcurrentMap<String, TopicRouteData> topicRouteTable = 
this.mQClientFactory.getTopicRouteTable();
         if (topicRouteTable != null) {
             TopicRouteData topicRouteData = topicRouteTable.get(topic);
-            List<String> list = 
topicRouteData.getFilterServerTable().get(brokerAddr);
+            if (topicRouteData != null && 
topicRouteData.getFilterServerTable() != null) {
+                List<String> list = 
topicRouteData.getFilterServerTable().get(brokerAddr);
 
-            if (list != null && !list.isEmpty()) {
-                return list.get(randomNum() % list.size());
+                if (list != null && !list.isEmpty()) {
+                    return list.get(randomNum() % list.size());
+                }
             }
         }
 
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapperTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapperTest.java
index 2ffa8f4f14..cc38cb5c8f 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapperTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapperTest.java
@@ -60,6 +60,7 @@ import java.util.concurrent.ConcurrentMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -153,6 +154,68 @@ public class PullAPIWrapperTest {
                 any(PullCallback.class));
     }
 
+    @Test
+    public void testPullKernelImplWithMissingTopicRouteDataForClassFilter() {
+        when(mQClientFactory.getTopicRouteTable()).thenReturn(new 
ConcurrentHashMap<>());
+
+        assertFindFilterServerFailed();
+    }
+
+    @Test
+    public void testPullKernelImplWithMissingFilterServerTableForClassFilter() 
{
+        TopicRouteData topicRouteData = new TopicRouteData();
+        topicRouteData.setFilterServerTable(null);
+        ConcurrentMap<String, TopicRouteData> topicRouteTable = new 
ConcurrentHashMap<>();
+        topicRouteTable.put(defaultTopic, topicRouteData);
+        when(mQClientFactory.getTopicRouteTable()).thenReturn(topicRouteTable);
+
+        assertFindFilterServerFailed();
+    }
+
+    @Test
+    public void 
testPullKernelImplWithMissingBrokerFilterServerForClassFilter() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+        topicRouteData.setFilterServerTable(new HashMap<>());
+        ConcurrentMap<String, TopicRouteData> topicRouteTable = new 
ConcurrentHashMap<>();
+        topicRouteTable.put(defaultTopic, topicRouteData);
+        when(mQClientFactory.getTopicRouteTable()).thenReturn(topicRouteTable);
+
+        assertFindFilterServerFailed();
+    }
+
+    @Test
+    public void testPullKernelImplWithEmptyFilterServerListForClassFilter() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+        HashMap<String, List<String>> filterServerTable = new HashMap<>();
+        filterServerTable.put(defaultBrokerAddr, new ArrayList<>());
+        topicRouteData.setFilterServerTable(filterServerTable);
+        ConcurrentMap<String, TopicRouteData> topicRouteTable = new 
ConcurrentHashMap<>();
+        topicRouteTable.put(defaultTopic, topicRouteData);
+        when(mQClientFactory.getTopicRouteTable()).thenReturn(topicRouteTable);
+
+        assertFindFilterServerFailed();
+    }
+
+    private void assertFindFilterServerFailed() {
+        MQClientException actual = assertThrows(MQClientException.class, () -> 
pullAPIWrapper.pullKernelImpl(
+            createMessageQueue(),
+            "",
+            "",
+            1L,
+            1L,
+            1,
+            1,
+            PullSysFlag.buildSysFlag(false, false, false, true),
+            1L,
+            System.currentTimeMillis(),
+            defaultTimeout,
+            CommunicationMode.SYNC,
+            null
+        ));
+
+        assertTrue(actual.getMessage().contains("Find Filter Server Failed"));
+    }
+
     @Test
     public void testSetConnectBrokerByUser() {
         pullAPIWrapper.setConnectBrokerByUser(true);

Reply via email to