This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9e0fb1b02 [ISSUE #6169] Fix NPE when timerWheel disabled (#6184)
9e0fb1b02 is described below

commit 9e0fb1b0293aebbe5019e55c0fdbd356611b3d16
Author: rongtong <[email protected]>
AuthorDate: Wed Mar 8 17:23:06 2023 +0800

    [ISSUE #6169] Fix NPE when timerWheel disabled (#6184)
    
    * Fix some NPE when timerWheel disabled
    
    * Add UT
---
 .../broker/processor/PopMessageProcessor.java      |  13 ++-
 .../broker/processor/PopReviveService.java         |   5 +
 .../broker/processor/PopMessageProcessorTest.java  |  16 +++-
 .../store/metrics/DefaultStoreMetricsManager.java  | 104 +++++++++++----------
 4 files changed, 83 insertions(+), 55 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index cd4595326..d63fbe621 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -278,19 +278,26 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
 
         if (requestHeader.isTimeoutTooMuch()) {
             response.setCode(ResponseCode.POLLING_TIMEOUT);
-            response.setRemark(String.format("the broker[%s] poping message is 
timeout too much",
+            response.setRemark(String.format("the broker[%s] pop message is 
timeout too much",
                 this.brokerController.getBrokerConfig().getBrokerIP1()));
             return response;
         }
         if 
(!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission()))
 {
             response.setCode(ResponseCode.NO_PERMISSION);
-            response.setRemark(String.format("the broker[%s] poping message is 
forbidden",
+            response.setRemark(String.format("the broker[%s] pop message is 
forbidden",
                 this.brokerController.getBrokerConfig().getBrokerIP1()));
             return response;
         }
         if (requestHeader.getMaxMsgNums() > 32) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(String.format("the broker[%s] poping message's 
num is greater than 32",
+            response.setRemark(String.format("the broker[%s] pop message's num 
is greater than 32",
+                this.brokerController.getBrokerConfig().getBrokerIP1()));
+            return response;
+        }
+
+        if 
(!brokerController.getMessageStore().getMessageStoreConfig().isTimerWheelEnable())
 {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.format("the broker[%s] pop message is 
forbidden because timerWheelEnable is false",
                 this.brokerController.getBrokerConfig().getBrokerIP1()));
             return response;
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index a63cd2930..116cb0f82 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -600,6 +600,11 @@ public class PopReviveService extends ServiceThread {
                     continue;
                 }
 
+                if 
(!brokerController.getMessageStore().getMessageStoreConfig().isTimerWheelEnable())
 {
+                    POP_LOGGER.warn("skip revive topic because 
timerWheelEnable is false");
+                    continue;
+                }
+
                 POP_LOGGER.info("start revive topic={}, reviveQueueId={}", 
reviveTopic, queueId);
                 ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();
                 consumeReviveMessage(consumeReviveObj);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index aabc68220..44f04066c 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -91,6 +91,7 @@ public class PopMessageProcessorTest {
 
     @Test
     public void testProcessRequest_TopicNotExist() throws 
RemotingCommandException {
+        when(messageStore.getMessageStoreConfig()).thenReturn(new 
MessageStoreConfig());
         
brokerController.getTopicConfigManager().getTopicConfigTable().remove(topic);
         final RemotingCommand request = createPopMsgCommand();
         RemotingCommand response = 
popMessageProcessor.processRequest(handlerContext, request);
@@ -102,6 +103,7 @@ public class PopMessageProcessorTest {
     @Test
     public void testProcessRequest_Found() throws RemotingCommandException, 
InterruptedException {
         GetMessageResult getMessageResult = createGetMessageResult(1);
+        when(messageStore.getMessageStoreConfig()).thenReturn(new 
MessageStoreConfig());
         when(messageStore.getMessageAsync(anyString(), anyString(), anyInt(), 
anyLong(), anyInt(), 
any())).thenReturn(CompletableFuture.completedFuture(getMessageResult));
 
         final RemotingCommand request = createPopMsgCommand();
@@ -115,6 +117,7 @@ public class PopMessageProcessorTest {
     public void testProcessRequest_MsgWasRemoving() throws 
RemotingCommandException {
         GetMessageResult getMessageResult = createGetMessageResult(1);
         getMessageResult.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
+        when(messageStore.getMessageStoreConfig()).thenReturn(new 
MessageStoreConfig());
         when(messageStore.getMessageAsync(anyString(), anyString(), anyInt(), 
anyLong(), anyInt(), 
any())).thenReturn(CompletableFuture.completedFuture(getMessageResult));
 
         final RemotingCommand request = createPopMsgCommand();
@@ -128,6 +131,7 @@ public class PopMessageProcessorTest {
     public void testProcessRequest_NoMsgInQueue() throws 
RemotingCommandException {
         GetMessageResult getMessageResult = createGetMessageResult(0);
         getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
+        when(messageStore.getMessageStoreConfig()).thenReturn(new 
MessageStoreConfig());
         when(messageStore.getMessageAsync(anyString(), anyString(), anyInt(), 
anyLong(), anyInt(), 
any())).thenReturn(CompletableFuture.completedFuture(getMessageResult));
 
         final RemotingCommand request = createPopMsgCommand();
@@ -135,6 +139,17 @@ public class PopMessageProcessorTest {
         assertThat(response).isNull();
     }
 
+    @Test
+    public void testProcessRequest_whenTimerWheelIsFalse() throws 
RemotingCommandException {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setTimerWheelEnable(false);
+        
when(messageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        final RemotingCommand request = createPopMsgCommand();
+        RemotingCommand response = 
popMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+        assertThat(response.getRemark()).contains("pop message is forbidden 
because timerWheelEnable is false");
+    }
 
     private RemotingCommand createPopMsgCommand() {
         PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
@@ -152,7 +167,6 @@ public class PopMessageProcessorTest {
         return request;
     }
 
-
     private GetMessageResult createGetMessageResult(int msgCnt) {
         GetMessageResult getMessageResult = new GetMessageResult();
         getMessageResult.setStatus(GetMessageStatus.FOUND);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index 686265292..9132761a6 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -116,57 +116,59 @@ public class DefaultStoreMetricsManager {
                 measurement.record(System.currentTimeMillis() - 
earliestMessageTime, newAttributesBuilder().build());
             });
 
-        timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
-            .setDescription("Timer enqueue messages lag")
-            .ofLongs()
-            .buildWithCallback(measurement -> {
-                TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
-                
measurement.record(timerMessageStore.getEnqueueBehindMessages(), 
newAttributesBuilder().build());
-            });
-
-        timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
-            .setDescription("Timer enqueue latency")
-            .setUnit("milliseconds")
-            .ofLongs()
-            .buildWithCallback(measurement -> {
-                TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
-                measurement.record(timerMessageStore.getEnqueueBehindMillis(), 
newAttributesBuilder().build());
-            });
-        timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
-            .setDescription("Timer dequeue messages lag")
-            .ofLongs()
-            .buildWithCallback(measurement -> {
-                TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
-                
measurement.record(timerMessageStore.getDequeueBehindMessages(), 
newAttributesBuilder().build());
-            });
-        timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
-            .setDescription("Timer dequeue latency")
-            .setUnit("milliseconds")
-            .ofLongs()
-            .buildWithCallback(measurement -> {
-                TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
-                measurement.record(timerMessageStore.getDequeueBehind(), 
newAttributesBuilder().build());
-            });
-        timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
-            .setDescription("Current message number in timing")
-            .ofLongs()
-            .buildWithCallback(measurement -> {
-                TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
-                timerMessageStore.getTimerMetrics()
-                    .getTimingCount()
-                    .forEach((topic, metric) -> {
-                        measurement.record(
-                            metric.getCount().get(),
-                            newAttributesBuilder().put(LABEL_TOPIC, 
topic).build()
-                        );
-                    });
-            });
-        timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
-            .setDescription("Total number of timer dequeue")
-            .build();
-        timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
-            .setDescription("Total number of timer enqueue")
-            .build();
+        if (messageStore.getMessageStoreConfig().isTimerWheelEnable()) {
+            timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
+                .setDescription("Timer enqueue messages lag")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
+                    
measurement.record(timerMessageStore.getEnqueueBehindMessages(), 
newAttributesBuilder().build());
+                });
+
+            timerEnqueueLatency = 
meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
+                .setDescription("Timer enqueue latency")
+                .setUnit("milliseconds")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
+                    
measurement.record(timerMessageStore.getEnqueueBehindMillis(), 
newAttributesBuilder().build());
+                });
+            timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
+                .setDescription("Timer dequeue messages lag")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
+                    
measurement.record(timerMessageStore.getDequeueBehindMessages(), 
newAttributesBuilder().build());
+                });
+            timerDequeueLatency = 
meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
+                .setDescription("Timer dequeue latency")
+                .setUnit("milliseconds")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
+                    measurement.record(timerMessageStore.getDequeueBehind(), 
newAttributesBuilder().build());
+                });
+            timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
+                .setDescription("Current message number in timing")
+                .ofLongs()
+                .buildWithCallback(measurement -> {
+                    TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
+                    timerMessageStore.getTimerMetrics()
+                        .getTimingCount()
+                        .forEach((topic, metric) -> {
+                            measurement.record(
+                                metric.getCount().get(),
+                                newAttributesBuilder().put(LABEL_TOPIC, 
topic).build()
+                            );
+                        });
+                });
+            timerDequeueTotal = 
meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
+                .setDescription("Total number of timer dequeue")
+                .build();
+            timerEnqueueTotal = 
meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
+                .setDescription("Total number of timer enqueue")
+                .build();
+        }
     }
 
     public static void incTimerDequeueCount(String topic) {

Reply via email to