This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c6a7c0  [Test][C++] Fix flaky ReaderTest.testAsyncRead (#545)
0c6a7c0 is described below

commit 0c6a7c0a688409e6f2c4b066b48bdca24e5ec33b
Author: zhanglistar <[email protected]>
AuthorDate: Thu Mar 5 00:31:02 2026 +0800

    [Test][C++] Fix flaky ReaderTest.testAsyncRead (#545)
    
    * fix
    
    * format
    
    * revert test
    
    ---------
    
    Co-authored-by: zhangzhibiao <[email protected]>
---
 tests/ReaderTest.cc | 24 ++++++++++++++++++++----
 1 file changed, 20 insertions(+), 4 deletions(-)

diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index ba5b2a9..e4a924d 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -21,9 +21,13 @@
 #include <pulsar/Reader.h>
 #include <time.h>
 
+#include <atomic>
+#include <functional>
 #include <future>
+#include <set>
 #include <string>
 #include <thread>
+#include <vector>
 
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
@@ -110,15 +114,27 @@ TEST_P(ReaderTest, testAsyncRead) {
         ASSERT_EQ(ResultOk, producer.send(msg));
     }
 
+    // readNextAsync callbacks may complete in any order (e.g. with 
partitioned topic); collect all 10 then
+    // verify set
+    std::string received[10];
+    std::atomic<int> receivedCount{0};
     for (int i = 0; i < 10; i++) {
-        reader.readNextAsync([i](Result result, const Message& msg) {
+        reader.readNextAsync([&](Result result, const Message& msg) {
             ASSERT_EQ(ResultOk, result);
-            std::string content = msg.getDataAsString();
-            std::string expected = "my-message-" + std::to_string(i);
-            ASSERT_EQ(expected, content);
+            int idx = receivedCount.fetch_add(1);
+            if (idx < 10) received[idx] = msg.getDataAsString();
         });
     }
 
+    waitUntil(
+        std::chrono::seconds(5), [&]() { return receivedCount.load() == 10; }, 
1000);
+    ASSERT_EQ(10, receivedCount.load()) << "Expected 10 messages";
+
+    std::set<std::string> receivedSet(received, received + 10);
+    for (int i = 0; i < 10; i++) {
+        ASSERT_TRUE(receivedSet.count("my-message-" + std::to_string(i))) << 
"Missing my-message-" << i;
+    }
+
     waitUntil(
         std::chrono::seconds(5),
         [&]() {

Reply via email to