This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-e2e.git
The following commit(s) were added to refs/heads/master by this push:
new 0ea2d2f [ISSUE #15] Fixed a bug with timing messages (#16)
0ea2d2f is described below
commit 0ea2d2f261bdb8d8ff6c3df77df83c7827df17fd
Author: yueya <[email protected]>
AuthorDate: Wed Apr 12 16:00:18 2023 +0800
[ISSUE #15] Fixed a bug with timing messages (#16)
* Update README.md
* Update README.md
* Update pom
* End-to-end (e2e) test cases for RocketMQ Spring client support.
* Fixed a bug with timing messages
---------
Co-authored-by: 月伢 <[email protected]>
---
.../java/org/apache/rocketmq/util/VerifyUtils.java | 19 +++++++++----------
.../rocketmq/broker/server/DelayMessageTest.java | 4 ++--
2 files changed, 11 insertions(+), 12 deletions(-)
diff --git a/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java
b/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java
index 93d5b01..84c65a7 100644
--- a/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java
+++ b/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java
@@ -40,7 +40,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -192,7 +191,7 @@ public class VerifyUtils {
Assertions.fail(String.format("The following %s messages are not
consumed: %s", unConsumedMessages.size(), unConsumedMessages));
}
//Check for consumption latency
- HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages, 5);
+ HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages,
delayTime);
StringBuilder sb = new StringBuilder();
sb.append("The following messages do not meet the delay requirements
\n");
for (String msg : delayUnExcept.keySet()) {
@@ -216,7 +215,7 @@ public class VerifyUtils {
Assertions.fail(String.format("The following %s messages are not
consumed: %s", unConsumedMessages.size(), unConsumedMessages));
}
//Check for consumption latency
- HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages,
TIMEOUT + 5);
+ HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages,
delayTime);
StringBuilder sb = new StringBuilder();
sb.append("The following message does not meet the delay requirement
\n");
//Time stamp formatting
@@ -253,7 +252,7 @@ public class VerifyUtils {
Assertions.fail(String.format("The following %s messages are not
consumed: %s", unConsumedMessages.size(), unConsumedMessages));
}
//Check for consumption latency
- HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages, 5 +
flexibleTime - 30);
+ HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages,
delayTime);
StringBuilder sb = new StringBuilder();
sb.append("The following message does not meet the delay requirement
\n");
for (String msg : delayUnExcept.keySet()) {
@@ -311,18 +310,18 @@ public class VerifyUtils {
}
}
- private static HashMap<String, Long> checkDelay(DataCollector<Object>
dequeueMessages, int offset) {
+ private static HashMap<String, Long> checkDelay(DataCollector<Object>
dequeueMessages, int delayTimeSec) {
HashMap<String, Long> map = new HashMap<>();
Collection<Object> receivedMessages = dequeueMessages.getAllData();
+ long consumeTime = System.currentTimeMillis();
for (Object receivedMessage : receivedMessages) {
MessageView messageView = (MessageView) receivedMessage;
- Optional<Long> startDeliverTime =
messageView.getDeliveryTimestamp();
+
Assertions.assertTrue(messageView.getDeliveryTimestamp().isPresent());
//Check the current time and the distribution time. If the
difference is within 5s, the requirements are met
long bornTimestamp = messageView.getBornTimestamp();
- //if ()
-// if (Math.abs(startDeliverTime.get() - bornTimestamp) / 1000 >
DelayConf.DELAY_LEVEL[messageView.getDeliveryAttempt() - 1] + offset) {
-// map.put(messageView.getMessageId().toString(),
(startDeliverTime.get() - bornTimestamp) / 1000);
-// }
+ if (Math.abs((consumeTime - bornTimestamp) / 1000 - delayTimeSec)
> 5) {
+ map.put(messageView.getMessageId().toString(), (consumeTime -
bornTimestamp) / 1000);
+ }
}
return map;
}
diff --git
a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java
b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java
index 8fb34d6..d88d3ae 100644
---
a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java
+++
b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java
@@ -91,7 +91,7 @@ public class DelayMessageTest extends BaseOperate {
producer.send(message);
}
Assertions.assertEquals(SEND_NUM,
producer.getEnqueueMessages().getDataSize(), "send message failed");
- VerifyUtils.verifyDelayMessage(producer.getEnqueueMessages(),
pushConsumer.getListener().getDequeueMessages(), 15);
+ VerifyUtils.verifyDelayMessage(producer.getEnqueueMessages(),
pushConsumer.getListener().getDequeueMessages(), 10);
}
@Test
@@ -117,7 +117,7 @@ public class DelayMessageTest extends BaseOperate {
}
});
Assertions.assertEquals(SEND_NUM,
producer.getEnqueueMessages().getDataSize(), "send message failed");
- VerifyUtils.verifyDelayMessage(producer.getEnqueueMessages(),
pushConsumer.getListener().getDequeueMessages(), 15);
+ VerifyUtils.verifyDelayMessage(producer.getEnqueueMessages(),
pushConsumer.getListener().getDequeueMessages(), 10);
}
@Test