This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c7efdcc7cbb27769eb42ec5c75080d9caf5b87d4
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Dec 2 21:30:04 2020 -0800

    [C++] Implement batch aware producer router (#8395)
    
    * [C++] Implement batch aware producer router
    
    * Fixed tests that relied on old rr distribution
    
    * Fixed compilation with older compilers
    
    * Fixed test
    
    (cherry picked from commit 74803dbf1f9ef5b90558cce90f67fa4c668444a3)
---
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   |   5 +-
 pulsar-client-cpp/lib/RoundRobinMessageRouter.cc   |  63 +++++++-
 pulsar-client-cpp/lib/RoundRobinMessageRouter.h    |  26 +++-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |   4 +-
 pulsar-client-cpp/tests/CustomRoutingPolicy.h      |  13 ++
 pulsar-client-cpp/tests/PartitionsUpdateTest.cc    |   3 +-
 .../tests/RoundRobinMessageRouterTest.cc           | 164 ++++++++++++++++-----
 7 files changed, 223 insertions(+), 55 deletions(-)

diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index aa5e176..1590780 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -63,7 +63,10 @@ 
PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top
 MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
     switch (conf_.getPartitionsRoutingMode()) {
         case ProducerConfiguration::RoundRobinDistribution:
-            return 
std::make_shared<RoundRobinMessageRouter>(conf_.getHashingScheme());
+            return std::make_shared<RoundRobinMessageRouter>(
+                conf_.getHashingScheme(), conf_.getBatchingEnabled(), 
conf_.getBatchingMaxMessages(),
+                conf_.getBatchingMaxAllowedSizeInBytes(),
+                
boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
         case ProducerConfiguration::CustomPartition:
             return conf_.getMessageRouterPtr();
         case ProducerConfiguration::UseSinglePartition:
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc 
b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
index c47fb23..feb3ab0 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
+++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
@@ -18,22 +18,73 @@
  */
 #include "RoundRobinMessageRouter.h"
 
+#include "TimeUtils.h"
+
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_int_distribution.hpp>
+
 namespace pulsar {
-RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme
 hashingScheme)
-    : MessageRouterBase(hashingScheme), prevPartition_(0) {}
+RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme
 hashingScheme,
+                                                 bool batchingEnabled, 
uint32_t maxBatchingMessages,
+                                                 uint32_t maxBatchingSize,
+                                                 
boost::posix_time::time_duration maxBatchingDelay)
+    : MessageRouterBase(hashingScheme),
+      batchingEnabled_(batchingEnabled),
+      lastPartitionChange_(TimeUtils::currentTimeMillis()),
+      msgCounter_(0),
+      cumulativeBatchSize_(0),
+      maxBatchingMessages_(maxBatchingMessages),
+      maxBatchingSize_(maxBatchingSize),
+      maxBatchingDelay_(maxBatchingDelay) {
+    boost::random::mt19937 rng(time(nullptr));
+    boost::random::uniform_int_distribution<int> dist;
+    currentPartitionCursor_ = dist(rng);
+}
 
 RoundRobinMessageRouter::~RoundRobinMessageRouter() {}
 
 // override
 int RoundRobinMessageRouter::getPartition(const Message& msg, const 
TopicMetadata& topicMetadata) {
+    if (topicMetadata.getNumPartitions() == 1) {
+        // When there are no partitions, don't even bother
+        return 0;
+    }
+
     // if message has a key, hash the key and return the partition
     if (msg.hasPartitionKey()) {
         return hash->makeHash(msg.getPartitionKey()) % 
topicMetadata.getNumPartitions();
-    } else {
-        Lock lock(mutex_);
-        // else pick the next partition
-        return prevPartition_++ % topicMetadata.getNumPartitions();
     }
+
+    if (!batchingEnabled_) {
+        // If there's no batching, do the round-robin at the message scope
+        // as there is no gain otherwise.
+        return currentPartitionCursor_++ % topicMetadata.getNumPartitions();
+    }
+
+    // If there's no key, we do round-robin across partition, sticking with a 
given
+    // partition for a certain amount of messages or volume buffered or the 
max delay to batch is reached so
+    // that we ensure having a decent amount of batching of the messages. Note 
that it is possible that we
+    // skip more than one partition if multiple goroutines increment 
currentPartitionCursor at the same time.
+    // If that happens it shouldn't be a problem because we only want to 
spread the data on different
+    // partitions but not necessarily in a specific sequence.
+    uint32_t messageSize = msg.getLength();
+    uint32_t messageCount = msgCounter_;
+    uint32_t batchSize = cumulativeBatchSize_;
+    int64_t lastPartitionChange = lastPartitionChange_;
+    int64_t now = TimeUtils::currentTimeMillis();
+
+    if (messageCount >= maxBatchingMessages_ || (messageSize >= 
maxBatchingSize_ - batchSize) ||
+        (now - lastPartitionChange >= maxBatchingDelay_.total_milliseconds())) 
{
+        uint32_t currentPartitionCursor = ++currentPartitionCursor_;
+        lastPartitionChange_ = now;
+        cumulativeBatchSize_ = messageSize;
+        msgCounter_ = 1;
+        return currentPartitionCursor % topicMetadata.getNumPartitions();
+    }
+
+    ++msgCounter_;
+    cumulativeBatchSize_ += messageSize;
+    return currentPartitionCursor_ % topicMetadata.getNumPartitions();
 }
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h 
b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
index 5460e13..be172a0 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
+++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
@@ -16,28 +16,38 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#ifndef PULSAR_RR_MESSAGE_ROUTER_HEADER_
-#define PULSAR_RR_MESSAGE_ROUTER_HEADER_
+
+#pragma once
 
 #include <pulsar/defines.h>
 #include <pulsar/MessageRoutingPolicy.h>
 #include <pulsar/ProducerConfiguration.h>
 #include <pulsar/TopicMetadata.h>
-#include <mutex>
 #include "Hash.h"
 #include "MessageRouterBase.h"
 
+#include <atomic>
+#include <boost/date_time/local_time/local_time.hpp>
+
 namespace pulsar {
 class PULSAR_PUBLIC RoundRobinMessageRouter : public MessageRouterBase {
    public:
-    RoundRobinMessageRouter(ProducerConfiguration::HashingScheme 
hashingScheme);
+    RoundRobinMessageRouter(ProducerConfiguration::HashingScheme 
hashingScheme, bool batchingEnabled,
+                            uint32_t maxBatchingMessages, uint32_t 
maxBatchingSize,
+                            boost::posix_time::time_duration maxBatchingDelay);
     virtual ~RoundRobinMessageRouter();
     virtual int getPartition(const Message& msg, const TopicMetadata& 
topicMetadata);
 
    private:
-    std::mutex mutex_;
-    unsigned int prevPartition_;
+    const bool batchingEnabled_;
+    const uint32_t maxBatchingMessages_;
+    const uint32_t maxBatchingSize_;
+    const boost::posix_time::time_duration maxBatchingDelay_;
+
+    std::atomic<uint32_t> currentPartitionCursor_;
+    std::atomic<int64_t> lastPartitionChange_;
+    std::atomic<uint32_t> msgCounter_;
+    std::atomic<uint32_t> cumulativeBatchSize_;
 };
-typedef std::unique_lock<std::mutex> Lock;
+
 }  // namespace pulsar
-#endif  // PULSAR_RR_MESSAGE_ROUTER_HEADER_
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 60fc9b5..a812657 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -861,6 +861,7 @@ TEST(BasicEndToEndTest, testRoundRobinRoutingPolicy) {
     Producer producer;
     ProducerConfiguration tempProducerConfiguration;
     
tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    
tempProducerConfiguration.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>());
     ProducerConfiguration producerConfiguration = tempProducerConfiguration;
     Result result = client.createProducer(topicName, producerConfiguration, 
producer);
     ASSERT_EQ(ResultOk, result);
@@ -2465,7 +2466,7 @@ TEST(BasicEndToEndTest, 
testSyncFlushBatchMessagesPartitionedTopic) {
     Producer producer;
     int numOfMessages = 20;
     ProducerConfiguration tempProducerConfiguration;
-    
tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    
tempProducerConfiguration.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>());
     ProducerConfiguration producerConfiguration = tempProducerConfiguration;
     producerConfiguration.setBatchingEnabled(true);
     // set batch message number numOfMessages, and max delay 60s
@@ -2687,6 +2688,7 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) {
     // set batch message number numOfMessages, and max delay 60s
     producerConfiguration.setBatchingMaxMessages(numOfMessages / 
numberOfPartitions);
     producerConfiguration.setBatchingMaxPublishDelayMs(60000);
+    
producerConfiguration.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>());
 
     Result result = client.createProducer(topicName, producerConfiguration, 
producer);
     ASSERT_EQ(ResultOk, result);
diff --git a/pulsar-client-cpp/tests/CustomRoutingPolicy.h 
b/pulsar-client-cpp/tests/CustomRoutingPolicy.h
index ca82e34..ed10c5b 100644
--- a/pulsar-client-cpp/tests/CustomRoutingPolicy.h
+++ b/pulsar-client-cpp/tests/CustomRoutingPolicy.h
@@ -32,6 +32,19 @@ class CustomRoutingPolicy : public MessageRoutingPolicy {
 
     int getPartition(const Message& msg, const TopicMetadata& topicMetadata) { 
return 0; }
 };
+
+class SimpleRoundRobinRoutingPolicy : public MessageRoutingPolicy {
+   public:
+    SimpleRoundRobinRoutingPolicy() : counter_(0) {}
+
+    int getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
+        return counter_++ % topicMetadata.getNumPartitions();
+    }
+
+   private:
+    uint32_t counter_;
+};
+
 }  // namespace pulsar
 
 #endif  // CUSTOM_ROUTER_POLICY_HEADER_
