This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8859093a1d make ctx constructed in scheduleRenewTask (#8556)
8859093a1d is described below
commit 8859093a1d345dc98a119fd2ae6fc2b14faa76ee
Author: 吴星灿 <[email protected]>
AuthorDate: Wed Aug 21 17:29:00 2024 +0800
make ctx constructed in scheduleRenewTask (#8556)
---
.../proxy/service/receipt/DefaultReceiptHandleManager.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index 3948824a39..0cb519306e 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -159,7 +159,8 @@ public class DefaultReceiptHandleManager extends
AbstractStartAndShutdown implem
if (handle.getNextVisibleTime() - current >
proxyConfig.getRenewAheadTimeMillis()) {
return;
}
- renewalWorkerService.submit(() -> renewMessage(key, group,
msgID, handleStr));
+ renewalWorkerService.submit(() ->
renewMessage(createContext("RenewMessage"), key, group,
+ msgID, handleStr));
});
}
} catch (Exception e) {
@@ -169,15 +170,15 @@ public class DefaultReceiptHandleManager extends
AbstractStartAndShutdown implem
log.debug("scan for renewal done. cost:{}ms",
stopwatch.elapsed().toMillis());
}
- protected void renewMessage(ReceiptHandleGroupKey key, ReceiptHandleGroup
group, String msgID, String handleStr) {
+ protected void renewMessage(ProxyContext context, ReceiptHandleGroupKey
key, ReceiptHandleGroup group, String msgID, String handleStr) {
try {
- group.computeIfPresent(msgID, handleStr, messageReceiptHandle ->
startRenewMessage(key, messageReceiptHandle));
+ group.computeIfPresent(msgID, handleStr, messageReceiptHandle ->
startRenewMessage(context, key, messageReceiptHandle));
} catch (Exception e) {
log.error("error when renew message. msgID:{}, handleStr:{}",
msgID, handleStr, e);
}
}
- protected CompletableFuture<MessageReceiptHandle>
startRenewMessage(ReceiptHandleGroupKey key, MessageReceiptHandle
messageReceiptHandle) {
+ protected CompletableFuture<MessageReceiptHandle>
startRenewMessage(ProxyContext context, ReceiptHandleGroupKey key,
MessageReceiptHandle messageReceiptHandle) {
CompletableFuture<MessageReceiptHandle> resFuture = new
CompletableFuture<>();
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
long current = System.currentTimeMillis();
@@ -209,7 +210,6 @@ public class DefaultReceiptHandleManager extends
AbstractStartAndShutdown implem
}
});
} else {
- ProxyContext context = createContext("RenewMessage");
SubscriptionGroupConfig subscriptionGroupConfig =
metadataService.getSubscriptionGroupConfig(context,
messageReceiptHandle.getGroup());
if (subscriptionGroupConfig == null) {