This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 424034b add triggerFlush to immediately flush batch messages in
producer::send (#2980)
424034b is described below
commit 424034b5393bbf6440ac1c301febb98b64299fe6
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Nov 14 13:24:22 2018 +0800
add triggerFlush to immediately flush batch messages in producer::send
(#2980)
---
pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 7 +
pulsar-client-cpp/lib/PartitionedProducerImpl.h | 2 +
pulsar-client-cpp/lib/Producer.cc | 5 +
pulsar-client-cpp/lib/ProducerImpl.cc | 7 +
pulsar-client-cpp/lib/ProducerImpl.h | 4 +-
pulsar-client-cpp/lib/ProducerImplBase.h | 1 +
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 202 +++++++++++++++++++++++
pulsar-client-cpp/tests/BatchMessageTest.cc | 5 +-
pulsar-client-cpp/tests/ConsumerStatsTest.cc | 1 +
9 files changed, 232 insertions(+), 2 deletions(-)
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 6808aad..4e75e0e 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -234,4 +234,11 @@ Future<Result, ProducerImplBaseWeakPtr>
PartitionedProducerImpl::getProducerCrea
// override
bool PartitionedProducerImpl::isClosed() { return state_ == Closed; }
+
+void PartitionedProducerImpl::triggerFlush() {
+ for (ProducerList::const_iterator prod = producers_.begin(); prod !=
producers_.end(); prod++) {
+ (*prod)->triggerFlush();
+ }
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index be73ddc..cf6ddd9 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -69,6 +69,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture();
+ virtual void triggerFlush();
+
void handleSinglePartitionProducerCreated(Result result,
ProducerImplBaseWeakPtr producerBaseWeakPtr,
const unsigned int
partitionIndex);
diff --git a/pulsar-client-cpp/lib/Producer.cc
b/pulsar-client-cpp/lib/Producer.cc
index 822f41e..49adbfc 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -24,6 +24,7 @@
#include "ProducerImpl.h"
namespace pulsar {
+DECLARE_LOG_OBJECT()
static const std::string EMPTY_STRING;
@@ -37,6 +38,10 @@ Result Producer::send(const Message& msg) {
Promise<Result, Message> promise;
sendAsync(msg, WaitForCallbackValue<Message>(promise));
+ if (!promise.isComplete()) {
+ impl_->triggerFlush();
+ }
+
Message m;
Result result = promise.getFuture().get(m);
return result;
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 881b189..ddcf671 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -285,6 +285,13 @@ void ProducerImpl::statsCallBackHandler(Result res, const
Message& msg, SendCall
}
}
+void ProducerImpl::triggerFlush() {
+ if (batchMessageContainer) {
+ Lock lock(mutex_);
+ batchMessageContainer->sendMessage();
+ }
+}
+
void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
producerStatsBasePtr_->messageSent(msg);
SendCallback cb = boost::bind(&ProducerImpl::statsCallBackHandler, this,
_1, _2, callback,
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h
b/pulsar-client-cpp/lib/ProducerImpl.h
index 2907d40..615eafb 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -86,7 +86,9 @@ class ProducerImpl : public HandlerBase,
virtual void shutdown();
- bool isClosed();
+ virtual bool isClosed();
+
+ virtual void triggerFlush();
protected:
ProducerStatsBasePtr producerStatsBasePtr_;
diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h
b/pulsar-client-cpp/lib/ProducerImplBase.h
index ec19163..92f886f 100644
--- a/pulsar-client-cpp/lib/ProducerImplBase.h
+++ b/pulsar-client-cpp/lib/ProducerImplBase.h
@@ -42,6 +42,7 @@ class ProducerImplBase {
virtual bool isClosed() = 0;
virtual const std::string& getTopic() const = 0;
virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture()
= 0;
+ virtual void triggerFlush() = 0;
};
} // namespace pulsar
#endif // PULSAR_PRODUCER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 73430fc..122a643 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1956,4 +1956,206 @@ TEST(BasicEndToEndTest,
testPatternMultiTopicsConsumerAutoDiscovery) {
ASSERT_EQ(ResultOk, consumer.unsubscribe());
client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ std::string topicName =
"persistent://property/cluster/namespace/test-flush-batch-messages";
+ std::string subName = "subscription-name";
+ Producer producer;
+
+ int numOfMessages = 10;
+
+ ProducerConfiguration conf;
+
+ conf.setBatchingEnabled(true);
+ // set batch message number numOfMessages, and max delay 60s
+ conf.setBatchingMaxMessages(numOfMessages);
+ conf.setBatchingMaxPublishDelayMs(60000);
+
+ conf.setBlockIfQueueFull(true);
+ conf.setProperty("producer-name", "test-producer-name");
+ conf.setProperty("producer-id", "test-producer-id");
+
+ Promise<Result, Producer> producerPromise;
+ client.createProducerAsync(topicName, conf,
WaitForCallbackValue<Producer>(producerPromise));
+ Future<Result, Producer> producerFuture = producerPromise.getFuture();
+ Result result = producerFuture.get(producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setProperty("consumer-name", "test-consumer-name");
+ consumerConfig.setProperty("consumer-id", "test-consumer-id");
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicName, subName, consumerConfig,
+ WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ // Send Asynchronously of half the messages
+ std::string prefix = "msg-batch-async";
+ for (int i = 0; i < numOfMessages / 2; i++) {
+ std::string messageContent = prefix +
boost::lexical_cast<std::string>(i);
+ Message msg = MessageBuilder()
+ .setContent(messageContent)
+ .setProperty("msgIndex",
boost::lexical_cast<std::string>(i))
+ .build();
+ producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+ LOG_DEBUG("async sending message " << messageContent);
+ }
+ LOG_INFO("sending first half messages in async, should timeout to
receive");
+
+ // message not reached max batch number, should not receive any data.
+ Message receivedMsg;
+ ASSERT_EQ(ResultTimeout, consumer.receive(receivedMsg, 5000));
+
+ // Send Asynchronously of the other half the messages
+ for (int i = numOfMessages / 2; i < numOfMessages; i++) {
+ std::string messageContent = prefix +
boost::lexical_cast<std::string>(i);
+ Message msg = MessageBuilder()
+ .setContent(messageContent)
+ .setProperty("msgIndex",
boost::lexical_cast<std::string>(i))
+ .build();
+ producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+ LOG_DEBUG("async sending message " << messageContent);
+ }
+ LOG_INFO("sending the other half messages in async, should able to
receive");
+ // message not reached max batch number, should received the messages
+ ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 5000));
+
+ // receive all the messages.
+ int i = 1;
+ while (consumer.receive(receivedMsg, 5000) == ResultOk) {
+ std::string expectedMessageContent = prefix +
boost::lexical_cast<std::string>(i);
+ LOG_DEBUG("Received Message with [ content - " <<
receivedMsg.getDataAsString() << "] [ messageID = "
+ <<
receivedMsg.getMessageId() << "]");
+ ASSERT_EQ(receivedMsg.getProperty("msgIndex"),
boost::lexical_cast<std::string>(i++));
+ ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
+ ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+ }
+
+ // Send sync of half the messages, this will triggerFlush, and could get
the messages.
+ prefix = "msg-batch-sync";
+ for (int i = 0; i < numOfMessages / 2; i++) {
+ std::string messageContent = prefix +
boost::lexical_cast<std::string>(i);
+ Message msg = MessageBuilder()
+ .setContent(messageContent)
+ .setProperty("msgIndex",
boost::lexical_cast<std::string>(i))
+ .build();
+ producer.send(msg);
+ LOG_DEBUG("sync sending message " << messageContent);
+ }
+ // message not reached max batch number, should received the messages, and
not timeout
+ ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 5000));
+
+ producer.close();
+ client.shutdown();
+}
+
+// for partitioned reason, it may hard to verify message id.
+static void simpleCallback(Result code, const Message& msg) {
+ LOG_INFO("Received code: " << code << " -- Msg: " << msg);
+}
+
+TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
+ Client client(lookupUrl);
+ std::string topicName =
"persistent://prop/unit/ns/partition-testSyncFlushBatchMessages";
+ // call admin api to make it partitioned
+ std::string url =
+ adminUrl +
"admin/persistent/prop/unit/ns/partition-testSyncFlushBatchMessages/partitions";
+ int res = makePutRequest(url, "5");
+ int numberOfPartitions = 5;
+
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ Producer producer;
+ int numOfMessages = 20;
+ ProducerConfiguration tempProducerConfiguration;
+
tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+ ProducerConfiguration producerConfiguration = tempProducerConfiguration;
+ producerConfiguration.setBatchingEnabled(true);
+ // set batch message number numOfMessages, and max delay 60s
+ producerConfiguration.setBatchingMaxMessages(numOfMessages /
numberOfPartitions);
+ producerConfiguration.setBatchingMaxPublishDelayMs(60000);
+
+ Result result = client.createProducer(topicName, producerConfiguration,
producer);
+ ASSERT_EQ(ResultOk, result);
+ ASSERT_EQ(producer.getTopic(), topicName);
+
+ // Topic is partitioned into 5 partitions so each partition will receive
two messages
+ LOG_INFO("Creating Subscriber");
+ std::string consumerId = "CONSUMER";
+ ConsumerConfiguration consConfig;
+ consConfig.setConsumerType(ConsumerExclusive);
+ consConfig.setReceiverQueueSize(2);
+ ASSERT_FALSE(consConfig.hasMessageListener());
+ Consumer consumer[numberOfPartitions];
+ Result subscribeResult;
+ for (int i = 0; i < numberOfPartitions; i++) {
+ std::stringstream partitionedTopicName;
+ partitionedTopicName << topicName << "-partition-" << i;
+
+ std::stringstream partitionedConsumerId;
+ partitionedConsumerId << consumerId << i;
+ subscribeResult = client.subscribe(partitionedTopicName.str(),
partitionedConsumerId.str(),
+ consConfig, consumer[i]);
+
+ ASSERT_EQ(ResultOk, subscribeResult);
+ ASSERT_EQ(consumer[i].getTopic(), partitionedTopicName.str());
+ }
+
+ // Send asynchronously of first part the messages
+ std::string prefix = "msg-batch-async";
+ for (int i = 0; i < numOfMessages / numberOfPartitions / 2; i++) {
+ std::string messageContent = prefix +
boost::lexical_cast<std::string>(i);
+ Message msg = MessageBuilder()
+ .setContent(messageContent)
+ .setProperty("msgIndex",
boost::lexical_cast<std::string>(i))
+ .build();
+ producer.sendAsync(msg, simpleCallback);
+ LOG_DEBUG("async sending message " << messageContent);
+ }
+ LOG_INFO("sending first part messages in async, should timeout to
receive");
+
+ Message m;
+ ASSERT_EQ(ResultTimeout, consumer[0].receive(m, 5000));
+
+ for (int i = numOfMessages / numberOfPartitions / 2; i < numOfMessages;
i++) {
+ std::string messageContent = prefix +
boost::lexical_cast<std::string>(i);
+ Message msg = MessageBuilder()
+ .setContent(messageContent)
+ .setProperty("msgIndex",
boost::lexical_cast<std::string>(i))
+ .build();
+ producer.sendAsync(msg, simpleCallback);
+ LOG_DEBUG("async sending message " << messageContent);
+ }
+ LOG_INFO("sending second part messages in async, should be able to
receive");
+
+ for (int i = 0; i < numOfMessages / numberOfPartitions; i++) {
+ for (int partitionIndex = 0; partitionIndex < numberOfPartitions;
partitionIndex++) {
+ ASSERT_EQ(ResultOk, consumer[partitionIndex].receive(m));
+ ASSERT_EQ(ResultOk, consumer[partitionIndex].acknowledge(m));
+ }
+ }
+
+ // Sync send of first part of the messages, this will triggerFlush, and
could get the messages.
+ prefix = "msg-batch-sync";
+ for (int i = 0; i < numOfMessages / numberOfPartitions / 2; i++) {
+ std::string messageContent = prefix +
boost::lexical_cast<std::string>(i);
+ Message msg = MessageBuilder()
+ .setContent(messageContent)
+ .setProperty("msgIndex",
boost::lexical_cast<std::string>(i))
+ .build();
+ producer.send(msg);
+ LOG_DEBUG("sync sending message " << messageContent);
+ }
+ LOG_INFO("sending first part messages in sync, should not timeout to
receive");
+ ASSERT_EQ(ResultOk, consumer[0].receive(m, 5000));
+
+ producer.close();
+ client.shutdown();
}
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc
b/pulsar-client-cpp/tests/BatchMessageTest.cc
index 34c38db..894a582 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -139,7 +139,10 @@ TEST(BatchMessageTest, testProducerTimeout) {
/* Start the timer */
start = time(NULL);
LOG_DEBUG("start = " << start);
- producer.send(msg);
+ Promise<Result, Message> promise;
+ producer.sendAsync(msg, WaitForCallbackValue<Message>(promise));
+ Message m;
+ promise.getFuture().get(m);
/* End the timer */
end = time(NULL);
LOG_DEBUG("end = " << end);
diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc
b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
index 09c4b75..6843027 100644
--- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
@@ -266,6 +266,7 @@ TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) {
int numOfMessages = 7 * 5; // 5 message per partition
Promise<Result, Producer> producerPromise;
ProducerConfiguration config;
+ config.setBatchingEnabled(false);
config.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
client.createProducerAsync(topicName, config,
WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();