This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new e9f2ae7d feat(LitePushConsumer): support concurrent consumption for 
LitePushConsumer (#1248)
e9f2ae7d is described below

commit e9f2ae7d0e6307f3dd9dccb4afdf87fd5f7fbf4b
Author: Quan <[email protected]>
AuthorDate: Fri May 22 15:21:52 2026 +0800

    feat(LitePushConsumer): support concurrent consumption for LitePushConsumer 
(#1248)
    
    Introduce LiteStandardConsumeService to enable concurrent message
    consumption for LiteConsumer, with local retry on failure via
    eraseFifoMessage.
---
 .../java/impl/consumer/LitePushConsumerImpl.java   | 17 ++++++
 .../impl/consumer/LiteStandardConsumeService.java  | 70 ++++++++++++++++++++++
 .../java/impl/consumer/PushConsumerImpl.java       | 20 +++----
 3 files changed, 95 insertions(+), 12 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
index dbc94235..24fc45d9 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
@@ -20,14 +20,18 @@ package org.apache.rocketmq.client.java.impl.consumer;
 import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
 import apache.rocketmq.v2.Settings;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
 import org.apache.rocketmq.client.apis.consumer.OffsetOption;
 import org.apache.rocketmq.client.java.impl.ClientType;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LitePushConsumerImpl extends PushConsumerImpl implements 
LitePushConsumer {
+    private static final Logger log = 
LoggerFactory.getLogger(LitePushConsumerImpl.class);
 
     private final LiteSubscriptionManager liteSubscriptionManager;
 
@@ -83,4 +87,17 @@ public class LitePushConsumerImpl extends PushConsumerImpl 
implements LitePushCo
         return ClientType.LITE_PUSH_CONSUMER;
     }
 
+    @Override
+    protected ConsumeService createConsumeService() {
+        final ScheduledExecutorService scheduler = 
this.getClientManager().getScheduler();
+        if (getSettings().isFifo()) {
+            log.info("Create Lite FIFO consume service, consumerGroup={}, 
clientId={}, enableFifoConsumeAccelerator={}",
+                getConsumerGroup(), clientId, enableFifoConsumeAccelerator);
+            return new LiteFifoConsumeService(clientId, messageListener, 
consumptionExecutor, this,
+                scheduler, enableFifoConsumeAccelerator);
+        }
+        log.info("Create Lite standard consume service, consumerGroup={}, 
clientId={}", getConsumerGroup(), clientId);
+        return new LiteStandardConsumeService(clientId, messageListener, 
consumptionExecutor, this, scheduler);
+    }
+
 }
\ No newline at end of file
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteStandardConsumeService.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteStandardConsumeService.java
new file mode 100644
index 00000000..efbe1ab6
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteStandardConsumeService.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LiteStandardConsumeService extends ConsumeService {
+    private static final Logger log = 
LoggerFactory.getLogger(LiteStandardConsumeService.class);
+
+    public LiteStandardConsumeService(ClientId clientId, MessageListener 
messageListener,
+        ThreadPoolExecutor consumptionExecutor, MessageInterceptor 
messageInterceptor,
+        ScheduledExecutorService scheduler) {
+        super(clientId, messageListener, consumptionExecutor, 
messageInterceptor, scheduler);
+    }
+
+    @Override
+    public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
+        for (MessageViewImpl messageView : messageViews) {
+            // Discard corrupted message.
+            if (messageView.isCorrupted()) {
+                log.error("Message is corrupted for lite standard consumption, 
prepare to discard it, mq={}, "
+                    + "messageId={}, clientId={}", pq.getMessageQueue(), 
messageView.getMessageId(), clientId);
+                pq.discardMessage(messageView);
+                continue;
+            }
+            final ListenableFuture<ConsumeResult> future = 
consume(messageView);
+            Futures.addCallback(future, new FutureCallback<ConsumeResult>() {
+                @Override
+                public void onSuccess(ConsumeResult consumeResult) {
+                    // Use eraseFifoMessage for local retry on failure, 
avoiding frequent server requests.
+                    pq.eraseFifoMessage(messageView, consumeResult);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    // Should never reach here.
+                    log.error("[Bug] Exception raised in lite standard 
consumption callback, clientId={}", clientId, t);
+                }
+            }, MoreExecutors.directExecutor());
+        }
+    }
+}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 01ddd30e..002a14f4 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -89,16 +89,18 @@ import org.slf4j.LoggerFactory;
 class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
     private static final Logger log = 
LoggerFactory.getLogger(PushConsumerImpl.class);
 
+    protected final MessageListener messageListener;
+    protected final ThreadPoolExecutor consumptionExecutor;
+    protected final boolean enableFifoConsumeAccelerator;
+
     final AtomicLong consumptionOkQuantity;
     final AtomicLong consumptionErrorQuantity;
 
     private final PushSubscriptionSettings pushSubscriptionSettings;
     private final Map<String /* topic */, FilterExpression> 
subscriptionExpressions;
     private final ConcurrentMap<String /* topic */, Assignments> 
cacheAssignments;
-    private final MessageListener messageListener;
     private final int maxCacheMessageCount;
     private final int maxCacheMessageSizeInBytes;
-    private final boolean enableFifoConsumeAccelerator;
     private final boolean enableMessageInterceptorFiltering;
     private final InflightRequestCountInterceptor 
inflightRequestCountInterceptor;
 
@@ -111,7 +113,6 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
      */
     private final AtomicLong receivedMessagesQuantity;
 
-    private final ThreadPoolExecutor consumptionExecutor;
     private final ConcurrentMap<MessageQueueImpl, ProcessQueue> 
processQueueTable;
     private ConsumeService consumeService;
 
@@ -247,15 +248,10 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     protected ConsumeService createConsumeService() {
         final ScheduledExecutorService scheduler = 
this.getClientManager().getScheduler();
         if (getSettings().isFifo()) {
-            log.info("Create {}FIFO consume service, consumerGroup={}, 
clientId={}, enableFifoConsumeAccelerator={}",
-                isLiteConsumer() ? "Lite " : "", getConsumerGroup(), clientId, 
enableFifoConsumeAccelerator);
-            if (isLiteConsumer()) {
-                return new LiteFifoConsumeService(clientId, messageListener, 
consumptionExecutor, this,
-                    scheduler, enableFifoConsumeAccelerator);
-            } else {
-                return new FifoConsumeService(clientId, messageListener, 
consumptionExecutor, this,
-                    scheduler, enableFifoConsumeAccelerator);
-            }
+            log.info("Create FIFO consume service, consumerGroup={}, 
clientId={}, enableFifoConsumeAccelerator={}",
+                getConsumerGroup(), clientId, enableFifoConsumeAccelerator);
+            return new FifoConsumeService(clientId, messageListener, 
consumptionExecutor, this,
+                scheduler, enableFifoConsumeAccelerator);
         }
         log.info("Create standard consume service, consumerGroup={}, 
clientId={}", getConsumerGroup(), clientId);
         return new StandardConsumeService(clientId, messageListener, 
consumptionExecutor, this, scheduler);

Reply via email to