This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0f846309d5ff615de4ce7a10f656bc7b82b70d8d Author: Yunze Xu <[email protected]> AuthorDate: Mon Jan 25 10:42:49 2021 +0800 [Python] Return MessageId in producer's synchronous send method (#9287) Fixes #9176 ### Motivation Currently Python producer's send method returns nothing. However, it should returns a `MessageId` at least. ### Modifications - Add a new `Producer#send` API with an extra argument as the output argument of `MessageId`. - Improve the `Producer#send` related documents to ensure these methods can link to each other in Doxygen-generated websites. - Let Python client's `Producer#send` return a `MessageId`. - Add related tests. It should be noted that the current C++ `Producer::send`'s API design is strange and weird that it returns no `MessageId` explicitly. However it will set the input `Message`'s internal `MessageId` field and there're no document about it, as well as the tests. This API design is extremely terrible. I think without looking into the source code, no one could guess that the input **const** `Message` argument will **modify** its internal field after `send` is completed. So I add a new `send` method as a substitute and mark the old `send` method deprecated. (cherry picked from commit 0e6bbc8ef41fc38048cb95ef3b594c070268916b) --- pulsar-client-cpp/include/pulsar/Producer.h | 23 +++++++++--- .../include/pulsar/ProducerConfiguration.h | 42 ++++++++++++++++++++++ pulsar-client-cpp/lib/Producer.cc | 11 ++++++ pulsar-client-cpp/python/pulsar/__init__.py | 6 ++-- pulsar-client-cpp/python/pulsar_test.py | 12 +++++++ pulsar-client-cpp/python/src/producer.cc | 8 +++-- pulsar-client-cpp/tests/ProducerTest.cc | 28 +++++++++++++-- 7 files changed, 120 insertions(+), 10 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h index ae55093..6241fa2 100644 --- a/pulsar-client-cpp/include/pulsar/Producer.h +++ b/pulsar-client-cpp/include/pulsar/Producer.h @@ -50,7 +50,14 @@ class PULSAR_PUBLIC Producer { const std::string& getProducerName() const; /** - * Publish a message on the topic associated with this Producer. + * @deprecated + * It's the same with send(const Message& msg, MessageId& messageId) except that MessageId will be stored + * in `msg` though `msg` is `const`. + */ + Result send(const Message& msg); + + /** + * Publish a message on the topic associated with this Producer and get the associated MessageId. * * This method will block until the message will be accepted and persisted * by the broker. In case of errors, the client library will try to @@ -61,11 +68,19 @@ class PULSAR_PUBLIC Producer { * * This method is equivalent to asyncSend() and wait until the callback is triggered. * - * @param msg message to publish + * @param [in] msg message to publish + * @param [out] messageId the message id assigned to the published message * @return ResultOk if the message was published successfully - * @return ResultWriteError if it wasn't possible to publish the message + * @return ResultTimeout if message was not sent successfully in ProducerConfiguration#getSendTimeout + * @return ResultProducerQueueIsFull if the outgoing messsage queue is full when + * ProducerConfiguration::getBlockIfQueueFull was false + * @return ResultMessageTooBig if message size is bigger than the maximum message size + * @return ResultAlreadyClosed if Producer was already closed when message was sent + * @return ResultCryptoError if ProducerConfiguration::isEncryptionEnabled returns true but the message + * was failed to encrypt + * @return ResultInvalidMessage if message's invalid, it's usually caused by resending the same Message */ - Result send(const Message& msg); + Result send(const Message& msg, MessageId& messageId); /** * Asynchronously publish a message on the topic associated with this Producer. diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h index 5666551..e260ca9 100644 --- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h @@ -107,7 +107,21 @@ class PULSAR_PUBLIC ProducerConfiguration { */ const SchemaInfo& getSchema() const; + /** + * The getter associated with getSendTimeout() + */ ProducerConfiguration& setSendTimeout(int sendTimeoutMs); + + /** + * Get the send timeout is milliseconds. + * + * If a message is not acknowledged by the server before the sendTimeout expires, an error will be + * reported. + * + * If the timeout is zero, there will be no timeout. + * + * @return the send timeout in milliseconds (Default: 30000) + */ int getSendTimeout() const; ProducerConfiguration& setInitialSequenceId(int64_t initialSequenceId); @@ -159,7 +173,15 @@ class PULSAR_PUBLIC ProducerConfiguration { ProducerConfiguration& setHashingScheme(const HashingScheme& scheme); HashingScheme getHashingScheme() const; + /** + * The setter associated with getBlockIfQueueFull() + */ ProducerConfiguration& setBlockIfQueueFull(bool); + + /** + * @return whether Producer::send or Producer::sendAsync operations should block when the outgoing message + * queue is full. (Default: false) + */ bool getBlockIfQueueFull() const; // Zero queue size feature will not be supported on consumer end if batching is enabled @@ -188,8 +210,28 @@ class PULSAR_PUBLIC ProducerConfiguration { ProducerCryptoFailureAction getCryptoFailureAction() const; ProducerConfiguration& setCryptoFailureAction(ProducerCryptoFailureAction action); + /** + * @return all the encryption keys added + */ const std::set<std::string>& getEncryptionKeys() const; + + /** + * @return true if encryption keys are added + */ bool isEncryptionEnabled() const; + + /** + * Add public encryption key, used by producer to encrypt the data key. + * + * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If + * keys are found, a callback getKey(String keyName) is invoked against each key to load the values of the + * key. Application should implement this callback to return the key in pkcs8 format. If compression is + * enabled, message is encrypted after compression. If batch messaging is enabled, the batched message is + * encrypted. + * + * @key the encryption key to add + * @return the ProducerConfiguration self + */ ProducerConfiguration& addEncryptionKey(std::string key); /** diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc index 7c7e53c..729577c 100644 --- a/pulsar-client-cpp/lib/Producer.cc +++ b/pulsar-client-cpp/lib/Producer.cc @@ -49,6 +49,17 @@ Result Producer::send(const Message& msg) { return result; } +Result Producer::send(const Message& msg, MessageId& messageId) { + Promise<Result, MessageId> promise; + sendAsync(msg, WaitForCallbackValue<MessageId>(promise)); + + if (!promise.isComplete()) { + impl_->triggerFlush(); + } + + return promise.getFuture().get(messageId); +} + void Producer::sendAsync(const Message& msg, SendCallback callback) { if (!impl_) { callback(ResultProducerNotInitialized, msg.getMessageId()); diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index c3c610a..cfae7e0 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -853,6 +853,8 @@ class Producer: """ Publish a message on the topic. Blocks until the message is acknowledged + Returns a `MessageId` object that represents where the message is persisted. + **Args** * `content`: @@ -887,7 +889,7 @@ class Producer: msg = self._build_msg(content, properties, partition_key, sequence_id, replication_clusters, disable_replication, event_timestamp, deliver_at, deliver_after) - return self._producer.send(msg) + return MessageId.deserialize(self._producer.send(msg)) def send_async(self, content, callback, properties=None, @@ -1000,7 +1002,7 @@ class Producer: mb.deliver_at(deliver_at) if deliver_after: mb.deliver_after(deliver_after) - + return mb.build() diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 2ea940e..f056832 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -122,6 +122,18 @@ class PulsarTest(TestCase): self.assertEqual(len(sent_messages), 3) client.close() + def test_producer_send(self): + client = Client(self.serviceUrl) + topic = 'test_producer_send' + producer = client.create_producer(topic) + consumer = client.subscribe(topic, 'sub-name') + msg_id = producer.send(b'hello') + print('send to {}'.format(msg_id)) + msg = consumer.receive(TM) + consumer.acknowledge(msg) + print('receive from {}'.format(msg.message_id())) + self.assertEqual(msg_id, msg.message_id()) + def test_producer_consumer(self): client = Client(self.serviceUrl) consumer = client.subscribe('my-python-topic-producer-consumer', diff --git a/pulsar-client-cpp/python/src/producer.cc b/pulsar-client-cpp/python/src/producer.cc index 3be5417..c50eac1 100644 --- a/pulsar-client-cpp/python/src/producer.cc +++ b/pulsar-client-cpp/python/src/producer.cc @@ -20,13 +20,17 @@ #include <functional> -void Producer_send(Producer& producer, const Message& message) { +extern boost::python::object MessageId_serialize(const MessageId& msgId); + +boost::python::object Producer_send(Producer& producer, const Message& message) { Result res; + MessageId messageId; Py_BEGIN_ALLOW_THREADS - res = producer.send(message); + res = producer.send(message, messageId); Py_END_ALLOW_THREADS CHECK_RESULT(res); + return MessageId_serialize(messageId); } void Producer_sendAsyncCallback(PyObject* callback, Result res, const MessageId& msgId) { diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc index 6769353..38f8d32 100644 --- a/pulsar-client-cpp/tests/ProducerTest.cc +++ b/pulsar-client-cpp/tests/ProducerTest.cc @@ -19,8 +19,10 @@ #include <pulsar/Client.h> #include <gtest/gtest.h> -#include "../lib/Future.h" -#include "../lib/Utils.h" +#include "lib/Future.h" +#include "lib/Utils.h" +#include "lib/LogUtils.h" +DECLARE_LOG_OBJECT() using namespace pulsar; @@ -71,3 +73,25 @@ TEST(ProducerTest, exactlyOnceWithProducerNameSpecified) { Result result = client.createProducer(topicName, producerConfiguration2, producer3); ASSERT_EQ(ResultProducerBusy, result); } + +TEST(ProducerTest, testSynchronouslySend) { + Client client(serviceUrl); + const std::string topic = "ProducerTestSynchronouslySend"; + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-name", consumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + MessageId messageId; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("hello").build(), messageId)); + LOG_INFO("Send message to " << messageId); + + Message receivedMessage; + ASSERT_EQ(ResultOk, consumer.receive(receivedMessage, 3000)); + LOG_INFO("Received message from " << receivedMessage.getMessageId()); + ASSERT_EQ(receivedMessage.getMessageId(), messageId); + ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMessage)); + + client.close(); +}
