This is an automated email from the ASF dual-hosted git repository.
ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6cccff82f4 [ISSUE #9187] The request should be rejected if the
queueOffset equals maxOffset when changing the invisible time (#9186)
6cccff82f4 is described below
commit 6cccff82f42d7c8326774c88334b4616b0a46e5e
Author: rongtong <[email protected]>
AuthorDate: Wed Feb 19 17:24:50 2025 +0800
[ISSUE #9187] The request should be rejected if the queueOffset equals
maxOffset when changing the invisible time (#9186)
* When changing the invisible time, the request should be rejected if the
queueOffset equals maxOffset
* Add UTs
* Make UTs to pass
* Remove unused imports
---
.../processor/ChangeInvisibleTimeProcessor.java | 2 +-
.../ChangeInvisibleTimeProcessorTest.java | 35 ++++++++++++++++++++--
2 files changed, 33 insertions(+), 4 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index a7180f6654..de72ee7baf 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -130,7 +130,7 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
} catch (ConsumeQueueException e) {
throw new RemotingCommandException("Failed to get max consume
offset", e);
}
- if (requestHeader.getOffset() < minOffset || requestHeader.getOffset()
> maxOffset) {
+ if (requestHeader.getOffset() < minOffset || requestHeader.getOffset()
>= maxOffset) {
response.setCode(ResponseCode.NO_MESSAGE);
return CompletableFuture.completedFuture(response);
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index a7aae7ee3d..e15d51b4a8 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -29,8 +29,6 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
@@ -46,6 +44,7 @@ import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -56,6 +55,8 @@ import org.mockito.junit.MockitoJUnitRunner;
import static
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -108,7 +109,8 @@ public class ChangeInvisibleTimeProcessorTest {
}
@Test
- public void testProcessRequest_Success() throws RemotingCommandException,
InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
+ public void testProcessRequest_Success() throws RemotingCommandException,
ConsumeQueueException {
+ when(messageStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(2L);
when(escapeBridge.asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK))));
int queueId = 0;
long queueOffset = 0;
@@ -133,4 +135,31 @@ public class ChangeInvisibleTimeProcessorTest {
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
}
+
+ @Test
+ public void testProcessRequest_NoMessage() throws
RemotingCommandException, ConsumeQueueException {
+ when(messageStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(2L);
+ int queueId = 0;
+ long queueOffset = 2;
+ long popTime = System.currentTimeMillis() - 1_000;
+ long invisibleTime = 30_000;
+ int reviveQid = 0;
+ String brokerName = "test_broker";
+ String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime,
invisibleTime, reviveQid,
+ topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR +
queueOffset;
+
+ ChangeInvisibleTimeRequestHeader requestHeader = new
ChangeInvisibleTimeRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setOffset(queueOffset);
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setExtraInfo(extraInfo);
+ requestHeader.setInvisibleTime(invisibleTime);
+
+ final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand responseToReturn =
changeInvisibleTimeProcessor.processRequest(handlerContext, request);
+
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
+ }
}