This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d7c2d25 [ISSUE #2708] fix offset rollback when fetch offset from
broker exception
new 5b945a9 Merge pull request #3158 from
Zanglei06/fix_rmq_client_offset_dev
d7c2d25 is described below
commit d7c2d25c2a7344506ee001f7dfaf72f62c5f6c47
Author: zanglei <[email protected]>
AuthorDate: Fri Jul 16 11:01:15 2021 +0800
[ISSUE #2708] fix offset rollback when fetch offset from broker exception
---
.../rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java | 2 +-
.../rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java | 2 +-
.../java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java | 2 +-
.../apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java | 3 ++-
4 files changed, 5 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index e835be1..d28d23a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -785,7 +785,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
long offset = 0L;
try {
offset = nextPullOffset(messageQueue);
- } catch (MQClientException e) {
+ } catch (Exception e) {
log.error("Failed to get next pull offset", e);
scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
return;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index bb0b7f1..59b8deb 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -273,7 +273,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
long offset = -1L;
try {
offset =
this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
- } catch (MQClientException e) {
+ } catch (Exception e) {
this.executePullRequestLater(pullRequest,
pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult:
{}", pullRequest, e);
return;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 833d465..7677d8b 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -378,7 +378,7 @@ public abstract class RebalanceImpl {
long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
- } catch (MQClientException e) {
+ } catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}",
consumerGroup, mq);
continue;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 286c684..8fe9400 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -102,7 +102,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
try {
result =
this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
- result = -1;
+ log.warn("Compute consume offset from last offset
exception, mq={}, exception={}", mq, e);
+ throw e;
}
}
} else {