This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 216fc1b537 [ISSUE #10345] fix: preserve lite topic for auto-renew
receipt handles (#10346)
216fc1b537 is described below
commit 216fc1b5370731f74d6c40fbe14052bb2be86592
Author: wizcraft_kris <[email protected]>
AuthorDate: Mon Jun 8 13:54:45 2026 +0800
[ISSUE #10345] fix: preserve lite topic for auto-renew receipt handles
(#10346)
---
.../grpc/v2/consumer/ReceiveMessageActivity.java | 3 +-
.../v2/consumer/ReceiveMessageActivityTest.java | 61 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 1 deletion(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index f5e1c7b76f..becf2c2165 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -214,7 +214,8 @@ public class ReceiveMessageActivity extends
AbstractMessagingActivity {
if (receiptHandle != null) {
MessageReceiptHandle messageReceiptHandle =
new MessageReceiptHandle(group, topic,
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
- messageExt.getQueueOffset(),
messageExt.getReconsumeTimes());
+ messageExt.getQueueOffset(),
messageExt.getReconsumeTimes(),
+
messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC));
messagingProcessor.addReceiptHandle(ctx, clientChannel,
group, messageExt.getMsgId(), messageReceiptHandle);
}
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index f7074dedd6..6478f90cb6 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.proxy.grpc.v2.consumer;
import apache.rocketmq.v2.Code;
+import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.FilterExpression;
import apache.rocketmq.v2.FilterType;
import apache.rocketmq.v2.MessageQueue;
@@ -45,6 +46,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
@@ -64,6 +66,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
@@ -319,6 +322,64 @@ public class ReceiveMessageActivityTest extends
BaseActivityTest {
assertEquals(Arrays.asList(popCk1, popCk2),
receiptHandleCaptor.getAllValues().stream().map(ReceiptHandle::encode).collect(Collectors.toList()));
}
+ @Test
+ public void testReceiveLiteMessageAddReceiptHandleWithLiteTopic() {
+ ConfigurationManager.getProxyConfig().setEnableProxyAutoRenew(true);
+ StreamObserver<ReceiveMessageResponse> receiveStreamObserver =
mock(ServerCallStreamObserver.class);
+ doNothing().when(receiveStreamObserver).onNext(any());
+
when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder()
+ .setClientType(ClientType.LITE_PUSH_CONSUMER)
+ .build());
+
+ String msgId = "liteMsgId";
+ String liteTopic = "liteTopic";
+ String popCk = "0 0 60000 0 0 broker 0 0 0";
+ MessageExt messageExt = new MessageExt();
+ messageExt.setTopic(TOPIC);
+ messageExt.setMsgId(msgId);
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_POP_CK,
popCk);
+ MessageAccessor.putProperty(messageExt,
MessageConst.PROPERTY_LITE_TOPIC, liteTopic);
+ messageExt.setBody("body".getBytes());
+
+ PopResult popResult = new PopResult(PopStatus.FOUND,
Collections.singletonList(messageExt));
+ when(this.messagingProcessor.popLiteMessage(
+ any(),
+ any(),
+ anyString(),
+ anyString(),
+ anyInt(),
+ anyLong(),
+ anyLong(),
+ any(),
+ any(),
+ isNull(),
+
anyLong())).thenReturn(CompletableFuture.completedFuture(popResult));
+
+ ArgumentCaptor<MessageReceiptHandle> messageReceiptHandleCaptor =
ArgumentCaptor.forClass(MessageReceiptHandle.class);
+
+ ProxyContext ctx = createContext();
+ this.grpcChannelManager.createChannel(ctx, ctx.getClientID());
+ ReceiveMessageRequest receiveMessageRequest =
ReceiveMessageRequest.newBuilder()
+ .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
+
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
+ .setAutoRenew(true)
+ .setFilterExpression(FilterExpression.newBuilder()
+ .setType(FilterType.TAG)
+ .setExpression("*")
+ .build())
+ .build();
+
+ this.receiveMessageActivity.receiveMessage(ctx, receiveMessageRequest,
receiveStreamObserver);
+
+ verify(this.messagingProcessor).addReceiptHandle(
+ any(),
+ any(),
+ eq(CONSUMER_GROUP),
+ eq(msgId),
+ messageReceiptHandleCaptor.capture());
+ assertEquals(liteTopic,
messageReceiptHandleCaptor.getValue().getLiteTopic());
+ }
+
@Test
public void testReceiveMessage() {
StreamObserver<ReceiveMessageResponse> receiveStreamObserver =
mock(ServerCallStreamObserver.class);