This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 216fc1b537 [ISSUE #10345] fix: preserve lite topic for auto-renew 
receipt handles (#10346)
216fc1b537 is described below

commit 216fc1b5370731f74d6c40fbe14052bb2be86592
Author: wizcraft_kris <[email protected]>
AuthorDate: Mon Jun 8 13:54:45 2026 +0800

    [ISSUE #10345] fix: preserve lite topic for auto-renew receipt handles 
(#10346)
---
 .../grpc/v2/consumer/ReceiveMessageActivity.java   |  3 +-
 .../v2/consumer/ReceiveMessageActivityTest.java    | 61 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index f5e1c7b76f..becf2c2165 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -214,7 +214,8 @@ public class ReceiveMessageActivity extends 
AbstractMessagingActivity {
                 if (receiptHandle != null) {
                     MessageReceiptHandle messageReceiptHandle =
                         new MessageReceiptHandle(group, topic, 
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
-                            messageExt.getQueueOffset(), 
messageExt.getReconsumeTimes());
+                            messageExt.getQueueOffset(), 
messageExt.getReconsumeTimes(),
+                            
messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC));
                     messagingProcessor.addReceiptHandle(ctx, clientChannel, 
group, messageExt.getMsgId(), messageReceiptHandle);
                 }
             }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index f7074dedd6..6478f90cb6 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.proxy.grpc.v2.consumer;
 
 import apache.rocketmq.v2.Code;
+import apache.rocketmq.v2.ClientType;
 import apache.rocketmq.v2.FilterExpression;
 import apache.rocketmq.v2.FilterType;
 import apache.rocketmq.v2.MessageQueue;
@@ -45,6 +46,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
@@ -64,6 +66,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -319,6 +322,64 @@ public class ReceiveMessageActivityTest extends 
BaseActivityTest {
         assertEquals(Arrays.asList(popCk1, popCk2), 
receiptHandleCaptor.getAllValues().stream().map(ReceiptHandle::encode).collect(Collectors.toList()));
     }
 
+    @Test
+    public void testReceiveLiteMessageAddReceiptHandleWithLiteTopic() {
+        ConfigurationManager.getProxyConfig().setEnableProxyAutoRenew(true);
+        StreamObserver<ReceiveMessageResponse> receiveStreamObserver = 
mock(ServerCallStreamObserver.class);
+        doNothing().when(receiveStreamObserver).onNext(any());
+        
when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder()
+            .setClientType(ClientType.LITE_PUSH_CONSUMER)
+            .build());
+
+        String msgId = "liteMsgId";
+        String liteTopic = "liteTopic";
+        String popCk = "0 0 60000 0 0 broker 0 0 0";
+        MessageExt messageExt = new MessageExt();
+        messageExt.setTopic(TOPIC);
+        messageExt.setMsgId(msgId);
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_POP_CK, 
popCk);
+        MessageAccessor.putProperty(messageExt, 
MessageConst.PROPERTY_LITE_TOPIC, liteTopic);
+        messageExt.setBody("body".getBytes());
+
+        PopResult popResult = new PopResult(PopStatus.FOUND, 
Collections.singletonList(messageExt));
+        when(this.messagingProcessor.popLiteMessage(
+            any(),
+            any(),
+            anyString(),
+            anyString(),
+            anyInt(),
+            anyLong(),
+            anyLong(),
+            any(),
+            any(),
+            isNull(),
+            
anyLong())).thenReturn(CompletableFuture.completedFuture(popResult));
+
+        ArgumentCaptor<MessageReceiptHandle> messageReceiptHandleCaptor = 
ArgumentCaptor.forClass(MessageReceiptHandle.class);
+
+        ProxyContext ctx = createContext();
+        this.grpcChannelManager.createChannel(ctx, ctx.getClientID());
+        ReceiveMessageRequest receiveMessageRequest = 
ReceiveMessageRequest.newBuilder()
+            .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
+            
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
+            .setAutoRenew(true)
+            .setFilterExpression(FilterExpression.newBuilder()
+                .setType(FilterType.TAG)
+                .setExpression("*")
+                .build())
+            .build();
+
+        this.receiveMessageActivity.receiveMessage(ctx, receiveMessageRequest, 
receiveStreamObserver);
+
+        verify(this.messagingProcessor).addReceiptHandle(
+            any(),
+            any(),
+            eq(CONSUMER_GROUP),
+            eq(msgId),
+            messageReceiptHandleCaptor.capture());
+        assertEquals(liteTopic, 
messageReceiptHandleCaptor.getValue().getLiteTopic());
+    }
+
     @Test
     public void testReceiveMessage() {
         StreamObserver<ReceiveMessageResponse> receiveStreamObserver = 
mock(ServerCallStreamObserver.class);

Reply via email to