This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 8905f83d [ISSUE #1081] [Java] Support message filtering in 
interceptors with proper acknowledgment handling (#1082)
8905f83d is described below

commit 8905f83daf6f9cb26e1d283d86239ff907d2527d
Author: bcaw-ofeer <65269125+bcaw-of...@users.noreply.github.com>
AuthorDate: Thu Aug 28 16:37:30 2025 +0800

    [ISSUE #1081] [Java] Support message filtering in interceptors with proper 
acknowledgment handling (#1082)
    
    Co-authored-by: xiaoying.ly <xiaoying...@alibaba-inc.com>
    Co-authored-by: terrance.lzm <terrance....@alibaba-inc.com>
---
 .../client/apis/consumer/PushConsumerBuilder.java  |  9 +++
 .../java/impl/consumer/ProcessQueueImpl.java       | 64 +++++++++++++++++++---
 .../impl/consumer/PushConsumerBuilderImpl.java     | 11 +++-
 .../java/impl/consumer/PushConsumerImpl.java       | 16 +++++-
 .../java/impl/consumer/ReceiveMessageResult.java   |  5 ++
 protos                                             |  1 -
 6 files changed, 96 insertions(+), 10 deletions(-)

diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
index 73677a7d..93f4c67b 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
@@ -90,6 +90,15 @@ public interface PushConsumerBuilder {
      */
     PushConsumerBuilder setEnableFifoConsumeAccelerator(boolean 
enableFifoConsumeAccelerator);
 
+    /**
+     * Enable or disable message interceptor filtering functionality.
+     * When enabled, it supports client-side message filtering by message 
interceptors.
+     *
+     * @param enableMessageInterceptorFiltering whether to enable message 
interceptor filtering
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setEnableMessageInterceptorFiltering(boolean 
enableMessageInterceptorFiltering);
+
     /**
      * Finalize the build of {@link PushConsumer} and start.
      *
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 7a1fc404..6e9146db 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -37,6 +37,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -256,13 +257,62 @@ class ProcessQueueImpl implements ProcessQueue {
                             new MessageInterceptorContextImpl(context, 
MessageHookPointsStatus.OK);
                         consumer.doAfter(context0, generalMessages);
 
-                        try {
-                            onReceiveMessageResult(result);
-                        } catch (Throwable t) {
-                            // Should never reach here.
-                            log.error("[Bug] Exception raised while handling 
receive result, mq={}, endpoints={}, "
-                                + "clientId={}", mq, endpoints, clientId, t);
-                            onReceiveMessageException(t, attemptId);
+                        // Only perform message filtering when 
enableMessageInterceptorFiltering is enabled.
+                        if (consumer.isEnableMessageInterceptorFiltering()) {
+                            final List<MessageViewImpl> originalMessages =
+                                new ArrayList<>(result.getMessageViewImpls());
+
+                            final Set<MessageId> filteredMessageIds = 
generalMessages.stream()
+                                .filter(msg -> msg.getMessageId().isPresent())
+                                .map(msg -> msg.getMessageId().get())
+                                .collect(Collectors.toSet());
+
+                            final List<MessageViewImpl> filteredOutMessages = 
new ArrayList<>();
+                            final List<MessageViewImpl> remainingMessages = 
new ArrayList<>();
+
+                            for (MessageViewImpl originalMsg : 
originalMessages) {
+                                if 
(filteredMessageIds.contains(originalMsg.getMessageId())) {
+                                    remainingMessages.add(originalMsg);
+                                } else {
+                                    filteredOutMessages.add(originalMsg);
+                                }
+                            }
+
+                            // Ack filtered out messages.
+                            if (!filteredOutMessages.isEmpty()) {
+                                log.info("Acking {} filtered out messages by 
interceptor, mq={}, clientId={}",
+                                    filteredOutMessages.size(), mq, 
consumer.getClientId());
+
+                                for (MessageViewImpl filteredOutMsg : 
filteredOutMessages) {
+                                    ListenableFuture<Void> ackFuture = 
ackMessage(filteredOutMsg);
+                                    ackFuture.addListener(() -> {
+                                        log.debug("Successfully acked filtered 
out message, messageId={}, topic={}",
+                                            filteredOutMsg.getMessageId(), 
filteredOutMsg.getTopic());
+                                    }, MoreExecutors.directExecutor());
+                                }
+                            }
+
+                            try {
+                                // Create new ReceiveMessageResult with 
filtered messages.
+                                ReceiveMessageResult filteredResult =
+                                    
ReceiveMessageResult.createFilteredResult(result, remainingMessages);
+                                onReceiveMessageResult(filteredResult);
+                            } catch (Throwable t) {
+                                // Should never reach here.
+                                log.error("[Bug] Exception raised while 
handling receive result, mq={}, endpoints={}, "
+                                    + "clientId={}", mq, endpoints, clientId, 
t);
+                                onReceiveMessageException(t, attemptId);
+                            }
+                        } else {
+                            // When filtering is disabled, use original result 
directly to avoid performance overhead.
+                            try {
+                                onReceiveMessageResult(result);
+                            } catch (Throwable t) {
+                                // Should never reach here.
+                                log.error("[Bug] Exception raised while 
handling receive result, mq={}, endpoints={}, "
+                                    + "clientId={}", mq, endpoints, clientId, 
t);
+                                onReceiveMessageException(t, attemptId);
+                            }
                         }
                     }
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
index b9171c18..0d833bfc 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
@@ -42,6 +42,7 @@ public class PushConsumerBuilderImpl implements 
PushConsumerBuilder {
     private int maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
     private int consumptionThreadCount = 20;
     private boolean enableFifoConsumeAccelerator = false;
+    private boolean enableMessageInterceptorFiltering = false;
 
     /**
      * @see PushConsumerBuilder#setClientConfiguration(ClientConfiguration)
@@ -123,6 +124,14 @@ public class PushConsumerBuilderImpl implements 
PushConsumerBuilder {
         return this;
     }
 
+    /**
+     * @see PushConsumerBuilder#setEnableMessageInterceptorFiltering(boolean)
+     */
+    public PushConsumerBuilder setEnableMessageInterceptorFiltering(boolean 
enableMessageInterceptorFiltering) {
+        this.enableMessageInterceptorFiltering = 
enableMessageInterceptorFiltering;
+        return this;
+    }
+
     /**
      * @see PushConsumerBuilder#build()
      */
@@ -134,7 +143,7 @@ public class PushConsumerBuilderImpl implements 
PushConsumerBuilder {
         checkArgument(!subscriptionExpressions.isEmpty(), 
"subscriptionExpressions have not been set yet");
         final PushConsumerImpl pushConsumer = new 
PushConsumerImpl(clientConfiguration, consumerGroup,
             subscriptionExpressions, messageListener, maxCacheMessageCount, 
maxCacheMessageSizeInBytes,
-            consumptionThreadCount, enableFifoConsumeAccelerator);
+            consumptionThreadCount, enableFifoConsumeAccelerator, 
enableMessageInterceptorFiltering);
         pushConsumer.startAsync().awaitRunning();
         return pushConsumer;
     }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 62c756db..c43c3d27 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -104,6 +104,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     private final int maxCacheMessageCount;
     private final int maxCacheMessageSizeInBytes;
     private final boolean enableFifoConsumeAccelerator;
+    private final boolean enableMessageInterceptorFiltering;
     private final InflightRequestCountInterceptor 
inflightRequestCountInterceptor;
 
     /**
@@ -129,6 +130,14 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
         Map<String, FilterExpression> subscriptionExpressions, MessageListener 
messageListener,
         int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int 
consumptionThreadCount,
         boolean enableFifoConsumeAccelerator) {
+        this(clientConfiguration, consumerGroup, subscriptionExpressions, 
messageListener, maxCacheMessageCount,
+            maxCacheMessageSizeInBytes, consumptionThreadCount, 
enableFifoConsumeAccelerator, false);
+    }
+
+    public PushConsumerImpl(ClientConfiguration clientConfiguration, String 
consumerGroup,
+        Map<String, FilterExpression> subscriptionExpressions, MessageListener 
messageListener,
+        int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int 
consumptionThreadCount,
+        boolean enableFifoConsumeAccelerator, boolean 
enableMessageInterceptorFiltering) {
         super(clientConfiguration, consumerGroup, 
subscriptionExpressions.keySet());
         this.clientConfiguration = clientConfiguration;
         Resource groupResource = new 
Resource(clientConfiguration.getNamespace(), consumerGroup);
@@ -141,6 +150,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
         this.maxCacheMessageCount = maxCacheMessageCount;
         this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
         this.enableFifoConsumeAccelerator = enableFifoConsumeAccelerator;
+        this.enableMessageInterceptorFiltering = 
enableMessageInterceptorFiltering;
 
         this.receptionTimes = new AtomicLong(0);
         this.receivedMessagesQuantity = new AtomicLong(0);
@@ -165,7 +175,7 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
         Map<String, FilterExpression> subscriptionExpressions, MessageListener 
messageListener,
         int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int 
consumptionThreadCount) {
         this(clientConfiguration, consumerGroup, subscriptionExpressions, 
messageListener, maxCacheMessageCount,
-            maxCacheMessageSizeInBytes, consumptionThreadCount, true);
+            maxCacheMessageSizeInBytes, consumptionThreadCount, true, false);
     }
 
     @Override
@@ -626,4 +636,8 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     public ThreadPoolExecutor getConsumptionExecutor() {
         return consumptionExecutor;
     }
+
+    public boolean isEnableMessageInterceptorFiltering() {
+        return enableMessageInterceptorFiltering;
+    }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index ab11ff93..0b224858 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -32,6 +32,11 @@ public class ReceiveMessageResult {
         this.messages = messages;
     }
 
+    public static ReceiveMessageResult 
createFilteredResult(ReceiveMessageResult original,
+        List<MessageViewImpl> filteredMessages) {
+        return new ReceiveMessageResult(original.getEndpoints(), new 
ArrayList<>(filteredMessages));
+    }
+
     public List<MessageView> getMessageViews() {
         return new ArrayList<>(messages);
     }
diff --git a/protos b/protos
deleted file mode 160000
index 5c9f8419..00000000
--- a/protos
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 5c9f84199bffa79b2ed73beb37774ca92e749c19

Reply via email to