This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 651a5ca992 [ISSUE #7562] BugFix for estimating message accumulation 
correctly (#7563)
651a5ca992 is described below

commit 651a5ca992988b90c7e4884e9975db0938557def
Author: Jixiang Jin <[email protected]>
AuthorDate: Thu Nov 16 10:16:16 2023 +0800

    [ISSUE #7562] BugFix for estimating message accumulation correctly (#7563)
---
 .../broker/metrics/ConsumerLagCalculator.java      | 11 ++++++----
 .../proxy/common/utils/FilterUtilTest.java         | 25 ++++++++++++++++++++++
 .../remoting/protocol/filter/FilterAPI.java        |  8 +++++++
 3 files changed, 40 insertions(+), 4 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index 7a5f1f765e..af08a83c7c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SimpleSubscriptionData;
@@ -435,10 +436,12 @@ public class ConsumerLagCalculator {
                 if (subscriptionGroupConfig != null) {
                     for (SimpleSubscriptionData simpleSubscriptionData : 
subscriptionGroupConfig.getSubscriptionDataSet()) {
                         if (topic.equals(simpleSubscriptionData.getTopic())) {
-                            subscriptionData = new SubscriptionData();
-                            
subscriptionData.setTopic(simpleSubscriptionData.getTopic());
-                            
subscriptionData.setExpressionType(simpleSubscriptionData.getExpressionType());
-                            
subscriptionData.setSubString(simpleSubscriptionData.getExpression());
+                            try {
+                                subscriptionData = 
FilterAPI.buildSubscriptionData(simpleSubscriptionData.getTopic(),
+                                    simpleSubscriptionData.getExpression(), 
simpleSubscriptionData.getExpressionType());
+                            } catch (Exception e) {
+                                LOGGER.error("Try to build subscription for 
group:{}, topic:{} exception.", group, topic, e);
+                            }
                             break;
                         }
                     }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
index 23389e9d3b..7c9d84015a 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
@@ -48,4 +48,29 @@ public class FilterUtilTest {
         assertThat(FilterUtils.isTagMatched(subscriptionData.getTagsSet(), 
null)).isFalse();
     }
 
+    @Test
+    public void testBuildSubscriptionData() throws Exception {
+        // Test case 1: expressionType is null, will use TAG as default.
+        String topic = "topic";
+        String subString = "substring";
+        String expressionType = null;
+        SubscriptionData result = FilterAPI.buildSubscriptionData(topic, 
subString, expressionType);
+        assertThat(result).isNotNull();
+        assertThat(topic).isEqualTo(result.getTopic());
+        assertThat(subString).isEqualTo(result.getSubString());
+        assertThat(result.getExpressionType()).isEqualTo("TAG");
+        assertThat(result.getCodeSet().size()).isEqualTo(1);
+
+        // Test case 2: expressionType is not null
+        topic = "topic";
+        subString = "substring1||substring2";
+        expressionType = "SQL92";
+        result = FilterAPI.buildSubscriptionData(topic, subString, 
expressionType);
+        assertThat(result).isNotNull();
+        assertThat(topic).isEqualTo(result.getTopic());
+        assertThat(subString).isEqualTo(result.getSubString());
+        assertThat(result.getExpressionType()).isEqualTo(expressionType);
+        assertThat(result.getCodeSet().size()).isEqualTo(2);
+    }
+
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
index 10a6bb4633..f291bfccfe 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
@@ -46,6 +46,14 @@ public class FilterAPI {
         return subscriptionData;
     }
 
+    public static SubscriptionData buildSubscriptionData(String topic, String 
subString, String expressionType) throws Exception {
+        final SubscriptionData subscriptionData = buildSubscriptionData(topic, 
subString);
+        if (StringUtils.isNotBlank(expressionType)) {
+            subscriptionData.setExpressionType(expressionType);
+        }
+        return subscriptionData;
+    }
+
     public static SubscriptionData build(final String topic, final String 
subString,
         final String type) throws Exception {
         if (ExpressionType.TAG.equals(type) || type == null) {

Reply via email to