This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 0c6a7c0 [Test][C++] Fix flaky ReaderTest.testAsyncRead (#545)
0c6a7c0 is described below
commit 0c6a7c0a688409e6f2c4b066b48bdca24e5ec33b
Author: zhanglistar <[email protected]>
AuthorDate: Thu Mar 5 00:31:02 2026 +0800
[Test][C++] Fix flaky ReaderTest.testAsyncRead (#545)
* fix
* format
* revert test
---------
Co-authored-by: zhangzhibiao <[email protected]>
---
tests/ReaderTest.cc | 24 ++++++++++++++++++++----
1 file changed, 20 insertions(+), 4 deletions(-)
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index ba5b2a9..e4a924d 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -21,9 +21,13 @@
#include <pulsar/Reader.h>
#include <time.h>
+#include <atomic>
+#include <functional>
#include <future>
+#include <set>
#include <string>
#include <thread>
+#include <vector>
#include "HttpHelper.h"
#include "PulsarFriend.h"
@@ -110,15 +114,27 @@ TEST_P(ReaderTest, testAsyncRead) {
ASSERT_EQ(ResultOk, producer.send(msg));
}
+ // readNextAsync callbacks may complete in any order (e.g. with
partitioned topic); collect all 10 then
+ // verify set
+ std::string received[10];
+ std::atomic<int> receivedCount{0};
for (int i = 0; i < 10; i++) {
- reader.readNextAsync([i](Result result, const Message& msg) {
+ reader.readNextAsync([&](Result result, const Message& msg) {
ASSERT_EQ(ResultOk, result);
- std::string content = msg.getDataAsString();
- std::string expected = "my-message-" + std::to_string(i);
- ASSERT_EQ(expected, content);
+ int idx = receivedCount.fetch_add(1);
+ if (idx < 10) received[idx] = msg.getDataAsString();
});
}
+ waitUntil(
+ std::chrono::seconds(5), [&]() { return receivedCount.load() == 10; },
1000);
+ ASSERT_EQ(10, receivedCount.load()) << "Expected 10 messages";
+
+ std::set<std::string> receivedSet(received, received + 10);
+ for (int i = 0; i < 10; i++) {
+ ASSERT_TRUE(receivedSet.count("my-message-" + std::to_string(i))) <<
"Missing my-message-" << i;
+ }
+
waitUntil(
std::chrono::seconds(5),
[&]() {