This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 65017010 [ISSUE #934] C++ trans producer should return a SendReceipt
object in the send call (#936)
65017010 is described below
commit 650170106805f8d7cb4b7265cf88e917567b1d8d
Author: lizhimins <[email protected]>
AuthorDate: Tue Feb 11 11:26:55 2025 +0800
[ISSUE #934] C++ trans producer should return a SendReceipt object in the
send call (#936)
---
.../ExampleProducerWithTransactionalMessage.cpp | 4 +++-
cpp/include/rocketmq/Producer.h | 2 +-
cpp/source/rocketmq/Producer.cpp | 4 ++--
cpp/source/rocketmq/ProducerImpl.cpp | 12 ++++++++---
cpp/source/rocketmq/include/ProducerImpl.h | 25 ++++++++++------------
5 files changed, 26 insertions(+), 21 deletions(-)
diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index f595c6ef..d2c194d5 100644
--- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -109,9 +109,11 @@ int main(int argc, char* argv[]) {
auto transaction = producer.beginTransaction();
std::error_code ec;
- producer.send(std::move(message), ec, *transaction);
+ SendReceipt send_receipt = producer.send(std::move(message), ec,
*transaction);
if (!ec) {
+ std::cout << "Send transactional message to " << FLAGS_topic << " OK. "
+ << "Message-ID: " << send_receipt.message_id << std::endl;
if (!transaction->commit()) {
std::cerr << "Failed to commit message" << std::endl;
}
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index 6b42843d..d3de06e9 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -65,7 +65,7 @@ public:
std::unique_ptr<Transaction> beginTransaction();
- void send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction);
+ SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction);
private:
explicit Producer(std::shared_ptr<ProducerImpl> impl) :
impl_(std::move(impl)) {
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 907d0a28..4f29383f 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -64,8 +64,8 @@ std::unique_ptr<Transaction> Producer::beginTransaction() {
return impl_->beginTransaction();
}
-void Producer::send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction) {
- impl_->send(std::move(message), ec, transaction);
+SendReceipt Producer::send(MessageConstPtr message, std::error_code& ec,
Transaction& transaction) {
+ return impl_->send(std::move(message), ec, transaction);
}
ProducerBuilder Producer::newBuilder() {
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp
b/cpp/source/rocketmq/ProducerImpl.cpp
index 9b664d59..3de3d37d 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -474,7 +474,7 @@ void ProducerImpl::isolateEndpoint(const std::string&
target) {
isolated_endpoints_.insert(target);
}
-void ProducerImpl::send(MessageConstPtr message, std::error_code& ec,
Transaction& transaction) {
+SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec,
Transaction& transaction) {
MiniTransaction mini = {};
mini.topic = message->topic();
mini.trace_context = message->traceContext();
@@ -482,13 +482,17 @@ void ProducerImpl::send(MessageConstPtr message,
std::error_code& ec, Transactio
if (!message->group().empty()) {
ec = ErrorCode::MessagePropertyConflictWithType;
SPDLOG_WARN("FIFO message may not be transactional");
- return;
+ SendReceipt send_receipt{};
+ send_receipt.message = std::move(message);
+ return send_receipt;
}
if (message->deliveryTimestamp().time_since_epoch().count()) {
ec = ErrorCode::MessagePropertyConflictWithType;
SPDLOG_WARN("Timed message may not be transactional");
- return;
+ SendReceipt send_receipt{};
+ send_receipt.message = std::move(message);
+ return send_receipt;
}
Message* msg = const_cast<Message*>(message.get());
@@ -501,6 +505,8 @@ void ProducerImpl::send(MessageConstPtr message,
std::error_code& ec, Transactio
mini.target = send_receipt.target;
auto& impl = dynamic_cast<TransactionImpl&>(transaction);
impl.appendMiniTransaction(mini);
+
+ return send_receipt;
}
void ProducerImpl::getPublishInfoAsync(const std::string& topic, const
PublishInfoCallback& cb) {
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h
b/cpp/source/rocketmq/include/ProducerImpl.h
index 2c284172..2cb9c44e 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -49,20 +49,20 @@ public:
void shutdown() override;
/**
- * Note we requrie application to transfer ownership of the message to send
to avoid concurrent modification during
- * sent.
+ * Note we require application to transfer ownership of the message
+ * to send to avoid concurrent modification during sent.
*
- * Regardless of the send result, SendReceipt would have the
std::unique_ptr<const Message>, facilliating
- * application to conduct customized retry policy.
+ * Regardless of the send result, SendReceipt would have the
std::unique_ptr<const Message>,
+ * facilitating application to conduct customized retry policy.
*/
SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept;
/**
- * Note we requrie application to transfer ownership of the message to send
to avoid concurrent modification during
- * sent.
+ * Note we require application to transfer ownership of the message
+ * to send to avoid concurrent modification during sent.
*
- * Regardless of the send result, SendReceipt would have the
std::unique_ptr<const Message>, facilliating
- * application to conduct customized retry policy.
+ * Regardless of the send result, SendReceipt would have the
std::unique_ptr<const Message>,
+ * facilitating application to conduct customized retry policy.
*/
void send(MessageConstPtr message, SendCallback callback);
@@ -74,13 +74,10 @@ public:
}
/**
- * Note we requrie application to transfer ownership of the message to send
to avoid concurrent modification during
- * sent.
- *
- * TODO: Refine this API. Current API is not good enough as it cannot handle
the message back to its caller on publish
- * failure.
+ * Note we require application to transfer ownership of the message
+ * to send to avoid concurrent modification during sent.
*/
- void send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction);
+ SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction);
/**
* Check if the RPC client for the target host is isolated or not