This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d1974c5535 [ISSUE #8269] Support pop consumption filter in long
polling service (#8271)
d1974c5535 is described below
commit d1974c55353488095e5122f5ce361c150611f21a
Author: lizhimins <[email protected]>
AuthorDate: Thu Jun 6 20:19:06 2024 +0800
[ISSUE #8269] Support pop consumption filter in long polling service (#8271)
---
.../longpolling/NotifyMessageArrivingListener.java | 11 ++++--
.../broker/longpolling/PopLongPollingService.java | 44 ++++++++++++++++++----
.../rocketmq/broker/longpolling/PopRequest.java | 25 +++++++++---
.../broker/processor/AckMessageProcessor.java | 13 +++----
.../broker/processor/NotificationProcessor.java | 11 +++++-
.../broker/processor/PopMessageProcessor.java | 40 ++++++++++++++------
6 files changed, 107 insertions(+), 37 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
index e55ed2778a..1ddb9f4f8e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -36,9 +36,12 @@ public class NotifyMessageArrivingListener implements
MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long
tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
- this.pullRequestHoldService.notifyMessageArriving(topic, queueId,
logicOffset, tagsCode,
- msgStoreTime, filterBitMap, properties);
- this.popMessageProcessor.notifyMessageArriving(topic, queueId);
- this.notificationProcessor.notifyMessageArriving(topic, queueId);
+
+ this.pullRequestHoldService.notifyMessageArriving(
+ topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap,
properties);
+ this.popMessageProcessor.notifyMessageArriving(
+ topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+ this.notificationProcessor.notifyMessageArriving(
+ topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index a768fe4b9c..b5179114f3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -35,6 +35,9 @@ import
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.MessageFilter;
import static org.apache.rocketmq.broker.longpolling.PollingResult.NOT_POLLING;
import static
org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_FULL;
@@ -147,39 +150,61 @@ public class PopLongPollingService extends ServiceThread {
}
public void notifyMessageArrivingWithRetryTopic(final String topic, final
int queueId) {
+ this.notifyMessageArrivingWithRetryTopic(topic, queueId, null, 0L,
null, null);
+ }
+
+ public void notifyMessageArrivingWithRetryTopic(final String topic, final
int queueId,
+ Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
String notifyTopic;
if (KeyBuilder.isPopRetryTopicV2(topic)) {
notifyTopic = KeyBuilder.parseNormalTopic(topic);
} else {
notifyTopic = topic;
}
- notifyMessageArriving(notifyTopic, queueId);
+ notifyMessageArriving(notifyTopic, queueId, tagsCode, msgStoreTime,
filterBitMap, properties);
}
- public void notifyMessageArriving(final String topic, final int queueId) {
+ public void notifyMessageArriving(final String topic, final int queueId,
+ Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
if (cids == null) {
return;
}
for (Map.Entry<String, Byte> cid : cids.entrySet()) {
if (queueId >= 0) {
- notifyMessageArriving(topic, cid.getKey(), -1);
+ notifyMessageArriving(topic, -1, cid.getKey(), tagsCode,
msgStoreTime, filterBitMap, properties);
}
- notifyMessageArriving(topic, cid.getKey(), queueId);
+ notifyMessageArriving(topic, queueId, cid.getKey(), tagsCode,
msgStoreTime, filterBitMap, properties);
}
}
- public boolean notifyMessageArriving(final String topic, final String cid,
final int queueId) {
+ public boolean notifyMessageArriving(final String topic, final int
queueId, final String cid,
+ Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
ConcurrentSkipListSet<PopRequest> remotingCommands =
pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId));
if (remotingCommands == null || remotingCommands.isEmpty()) {
return false;
}
+
PopRequest popRequest = pollRemotingCommands(remotingCommands);
if (popRequest == null) {
return false;
}
+
+ if (popRequest.getMessageFilter() != null &&
popRequest.getSubscriptionData() != null) {
+ boolean match =
popRequest.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
+ new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime,
filterBitMap));
+ if (match && properties != null) {
+ match =
popRequest.getMessageFilter().isMatchedByCommitLog(null, properties);
+ }
+ if (!match) {
+ remotingCommands.add(popRequest);
+ totalPollingNum.incrementAndGet();
+ return false;
+ }
+ }
+
if (brokerController.getBrokerConfig().isEnablePopLog()) {
- POP_LOGGER.info("lock release , new msg arrive , wakeUp : {}",
popRequest);
+ POP_LOGGER.info("lock release, new msg arrive, wakeUp: {}",
popRequest);
}
return wakeUp(popRequest);
}
@@ -221,6 +246,11 @@ public class PopLongPollingService extends ServiceThread {
*/
public PollingResult polling(final ChannelHandlerContext ctx,
RemotingCommand remotingCommand,
final PollingHeader requestHeader) {
+ return this.polling(ctx, remotingCommand, requestHeader, null, null);
+ }
+
+ public PollingResult polling(final ChannelHandlerContext ctx,
RemotingCommand remotingCommand,
+ final PollingHeader requestHeader, SubscriptionData subscriptionData,
MessageFilter messageFilter) {
if (requestHeader.getPollTime() <= 0 || this.isStopped()) {
return NOT_POLLING;
}
@@ -234,7 +264,7 @@ public class PopLongPollingService extends ServiceThread {
}
cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
long expired = requestHeader.getBornTime() +
requestHeader.getPollTime();
- final PopRequest request = new PopRequest(remotingCommand, ctx,
expired);
+ final PopRequest request = new PopRequest(remotingCommand, ctx,
expired, subscriptionData, messageFilter);
boolean isFull = totalPollingNum.get() >=
this.brokerController.getBrokerConfig().getMaxPopPollingSize();
if (isFull) {
POP_LOGGER.info("polling {}, result POLLING_FULL, total:{}",
remotingCommand, totalPollingNum.get());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java
index a45bcce9f6..0419dbf637 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java
@@ -16,28 +16,35 @@
*/
package org.apache.rocketmq.broker.longpolling;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.Comparator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-import io.netty.channel.Channel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.MessageFilter;
public class PopRequest {
private static final AtomicLong COUNTER = new AtomicLong(Long.MIN_VALUE);
private final RemotingCommand remotingCommand;
private final ChannelHandlerContext ctx;
- private final long expired;
private final AtomicBoolean complete = new AtomicBoolean(false);
private final long op = COUNTER.getAndIncrement();
- public PopRequest(RemotingCommand remotingCommand, ChannelHandlerContext
ctx, long expired) {
+ private final long expired;
+ private final SubscriptionData subscriptionData;
+ private final MessageFilter messageFilter;
+
+ public PopRequest(RemotingCommand remotingCommand, ChannelHandlerContext
ctx,
+ long expired, SubscriptionData subscriptionData, MessageFilter
messageFilter) {
+
this.ctx = ctx;
this.remotingCommand = remotingCommand;
this.expired = expired;
+ this.subscriptionData = subscriptionData;
+ this.messageFilter = messageFilter;
}
public Channel getChannel() {
@@ -64,6 +71,14 @@ public class PopRequest {
return expired;
}
+ public SubscriptionData getSubscriptionData() {
+ return subscriptionData;
+ }
+
+ public MessageFilter getMessageFilter() {
+ return messageFilter;
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("PopRequest{");
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 9a56498632..6f7b7e8a24 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -297,15 +297,12 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
qId, ackOffset,
popTime);
if (nextOffset > -1) {
- if
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
- topic, consumeGroup, qId)) {
-
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
- consumeGroup, topic, qId, nextOffset);
+ if
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(topic,
consumeGroup, qId)) {
+
this.brokerController.getConsumerOffsetManager().commitOffset(
+ channel.remoteAddress().toString(), consumeGroup,
topic, qId, nextOffset);
}
- if
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
- consumeGroup, qId, invisibleTime)) {
-
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
- topic, consumeGroup, qId);
+ if
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
consumeGroup, qId, invisibleTime)) {
+
this.brokerController.getPopMessageProcessor().notifyMessageArriving(topic,
qId, consumeGroup);
}
} else if (nextOffset == -1) {
String errorInfo = String.format("offset is illegal, key:%s,
old:%d, commit:%d, next:%d, %s",
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 6447500cbe..c82725fe1e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.util.Map;
import java.util.Objects;
import java.util.Random;
import org.apache.rocketmq.broker.BrokerController;
@@ -58,8 +59,16 @@ public class NotificationProcessor implements
NettyRequestProcessor {
return false;
}
+ // When a new message is written to CommitLog, this method would be called.
+ // Suspended long polling will receive notification and be wakeup.
+ public void notifyMessageArriving(final String topic, final int queueId,
+ Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
+ this.popLongPollingService.notifyMessageArrivingWithRetryTopic(
+ topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+ }
+
public void notifyMessageArriving(final String topic, final int queueId) {
- popLongPollingService.notifyMessageArrivingWithRetryTopic(topic,
queueId);
+ this.popLongPollingService.notifyMessageArrivingWithRetryTopic(topic,
queueId);
}
@Override
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 93c04a1b8d..3df4bec984 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -26,6 +26,7 @@ import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Random;
@@ -167,15 +168,23 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
public void notifyLongPollingRequestIfNeed(String topic, String group, int
queueId) {
+ this.notifyLongPollingRequestIfNeed(
+ topic, group, queueId, null, 0L, null, null);
+ }
+
+ public void notifyLongPollingRequestIfNeed(String topic, String group, int
queueId,
+ Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
long popBufferOffset =
this.brokerController.getPopMessageProcessor().getPopBufferMergeService().getLatestOffset(topic,
group, queueId);
long consumerOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic,
queueId);
long maxOffset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
long offset = Math.max(popBufferOffset, consumerOffset);
if (maxOffset > offset) {
- boolean notifySuccess =
popLongPollingService.notifyMessageArriving(topic, group, -1);
+ boolean notifySuccess =
popLongPollingService.notifyMessageArriving(
+ topic, -1, group, tagsCode, msgStoreTime, filterBitMap,
properties);
if (!notifySuccess) {
// notify pop queue
- notifySuccess =
popLongPollingService.notifyMessageArriving(topic, group, queueId);
+ notifySuccess = popLongPollingService.notifyMessageArriving(
+ topic, queueId, group, tagsCode, msgStoreTime,
filterBitMap, properties);
}
this.brokerController.getNotificationProcessor().notifyMessageArriving(topic,
queueId);
if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -185,12 +194,15 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
}
- public void notifyMessageArriving(final String topic, final int queueId) {
- popLongPollingService.notifyMessageArrivingWithRetryTopic(topic,
queueId);
+ public void notifyMessageArriving(final String topic, final int queueId,
+ Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
+ popLongPollingService.notifyMessageArrivingWithRetryTopic(
+ topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
}
- public boolean notifyMessageArriving(final String topic, final String cid,
final int queueId) {
- return popLongPollingService.notifyMessageArriving(topic, cid,
queueId);
+ public void notifyMessageArriving(final String topic, final int queueId,
final String cid) {
+ popLongPollingService.notifyMessageArriving(
+ topic, queueId, cid, null, 0L, null, null);
}
@Override
@@ -292,10 +304,11 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+ SubscriptionData subscriptionData = null;
ExpressionMessageFilter messageFilter = null;
- if (requestHeader.getExp() != null && requestHeader.getExp().length()
> 0) {
+ if (requestHeader.getExp() != null &&
!requestHeader.getExp().isEmpty()) {
try {
- SubscriptionData subscriptionData =
FilterAPI.build(requestHeader.getTopic(), requestHeader.getExp(),
requestHeader.getExpType());
+ subscriptionData = FilterAPI.build(requestHeader.getTopic(),
requestHeader.getExp(), requestHeader.getExpType());
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), subscriptionData);
@@ -329,7 +342,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
} else {
try {
- SubscriptionData subscriptionData =
FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG);
+ subscriptionData = FilterAPI.build(requestHeader.getTopic(),
"*", ExpressionType.TAG);
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), subscriptionData);
@@ -403,17 +416,20 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
final RemotingCommand finalResponse = response;
+ SubscriptionData finalSubscriptionData = subscriptionData;
getMessageFuture.thenApply(restNum -> {
if (!getMessageResult.getMessageBufferList().isEmpty()) {
finalResponse.setCode(ResponseCode.SUCCESS);
getMessageResult.setStatus(GetMessageStatus.FOUND);
if (restNum > 0) {
// all queue pop can not notify specified queue pop, and
vice versa
-
popLongPollingService.notifyMessageArriving(requestHeader.getTopic(),
requestHeader.getConsumerGroup(),
- requestHeader.getQueueId());
+ popLongPollingService.notifyMessageArriving(
+ requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getConsumerGroup(),
+ null, 0L, null, null);
}
} else {
- PollingResult pollingResult =
popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader));
+ PollingResult pollingResult = popLongPollingService.polling(
+ ctx, request, new PollingHeader(requestHeader),
finalSubscriptionData, finalMessageFilter);
if (PollingResult.POLLING_SUC == pollingResult) {
return null;
} else if (PollingResult.POLLING_FULL == pollingResult) {