This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c7efdcc7cbb27769eb42ec5c75080d9caf5b87d4 Author: Matteo Merli <[email protected]> AuthorDate: Wed Dec 2 21:30:04 2020 -0800 [C++] Implement batch aware producer router (#8395) * [C++] Implement batch aware producer router * Fixed tests that relied on old rr distribution * Fixed compilation with older compilers * Fixed test (cherry picked from commit 74803dbf1f9ef5b90558cce90f67fa4c668444a3) --- pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 5 +- pulsar-client-cpp/lib/RoundRobinMessageRouter.cc | 63 +++++++- pulsar-client-cpp/lib/RoundRobinMessageRouter.h | 26 +++- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 4 +- pulsar-client-cpp/tests/CustomRoutingPolicy.h | 13 ++ pulsar-client-cpp/tests/PartitionsUpdateTest.cc | 3 +- .../tests/RoundRobinMessageRouterTest.cc | 164 ++++++++++++++++----- 7 files changed, 223 insertions(+), 55 deletions(-) diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index aa5e176..1590780 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -63,7 +63,10 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() { switch (conf_.getPartitionsRoutingMode()) { case ProducerConfiguration::RoundRobinDistribution: - return std::make_shared<RoundRobinMessageRouter>(conf_.getHashingScheme()); + return std::make_shared<RoundRobinMessageRouter>( + conf_.getHashingScheme(), conf_.getBatchingEnabled(), conf_.getBatchingMaxMessages(), + conf_.getBatchingMaxAllowedSizeInBytes(), + boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs())); case ProducerConfiguration::CustomPartition: return conf_.getMessageRouterPtr(); case ProducerConfiguration::UseSinglePartition: diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc index c47fb23..feb3ab0 100644 --- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc +++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc @@ -18,22 +18,73 @@ */ #include "RoundRobinMessageRouter.h" +#include "TimeUtils.h" + +#include <boost/random/mersenne_twister.hpp> +#include <boost/random/uniform_int_distribution.hpp> + namespace pulsar { -RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme) - : MessageRouterBase(hashingScheme), prevPartition_(0) {} +RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme, + bool batchingEnabled, uint32_t maxBatchingMessages, + uint32_t maxBatchingSize, + boost::posix_time::time_duration maxBatchingDelay) + : MessageRouterBase(hashingScheme), + batchingEnabled_(batchingEnabled), + lastPartitionChange_(TimeUtils::currentTimeMillis()), + msgCounter_(0), + cumulativeBatchSize_(0), + maxBatchingMessages_(maxBatchingMessages), + maxBatchingSize_(maxBatchingSize), + maxBatchingDelay_(maxBatchingDelay) { + boost::random::mt19937 rng(time(nullptr)); + boost::random::uniform_int_distribution<int> dist; + currentPartitionCursor_ = dist(rng); +} RoundRobinMessageRouter::~RoundRobinMessageRouter() {} // override int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) { + if (topicMetadata.getNumPartitions() == 1) { + // When there are no partitions, don't even bother + return 0; + } + // if message has a key, hash the key and return the partition if (msg.hasPartitionKey()) { return hash->makeHash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); - } else { - Lock lock(mutex_); - // else pick the next partition - return prevPartition_++ % topicMetadata.getNumPartitions(); } + + if (!batchingEnabled_) { + // If there's no batching, do the round-robin at the message scope + // as there is no gain otherwise. + return currentPartitionCursor_++ % topicMetadata.getNumPartitions(); + } + + // If there's no key, we do round-robin across partition, sticking with a given + // partition for a certain amount of messages or volume buffered or the max delay to batch is reached so + // that we ensure having a decent amount of batching of the messages. Note that it is possible that we + // skip more than one partition if multiple goroutines increment currentPartitionCursor at the same time. + // If that happens it shouldn't be a problem because we only want to spread the data on different + // partitions but not necessarily in a specific sequence. + uint32_t messageSize = msg.getLength(); + uint32_t messageCount = msgCounter_; + uint32_t batchSize = cumulativeBatchSize_; + int64_t lastPartitionChange = lastPartitionChange_; + int64_t now = TimeUtils::currentTimeMillis(); + + if (messageCount >= maxBatchingMessages_ || (messageSize >= maxBatchingSize_ - batchSize) || + (now - lastPartitionChange >= maxBatchingDelay_.total_milliseconds())) { + uint32_t currentPartitionCursor = ++currentPartitionCursor_; + lastPartitionChange_ = now; + cumulativeBatchSize_ = messageSize; + msgCounter_ = 1; + return currentPartitionCursor % topicMetadata.getNumPartitions(); + } + + ++msgCounter_; + cumulativeBatchSize_ += messageSize; + return currentPartitionCursor_ % topicMetadata.getNumPartitions(); } } // namespace pulsar diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h index 5460e13..be172a0 100644 --- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h +++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h @@ -16,28 +16,38 @@ * specific language governing permissions and limitations * under the License. */ -#ifndef PULSAR_RR_MESSAGE_ROUTER_HEADER_ -#define PULSAR_RR_MESSAGE_ROUTER_HEADER_ + +#pragma once #include <pulsar/defines.h> #include <pulsar/MessageRoutingPolicy.h> #include <pulsar/ProducerConfiguration.h> #include <pulsar/TopicMetadata.h> -#include <mutex> #include "Hash.h" #include "MessageRouterBase.h" +#include <atomic> +#include <boost/date_time/local_time/local_time.hpp> + namespace pulsar { class PULSAR_PUBLIC RoundRobinMessageRouter : public MessageRouterBase { public: - RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme); + RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme, bool batchingEnabled, + uint32_t maxBatchingMessages, uint32_t maxBatchingSize, + boost::posix_time::time_duration maxBatchingDelay); virtual ~RoundRobinMessageRouter(); virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata); private: - std::mutex mutex_; - unsigned int prevPartition_; + const bool batchingEnabled_; + const uint32_t maxBatchingMessages_; + const uint32_t maxBatchingSize_; + const boost::posix_time::time_duration maxBatchingDelay_; + + std::atomic<uint32_t> currentPartitionCursor_; + std::atomic<int64_t> lastPartitionChange_; + std::atomic<uint32_t> msgCounter_; + std::atomic<uint32_t> cumulativeBatchSize_; }; -typedef std::unique_lock<std::mutex> Lock; + } // namespace pulsar -#endif // PULSAR_RR_MESSAGE_ROUTER_HEADER_ diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 60fc9b5..a812657 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -861,6 +861,7 @@ TEST(BasicEndToEndTest, testRoundRobinRoutingPolicy) { Producer producer; ProducerConfiguration tempProducerConfiguration; tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); + tempProducerConfiguration.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>()); ProducerConfiguration producerConfiguration = tempProducerConfiguration; Result result = client.createProducer(topicName, producerConfiguration, producer); ASSERT_EQ(ResultOk, result); @@ -2465,7 +2466,7 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) { Producer producer; int numOfMessages = 20; ProducerConfiguration tempProducerConfiguration; - tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); + tempProducerConfiguration.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>()); ProducerConfiguration producerConfiguration = tempProducerConfiguration; producerConfiguration.setBatchingEnabled(true); // set batch message number numOfMessages, and max delay 60s @@ -2687,6 +2688,7 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { // set batch message number numOfMessages, and max delay 60s producerConfiguration.setBatchingMaxMessages(numOfMessages / numberOfPartitions); producerConfiguration.setBatchingMaxPublishDelayMs(60000); + producerConfiguration.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>()); Result result = client.createProducer(topicName, producerConfiguration, producer); ASSERT_EQ(ResultOk, result); diff --git a/pulsar-client-cpp/tests/CustomRoutingPolicy.h b/pulsar-client-cpp/tests/CustomRoutingPolicy.h index ca82e34..ed10c5b 100644 --- a/pulsar-client-cpp/tests/CustomRoutingPolicy.h +++ b/pulsar-client-cpp/tests/CustomRoutingPolicy.h @@ -32,6 +32,19 @@ class CustomRoutingPolicy : public MessageRoutingPolicy { int getPartition(const Message& msg, const TopicMetadata& topicMetadata) { return 0; } }; + +class SimpleRoundRobinRoutingPolicy : public MessageRoutingPolicy { + public: + SimpleRoundRobinRoutingPolicy() : counter_(0) {} + + int getPartition(const Message& msg, const TopicMetadata& topicMetadata) { + return counter_++ % topicMetadata.getNumPartitions(); + } + + private: + uint32_t counter_; +}; + } // namespace pulsar #endif // CUSTOM_ROUTER_POLICY_HEADER_ diff --git a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc index e1bf68c..af473a2 100644 --- a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc +++ b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc @@ -25,6 +25,7 @@ #include <memory> #include "HttpHelper.h" +#include "CustomRoutingPolicy.h" using namespace pulsar; @@ -57,7 +58,7 @@ class PartitionsSet { Result initProducer(bool enablePartitionsUpdate) { clientForProducer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate))); const auto producerConfig = - ProducerConfiguration().setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); + ProducerConfiguration().setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>()); return clientForProducer_->createProducer(topicName, producerConfig, producer_); } diff --git a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc index 431dd6b..ce5ad17 100644 --- a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc +++ b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc @@ -19,59 +19,147 @@ #include <pulsar/Client.h> #include <pulsar/ProducerConfiguration.h> #include <gtest/gtest.h> -#include <gmock/gmock.h> -#include <boost/functional/hash.hpp> - -#include "tests/mocks/GMockMessage.h" +#include <thread> #include "../lib/RoundRobinMessageRouter.h" #include "../lib/TopicMetadataImpl.h" -using ::testing::AtLeast; -using ::testing::Return; -using ::testing::ReturnRef; - using namespace pulsar; -// TODO: Edit Message class to suit Google Mock and enable these tests when 2.0.0 release. +TEST(RoundRobinMessageRouterTest, onePartition) { + const int numPartitions = 1; + + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1, + boost::posix_time::milliseconds(0)); + + Message msg1 = MessageBuilder().setPartitionKey("my-key-1").setContent("one").build(); + Message msg2 = MessageBuilder().setPartitionKey("my-key-2").setContent("two").build(); + Message msg3 = MessageBuilder().setContent("three").build(); + + int p1 = router.getPartition(msg1, TopicMetadataImpl(numPartitions)); + int p2 = router.getPartition(msg2, TopicMetadataImpl(numPartitions)); + int p3 = router.getPartition(msg3, TopicMetadataImpl(numPartitions)); + ASSERT_EQ(p1, 0); + ASSERT_EQ(p2, 0); + ASSERT_EQ(p3, 0); +} + +TEST(RoundRobinMessageRouterTest, sameKey) { + const int numPartitions = 13; + + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1, + boost::posix_time::milliseconds(0)); + + Message msg1 = MessageBuilder().setPartitionKey("my-key").setContent("one").build(); + Message msg2 = MessageBuilder().setPartitionKey("my-key").setContent("two").build(); + + int p1 = router.getPartition(msg1, TopicMetadataImpl(numPartitions)); + int p2 = router.getPartition(msg2, TopicMetadataImpl(numPartitions)); + ASSERT_EQ(p2, p1); +} + +TEST(RoundRobinMessageRouterTest, batchingDisabled) { + const int numPartitions = 13; + + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1, + boost::posix_time::milliseconds(0)); + + Message msg1 = MessageBuilder().setContent("one").build(); + Message msg2 = MessageBuilder().setContent("two").build(); + + int p1 = router.getPartition(msg1, TopicMetadataImpl(numPartitions)); + int p2 = router.getPartition(msg2, TopicMetadataImpl(numPartitions)); + ASSERT_EQ(p2, (p1 + 1) % numPartitions); +} + +TEST(RoundRobinMessageRouterTest, batchingEnabled) { + const int numPartitions = 13; + + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 1000, 100000, + boost::posix_time::seconds(1)); -TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) { - const int numPartitions1 = 5; - const int numPartitions2 = 3; + int p = -1; + for (int i = 0; i < 100; i++) { + Message msg = MessageBuilder().setContent("0123456789").build(); - RoundRobinMessageRouter router1(ProducerConfiguration::BoostHash); - RoundRobinMessageRouter router2(ProducerConfiguration::BoostHash); + int p1 = router.getPartition(msg, TopicMetadataImpl(numPartitions)); + if (p != -1) { + ASSERT_EQ(p1, p); + } - GMockMessage message; - EXPECT_CALL(message, hasPartitionKey()).Times(20).WillRepeatedly(Return(false)); - EXPECT_CALL(message, getPartitionKey()).Times(0); - for (int i = 0; i < 10; i++) { - ASSERT_EQ(i % numPartitions1, router1.getPartition(message, TopicMetadataImpl(numPartitions1))); - ASSERT_EQ(i % numPartitions2, router2.getPartition(message, TopicMetadataImpl(numPartitions2))); + p = p1; } } -TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithPartitionKey) { - const int numPartitons = 1234; +TEST(RoundRobinMessageRouterTest, maxDelay) { + const int numPartitions = 13; - RoundRobinMessageRouter router(ProducerConfiguration::BoostHash); + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 1000, 100000, + boost::posix_time::seconds(1)); - std::string partitionKey1 = "key1"; - std::string partitionKey2 = "key2"; + int p1 = -1; + for (int i = 0; i < 100; i++) { + Message msg = MessageBuilder().setContent("0123456789").build(); - GMockMessage message1; - EXPECT_CALL(message1, hasPartitionKey()).Times(1).WillOnce(Return(true)); - EXPECT_CALL(message1, getPartitionKey()).Times(1).WillOnce(ReturnRef(partitionKey1)); + int p = router.getPartition(msg, TopicMetadataImpl(numPartitions)); + if (p1 != -1) { + ASSERT_EQ(p1, p); + } - GMockMessage message2; - EXPECT_CALL(message2, hasPartitionKey()).Times(1).WillOnce(Return(true)); - EXPECT_CALL(message2, getPartitionKey()).Times(1).WillOnce(ReturnRef(partitionKey2)); + p1 = p; + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // Second set of messages will go in separate partition + + int p2 = -1; + for (int i = 0; i < 100; i++) { + Message msg = MessageBuilder().setContent("0123456789").build(); + + int p = router.getPartition(msg, TopicMetadataImpl(numPartitions)); + if (p2 != -1) { + ASSERT_EQ(p2, p); + } + + p2 = p; + } + + ASSERT_EQ(p2, (p1 + 1) % numPartitions); +} - auto expectedParrtition1 = - static_cast<const int>(boost::hash<std::string>()(partitionKey1) % numPartitons); - auto expectedParrtition2 = - static_cast<const int>(boost::hash<std::string>()(partitionKey2) % numPartitons); +TEST(RoundRobinMessageRouterTest, maxNumberOfMessages) { + const int numPartitions = 13; - ASSERT_EQ(expectedParrtition1, router.getPartition(message1, TopicMetadataImpl(numPartitons))); - ASSERT_EQ(expectedParrtition2, router.getPartition(message2, TopicMetadataImpl(numPartitons))); -} \ No newline at end of file + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 2, 1000, + boost::posix_time::seconds(1)); + + Message msg1 = MessageBuilder().setContent("one").build(); + Message msg2 = MessageBuilder().setContent("two").build(); + Message msg3 = MessageBuilder().setContent("tree").build(); + + TopicMetadataImpl tm = TopicMetadataImpl(numPartitions); + int p1 = router.getPartition(msg1, tm); + int p2 = router.getPartition(msg2, tm); + int p3 = router.getPartition(msg3, tm); + ASSERT_EQ(p1, p2); + ASSERT_EQ(p3, (p2 + 1) % numPartitions); +} + +TEST(RoundRobinMessageRouterTest, maxBatchSize) { + const int numPartitions = 13; + + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 10, 8, + boost::posix_time::seconds(1)); + + Message msg1 = MessageBuilder().setContent("one").build(); + Message msg2 = MessageBuilder().setContent("two").build(); + Message msg3 = MessageBuilder().setContent("tree").build(); + + TopicMetadataImpl tm = TopicMetadataImpl(numPartitions); + int p1 = router.getPartition(msg1, tm); + int p2 = router.getPartition(msg2, tm); + int p3 = router.getPartition(msg3, tm); + ASSERT_EQ(p1, p2); + ASSERT_EQ(p3, (p2 + 1) % numPartitions); +}
