This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bb89e79 Change message count in send callback to local variable
(#8141)
bb89e79 is described below
commit bb89e79cf9dc8a07233b30bdf8de319c9a5d3b60
Author: Yunze Xu <[email protected]>
AuthorDate: Sun Sep 27 17:09:28 2020 +0800
Change message count in send callback to local variable (#8141)
Fixes #8128
### Motivation
The send callback of `BatchMessageTest` is modifing a global variable which
is the count of successfully produced messages. So some tests may failed in
multi-thread environment.
### Modifications
- Add a `MessageCountSendCallback` class to replace the current send
callback function
---
pulsar-client-cpp/tests/BatchMessageTest.cc | 64 ++++++++++++++++-------------
1 file changed, 35 insertions(+), 29 deletions(-)
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc
b/pulsar-client-cpp/tests/BatchMessageTest.cc
index 592136e..4b7f23f 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+#include <atomic>
#include <ctime>
#include <functional>
#include <gtest/gtest.h>
@@ -42,7 +43,6 @@ DECLARE_LOG_OBJECT();
using namespace pulsar;
-static int globalTestBatchMessagesCounter = 0;
static int globalCount = 0;
static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";
@@ -55,10 +55,19 @@ static void messageListenerFunction(Consumer consumer,
const Message& msg) {
consumer.acknowledge(msg);
}
-static void sendCallBack(Result r, const MessageId& msgId) {
- ASSERT_EQ(r, ResultOk);
- globalTestBatchMessagesCounter++;
-}
+class MessageCountSendCallback {
+ public:
+ MessageCountSendCallback(std::atomic_int& numOfMessagesProduced)
+ : numOfMessagesProduced_(numOfMessagesProduced) {}
+
+ void operator()(Result result, const MessageId&) {
+ ASSERT_EQ(result, ResultOk);
+ numOfMessagesProduced_++;
+ }
+
+ private:
+ std::atomic_int& numOfMessagesProduced_;
+};
static void sendFailCallBack(Result r, Result expect_result) { EXPECT_EQ(r,
expect_result); }
@@ -175,7 +184,6 @@ TEST(BatchMessageTest, testProducerTimeout) {
TEST(BatchMessageTest, testBatchSizeInBytes) {
std::string testName = std::to_string(epochTime) + "testBatchSizeInBytes";
- globalTestBatchMessagesCounter = 0;
Client client(lookupUrl);
std::string topicName = "persistent://public/default/" + testName;
@@ -216,12 +224,13 @@ TEST(BatchMessageTest, testBatchSizeInBytes) {
ProducerStatsImplPtr producerStatsImplPtr =
PulsarFriend::getProducerStatsPtr(producer);
// Send Asynchronously
+ std::atomic_int numOfMessagesProduced{0};
std::string prefix = "12345678";
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex",
std::to_string(i)).build();
- producer.sendAsync(msg, &sendCallBack);
+ producer.sendAsync(msg,
MessageCountSendCallback(numOfMessagesProduced));
ASSERT_EQ(producerStatsImplPtr->getNumMsgsSent(), i + 1);
ASSERT_LT(PulsarFriend::sum(producerStatsImplPtr->getSendMap()), i +
1);
ASSERT_EQ(producerStatsImplPtr->getTotalMsgsSent(), i + 1);
@@ -246,7 +255,7 @@ TEST(BatchMessageTest, testBatchSizeInBytes) {
ASSERT_EQ(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()),
numOfMessages);
// Number of messages produced
- ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+ ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
@@ -254,7 +263,6 @@ TEST(BatchMessageTest, testBatchSizeInBytes) {
TEST(BatchMessageTest, testSmallReceiverQueueSize) {
std::string testName = std::to_string(epochTime) +
"testSmallReceiverQueueSize";
- globalTestBatchMessagesCounter = 0;
ClientConfiguration clientConf;
clientConf.setStatsIntervalInSeconds(20);
@@ -303,12 +311,13 @@ TEST(BatchMessageTest, testSmallReceiverQueueSize) {
ProducerStatsImplPtr producerStatsImplPtr =
PulsarFriend::getProducerStatsPtr(producer);
// Send Asynchronously
+ std::atomic_int numOfMessagesProduced{0};
std::string prefix = testName;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex",
std::to_string(i)).build();
- producer.sendAsync(msg, &sendCallBack);
+ producer.sendAsync(msg,
MessageCountSendCallback(numOfMessagesProduced));
ASSERT_EQ(producerStatsImplPtr->getTotalMsgsSent(), i + 1);
ASSERT_LE(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()),
i + 1);
LOG_DEBUG("sending message " << messageContent);
@@ -343,7 +352,7 @@ TEST(BatchMessageTest, testSmallReceiverQueueSize) {
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getReceivedMsgMap()), 0);
// Number of messages produced
- ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+ ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
@@ -395,15 +404,15 @@ TEST(BatchMessageTest, testIndividualAck) {
ASSERT_EQ(consumer.getSubscriptionName(), subName);
// Send Asynchronously
+ std::atomic_int numOfMessagesProduced{0};
std::string prefix = testName;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex",
std::to_string(i)).build();
- producer.sendAsync(msg, &sendCallBack);
+ producer.sendAsync(msg,
MessageCountSendCallback(numOfMessagesProduced));
LOG_DEBUG("sending message " << messageContent);
}
- globalTestBatchMessagesCounter = 0;
Message receivedMsg;
int i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
@@ -418,7 +427,7 @@ TEST(BatchMessageTest, testIndividualAck) {
}
}
// Number of messages produced
- ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+ ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
@@ -511,8 +520,6 @@ TEST(BatchMessageTest, testCumulativeAck) {
std::string subName = "subscription-name";
Producer producer;
- globalTestBatchMessagesCounter = 0;
-
// Enable batching on producer side
int batchSize = 5;
int numOfMessages = 15;
@@ -549,12 +556,13 @@ TEST(BatchMessageTest, testCumulativeAck) {
ProducerStatsImplPtr producerStatsImplPtr =
PulsarFriend::getProducerStatsPtr(producer);
// Send Asynchronously
+ std::atomic_int numOfMessagesProduced{0};
std::string prefix = testName;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex",
std::to_string(i)).build();
- producer.sendAsync(msg, &sendCallBack);
+ producer.sendAsync(msg,
MessageCountSendCallback(numOfMessagesProduced));
LOG_DEBUG("sending message " << messageContent);
}
@@ -584,7 +592,7 @@ TEST(BatchMessageTest, testCumulativeAck) {
ASSERT_EQ(t, 1);
// Number of messages produced
- ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+ ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
@@ -635,8 +643,6 @@ TEST(BatchMessageTest, testMixedAck) {
std::string subName = "subscription-name";
Producer producer;
- globalTestBatchMessagesCounter = 0;
-
// Enable batching on producer side
int batchSize = 5;
int numOfMessages = 15;
@@ -670,12 +676,13 @@ TEST(BatchMessageTest, testMixedAck) {
ASSERT_EQ(consumer.getSubscriptionName(), subName);
// Send Asynchronously
+ std::atomic_int numOfMessagesProduced{0};
std::string prefix = testName;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex",
std::to_string(i)).build();
- producer.sendAsync(msg, &sendCallBack);
+ producer.sendAsync(msg,
MessageCountSendCallback(numOfMessagesProduced));
LOG_DEBUG("sending message " << messageContent);
}
@@ -693,7 +700,7 @@ TEST(BatchMessageTest, testMixedAck) {
}
}
// Number of messages produced
- ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+ ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
@@ -741,8 +748,6 @@ TEST(BatchMessageTest, testPermits) {
std::string subName = "subscription-name";
Producer producer;
- globalTestBatchMessagesCounter = 0;
-
// Enable batching on producer side
int batchSize = 10;
int numOfMessages = 75;
@@ -779,12 +784,13 @@ TEST(BatchMessageTest, testPermits) {
ASSERT_EQ(consumer.getSubscriptionName(), subName);
// Send Asynchronously
+ std::atomic_int numOfMessagesProduced{0};
std::string prefix = testName;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex",
std::to_string(i)).build();
- producer.sendAsync(msg, &sendCallBack);
+ producer.sendAsync(msg,
MessageCountSendCallback(numOfMessagesProduced));
LOG_DEBUG("sending message " << messageContent);
}
@@ -802,7 +808,7 @@ TEST(BatchMessageTest, testPermits) {
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
}
// Number of messages produced
- ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+ ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
@@ -813,13 +819,13 @@ TEST(BatchMessageTest, testPermits) {
client.createProducer(topicName, conf, producer);
- globalTestBatchMessagesCounter = 0;
+ numOfMessagesProduced = 0;
// Send Asynchronously
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex",
std::to_string(i)).build();
- producer.sendAsync(msg, &sendCallBack);
+ producer.sendAsync(msg,
MessageCountSendCallback(numOfMessagesProduced));
LOG_DEBUG("sending message " << messageContent);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
@@ -837,7 +843,7 @@ TEST(BatchMessageTest, testPermits) {
ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
}
// Number of messages produced
- ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+ ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);