diff --git a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc 
b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
index e1bf68c..af473a2 100644
--- a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
+++ b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
@@ -25,6 +25,7 @@
 #include <memory>
 
 #include "HttpHelper.h"
+#include "CustomRoutingPolicy.h"
 
 using namespace pulsar;
 
@@ -57,7 +58,7 @@ class PartitionsSet {
     Result initProducer(bool enablePartitionsUpdate) {
         clientForProducer_.reset(new Client(serviceUrl, 
newClientConfig(enablePartitionsUpdate)));
         const auto producerConfig =
-            
ProducerConfiguration().setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+            
ProducerConfiguration().setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>());
         return clientForProducer_->createProducer(topicName, producerConfig, 
producer_);
     }
 
diff --git a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc 
b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc
index 431dd6b..ce5ad17 100644
--- a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc
+++ b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc
@@ -19,59 +19,147 @@
 #include <pulsar/Client.h>
 #include <pulsar/ProducerConfiguration.h>
 #include <gtest/gtest.h>
-#include <gmock/gmock.h>
-#include <boost/functional/hash.hpp>
-
-#include "tests/mocks/GMockMessage.h"
+#include <thread>
 
 #include "../lib/RoundRobinMessageRouter.h"
 #include "../lib/TopicMetadataImpl.h"
 
