This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new c9fc2e3  feat: add push consumer configs (#745)
c9fc2e3 is described below

commit c9fc2e3b8cd4a9ef3d27af1b5031d33964594a94
Author: wizcraft_kris <[email protected]>
AuthorDate: Wed Jan 28 14:17:55 2026 +0800

    feat: add push consumer configs (#745)
---
 .../spring/annotation/RocketMQMessageListener.java | 22 +++++++++++
 .../support/DefaultRocketMQListenerContainer.java  | 46 ++++++++++++++++++++++
 .../DefaultRocketMQListenerContainerTest.java      |  5 +++
 3 files changed, 73 insertions(+)

diff --git 
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
 
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 0c1638c..eab700f 100644
--- 
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ 
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -18,6 +18,8 @@
 package org.apache.rocketmq.spring.annotation;
 
 import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
@@ -173,4 +175,24 @@ public @interface RocketMQMessageListener {
      * The property of "instanceName".
      */
     String instanceName() default "DEFAULT";
+
+    /**
+     * Message pull Interval.
+     */
+    long pullInterval() default 0;
+
+    /**
+     * Batch pull size.
+     */
+    int pullBatchSize() default 32;
+
+    /**
+     * Batch consumption size.
+     */
+    int consumeMessageBatchMaxSize() default 1;
+
+    /**
+     * Consuming point on consumer booting.
+     */
+    ConsumeFromWhere consumeFromWhere() default 
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
 }
diff --git 
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
 
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 5f16fc2..86ca093 100644
--- 
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ 
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.client.utils.MessageUtil;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -137,6 +138,10 @@ public class DefaultRocketMQListenerContainer implements 
InitializingBean,
     private String namespace;
     private String namespaceV2;
     private long awaitTerminationMillisWhenShutdown;
+    private long pullInterval;
+    private int pullBatchSize;
+    private int consumeMessageBatchMaxSize;
+    private ConsumeFromWhere consumeFromWhere;
 
     private String instanceName;
 
@@ -252,6 +257,10 @@ public class DefaultRocketMQListenerContainer implements 
InitializingBean,
         this.suspendCurrentQueueTimeMillis = 
anno.suspendCurrentQueueTimeMillis();
         this.awaitTerminationMillisWhenShutdown = Math.max(0, 
anno.awaitTerminationMillisWhenShutdown());
         this.instanceName = anno.instanceName();
+        this.pullInterval = anno.pullInterval();
+        this.pullBatchSize = anno.pullBatchSize();
+        this.consumeMessageBatchMaxSize = anno.consumeMessageBatchMaxSize();
+        this.consumeFromWhere = anno.consumeFromWhere();
     }
 
     public ConsumeMode getConsumeMode() {
@@ -318,6 +327,38 @@ public class DefaultRocketMQListenerContainer implements 
InitializingBean,
         this.instanceName = instanceName;
     }
 
+    public ConsumeFromWhere getConsumeFromWhere() {
+        return consumeFromWhere;
+    }
+
+    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+        this.consumeFromWhere = consumeFromWhere;
+    }
+
+    public long getPullInterval() {
+        return pullInterval;
+    }
+
+    public void setPullInterval(long pullInterval) {
+        this.pullInterval = pullInterval;
+    }
+
+    public int getPullBatchSize() {
+        return pullBatchSize;
+    }
+
+    public void setPullBatchSize(int pullBatchSize) {
+        this.pullBatchSize = pullBatchSize;
+    }
+
+    public int getConsumeMessageBatchMaxSize() {
+        return consumeMessageBatchMaxSize;
+    }
+
+    public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
+        this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+    }
+
     public DefaultRocketMQListenerContainer 
setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
         this.awaitTerminationMillisWhenShutdown = 
awaitTerminationMillisWhenShutdown;
         return this;
@@ -662,6 +703,11 @@ public class DefaultRocketMQListenerContainer implements 
InitializingBean,
         consumer.setMaxReconsumeTimes(maxReconsumeTimes);
         
consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
         consumer.setInstanceName(instanceName);
+        consumer.setPullInterval(pullInterval);
+        consumer.setPullBatchSize(pullBatchSize);
+        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
+        consumer.setConsumeFromWhere(consumeFromWhere);
+
         switch (messageModel) {
             case BROADCASTING:
                 
consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.BROADCASTING);
diff --git 
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
 
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 182d2fa..be1a0b8 100644
--- 
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++ 
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -255,6 +255,11 @@ public class DefaultRocketMQListenerContainerTest {
         assertEquals(anno.delayLevelWhenNextConsume(), 
container.getDelayLevelWhenNextConsume());
         assertEquals(anno.suspendCurrentQueueTimeMillis(), 
container.getSuspendCurrentQueueTimeMillis());
         assertEquals(anno.instanceName(), container.getInstanceName());
+        assertEquals(anno.pullInterval(), container.getPullInterval());
+        assertEquals(anno.pullBatchSize(), container.getPullBatchSize());
+        assertEquals(anno.consumeMessageBatchMaxSize(), 
container.getConsumeMessageBatchMaxSize());
+        assertEquals(anno.awaitTerminationMillisWhenShutdown(), 
container.getAwaitTerminationMillisWhenShutdown());
+        assertEquals(anno.consumeFromWhere(), container.getConsumeFromWhere());
     }
 
     @RocketMQMessageListener(consumerGroup = "abc1", topic = "test",

Reply via email to