This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 651a5ca992 [ISSUE #7562] BugFix for estimating message accumulation
correctly (#7563)
651a5ca992 is described below
commit 651a5ca992988b90c7e4884e9975db0938557def
Author: Jixiang Jin <[email protected]>
AuthorDate: Thu Nov 16 10:16:16 2023 +0800
[ISSUE #7562] BugFix for estimating message accumulation correctly (#7563)
---
.../broker/metrics/ConsumerLagCalculator.java | 11 ++++++----
.../proxy/common/utils/FilterUtilTest.java | 25 ++++++++++++++++++++++
.../remoting/protocol/filter/FilterAPI.java | 8 +++++++
3 files changed, 40 insertions(+), 4 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index 7a5f1f765e..af08a83c7c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import
org.apache.rocketmq.remoting.protocol.subscription.SimpleSubscriptionData;
@@ -435,10 +436,12 @@ public class ConsumerLagCalculator {
if (subscriptionGroupConfig != null) {
for (SimpleSubscriptionData simpleSubscriptionData :
subscriptionGroupConfig.getSubscriptionDataSet()) {
if (topic.equals(simpleSubscriptionData.getTopic())) {
- subscriptionData = new SubscriptionData();
-
subscriptionData.setTopic(simpleSubscriptionData.getTopic());
-
subscriptionData.setExpressionType(simpleSubscriptionData.getExpressionType());
-
subscriptionData.setSubString(simpleSubscriptionData.getExpression());
+ try {
+ subscriptionData =
FilterAPI.buildSubscriptionData(simpleSubscriptionData.getTopic(),
+ simpleSubscriptionData.getExpression(),
simpleSubscriptionData.getExpressionType());
+ } catch (Exception e) {
+ LOGGER.error("Try to build subscription for
group:{}, topic:{} exception.", group, topic, e);
+ }
break;
}
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
index 23389e9d3b..7c9d84015a 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/utils/FilterUtilTest.java
@@ -48,4 +48,29 @@ public class FilterUtilTest {
assertThat(FilterUtils.isTagMatched(subscriptionData.getTagsSet(),
null)).isFalse();
}
+ @Test
+ public void testBuildSubscriptionData() throws Exception {
+ // Test case 1: expressionType is null, will use TAG as default.
+ String topic = "topic";
+ String subString = "substring";
+ String expressionType = null;
+ SubscriptionData result = FilterAPI.buildSubscriptionData(topic,
subString, expressionType);
+ assertThat(result).isNotNull();
+ assertThat(topic).isEqualTo(result.getTopic());
+ assertThat(subString).isEqualTo(result.getSubString());
+ assertThat(result.getExpressionType()).isEqualTo("TAG");
+ assertThat(result.getCodeSet().size()).isEqualTo(1);
+
+ // Test case 2: expressionType is not null
+ topic = "topic";
+ subString = "substring1||substring2";
+ expressionType = "SQL92";
+ result = FilterAPI.buildSubscriptionData(topic, subString,
expressionType);
+ assertThat(result).isNotNull();
+ assertThat(topic).isEqualTo(result.getTopic());
+ assertThat(subString).isEqualTo(result.getSubString());
+ assertThat(result.getExpressionType()).isEqualTo(expressionType);
+ assertThat(result.getCodeSet().size()).isEqualTo(2);
+ }
+
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
index 10a6bb4633..f291bfccfe 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/filter/FilterAPI.java
@@ -46,6 +46,14 @@ public class FilterAPI {
return subscriptionData;
}
+ public static SubscriptionData buildSubscriptionData(String topic, String
subString, String expressionType) throws Exception {
+ final SubscriptionData subscriptionData = buildSubscriptionData(topic,
subString);
+ if (StringUtils.isNotBlank(expressionType)) {
+ subscriptionData.setExpressionType(expressionType);
+ }
+ return subscriptionData;
+ }
+
public static SubscriptionData build(final String topic, final String
subString,
final String type) throws Exception {
if (ExpressionType.TAG.equals(type) || type == null) {