-using ::testing::AtLeast;
-using ::testing::Return;
-using ::testing::ReturnRef;
-
 using namespace pulsar;
 
-// TODO: Edit Message class to suit Google Mock and enable these tests when 
2.0.0 release.
+TEST(RoundRobinMessageRouterTest, onePartition) {
+    const int numPartitions = 1;
+
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 
1,
+                                   boost::posix_time::milliseconds(0));
+
+    Message msg1 = 
MessageBuilder().setPartitionKey("my-key-1").setContent("one").build();
+    Message msg2 = 
MessageBuilder().setPartitionKey("my-key-2").setContent("two").build();
+    Message msg3 = MessageBuilder().setContent("three").build();
+
+    int p1 = router.getPartition(msg1, TopicMetadataImpl(numPartitions));
+    int p2 = router.getPartition(msg2, TopicMetadataImpl(numPartitions));
+    int p3 = router.getPartition(msg3, TopicMetadataImpl(numPartitions));
+    ASSERT_EQ(p1, 0);
+    ASSERT_EQ(p2, 0);
+    ASSERT_EQ(p3, 0);
+}
+
+TEST(RoundRobinMessageRouterTest, sameKey) {
+    const int numPartitions = 13;
+
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 
1,
+                                   boost::posix_time::milliseconds(0));
+
+    Message msg1 = 
MessageBuilder().setPartitionKey("my-key").setContent("one").build();
+    Message msg2 = 
MessageBuilder().setPartitionKey("my-key").setContent("two").build();
+
+    int p1 = router.getPartition(msg1, TopicMetadataImpl(numPartitions));
+    int p2 = router.getPartition(msg2, TopicMetadataImpl(numPartitions));
+    ASSERT_EQ(p2, p1);
+}
+
+TEST(RoundRobinMessageRouterTest, batchingDisabled) {
+    const int numPartitions = 13;
+
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 
1,
+                                   boost::posix_time::milliseconds(0));
+
+    Message msg1 = MessageBuilder().setContent("one").build();
+    Message msg2 = MessageBuilder().setContent("two").build();
+
+    int p1 = router.getPartition(msg1, TopicMetadataImpl(numPartitions));
+    int p2 = router.getPartition(msg2, TopicMetadataImpl(numPartitions));
+    ASSERT_EQ(p2, (p1 + 1) % numPartitions);
+}
+
+TEST(RoundRobinMessageRouterTest, batchingEnabled) {
+    const int numPartitions = 13;
+
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 
1000, 100000,
+                                   boost::posix_time::seconds(1));
 
-TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) {
-    const int numPartitions1 = 5;
-    const int numPartitions2 = 3;
+    int p = -1;
+    for (int i = 0; i < 100; i++) {
+        Message msg = MessageBuilder().setContent("0123456789").build();
 
-    RoundRobinMessageRouter router1(ProducerConfiguration::BoostHash);
-    RoundRobinMessageRouter router2(ProducerConfiguration::BoostHash);
+        int p1 = router.getPartition(msg, TopicMetadataImpl(numPartitions));
+        if (p != -1) {
+            ASSERT_EQ(p1, p);
+        }
 
-    GMockMessage message;
-    EXPECT_CALL(message, 
hasPartitionKey()).Times(20).WillRepeatedly(Return(false));
-    EXPECT_CALL(message, getPartitionKey()).Times(0);
-    for (int i = 0; i < 10; i++) {
-        ASSERT_EQ(i % numPartitions1, router1.getPartition(message, 
TopicMetadataImpl(numPartitions1)));
-        ASSERT_EQ(i % numPartitions2, router2.getPartition(message, 
TopicMetadataImpl(numPartitions2)));
+        p = p1;
     }
 }
 
-TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithPartitionKey) {
-    const int numPartitons = 1234;
+TEST(RoundRobinMessageRouterTest, maxDelay) {
+    const int numPartitions = 13;
 
-    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash);
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 
1000, 100000,
+                                   boost::posix_time::seconds(1));
 
-    std::string partitionKey1 = "key1";
-    std::string partitionKey2 = "key2";
+    int p1 = -1;
+    for (int i = 0; i < 100; i++) {
+        Message msg = MessageBuilder().setContent("0123456789").build();
 
-    GMockMessage message1;
-    EXPECT_CALL(message1, hasPartitionKey()).Times(1).WillOnce(Return(true));
-    EXPECT_CALL(message1, 
getPartitionKey()).Times(1).WillOnce(ReturnRef(partitionKey1));
+        int p = router.getPartition(msg, TopicMetadataImpl(numPartitions));
+        if (p1 != -1) {
+            ASSERT_EQ(p1, p);
+        }
 
-    GMockMessage message2;
-    EXPECT_CALL(message2, hasPartitionKey()).Times(1).WillOnce(Return(true));
-    EXPECT_CALL(message2, 
getPartitionKey()).Times(1).WillOnce(ReturnRef(partitionKey2));
+        p1 = p;
+    }
+
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    // Second set of messages will go in separate partition
+
+    int p2 = -1;
+    for (int i = 0; i < 100; i++) {
+        Message msg = MessageBuilder().setContent("0123456789").build();
+
+        int p = router.getPartition(msg, TopicMetadataImpl(numPartitions));
+        if (p2 != -1) {
+            ASSERT_EQ(p2, p);
+        }
+
+        p2 = p;
+    }
+
+    ASSERT_EQ(p2, (p1 + 1) % numPartitions);
+}
 
-    auto expectedParrtition1 =
-        static_cast<const int>(boost::hash<std::string>()(partitionKey1) % 
numPartitons);
-    auto expectedParrtition2 =
-        static_cast<const int>(boost::hash<std::string>()(partitionKey2) % 
numPartitons);
+TEST(RoundRobinMessageRouterTest, maxNumberOfMessages) {
+    const int numPartitions = 13;
 
-    ASSERT_EQ(expectedParrtition1, router.getPartition(message1, 
TopicMetadataImpl(numPartitons)));
-    ASSERT_EQ(expectedParrtition2, router.getPartition(message2, 
TopicMetadataImpl(numPartitons)));
-}
\ No newline at end of file
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 2, 
1000,
+                                   boost::posix_time::seconds(1));
+
+    Message msg1 = MessageBuilder().setContent("one").build();
+    Message msg2 = MessageBuilder().setContent("two").build();
+    Message msg3 = MessageBuilder().setContent("tree").build();
+
+    TopicMetadataImpl tm = TopicMetadataImpl(numPartitions);
+    int p1 = router.getPartition(msg1, tm);
+    int p2 = router.getPartition(msg2, tm);
+    int p3 = router.getPartition(msg3, tm);
+    ASSERT_EQ(p1, p2);
+    ASSERT_EQ(p3, (p2 + 1) % numPartitions);
+}
+
+TEST(RoundRobinMessageRouterTest, maxBatchSize) {
+    const int numPartitions = 13;
+
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 10, 
8,
+                                   boost::posix_time::seconds(1));
+
+    Message msg1 = MessageBuilder().setContent("one").build();
+    Message msg2 = MessageBuilder().setContent("two").build();
+    Message msg3 = MessageBuilder().setContent("tree").build();
+
+    TopicMetadataImpl tm = TopicMetadataImpl(numPartitions);
+    int p1 = router.getPartition(msg1, tm);
+    int p2 = router.getPartition(msg2, tm);
+    int p3 = router.getPartition(msg3, tm);
+    ASSERT_EQ(p1, p2);
+    ASSERT_EQ(p3, (p2 + 1) % numPartitions);
+}

Reply via email to