This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e0f5295fed passing the renew event type to create the correct context
(#7045)
e0f5295fed is described below
commit e0f5295fed8791d93bfa5b8420074c00b651ddfe
Author: lk <[email protected]>
AuthorDate: Wed Jul 19 15:55:11 2023 +0800
passing the renew event type to create the correct context (#7045)
---
.../java/org/apache/rocketmq/proxy/common/RenewEvent.java | 14 +++++++++++++-
.../rocketmq/proxy/processor/ReceiptHandleProcessor.java | 2 +-
.../proxy/service/receipt/DefaultReceiptHandleManager.java | 6 +++---
3 files changed, 17 insertions(+), 5 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
index fdf9833ccd..0ff65c1ccf 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
@@ -23,11 +23,19 @@ import org.apache.rocketmq.client.consumer.AckResult;
public class RenewEvent {
protected MessageReceiptHandle messageReceiptHandle;
protected long renewTime;
+ protected EventType eventType;
protected CompletableFuture<AckResult> future;
- public RenewEvent(MessageReceiptHandle messageReceiptHandle, long
renewTime, CompletableFuture<AckResult> future) {
+ public enum EventType {
+ RENEW,
+ STOP_RENEW,
+ CLEAR_GROUP
+ }
+
+ public RenewEvent(MessageReceiptHandle messageReceiptHandle, long
renewTime, EventType eventType, CompletableFuture<AckResult> future) {
this.messageReceiptHandle = messageReceiptHandle;
this.renewTime = renewTime;
+ this.eventType = eventType;
this.future = future;
}
@@ -39,6 +47,10 @@ public class RenewEvent {
return renewTime;
}
+ public EventType getEventType() {
+ return eventType;
+ }
+
public CompletableFuture<AckResult> getFuture() {
return future;
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index fc49e76229..460842a86e 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -38,7 +38,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor
{
public ReceiptHandleProcessor(MessagingProcessor messagingProcessor,
ServiceManager serviceManager) {
super(messagingProcessor, serviceManager);
StateEventListener<RenewEvent> eventListener = event -> {
- ProxyContext context = createContext("RenewMessage");
+ ProxyContext context = createContext(event.getEventType().name());
MessageReceiptHandle messageReceiptHandle =
event.getMessageReceiptHandle();
ReceiptHandle handle =
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
messagingProcessor.changeInvisibleTime(context, handle,
messageReceiptHandle.getMessageId(),
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index c7633d658a..9f35435f0d 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -188,7 +188,7 @@ public class DefaultReceiptHandleManager extends
AbstractStartAndShutdown implem
}
if (current - messageReceiptHandle.getConsumeTimestamp() <
proxyConfig.getRenewMaxTimeMillis()) {
CompletableFuture<AckResult> future = new
CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle,
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
+ eventListener.fireEvent(new RenewEvent(messageReceiptHandle,
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()),
RenewEvent.EventType.RENEW, future));
future.whenComplete((ackResult, throwable) -> {
if (throwable != null) {
log.error("error when renew. handle:{}",
messageReceiptHandle, throwable);
@@ -218,7 +218,7 @@ public class DefaultReceiptHandleManager extends
AbstractStartAndShutdown implem
}
RetryPolicy retryPolicy =
subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
CompletableFuture<AckResult> future = new
CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle,
retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()),
future));
+ eventListener.fireEvent(new RenewEvent(messageReceiptHandle,
retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()),
RenewEvent.EventType.STOP_RENEW, future));
future.whenComplete((ackResult, throwable) -> {
if (throwable != null) {
log.error("error when nack in renew. handle:{}",
messageReceiptHandle, throwable);
@@ -246,7 +246,7 @@ public class DefaultReceiptHandleManager extends
AbstractStartAndShutdown implem
try {
handleGroup.computeIfPresent(msgID, handle,
messageReceiptHandle -> {
CompletableFuture<AckResult> future = new
CompletableFuture<>();
- eventListener.fireEvent(new
RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(),
future));
+ eventListener.fireEvent(new
RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(),
RenewEvent.EventType.CLEAR_GROUP, future));
return CompletableFuture.completedFuture(null);
});
} catch (Exception e) {