This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new c9fc2e3 feat: add push consumer configs (#745)
c9fc2e3 is described below
commit c9fc2e3b8cd4a9ef3d27af1b5031d33964594a94
Author: wizcraft_kris <[email protected]>
AuthorDate: Wed Jan 28 14:17:55 2026 +0800
feat: add push consumer configs (#745)
---
.../spring/annotation/RocketMQMessageListener.java | 22 +++++++++++
.../support/DefaultRocketMQListenerContainer.java | 46 ++++++++++++++++++++++
.../DefaultRocketMQListenerContainerTest.java | 5 +++
3 files changed, 73 insertions(+)
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 0c1638c..eab700f 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -18,6 +18,8 @@
package org.apache.rocketmq.spring.annotation;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@@ -173,4 +175,24 @@ public @interface RocketMQMessageListener {
* The property of "instanceName".
*/
String instanceName() default "DEFAULT";
+
+ /**
+ * Message pull Interval.
+ */
+ long pullInterval() default 0;
+
+ /**
+ * Batch pull size.
+ */
+ int pullBatchSize() default 32;
+
+ /**
+ * Batch consumption size.
+ */
+ int consumeMessageBatchMaxSize() default 1;
+
+ /**
+ * Consuming point on consumer booting.
+ */
+ ConsumeFromWhere consumeFromWhere() default
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
}
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 5f16fc2..86ca093 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.utils.MessageUtil;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -137,6 +138,10 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
private String namespace;
private String namespaceV2;
private long awaitTerminationMillisWhenShutdown;
+ private long pullInterval;
+ private int pullBatchSize;
+ private int consumeMessageBatchMaxSize;
+ private ConsumeFromWhere consumeFromWhere;
private String instanceName;
@@ -252,6 +257,10 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
this.suspendCurrentQueueTimeMillis =
anno.suspendCurrentQueueTimeMillis();
this.awaitTerminationMillisWhenShutdown = Math.max(0,
anno.awaitTerminationMillisWhenShutdown());
this.instanceName = anno.instanceName();
+ this.pullInterval = anno.pullInterval();
+ this.pullBatchSize = anno.pullBatchSize();
+ this.consumeMessageBatchMaxSize = anno.consumeMessageBatchMaxSize();
+ this.consumeFromWhere = anno.consumeFromWhere();
}
public ConsumeMode getConsumeMode() {
@@ -318,6 +327,38 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
this.instanceName = instanceName;
}
+ public ConsumeFromWhere getConsumeFromWhere() {
+ return consumeFromWhere;
+ }
+
+ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+ this.consumeFromWhere = consumeFromWhere;
+ }
+
+ public long getPullInterval() {
+ return pullInterval;
+ }
+
+ public void setPullInterval(long pullInterval) {
+ this.pullInterval = pullInterval;
+ }
+
+ public int getPullBatchSize() {
+ return pullBatchSize;
+ }
+
+ public void setPullBatchSize(int pullBatchSize) {
+ this.pullBatchSize = pullBatchSize;
+ }
+
+ public int getConsumeMessageBatchMaxSize() {
+ return consumeMessageBatchMaxSize;
+ }
+
+ public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
+ this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+ }
+
public DefaultRocketMQListenerContainer
setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
this.awaitTerminationMillisWhenShutdown =
awaitTerminationMillisWhenShutdown;
return this;
@@ -662,6 +703,11 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
consumer.setMaxReconsumeTimes(maxReconsumeTimes);
consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
consumer.setInstanceName(instanceName);
+ consumer.setPullInterval(pullInterval);
+ consumer.setPullBatchSize(pullBatchSize);
+ consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
+ consumer.setConsumeFromWhere(consumeFromWhere);
+
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.BROADCASTING);
diff --git
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 182d2fa..be1a0b8 100644
---
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -255,6 +255,11 @@ public class DefaultRocketMQListenerContainerTest {
assertEquals(anno.delayLevelWhenNextConsume(),
container.getDelayLevelWhenNextConsume());
assertEquals(anno.suspendCurrentQueueTimeMillis(),
container.getSuspendCurrentQueueTimeMillis());
assertEquals(anno.instanceName(), container.getInstanceName());
+ assertEquals(anno.pullInterval(), container.getPullInterval());
+ assertEquals(anno.pullBatchSize(), container.getPullBatchSize());
+ assertEquals(anno.consumeMessageBatchMaxSize(),
container.getConsumeMessageBatchMaxSize());
+ assertEquals(anno.awaitTerminationMillisWhenShutdown(),
container.getAwaitTerminationMillisWhenShutdown());
+ assertEquals(anno.consumeFromWhere(), container.getConsumeFromWhere());
}
@RocketMQMessageListener(consumerGroup = "abc1", topic = "test",