This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 1139cfbc [ISSUE #1174] [C++] Optimize the logic for generating the
uniqueId of messages (#1175)
1139cfbc is described below
commit 1139cfbc8faf71065b87526aee1180b47015b069
Author: lizhimins <[email protected]>
AuthorDate: Wed Jan 14 10:13:10 2026 +0800
[ISSUE #1174] [C++] Optimize the logic for generating the uniqueId of
messages (#1175)
---
cpp/source/base/UniqueIdGenerator.cpp | 54 +++++++++++++++++++++++------
cpp/source/base/include/InvocationContext.h | 2 +-
cpp/source/base/include/UniqueIdGenerator.h | 2 ++
cpp/source/rocketmq/ProcessQueueImpl.cpp | 13 +------
4 files changed, 48 insertions(+), 23 deletions(-)
diff --git a/cpp/source/base/UniqueIdGenerator.cpp
b/cpp/source/base/UniqueIdGenerator.cpp
index 16c30d93..0083be8f 100644
--- a/cpp/source/base/UniqueIdGenerator.cpp
+++ b/cpp/source/base/UniqueIdGenerator.cpp
@@ -17,6 +17,7 @@
#include "UniqueIdGenerator.h"
#include <cstring>
+#include <random>
#include "spdlog/spdlog.h"
#include "MixAll.h"
@@ -62,19 +63,15 @@ std::string UniqueIdGenerator::next() {
Slot slot = {};
{
absl::MutexLock lk(&mtx_);
- uint32_t delta = deltaSeconds();
- if (seconds_ != delta) {
- seconds_ = delta;
- sequence_ = 0;
- SPDLOG_DEBUG("Second: {} and sequence: {}", seconds_, sequence_);
- } else {
- sequence_++;
- }
- slot.seconds = seconds_;
- slot.sequence = sequence_;
+ seconds_ = deltaSeconds();
+ slot.seconds = absl::big_endian::FromHost32(seconds_);
+ slot.sequence = absl::big_endian::FromHost32(sequence_);
+ sequence_++;
}
std::array<uint8_t, 17> raw{};
raw[0] = VERSION;
+
+ // 9 bytes prefix: VERSION(1) + MAC(6) + PID(low2)
memcpy(raw.data() + sizeof(VERSION), prefix_.data(), prefix_.size());
memcpy(raw.data() + sizeof(VERSION) + prefix_.size(), &slot, sizeof(slot));
return MixAll::hex(raw.data(), raw.size());
@@ -90,4 +87,41 @@ uint32_t UniqueIdGenerator::deltaSeconds() {
.count();
}
+std::string UniqueIdGenerator::nextUuidV4Std() {
+ std::array<uint8_t, 16> b{};
+
+ static thread_local std::random_device rd;
+ for (size_t i = 0; i < b.size();) {
+ uint32_t v = static_cast<uint32_t>(rd());
+ for (int k = 0; k < 4 && i < b.size(); ++k, ++i) {
+ b[i] = static_cast<uint8_t>(v & 0xFF);
+ v >>= 8;
+ }
+ }
+
+ // RFC 4122
+ b[6] = static_cast<uint8_t>((b[6] & 0x0F) | 0x40);
+ b[8] = static_cast<uint8_t>((b[8] & 0x3F) | 0x80);
+
+ static constexpr char hex[] = "0123456789abcdef";
+
+ std::string out;
+ out.resize(36);
+
+ auto put_byte = [&](size_t& p, uint8_t x) {
+ out[p++] = hex[(x >> 4) & 0xF];
+ out[p++] = hex[x & 0xF];
+ };
+
+ size_t p = 0;
+ for (int i = 0; i < 16; ++i) {
+ if (i == 4 || i == 6 || i == 8 || i == 10) {
+ out[p++] = '-';
+ }
+ put_byte(p, b[i]);
+ }
+
+ return out;
+}
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/base/include/InvocationContext.h
b/cpp/source/base/include/InvocationContext.h
index dd6864bf..fdd3243b 100644
--- a/cpp/source/base/include/InvocationContext.h
+++ b/cpp/source/base/include/InvocationContext.h
@@ -40,7 +40,7 @@ ROCKETMQ_NAMESPACE_BEGIN
* async_stream.h
*/
struct BaseInvocationContext {
- BaseInvocationContext() : request_id_(UniqueIdGenerator::instance().next()) {
+ BaseInvocationContext() : request_id_(UniqueIdGenerator::nextUuidV4Std()) {
context.AddMetadata(MetadataConstants::REQUEST_ID_KEY, request_id_);
}
diff --git a/cpp/source/base/include/UniqueIdGenerator.h
b/cpp/source/base/include/UniqueIdGenerator.h
index e44c5e64..7cd96b97 100644
--- a/cpp/source/base/include/UniqueIdGenerator.h
+++ b/cpp/source/base/include/UniqueIdGenerator.h
@@ -33,6 +33,8 @@ public:
std::string next() LOCKS_EXCLUDED(mtx_);
+ static std::string nextUuidV4Std();
+
UniqueIdGenerator(const UniqueIdGenerator&) = delete;
UniqueIdGenerator(UniqueIdGenerator&&) = delete;
diff --git a/cpp/source/rocketmq/ProcessQueueImpl.cpp
b/cpp/source/rocketmq/ProcessQueueImpl.cpp
index dc6f32d1..11a7f194 100644
--- a/cpp/source/rocketmq/ProcessQueueImpl.cpp
+++ b/cpp/source/rocketmq/ProcessQueueImpl.cpp
@@ -21,7 +21,6 @@
#include <system_error>
#include <utility>
-#include "UniqueIdGenerator.h"
#include "AsyncReceiveMessageCallback.h"
#include "MetadataConstants.h"
#include "Protocol.h"
@@ -183,16 +182,6 @@ void
ProcessQueueImpl::wrapFilterExpression(rmq::FilterExpression* filter_expres
}
}
-void generateAttemptId(std::string& attempt_id) {
- const std::string unique_id = UniqueIdGenerator::instance().next();
- if (unique_id.size() < 34) {
- return;
- }
- attempt_id = fmt::format(
- "{}-{}-{}-{}-{}", unique_id.substr(0, 8), unique_id.substr(8, 4),
- unique_id.substr(12, 4), unique_id.substr(16, 4), unique_id.substr(20,
12));
-}
-
void ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string,
std::string>& metadata,
rmq::ReceiveMessageRequest&
request, std::string& attempt_id) {
std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
@@ -216,7 +205,7 @@ void
ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, st
request.mutable_invisible_duration()->set_nanos(nano_seconds);
if (attempt_id.empty()) {
- generateAttemptId(attempt_id);
+ attempt_id = UniqueIdGenerator::nextUuidV4Std();
}
request.set_attempt_id(attempt_id);
}