This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bb89e79  Change message count in send callback to local variable 
(#8141)
bb89e79 is described below

commit bb89e79cf9dc8a07233b30bdf8de319c9a5d3b60
Author: Yunze Xu <[email protected]>
AuthorDate: Sun Sep 27 17:09:28 2020 +0800

    Change message count in send callback to local variable (#8141)
    
    Fixes #8128
    
    ### Motivation
    
    The send callback of `BatchMessageTest` is modifing a global variable which 
is the count of successfully produced messages. So some tests may failed in 
multi-thread environment.
    
    ### Modifications
    
    - Add a `MessageCountSendCallback` class to replace the current send 
callback function
---
 pulsar-client-cpp/tests/BatchMessageTest.cc | 64 ++++++++++++++++-------------
 1 file changed, 35 insertions(+), 29 deletions(-)

diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc 
b/pulsar-client-cpp/tests/BatchMessageTest.cc
index 592136e..4b7f23f 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <atomic>
 #include <ctime>
 #include <functional>
 #include <gtest/gtest.h>
@@ -42,7 +43,6 @@ DECLARE_LOG_OBJECT();
 
 using namespace pulsar;
 
-static int globalTestBatchMessagesCounter = 0;
 static int globalCount = 0;
 static std::string lookupUrl = "pulsar://localhost:6650";
 static std::string adminUrl = "http://localhost:8080/";;
@@ -55,10 +55,19 @@ static void messageListenerFunction(Consumer consumer, 
const Message& msg) {
     consumer.acknowledge(msg);
 }
 
-static void sendCallBack(Result r, const MessageId& msgId) {
-    ASSERT_EQ(r, ResultOk);
-    globalTestBatchMessagesCounter++;
-}
+class MessageCountSendCallback {
+   public:
+    MessageCountSendCallback(std::atomic_int& numOfMessagesProduced)
+        : numOfMessagesProduced_(numOfMessagesProduced) {}
+
+    void operator()(Result result, const MessageId&) {
+        ASSERT_EQ(result, ResultOk);
+        numOfMessagesProduced_++;
+    }
+
+   private:
+    std::atomic_int& numOfMessagesProduced_;
+};
 
 static void sendFailCallBack(Result r, Result expect_result) { EXPECT_EQ(r, 
expect_result); }
 
@@ -175,7 +184,6 @@ TEST(BatchMessageTest, testProducerTimeout) {
 
 TEST(BatchMessageTest, testBatchSizeInBytes) {
     std::string testName = std::to_string(epochTime) + "testBatchSizeInBytes";
-    globalTestBatchMessagesCounter = 0;
 
     Client client(lookupUrl);
     std::string topicName = "persistent://public/default/" + testName;
@@ -216,12 +224,13 @@ TEST(BatchMessageTest, testBatchSizeInBytes) {
 
     ProducerStatsImplPtr producerStatsImplPtr = 
PulsarFriend::getProducerStatsPtr(producer);
     // Send Asynchronously
+    std::atomic_int numOfMessagesProduced{0};
     std::string prefix = "12345678";
     for (int i = 0; i < numOfMessages; i++) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             
MessageBuilder().setContent(messageContent).setProperty("msgIndex", 
std::to_string(i)).build();
-        producer.sendAsync(msg, &sendCallBack);
+        producer.sendAsync(msg, 
MessageCountSendCallback(numOfMessagesProduced));
         ASSERT_EQ(producerStatsImplPtr->getNumMsgsSent(), i + 1);
         ASSERT_LT(PulsarFriend::sum(producerStatsImplPtr->getSendMap()), i + 
1);
         ASSERT_EQ(producerStatsImplPtr->getTotalMsgsSent(), i + 1);
@@ -246,7 +255,7 @@ TEST(BatchMessageTest, testBatchSizeInBytes) {
     ASSERT_EQ(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()), 
numOfMessages);
 
     // Number of messages produced
-    ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
 
     // Number of messages consumed
     ASSERT_EQ(i, numOfMessages);
@@ -254,7 +263,6 @@ TEST(BatchMessageTest, testBatchSizeInBytes) {
 
 TEST(BatchMessageTest, testSmallReceiverQueueSize) {
     std::string testName = std::to_string(epochTime) + 
"testSmallReceiverQueueSize";
-    globalTestBatchMessagesCounter = 0;
 
     ClientConfiguration clientConf;
     clientConf.setStatsIntervalInSeconds(20);
@@ -303,12 +311,13 @@ TEST(BatchMessageTest, testSmallReceiverQueueSize) {
 
     ProducerStatsImplPtr producerStatsImplPtr = 
PulsarFriend::getProducerStatsPtr(producer);
     // Send Asynchronously
+    std::atomic_int numOfMessagesProduced{0};
     std::string prefix = testName;
     for (int i = 0; i < numOfMessages; i++) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             
MessageBuilder().setContent(messageContent).setProperty("msgIndex", 
std::to_string(i)).build();
-        producer.sendAsync(msg, &sendCallBack);
+        producer.sendAsync(msg, 
MessageCountSendCallback(numOfMessagesProduced));
         ASSERT_EQ(producerStatsImplPtr->getTotalMsgsSent(), i + 1);
         ASSERT_LE(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()), 
i + 1);
         LOG_DEBUG("sending message " << messageContent);
@@ -343,7 +352,7 @@ TEST(BatchMessageTest, testSmallReceiverQueueSize) {
     ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getReceivedMsgMap()), 0);
 
     // Number of messages produced
-    ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
 
     // Number of messages consumed
     ASSERT_EQ(i, numOfMessages);
@@ -395,15 +404,15 @@ TEST(BatchMessageTest, testIndividualAck) {
     ASSERT_EQ(consumer.getSubscriptionName(), subName);
 
     // Send Asynchronously
+    std::atomic_int numOfMessagesProduced{0};
     std::string prefix = testName;
     for (int i = 0; i < numOfMessages; i++) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             
MessageBuilder().setContent(messageContent).setProperty("msgIndex", 
std::to_string(i)).build();
-        producer.sendAsync(msg, &sendCallBack);
+        producer.sendAsync(msg, 
MessageCountSendCallback(numOfMessagesProduced));
         LOG_DEBUG("sending message " << messageContent);
     }
-    globalTestBatchMessagesCounter = 0;
     Message receivedMsg;
     int i = 0;
     while (consumer.receive(receivedMsg, 5000) == ResultOk) {
@@ -418,7 +427,7 @@ TEST(BatchMessageTest, testIndividualAck) {
         }
     }
     // Number of messages produced
-    ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
 
     // Number of messages consumed
     ASSERT_EQ(i, numOfMessages);
@@ -511,8 +520,6 @@ TEST(BatchMessageTest, testCumulativeAck) {
     std::string subName = "subscription-name";
     Producer producer;
 
-    globalTestBatchMessagesCounter = 0;
-
     // Enable batching on producer side
     int batchSize = 5;
     int numOfMessages = 15;
@@ -549,12 +556,13 @@ TEST(BatchMessageTest, testCumulativeAck) {
     ProducerStatsImplPtr producerStatsImplPtr = 
PulsarFriend::getProducerStatsPtr(producer);
 
     // Send Asynchronously
+    std::atomic_int numOfMessagesProduced{0};
     std::string prefix = testName;
     for (int i = 0; i < numOfMessages; i++) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             
MessageBuilder().setContent(messageContent).setProperty("msgIndex", 
std::to_string(i)).build();
-        producer.sendAsync(msg, &sendCallBack);
+        producer.sendAsync(msg, 
MessageCountSendCallback(numOfMessagesProduced));
         LOG_DEBUG("sending message " << messageContent);
     }
 
@@ -584,7 +592,7 @@ TEST(BatchMessageTest, testCumulativeAck) {
     ASSERT_EQ(t, 1);
 
     // Number of messages produced
-    ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
 
     // Number of messages consumed
     ASSERT_EQ(i, numOfMessages);
@@ -635,8 +643,6 @@ TEST(BatchMessageTest, testMixedAck) {
     std::string subName = "subscription-name";
     Producer producer;
 
-    globalTestBatchMessagesCounter = 0;
-
     // Enable batching on producer side
     int batchSize = 5;
     int numOfMessages = 15;
@@ -670,12 +676,13 @@ TEST(BatchMessageTest, testMixedAck) {
     ASSERT_EQ(consumer.getSubscriptionName(), subName);
 
     // Send Asynchronously
+    std::atomic_int numOfMessagesProduced{0};
     std::string prefix = testName;
     for (int i = 0; i < numOfMessages; i++) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             
MessageBuilder().setContent(messageContent).setProperty("msgIndex", 
std::to_string(i)).build();
-        producer.sendAsync(msg, &sendCallBack);
+        producer.sendAsync(msg, 
MessageCountSendCallback(numOfMessagesProduced));
         LOG_DEBUG("sending message " << messageContent);
     }
 
@@ -693,7 +700,7 @@ TEST(BatchMessageTest, testMixedAck) {
         }
     }
     // Number of messages produced
-    ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
 
     // Number of messages consumed
     ASSERT_EQ(i, numOfMessages);
@@ -741,8 +748,6 @@ TEST(BatchMessageTest, testPermits) {
     std::string subName = "subscription-name";
     Producer producer;
 
-    globalTestBatchMessagesCounter = 0;
-
     // Enable batching on producer side
     int batchSize = 10;
     int numOfMessages = 75;
@@ -779,12 +784,13 @@ TEST(BatchMessageTest, testPermits) {
     ASSERT_EQ(consumer.getSubscriptionName(), subName);
 
     // Send Asynchronously
+    std::atomic_int numOfMessagesProduced{0};
     std::string prefix = testName;
     for (int i = 0; i < numOfMessages; i++) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             
MessageBuilder().setContent(messageContent).setProperty("msgIndex", 
std::to_string(i)).build();
-        producer.sendAsync(msg, &sendCallBack);
+        producer.sendAsync(msg, 
MessageCountSendCallback(numOfMessagesProduced));
         LOG_DEBUG("sending message " << messageContent);
     }
 
@@ -802,7 +808,7 @@ TEST(BatchMessageTest, testPermits) {
         ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
     }
     // Number of messages produced
-    ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
 
     // Number of messages consumed
     ASSERT_EQ(i, numOfMessages);
@@ -813,13 +819,13 @@ TEST(BatchMessageTest, testPermits) {
 
     client.createProducer(topicName, conf, producer);
 
-    globalTestBatchMessagesCounter = 0;
+    numOfMessagesProduced = 0;
     // Send Asynchronously
     for (int i = 0; i < numOfMessages; i++) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             
MessageBuilder().setContent(messageContent).setProperty("msgIndex", 
std::to_string(i)).build();
-        producer.sendAsync(msg, &sendCallBack);
+        producer.sendAsync(msg, 
MessageCountSendCallback(numOfMessagesProduced));
         LOG_DEBUG("sending message " << messageContent);
     }
     std::this_thread::sleep_for(std::chrono::seconds(5));
@@ -837,7 +843,7 @@ TEST(BatchMessageTest, testPermits) {
         ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
     }
     // Number of messages produced
-    ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
+    ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
 
     // Number of messages consumed
     ASSERT_EQ(i, numOfMessages);

Reply via email to