This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 8905f83d [ISSUE #1081] [Java] Support message filtering in interceptors with proper acknowledgment handling (#1082) 8905f83d is described below commit 8905f83daf6f9cb26e1d283d86239ff907d2527d Author: bcaw-ofeer <65269125+bcaw-of...@users.noreply.github.com> AuthorDate: Thu Aug 28 16:37:30 2025 +0800 [ISSUE #1081] [Java] Support message filtering in interceptors with proper acknowledgment handling (#1082) Co-authored-by: xiaoying.ly <xiaoying...@alibaba-inc.com> Co-authored-by: terrance.lzm <terrance....@alibaba-inc.com> --- .../client/apis/consumer/PushConsumerBuilder.java | 9 +++ .../java/impl/consumer/ProcessQueueImpl.java | 64 +++++++++++++++++++--- .../impl/consumer/PushConsumerBuilderImpl.java | 11 +++- .../java/impl/consumer/PushConsumerImpl.java | 16 +++++- .../java/impl/consumer/ReceiveMessageResult.java | 5 ++ protos | 1 - 6 files changed, 96 insertions(+), 10 deletions(-) diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java index 73677a7d..93f4c67b 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java @@ -90,6 +90,15 @@ public interface PushConsumerBuilder { */ PushConsumerBuilder setEnableFifoConsumeAccelerator(boolean enableFifoConsumeAccelerator); + /** + * Enable or disable message interceptor filtering functionality. + * When enabled, it supports client-side message filtering by message interceptors. + * + * @param enableMessageInterceptorFiltering whether to enable message interceptor filtering + * @return the consumer builder instance. + */ + PushConsumerBuilder setEnableMessageInterceptorFiltering(boolean enableMessageInterceptorFiltering); + /** * Finalize the build of {@link PushConsumer} and start. * diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java index 7a1fc404..6e9146db 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java @@ -37,6 +37,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -256,13 +257,62 @@ class ProcessQueueImpl implements ProcessQueue { new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK); consumer.doAfter(context0, generalMessages); - try { - onReceiveMessageResult(result); - } catch (Throwable t) { - // Should never reach here. - log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, " - + "clientId={}", mq, endpoints, clientId, t); - onReceiveMessageException(t, attemptId); + // Only perform message filtering when enableMessageInterceptorFiltering is enabled. + if (consumer.isEnableMessageInterceptorFiltering()) { + final List<MessageViewImpl> originalMessages = + new ArrayList<>(result.getMessageViewImpls()); + + final Set<MessageId> filteredMessageIds = generalMessages.stream() + .filter(msg -> msg.getMessageId().isPresent()) + .map(msg -> msg.getMessageId().get()) + .collect(Collectors.toSet()); + + final List<MessageViewImpl> filteredOutMessages = new ArrayList<>(); + final List<MessageViewImpl> remainingMessages = new ArrayList<>(); + + for (MessageViewImpl originalMsg : originalMessages) { + if (filteredMessageIds.contains(originalMsg.getMessageId())) { + remainingMessages.add(originalMsg); + } else { + filteredOutMessages.add(originalMsg); + } + } + + // Ack filtered out messages. + if (!filteredOutMessages.isEmpty()) { + log.info("Acking {} filtered out messages by interceptor, mq={}, clientId={}", + filteredOutMessages.size(), mq, consumer.getClientId()); + + for (MessageViewImpl filteredOutMsg : filteredOutMessages) { + ListenableFuture<Void> ackFuture = ackMessage(filteredOutMsg); + ackFuture.addListener(() -> { + log.debug("Successfully acked filtered out message, messageId={}, topic={}", + filteredOutMsg.getMessageId(), filteredOutMsg.getTopic()); + }, MoreExecutors.directExecutor()); + } + } + + try { + // Create new ReceiveMessageResult with filtered messages. + ReceiveMessageResult filteredResult = + ReceiveMessageResult.createFilteredResult(result, remainingMessages); + onReceiveMessageResult(filteredResult); + } catch (Throwable t) { + // Should never reach here. + log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, " + + "clientId={}", mq, endpoints, clientId, t); + onReceiveMessageException(t, attemptId); + } + } else { + // When filtering is disabled, use original result directly to avoid performance overhead. + try { + onReceiveMessageResult(result); + } catch (Throwable t) { + // Should never reach here. + log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, " + + "clientId={}", mq, endpoints, clientId, t); + onReceiveMessageException(t, attemptId); + } } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java index b9171c18..0d833bfc 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java @@ -42,6 +42,7 @@ public class PushConsumerBuilderImpl implements PushConsumerBuilder { private int maxCacheMessageSizeInBytes = 64 * 1024 * 1024; private int consumptionThreadCount = 20; private boolean enableFifoConsumeAccelerator = false; + private boolean enableMessageInterceptorFiltering = false; /** * @see PushConsumerBuilder#setClientConfiguration(ClientConfiguration) @@ -123,6 +124,14 @@ public class PushConsumerBuilderImpl implements PushConsumerBuilder { return this; } + /** + * @see PushConsumerBuilder#setEnableMessageInterceptorFiltering(boolean) + */ + public PushConsumerBuilder setEnableMessageInterceptorFiltering(boolean enableMessageInterceptorFiltering) { + this.enableMessageInterceptorFiltering = enableMessageInterceptorFiltering; + return this; + } + /** * @see PushConsumerBuilder#build() */ @@ -134,7 +143,7 @@ public class PushConsumerBuilderImpl implements PushConsumerBuilder { checkArgument(!subscriptionExpressions.isEmpty(), "subscriptionExpressions have not been set yet"); final PushConsumerImpl pushConsumer = new PushConsumerImpl(clientConfiguration, consumerGroup, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, - consumptionThreadCount, enableFifoConsumeAccelerator); + consumptionThreadCount, enableFifoConsumeAccelerator, enableMessageInterceptorFiltering); pushConsumer.startAsync().awaitRunning(); return pushConsumer; } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java index 62c756db..c43c3d27 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java @@ -104,6 +104,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer { private final int maxCacheMessageCount; private final int maxCacheMessageSizeInBytes; private final boolean enableFifoConsumeAccelerator; + private final boolean enableMessageInterceptorFiltering; private final InflightRequestCountInterceptor inflightRequestCountInterceptor; /** @@ -129,6 +130,14 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer { Map<String, FilterExpression> subscriptionExpressions, MessageListener messageListener, int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount, boolean enableFifoConsumeAccelerator) { + this(clientConfiguration, consumerGroup, subscriptionExpressions, messageListener, maxCacheMessageCount, + maxCacheMessageSizeInBytes, consumptionThreadCount, enableFifoConsumeAccelerator, false); + } + + public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, + Map<String, FilterExpression> subscriptionExpressions, MessageListener messageListener, + int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount, + boolean enableFifoConsumeAccelerator, boolean enableMessageInterceptorFiltering) { super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet()); this.clientConfiguration = clientConfiguration; Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup); @@ -141,6 +150,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer { this.maxCacheMessageCount = maxCacheMessageCount; this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; this.enableFifoConsumeAccelerator = enableFifoConsumeAccelerator; + this.enableMessageInterceptorFiltering = enableMessageInterceptorFiltering; this.receptionTimes = new AtomicLong(0); this.receivedMessagesQuantity = new AtomicLong(0); @@ -165,7 +175,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer { Map<String, FilterExpression> subscriptionExpressions, MessageListener messageListener, int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) { this(clientConfiguration, consumerGroup, subscriptionExpressions, messageListener, maxCacheMessageCount, - maxCacheMessageSizeInBytes, consumptionThreadCount, true); + maxCacheMessageSizeInBytes, consumptionThreadCount, true, false); } @Override @@ -626,4 +636,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer { public ThreadPoolExecutor getConsumptionExecutor() { return consumptionExecutor; } + + public boolean isEnableMessageInterceptorFiltering() { + return enableMessageInterceptorFiltering; + } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java index ab11ff93..0b224858 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java @@ -32,6 +32,11 @@ public class ReceiveMessageResult { this.messages = messages; } + public static ReceiveMessageResult createFilteredResult(ReceiveMessageResult original, + List<MessageViewImpl> filteredMessages) { + return new ReceiveMessageResult(original.getEndpoints(), new ArrayList<>(filteredMessages)); + } + public List<MessageView> getMessageViews() { return new ArrayList<>(messages); } diff --git a/protos b/protos deleted file mode 160000 index 5c9f8419..00000000 --- a/protos +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 5c9f84199bffa79b2ed73beb37774ca92e749c19