This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3722431c25 [ISSUE #8458] Add more test coverage for ProcessQueue
(#8459)
3722431c25 is described below
commit 3722431c2593a5fc568d415d860001c690c5a5ad
Author: yx9o <[email protected]>
AuthorDate: Sun Jul 28 17:11:15 2024 +0800
[ISSUE #8458] Add more test coverage for ProcessQueue (#8459)
* [ISSUE #8458] Add more test coverage for ProcessQueue
* Update
* Update
* Update
---
.../client/impl/consumer/ProcessQueueTest.java | 82 ++++++++++++++++++++--
1 file changed, 75 insertions(+), 7 deletions(-)
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
index be0bd29f79..a8afd4a233 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
@@ -16,17 +16,32 @@
*/
package org.apache.rocketmq.client.impl.consumer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
import org.assertj.core.util.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeMap;
+
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class ProcessQueueTest {
@@ -78,7 +93,7 @@ public class ProcessQueueTest {
}
@Test
- public void testFillProcessQueueInfo() {
+ public void testFillProcessQueueInfo() throws IllegalAccessException {
ProcessQueue pq = new ProcessQueue();
pq.putMessage(createMessageList(102400));
@@ -101,6 +116,57 @@ public class ProcessQueueTest {
pq.commit();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(0);
+
+ TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<>();
+ consumingMsgOrderlyTreeMap.put(0L, createMessageList(1).get(0));
+ FieldUtils.writeDeclaredField(pq, "consumingMsgOrderlyTreeMap",
consumingMsgOrderlyTreeMap, true);
+ pq.fillProcessQueueInfo(processQueueInfo);
+ assertEquals(0, processQueueInfo.getTransactionMsgMinOffset());
+ assertEquals(0, processQueueInfo.getTransactionMsgMaxOffset());
+ assertEquals(1, processQueueInfo.getTransactionMsgCount());
+ }
+
+ @Test
+ public void testPopRequest() throws MQBrokerException, RemotingException,
InterruptedException, MQClientException {
+ ProcessQueue processQueue = createProcessQueue();
+ MessageExt messageExt = createMessageList(1).get(0);
+
messageExt.getProperties().put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP,
System.currentTimeMillis() - 20 * 60 * 1000L + "");
+ processQueue.getMsgTreeMap().put(0L, messageExt);
+ DefaultMQPushConsumer pushConsumer = mock(DefaultMQPushConsumer.class);
+ processQueue.cleanExpiredMsg(pushConsumer);
+ verify(pushConsumer).sendMessageBack(any(MessageExt.class), eq(3));
+ }
+
+ @Test
+ public void testRollback() throws IllegalAccessException {
+ ProcessQueue processQueue = createProcessQueue();
+ processQueue.rollback();
+ Field consumingMsgOrderlyTreeMapField =
FieldUtils.getDeclaredField(processQueue.getClass(),
"consumingMsgOrderlyTreeMap", true);
+ TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = (TreeMap<Long,
MessageExt>) consumingMsgOrderlyTreeMapField.get(processQueue);
+ assertEquals(0, consumingMsgOrderlyTreeMap.size());
+ }
+
+ @Test
+ public void testHasTempMessage() {
+ ProcessQueue processQueue = createProcessQueue();
+ assertFalse(processQueue.hasTempMessage());
+ }
+
+ @Test
+ public void testProcessQueue() {
+ ProcessQueue processQueue1 = createProcessQueue();
+ ProcessQueue processQueue2 = createProcessQueue();
+ assertEquals(processQueue1.getMsgAccCnt(),
processQueue2.getMsgAccCnt());
+ assertEquals(processQueue1.getTryUnlockTimes(),
processQueue2.getTryUnlockTimes());
+ assertEquals(processQueue1.getLastLockTimestamp(),
processQueue2.getLastLockTimestamp());
+ assertEquals(processQueue1.getLastPullTimestamp(),
processQueue2.getLastPullTimestamp());
+ }
+
+ private ProcessQueue createProcessQueue() {
+ ProcessQueue result = new ProcessQueue();
+ result.setMsgAccCnt(1);
+ result.incTryUnlockTimes();
+ return result;
}
private List<MessageExt> createMessageList() {
@@ -108,13 +174,15 @@ public class ProcessQueueTest {
}
private List<MessageExt> createMessageList(int count) {
- List<MessageExt> messageExtList = new ArrayList<>();
+ List<MessageExt> result = new ArrayList<>();
for (int i = 0; i < count; i++) {
MessageExt messageExt = new MessageExt();
messageExt.setQueueOffset(i);
messageExt.setBody(new byte[123]);
- messageExtList.add(messageExt);
+ messageExt.setKeys("keys" + i);
+
messageExt.getProperties().put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP,
System.currentTimeMillis() + "");
+ result.add(messageExt);
}
- return messageExtList;
+ return result;
}
}