This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-e2e.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ea2d2f  [ISSUE #15] Fixed a bug with timing messages (#16)
0ea2d2f is described below

commit 0ea2d2f261bdb8d8ff6c3df77df83c7827df17fd
Author: yueya <[email protected]>
AuthorDate: Wed Apr 12 16:00:18 2023 +0800

    [ISSUE #15] Fixed a bug with timing messages (#16)
    
    * Update README.md
    
    * Update README.md
    
    * Update pom
    
    * End-to-end (e2e) test cases for RocketMQ Spring client support.
    
    * Fixed a bug with timing messages
    
    ---------
    
    Co-authored-by: 月伢 <[email protected]>
---
 .../java/org/apache/rocketmq/util/VerifyUtils.java    | 19 +++++++++----------
 .../rocketmq/broker/server/DelayMessageTest.java      |  4 ++--
 2 files changed, 11 insertions(+), 12 deletions(-)

diff --git a/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java 
b/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java
index 93d5b01..84c65a7 100644
--- a/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java
+++ b/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java
@@ -40,7 +40,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -192,7 +191,7 @@ public class VerifyUtils {
             Assertions.fail(String.format("The following %s messages are not 
consumed: %s", unConsumedMessages.size(), unConsumedMessages));
         }
         //Check for consumption latency
-        HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages, 5);
+        HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages, 
delayTime);
         StringBuilder sb = new StringBuilder();
         sb.append("The following messages do not meet the delay requirements 
\n");
         for (String msg : delayUnExcept.keySet()) {
@@ -216,7 +215,7 @@ public class VerifyUtils {
             Assertions.fail(String.format("The following %s messages are not 
consumed: %s", unConsumedMessages.size(), unConsumedMessages));
         }
         //Check for consumption latency
-        HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages, 
TIMEOUT + 5);
+        HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages, 
delayTime);
         StringBuilder sb = new StringBuilder();
         sb.append("The following message does not meet the delay requirement 
\n");
         //Time stamp formatting
@@ -253,7 +252,7 @@ public class VerifyUtils {
             Assertions.fail(String.format("The following %s messages are not 
consumed: %s", unConsumedMessages.size(), unConsumedMessages));
         }
         //Check for consumption latency
-        HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages, 5 + 
flexibleTime - 30);
+        HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages, 
delayTime);
         StringBuilder sb = new StringBuilder();
         sb.append("The following message does not meet the delay requirement 
\n");
         for (String msg : delayUnExcept.keySet()) {
@@ -311,18 +310,18 @@ public class VerifyUtils {
         }
     }
 
-    private static HashMap<String, Long> checkDelay(DataCollector<Object> 
dequeueMessages, int offset) {
+    private static HashMap<String, Long> checkDelay(DataCollector<Object> 
dequeueMessages, int delayTimeSec) {
         HashMap<String, Long> map = new HashMap<>();
         Collection<Object> receivedMessages = dequeueMessages.getAllData();
+        long consumeTime = System.currentTimeMillis();
         for (Object receivedMessage : receivedMessages) {
             MessageView messageView = (MessageView) receivedMessage;
-            Optional<Long> startDeliverTime = 
messageView.getDeliveryTimestamp();
+            
Assertions.assertTrue(messageView.getDeliveryTimestamp().isPresent());
             //Check the current time and the distribution time. If the 
difference is within 5s, the requirements are met
             long bornTimestamp = messageView.getBornTimestamp();
-            //if ()
-//            if (Math.abs(startDeliverTime.get() - bornTimestamp) / 1000 > 
DelayConf.DELAY_LEVEL[messageView.getDeliveryAttempt() - 1] + offset) {
-//                map.put(messageView.getMessageId().toString(), 
(startDeliverTime.get() - bornTimestamp) / 1000);
-//            }
+            if (Math.abs((consumeTime - bornTimestamp) / 1000 - delayTimeSec) 
> 5) {
+                map.put(messageView.getMessageId().toString(), (consumeTime - 
bornTimestamp) / 1000);
+            }
         }
         return map;
     }
diff --git 
a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java
 
b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java
index 8fb34d6..d88d3ae 100644
--- 
a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java
+++ 
b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/DelayMessageTest.java
@@ -91,7 +91,7 @@ public class DelayMessageTest extends BaseOperate {
             producer.send(message);
         }
         Assertions.assertEquals(SEND_NUM, 
producer.getEnqueueMessages().getDataSize(), "send message failed");
-        VerifyUtils.verifyDelayMessage(producer.getEnqueueMessages(), 
pushConsumer.getListener().getDequeueMessages(), 15);
+        VerifyUtils.verifyDelayMessage(producer.getEnqueueMessages(), 
pushConsumer.getListener().getDequeueMessages(), 10);
     }
 
     @Test
@@ -117,7 +117,7 @@ public class DelayMessageTest extends BaseOperate {
             }
         });
         Assertions.assertEquals(SEND_NUM, 
producer.getEnqueueMessages().getDataSize(), "send message failed");
-        VerifyUtils.verifyDelayMessage(producer.getEnqueueMessages(), 
pushConsumer.getListener().getDequeueMessages(), 15);
+        VerifyUtils.verifyDelayMessage(producer.getEnqueueMessages(), 
pushConsumer.getListener().getDequeueMessages(), 10);
     }
 
     @Test

Reply via email to