This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2930994a3 [ISSUE #4584] Add new persist method to update consume
offset to remote server.
2930994a3 is described below
commit 2930994a37e065b946977d74f4ad3966f8963267
Author: dinglei <[email protected]>
AuthorDate: Tue Aug 23 13:57:35 2022 +0800
[ISSUE #4584] Add new persist method to update consume offset to remote
server.
---
.../client/consumer/DefaultMQPullConsumer.java | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 48e7a3ace..2747fabbb 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -36,10 +36,8 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
- * @deprecated
- * Default pulling consumer.
- * This class will be removed in 2022, and a better implementation {@link
DefaultLitePullConsumer} is recommend to use
- * in the scenario of actively pulling messages.
+ * @deprecated Default pulling consumer. This class will be removed in 2022,
and a better implementation {@link
+ * DefaultLitePullConsumer} is recommend to use in the scenario of actively
pulling messages.
*/
@Deprecated
public class DefaultMQPullConsumer extends ClientConfig implements
MQPullConsumer {
@@ -109,6 +107,7 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
public DefaultMQPullConsumer(final String namespace, final String
consumerGroup) {
this(namespace, consumerGroup, null);
}
+
/**
* Constructor specifying namespace, consumer group and RPC hook.
*
@@ -127,7 +126,8 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum,
Map<String, String> attributes) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum,
+ Map<String, String> attributes) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0, null);
}
@@ -136,7 +136,8 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum, int
topicSysFlag, Map<String, String> attributes) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum, int
topicSysFlag,
+ Map<String, String> attributes) throws MQClientException {
this.defaultMQPullConsumerImpl.createTopic(key,
withNamespace(newTopic), queueNum, topicSysFlag);
}
@@ -352,7 +353,8 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
}
@Override
- public void pull(MessageQueue mq, String subExpression, long offset, int
maxNums, int maxSize, PullCallback pullCallback,
+ public void pull(MessageQueue mq, String subExpression, long offset, int
maxNums, int maxSize,
+ PullCallback pullCallback,
long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset,
maxNums, maxSize, pullCallback, timeout);
@@ -460,4 +462,8 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes;
}
+
+ public void persist(MessageQueue mq) {
+ this.getOffsetStore().persist(queueWithNamespace(mq));
+ }
}