This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2930994a3 [ISSUE #4584] Add new persist method to update consume 
offset to remote server.
2930994a3 is described below

commit 2930994a37e065b946977d74f4ad3966f8963267
Author: dinglei <[email protected]>
AuthorDate: Tue Aug 23 13:57:35 2022 +0800

    [ISSUE #4584] Add new persist method to update consume offset to remote 
server.
---
 .../client/consumer/DefaultMQPullConsumer.java       | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 48e7a3ace..2747fabbb 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -36,10 +36,8 @@ import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
 /**
- * @deprecated
- * Default pulling consumer.
- * This class will be removed in 2022, and a better implementation {@link 
DefaultLitePullConsumer} is recommend to use
- * in the scenario of actively pulling messages.
+ * @deprecated Default pulling consumer. This class will be removed in 2022, 
and a better implementation {@link
+ * DefaultLitePullConsumer} is recommend to use in the scenario of actively 
pulling messages.
  */
 @Deprecated
 public class DefaultMQPullConsumer extends ClientConfig implements 
MQPullConsumer {
@@ -109,6 +107,7 @@ public class DefaultMQPullConsumer extends ClientConfig 
implements MQPullConsume
     public DefaultMQPullConsumer(final String namespace, final String 
consumerGroup) {
         this(namespace, consumerGroup, null);
     }
+
     /**
      * Constructor specifying namespace, consumer group and RPC hook.
      *
@@ -127,7 +126,8 @@ public class DefaultMQPullConsumer extends ClientConfig 
implements MQPullConsume
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, 
Map<String, String> attributes) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum,
+        Map<String, String> attributes) throws MQClientException {
         createTopic(key, withNamespace(newTopic), queueNum, 0, null);
     }
 
@@ -136,7 +136,8 @@ public class DefaultMQPullConsumer extends ClientConfig 
implements MQPullConsume
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag, Map<String, String> attributes) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag,
+        Map<String, String> attributes) throws MQClientException {
         this.defaultMQPullConsumerImpl.createTopic(key, 
withNamespace(newTopic), queueNum, topicSysFlag);
     }
 
@@ -352,7 +353,8 @@ public class DefaultMQPullConsumer extends ClientConfig 
implements MQPullConsume
     }
 
     @Override
-    public void pull(MessageQueue mq, String subExpression, long offset, int 
maxNums, int maxSize, PullCallback pullCallback,
+    public void pull(MessageQueue mq, String subExpression, long offset, int 
maxNums, int maxSize,
+        PullCallback pullCallback,
         long timeout)
         throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, 
maxNums, maxSize, pullCallback, timeout);
@@ -460,4 +462,8 @@ public class DefaultMQPullConsumer extends ClientConfig 
implements MQPullConsume
     public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
         this.maxReconsumeTimes = maxReconsumeTimes;
     }
+
+    public void persist(MessageQueue mq) {
+        this.getOffsetStore().persist(queueWithNamespace(mq));
+    }
 }

Reply via email to