This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new e9f2ae7d feat(LitePushConsumer): support concurrent consumption for
LitePushConsumer (#1248)
e9f2ae7d is described below
commit e9f2ae7d0e6307f3dd9dccb4afdf87fd5f7fbf4b
Author: Quan <[email protected]>
AuthorDate: Fri May 22 15:21:52 2026 +0800
feat(LitePushConsumer): support concurrent consumption for LitePushConsumer
(#1248)
Introduce LiteStandardConsumeService to enable concurrent message
consumption for LiteConsumer, with local retry on failure via
eraseFifoMessage.
---
.../java/impl/consumer/LitePushConsumerImpl.java | 17 ++++++
.../impl/consumer/LiteStandardConsumeService.java | 70 ++++++++++++++++++++++
.../java/impl/consumer/PushConsumerImpl.java | 20 +++----
3 files changed, 95 insertions(+), 12 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
index dbc94235..24fc45d9 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LitePushConsumerImpl.java
@@ -20,14 +20,18 @@ package org.apache.rocketmq.client.java.impl.consumer;
import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
import apache.rocketmq.v2.Settings;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
import org.apache.rocketmq.client.apis.consumer.OffsetOption;
import org.apache.rocketmq.client.java.impl.ClientType;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class LitePushConsumerImpl extends PushConsumerImpl implements
LitePushConsumer {
+ private static final Logger log =
LoggerFactory.getLogger(LitePushConsumerImpl.class);
private final LiteSubscriptionManager liteSubscriptionManager;
@@ -83,4 +87,17 @@ public class LitePushConsumerImpl extends PushConsumerImpl
implements LitePushCo
return ClientType.LITE_PUSH_CONSUMER;
}
+ @Override
+ protected ConsumeService createConsumeService() {
+ final ScheduledExecutorService scheduler =
this.getClientManager().getScheduler();
+ if (getSettings().isFifo()) {
+ log.info("Create Lite FIFO consume service, consumerGroup={},
clientId={}, enableFifoConsumeAccelerator={}",
+ getConsumerGroup(), clientId, enableFifoConsumeAccelerator);
+ return new LiteFifoConsumeService(clientId, messageListener,
consumptionExecutor, this,
+ scheduler, enableFifoConsumeAccelerator);
+ }
+ log.info("Create Lite standard consume service, consumerGroup={},
clientId={}", getConsumerGroup(), clientId);
+ return new LiteStandardConsumeService(clientId, messageListener,
consumptionExecutor, this, scheduler);
+ }
+
}
\ No newline at end of file
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteStandardConsumeService.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteStandardConsumeService.java
new file mode 100644
index 00000000..efbe1ab6
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteStandardConsumeService.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LiteStandardConsumeService extends ConsumeService {
+ private static final Logger log =
LoggerFactory.getLogger(LiteStandardConsumeService.class);
+
+ public LiteStandardConsumeService(ClientId clientId, MessageListener
messageListener,
+ ThreadPoolExecutor consumptionExecutor, MessageInterceptor
messageInterceptor,
+ ScheduledExecutorService scheduler) {
+ super(clientId, messageListener, consumptionExecutor,
messageInterceptor, scheduler);
+ }
+
+ @Override
+ public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
+ for (MessageViewImpl messageView : messageViews) {
+ // Discard corrupted message.
+ if (messageView.isCorrupted()) {
+ log.error("Message is corrupted for lite standard consumption,
prepare to discard it, mq={}, "
+ + "messageId={}, clientId={}", pq.getMessageQueue(),
messageView.getMessageId(), clientId);
+ pq.discardMessage(messageView);
+ continue;
+ }
+ final ListenableFuture<ConsumeResult> future =
consume(messageView);
+ Futures.addCallback(future, new FutureCallback<ConsumeResult>() {
+ @Override
+ public void onSuccess(ConsumeResult consumeResult) {
+ // Use eraseFifoMessage for local retry on failure,
avoiding frequent server requests.
+ pq.eraseFifoMessage(messageView, consumeResult);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // Should never reach here.
+ log.error("[Bug] Exception raised in lite standard
consumption callback, clientId={}", clientId, t);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 01ddd30e..002a14f4 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -89,16 +89,18 @@ import org.slf4j.LoggerFactory;
class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
private static final Logger log =
LoggerFactory.getLogger(PushConsumerImpl.class);
+ protected final MessageListener messageListener;
+ protected final ThreadPoolExecutor consumptionExecutor;
+ protected final boolean enableFifoConsumeAccelerator;
+
final AtomicLong consumptionOkQuantity;
final AtomicLong consumptionErrorQuantity;
private final PushSubscriptionSettings pushSubscriptionSettings;
private final Map<String /* topic */, FilterExpression>
subscriptionExpressions;
private final ConcurrentMap<String /* topic */, Assignments>
cacheAssignments;
- private final MessageListener messageListener;
private final int maxCacheMessageCount;
private final int maxCacheMessageSizeInBytes;
- private final boolean enableFifoConsumeAccelerator;
private final boolean enableMessageInterceptorFiltering;
private final InflightRequestCountInterceptor
inflightRequestCountInterceptor;
@@ -111,7 +113,6 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
*/
private final AtomicLong receivedMessagesQuantity;
- private final ThreadPoolExecutor consumptionExecutor;
private final ConcurrentMap<MessageQueueImpl, ProcessQueue>
processQueueTable;
private ConsumeService consumeService;
@@ -247,15 +248,10 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
protected ConsumeService createConsumeService() {
final ScheduledExecutorService scheduler =
this.getClientManager().getScheduler();
if (getSettings().isFifo()) {
- log.info("Create {}FIFO consume service, consumerGroup={},
clientId={}, enableFifoConsumeAccelerator={}",
- isLiteConsumer() ? "Lite " : "", getConsumerGroup(), clientId,
enableFifoConsumeAccelerator);
- if (isLiteConsumer()) {
- return new LiteFifoConsumeService(clientId, messageListener,
consumptionExecutor, this,
- scheduler, enableFifoConsumeAccelerator);
- } else {
- return new FifoConsumeService(clientId, messageListener,
consumptionExecutor, this,
- scheduler, enableFifoConsumeAccelerator);
- }
+ log.info("Create FIFO consume service, consumerGroup={},
clientId={}, enableFifoConsumeAccelerator={}",
+ getConsumerGroup(), clientId, enableFifoConsumeAccelerator);
+ return new FifoConsumeService(clientId, messageListener,
consumptionExecutor, this,
+ scheduler, enableFifoConsumeAccelerator);
}
log.info("Create standard consume service, consumerGroup={},
clientId={}", getConsumerGroup(), clientId);
return new StandardConsumeService(clientId, messageListener,
consumptionExecutor, this, scheduler);