This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 06e22b4b42 [ISSUE #7765] Fix unit test testEstimateLag
06e22b4b42 is described below
commit 06e22b4b423bafd8e3f46d555e659fb72370e4f3
Author: landonchan90 <[email protected]>
AuthorDate: Wed Jan 31 17:39:30 2024 +0800
[ISSUE #7765] Fix unit test testEstimateLag
Co-authored-by: landonchan90 <[email protected]>
---
.../java/org/apache/rocketmq/test/offset/LagCalculationIT.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
b/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
index 0be18a9d33..ad521440e9 100644
--- a/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -171,6 +172,13 @@ public class LagCalculationIT extends BaseConf {
RMQSqlConsumer sqlConsumer =
ConsumerFactory.getRMQSqlConsumer(NAMESRV_ADDR, initConsumerGroup(), topic,
selector, sqlListener);
RMQBlockListener tagListener = new RMQBlockListener(true);
RMQNormalConsumer tagConsumer = getConsumer(NAMESRV_ADDR, topic, tag,
tagListener);
+
+ //init subscriptionData & consumerFilterData for sql
+ SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(topic, sql, ExpressionType.SQL92);
+ for (BrokerController controller : brokerControllerList) {
+ controller.getConsumerFilterManager().register(topic,
sqlConsumer.getConsumerGroup(), sql, ExpressionType.SQL92,
subscriptionData.getSubVersion());
+ }
+
// wait for building filter data
await().atMost(5, TimeUnit.SECONDS).until(() ->
sqlListener.isBlocked() && tagListener.isBlocked());
@@ -210,7 +218,6 @@ public class LagCalculationIT extends BaseConf {
for (MessageQueue mq : mqs) {
if
(mq.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) {
long brokerOffset =
controller.getMessageStore().getMaxOffsetInQueue(topic, mq.getQueueId());
- SubscriptionData subscriptionData =
controller.getConsumerManager().findSubscriptionData(sqlConsumer.getConsumerGroup(),
topic);
ConsumerFilterData consumerFilterData =
controller.getConsumerFilterManager().get(topic,
sqlConsumer.getConsumerGroup());
long estimateMessageCount = controller.getMessageStore()
.estimateMessageCount(topic, mq.getQueueId(), 0,
brokerOffset,