This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new df8759f9 [C++] Fix critical correctness, thread safety, and memory 
management bugs (#1259)
df8759f9 is described below

commit df8759f93c4d05ae3bc0c430a4e58a2e27dfb49a
Author: lizhimins <[email protected]>
AuthorDate: Mon Jun 8 16:48:31 2026 +0800

    [C++] Fix critical correctness, thread safety, and memory management bugs 
(#1259)
---
 cpp/docs/bugfix-correctness-threadsafety-memory.md | 498 +++++++++++++++++++++
 cpp/source/client/RpcClientImpl.cpp                |   4 +-
 cpp/source/client/TelemetryBidiReactor.cpp         |  27 +-
 cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp  |  22 +
 cpp/source/rocketmq/FifoProducerPartition.cpp      |   6 +-
 cpp/source/rocketmq/ProducerImpl.cpp               |  14 +-
 cpp/source/rocketmq/PushConsumerImpl.cpp           |   3 +
 cpp/source/rocketmq/SimpleConsumerImpl.cpp         |   5 +-
 cpp/source/scheduler/SchedulerImpl.cpp             |   5 +-
 9 files changed, 553 insertions(+), 31 deletions(-)

diff --git a/cpp/docs/bugfix-correctness-threadsafety-memory.md 
b/cpp/docs/bugfix-correctness-threadsafety-memory.md
new file mode 100644
index 00000000..3247b54d
--- /dev/null
+++ b/cpp/docs/bugfix-correctness-threadsafety-memory.md
@@ -0,0 +1,498 @@
+# C++ Client Bug Fixes: Correctness, Thread Safety & Memory Management
+
+> Date: 2026-06-08
+
+This document describes a set of bug fixes addressing correctness, thread 
safety,
+and memory management issues found in the C++ client SDK.
+
+## Summary
+
+| # | Category | Issue | Severity | Files |
+|---|----------|-------|----------|-------|
+| 1 | Correctness | `onVerifyMessage` switch fall-through | Critical | 
PushConsumerImpl.cpp |
+| 2 | Correctness | `onVerifyMessage` callback never invoked | Critical | 
PushConsumerImpl.cpp |
+| 3 | Correctness | `schedule()` empty implementation | Critical | 
ConsumeMessageServiceImpl.cpp |
+| 4 | Correctness | `std::remove_if` result discarded | Major | 
SimpleConsumerImpl.cpp |
+| 5 | Thread Safety | Static `task_id` data race | Major | SchedulerImpl.cpp |
+| 6 | Thread Safety | `state_` read without lock | Major | 
TelemetryBidiReactor.cpp |
+| 7 | Thread Safety | Use-after-free in `asyncCallback` | Critical | 
RpcClientImpl.cpp |
+| 8 | Memory | `const_cast` to move from const ref — add safety comments | 
Major | ProducerImpl.cpp, FifoProducerPartition.cpp |
+
+## 1. `onVerifyMessage` switch fall-through
+
+**File:** `source/rocketmq/PushConsumerImpl.cpp`
+
+The switch statement in `onVerifyMessage` lacked `break` statements, causing 
the
+`SUCCESS` case to fall through into `FAILURE`. Every message verification was
+reported as failed to the server regardless of actual consumption result.
+
+```cpp
+// Before
+switch (result) {
+  case ConsumeResult::SUCCESS: {
+    cmd.mutable_status()->set_code(rmq::Code::OK);
+    cmd.mutable_status()->set_message("OK");
+  }  // fall-through
+  case ConsumeResult::FAILURE: {
+    cmd.mutable_status()->set_code(rmq::Code::FAILED_TO_CONSUME_MESSAGE);
+    cmd.mutable_status()->set_message("Consume message failed");
+  }
+}
+```
+
+**Fix:** Added `break;` to both cases.
+
+## 2. `onVerifyMessage` callback never invoked
+
+**File:** `source/rocketmq/PushConsumerImpl.cpp`
+
+The function accepts a callback `std::function<void(TelemetryCommand)> cb` and
+constructs a `TelemetryCommand cmd` with the verification result, but never
+calls `cb(cmd)`. The verification result was silently discarded.
+
+**Fix:** Added `cb(cmd);` at the end of the function, after all branches have
+populated the command.
+
+## 3. `schedule()` empty implementation
+
+**File:** `source/rocketmq/ConsumeMessageServiceImpl.cpp`
+
+The `schedule()` method had an empty body:
+
+```cpp
+void ConsumeMessageServiceImpl::schedule(
+    std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds delay) {
+}
+```
+
+This method is called when a FIFO consumption attempt fails and needs to be
+retried after a delay. With the empty body, failed consume tasks were silently
+dropped, breaking FIFO retry semantics.
+
+**Fix:** Implemented the method to schedule a delayed resubmission via
+`SchedulerImpl`:
+
+```cpp
+void ConsumeMessageServiceImpl::schedule(
+    std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds delay) {
+  auto consumer = consumer_.lock();
+  if (!consumer) {
+    return;
+  }
+
+  auto scheduler = consumer->manager()->getScheduler();
+  if (!scheduler) {
+    SPDLOG_WARN("Scheduler is not available, submitting task immediately");
+    submit(task);
+    return;
+  }
+
+  std::weak_ptr<ConsumeMessageServiceImpl> self(shared_from_this());
+  scheduler->schedule(
+      [self, task]() {
+        auto svc = self.lock();
+        if (!svc) {
+          return;
+        }
+        svc->submit(task);
+      },
+      "consume-retry", delay, std::chrono::milliseconds(0));
+}
+```
+
+Key design decisions:
+- Uses `weak_ptr` for both the consumer and the service itself to avoid 
preventing
+  shutdown during a pending retry.
+- Falls back to immediate `submit()` if the scheduler is unavailable.
+- Single-shot schedule (`interval = 0`), not periodic.
+
+## 4. `std::remove_if` result discarded
+
+**File:** `source/rocketmq/SimpleConsumerImpl.cpp`
+
+`std::remove_if` does not erase elements from a container. It moves matching
+elements to the end and returns an iterator to the new logical end. The return
+value must be passed to `erase()` to actually remove them.
+
+```cpp
+// Before: elements moved but never erased
+std::remove_if(assignments_.begin(), assignments_.end(),
+    [&](const rmq::Assignment& e) { return e == item; });
+
+// After: proper erase-remove idiom
+assignments_.erase(
+    std::remove_if(assignments_.begin(), assignments_.end(),
+                   [&](const rmq::Assignment& e) { return e == item; }),
+    assignments_.end());
+```
+
+Without this fix, queue assignment changes in `SimpleConsumer` were additive
+only — removed assignments persisted in the vector, potentially causing
+duplicate consumption.
+
+## 5. Static `task_id` data race
+
+**File:** `source/scheduler/SchedulerImpl.cpp`
+
+The `schedule()` function used a function-local `static std::uint32_t task_id`
+for generating unique task IDs. While `tasks_mtx_` (an instance-level mutex)
+was held during the increment, the static variable is shared across all
+`SchedulerImpl` instances. Concurrent calls from different instances constitute
+a data race on the non-atomic static.
+
+```cpp
+// Before: static variable protected by instance-level lock
+static std::uint32_t task_id = 0;
+// ...
+absl::MutexLock lk(&tasks_mtx_);
+id = ++task_id;
+
+// After: atomic increment, then take lock only for the map insertion
+static std::atomic<std::uint32_t> task_id{0};
+std::uint32_t id = ++task_id;
+{
+  absl::MutexLock lk(&tasks_mtx_);
+  tasks_.insert({id, task});
+}
+```
+
+## 6. `state_` read without lock
+
+**File:** `source/client/TelemetryBidiReactor.cpp`
+
+`tryWriteNext()` read `state_` while holding `writes_mtx_`, but `state_` is
+modified under `state_mtx_` (a different mutex). Reading a variable protected 
by
+mutex A while holding only mutex B is a data race.
+
+The function also contained redundant logic: after checking `writes_.empty()`
+and returning if true, it immediately checked `!writes_.empty()` again, and
+re-checked `state_` a second time inside that block.
+
+**Fix:** Restructured to acquire `state_mtx_` first for the state check (then
+release it), then acquire `writes_mtx_` for the write operation. Removed the
+redundant second emptiness check and second state check.
+
+```cpp
+void TelemetryBidiReactor::tryWriteNext() {
+  {
+    absl::MutexLock state_lk(&state_mtx_);
+    if (StreamState::Ready != state_) {
+      return;
+    }
+  }
+
+  absl::MutexLock lk(&writes_mtx_);
+  if (writes_.empty()) {
+    return;
+  }
+
+  AddHold();
+  StartWrite(&(writes_.front()));
+}
+```
+
+Note: There is a TOCTOU window between releasing `state_mtx_` and calling
+`StartWrite`. This is acceptable because gRPC's `StartWrite` is safe to call
+on a closing stream — it will simply fail, and the error is handled in
+`OnWriteDone`.
+
+## 7. Use-after-free in `asyncCallback`
+
+**File:** `source/client/RpcClientImpl.cpp`
+
+When `ClientManager` has already destructed (`manager` is null), the code
+deleted `invocation_context` but did not return. Execution continued to the
+lambda capture (capturing the now-dangling pointer) and the `manager->submit()`
+call (null pointer dereference).
+
+```cpp
+// Before
+if (!manager) {
+  SPDLOG_WARN("ClientManager has destructed. Response ignored");
+  delete invocation_context;
+}
+// continues to use invocation_context and manager...
+
+// After
+if (!manager) {
+  SPDLOG_WARN("ClientManager has destructed. Response ignored");
+  delete invocation_context;
+  return;
+}
+```
+
+This also resolves the related `InvocationContext` dual-deletion concern: with
+the early return, the deleted pointer is never captured by the lambda, so the
+`onCompletion()` → `delete this` path cannot be reached for an already-freed
+object.
+
+## 8. `const_cast` to move from const reference — add safety comments
+
+**Files:** `source/rocketmq/ProducerImpl.cpp`, 
`source/rocketmq/FifoProducerPartition.cpp`
+
+The synchronous `ProducerImpl::send()` uses `const_cast` to move fields out of
+the `const SendReceipt&` callback parameter. The root cause is that 
`SendReceipt`
+contains a `MessageConstPtr` (`unique_ptr<const Message>`), making it 
non-copyable.
+Meanwhile, `SendCallback` is a public API type 
(`include/rocketmq/SendCallback.h`)
+and its signature must remain `const SendReceipt&` — users implementing async
+callbacks should not be exposed to internal move semantics.
+
+The `const_cast` is safe in practice because the receipt is always a local
+temporary created in `SendContext::onSuccess`/`onFailure`, and is destroyed
+immediately after the callback returns. No other code holds a reference to it.
+
+**Fix:** Kept the public `SendCallback` signature as `const SendReceipt&`
+unchanged. Added comments at the two internal `const_cast` sites
+(`ProducerImpl::send` and `FifoProducerPartition::onComplete`) explaining why
+the cast is safe, so future maintainers do not remove it or introduce a second
+reference to the receipt.
+
+## Verification
+
+- **CMake build:** All targets compiled successfully (library, tests, examples)
+- **CMake tests:** 24/24 passed
+- **Bazel build:** All library and test targets compiled successfully (3387 
actions)
+- **Bazel tests:** 25/25 passed
+- **Runtime validation:** Producer (sync + async), PushConsumer, and 
SimpleConsumer
+  examples tested against a live RocketMQ 5.x instance — message send, receive,
+  ack, and invisible duration change all work correctly
+
+---
+
+# C++ 客户端缺陷修复:正确性、线程安全与内存管理
+
+> 日期:2026-06-08
+
+本文档记录了 C++ 客户端 SDK 中一组正确性、线程安全和内存管理缺陷的修复内容。
+
+## 概览
+
+| # | 分类 | 问题 | 严重程度 | 涉及文件 |
+|---|------|------|----------|----------|
+| 1 | 正确性 | `onVerifyMessage` switch 穿透 | 严重 | PushConsumerImpl.cpp |
+| 2 | 正确性 | `onVerifyMessage` 回调未调用 | 严重 | PushConsumerImpl.cpp |
+| 3 | 正确性 | `schedule()` 方法体为空 | 严重 | ConsumeMessageServiceImpl.cpp |
+| 4 | 正确性 | `std::remove_if` 返回值被丢弃 | 重要 | SimpleConsumerImpl.cpp |
+| 5 | 线程安全 | 静态 `task_id` 数据竞争 | 重要 | SchedulerImpl.cpp |
+| 6 | 线程安全 | `state_` 无锁读取 | 重要 | TelemetryBidiReactor.cpp |
+| 7 | 线程安全 | `asyncCallback` 中 use-after-free | 严重 | RpcClientImpl.cpp |
+| 8 | 内存管理 | 通过 `const_cast` 从 const 引用 move —— 添加安全性注释 | 重要 | 
ProducerImpl.cpp、FifoProducerPartition.cpp |
+
+## 1. `onVerifyMessage` switch 穿透
+
+**文件:** `source/rocketmq/PushConsumerImpl.cpp`
+
+`onVerifyMessage` 中的 switch 语句缺少 `break`,导致 `SUCCESS` 分支穿透到
+`FAILURE` 分支。无论消费结果如何,所有消息验证均被报告为失败。
+
+```cpp
+// 修复前
+switch (result) {
+  case ConsumeResult::SUCCESS: {
+    cmd.mutable_status()->set_code(rmq::Code::OK);
+    cmd.mutable_status()->set_message("OK");
+  }  // 穿透到 FAILURE
+  case ConsumeResult::FAILURE: {
+    cmd.mutable_status()->set_code(rmq::Code::FAILED_TO_CONSUME_MESSAGE);
+    cmd.mutable_status()->set_message("Consume message failed");
+  }
+}
+```
+
+**修复:** 为两个 case 添加 `break;`。
+
+## 2. `onVerifyMessage` 回调未调用
+
+**文件:** `source/rocketmq/PushConsumerImpl.cpp`
+
+该函数接受回调参数 `std::function<void(TelemetryCommand)> cb`,内部构造了
+`TelemetryCommand cmd` 并填充验证结果,但从未调用 `cb(cmd)`。验证结果被静默丢弃,
+Server 端的消息验证机制完全失效。
+
+**修复:** 在函数末尾所有分支完成 cmd 填充后,添加 `cb(cmd);`。
+
+## 3. `schedule()` 方法体为空
+
+**文件:** `source/rocketmq/ConsumeMessageServiceImpl.cpp`
+
+`schedule()` 方法体为空:
+
+```cpp
+void ConsumeMessageServiceImpl::schedule(
+    std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds delay) {
+}
+```
+
+该方法在 FIFO 消费失败后被调用,用于延迟重试。方法体为空意味着失败的消费任务被静默
+丢弃,FIFO 重试语义完全失效。
+
+**修复:** 实现延迟重新提交逻辑,通过 `SchedulerImpl` 调度延迟回调:
+
+```cpp
+void ConsumeMessageServiceImpl::schedule(
+    std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds delay) {
+  auto consumer = consumer_.lock();
+  if (!consumer) {
+    return;
+  }
+
+  auto scheduler = consumer->manager()->getScheduler();
+  if (!scheduler) {
+    SPDLOG_WARN("Scheduler is not available, submitting task immediately");
+    submit(task);
+    return;
+  }
+
+  std::weak_ptr<ConsumeMessageServiceImpl> self(shared_from_this());
+  scheduler->schedule(
+      [self, task]() {
+        auto svc = self.lock();
+        if (!svc) {
+          return;
+        }
+        svc->submit(task);
+      },
+      "consume-retry", delay, std::chrono::milliseconds(0));
+}
+```
+
+关键设计考量:
+- 对 consumer 和 service 自身均使用 `weak_ptr`,避免在待重试期间阻止正常关闭。
+- 调度器不可用时降级为立即 `submit()`。
+- 一次性调度(`interval = 0`),非周期性。
+
+## 4. `std::remove_if` 返回值被丢弃
+
+**文件:** `source/rocketmq/SimpleConsumerImpl.cpp`
+
+`std::remove_if` 不会从容器中删除元素。它将匹配元素移至末尾并返回新的逻辑结尾
+迭代器,必须将该返回值传给 `erase()` 才能真正删除。
+
+```cpp
+// 修复前:元素被移动但未删除
+std::remove_if(assignments_.begin(), assignments_.end(),
+    [&](const rmq::Assignment& e) { return e == item; });
+
+// 修复后:标准 erase-remove 惯用法
+assignments_.erase(
+    std::remove_if(assignments_.begin(), assignments_.end(),
+                   [&](const rmq::Assignment& e) { return e == item; }),
+    assignments_.end());
+```
+
+修复前,`SimpleConsumer` 的队列分配变更只增不减——被移除的 assignment 仍留在
+vector 中,可能导致重复消费。
+
+## 5. 静态 `task_id` 数据竞争
+
+**文件:** `source/scheduler/SchedulerImpl.cpp`
+
+`schedule()` 使用函数级 `static std::uint32_t task_id` 生成唯一任务 ID。虽然
+递增时持有 `tasks_mtx_`(实例级互斥锁),但该静态变量跨所有 `SchedulerImpl` 实例
+共享。不同实例的并发调用构成对非原子静态变量的数据竞争。
+
+```cpp
+// 修复前:静态变量由实例级锁保护
+static std::uint32_t task_id = 0;
+// ...
+absl::MutexLock lk(&tasks_mtx_);
+id = ++task_id;
+
+// 修复后:原子递增,仅在 map 插入时加锁
+static std::atomic<std::uint32_t> task_id{0};
+std::uint32_t id = ++task_id;
+{
+  absl::MutexLock lk(&tasks_mtx_);
+  tasks_.insert({id, task});
+}
+```
+
+## 6. `state_` 无锁读取
+
+**文件:** `source/client/TelemetryBidiReactor.cpp`
+
+`tryWriteNext()` 在持有 `writes_mtx_` 时读取 `state_`,但 `state_` 由
+`state_mtx_`(另一把锁)保护。持有互斥锁 A 读取由互斥锁 B 保护的变量构成数据竞争。
+
+原函数还包含冗余逻辑:检查 `writes_.empty()` 并在为空时返回后,紧接着又检查
+`!writes_.empty()`,并在该分支内再次检查 `state_`。
+
+**修复:** 重构为先获取 `state_mtx_` 检查状态(然后释放),再获取 `writes_mtx_`
+执行写操作。移除冗余的二次空检查和二次状态检查。
+
+```cpp
+void TelemetryBidiReactor::tryWriteNext() {
+  {
+    absl::MutexLock state_lk(&state_mtx_);
+    if (StreamState::Ready != state_) {
+      return;
+    }
+  }
+
+  absl::MutexLock lk(&writes_mtx_);
+  if (writes_.empty()) {
+    return;
+  }
+
+  AddHold();
+  StartWrite(&(writes_.front()));
+}
+```
+
+说明:释放 `state_mtx_` 到调用 `StartWrite` 之间存在 TOCTOU 窗口。这是可接受的,
+因为 gRPC 的 `StartWrite` 在流关闭时调用是安全的——只是会失败,错误在 `OnWriteDone`
+中处理。
+
+## 7. `asyncCallback` 中 use-after-free
+
+**文件:** `source/client/RpcClientImpl.cpp`
+
+当 `ClientManager` 已析构(`manager` 为 null)时,代码删除了 `invocation_context`
+但未返回。执行继续到 lambda 捕获(捕获了已悬空的指针)和 `manager->submit()` 调用
+(空指针解引用)。
+
+```cpp
+// 修复前
+if (!manager) {
+  SPDLOG_WARN("ClientManager has destructed. Response ignored");
+  delete invocation_context;
+}
+// 继续使用 invocation_context 和 manager...
+
+// 修复后
+if (!manager) {
+  SPDLOG_WARN("ClientManager has destructed. Response ignored");
+  delete invocation_context;
+  return;
+}
+```
+
+此修复同时解决了 `InvocationContext` 双重删除的关联问题:有了提前返回,已删除的指针
+不会被 lambda 捕获,因此 `onCompletion()` → `delete this` 路径不可能在已释放的
+对象上触发。
+
+## 8. 通过 `const_cast` 从 const 引用 move —— 添加安全性注释
+
+**文件:** 
`source/rocketmq/ProducerImpl.cpp`、`source/rocketmq/FifoProducerPartition.cpp`
+
+同步 `ProducerImpl::send()` 使用 `const_cast` 从 `const SendReceipt&` 回调参数中
+move 出字段。根本原因是 `SendReceipt` 包含 `MessageConstPtr`
+(`unique_ptr<const Message>`),不可拷贝。而 `SendCallback` 是公共 API 类型
+(`include/rocketmq/SendCallback.h`),其签名必须保持 `const SendReceipt&` ——
+用户实现异步回调时不应被暴露内部的 move 语义。
+
+`const_cast` 在实际中是安全的,因为 receipt 始终是 `SendContext::onSuccess`/
+`onFailure` 中创建的局部临时对象,回调返回后立即销毁,没有其他代码持有对它的引用。
+
+**修复:** 保持公共 `SendCallback` 签名 `const SendReceipt&` 不变。在内部两处
+`const_cast` 调用点(`ProducerImpl::send` 和 `FifoProducerPartition::onComplete`)
+添加注释说明为何该 cast 是安全的,防止未来维护者误删或引入对 receipt 的第二引用。
+
+## 验证
+
+- **CMake 构建:** 所有目标编译成功(库、测试、示例)
+- **CMake 测试:** 24/24 通过
+- **Bazel 构建:** 所有库和测试目标编译成功(3387 个 action)
+- **Bazel 测试:** 25/25 通过
+- **运行时验证:** Producer(同步 + 异步)、PushConsumer、SimpleConsumer 示例在
+  RocketMQ 5.x 实例上测试通过——消息发送、接收、ACK、不可见时间变更均工作正常
diff --git a/cpp/source/client/RpcClientImpl.cpp 
b/cpp/source/client/RpcClientImpl.cpp
index 2e183f2a..94e2c3ad 100644
--- a/cpp/source/client/RpcClientImpl.cpp
+++ b/cpp/source/client/RpcClientImpl.cpp
@@ -48,10 +48,8 @@ void RpcClientImpl::asyncCallback(std::weak_ptr<RpcClient> 
client, BaseInvocatio
   std::shared_ptr<ClientManager> manager = client_manager.lock();
   if (!manager) {
     SPDLOG_WARN("ClientManager has destructed. Response ignored");
-    // TODO: execute orphan callback in event-loop thread?
-    // invocation_context->onCompletion(false);
-    // or
     delete invocation_context;
+    return;
   }
 
   auto task = [invocation_context, client] {
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index 7cd0c218..a73efadd 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -288,28 +288,25 @@ void TelemetryBidiReactor::write(TelemetryCommand 
command) {
 
 void TelemetryBidiReactor::tryWriteNext() {
   SPDLOG_DEBUG("{}#tryWriteNext", peer_address_);
-  absl::MutexLock lk(&writes_mtx_);
-  if (StreamState::Ready != state_) {
-    SPDLOG_WARN("Further write to {} is not allowed due to stream-state={}", 
peer_address_,
-                static_cast<std::uint8_t>(state_));
-    return;
+
+  {
+    absl::MutexLock state_lk(&state_mtx_);
+    if (StreamState::Ready != state_) {
+      SPDLOG_WARN("Further write to {} is not allowed due to stream-state={}", 
peer_address_,
+                  static_cast<std::uint8_t>(state_));
+      return;
+    }
   }
 
+  absl::MutexLock lk(&writes_mtx_);
   if (writes_.empty()) {
     SPDLOG_DEBUG("No pending TelemetryCommand to write. Peer={}", 
peer_address_);
     return;
   }
 
-  if (!writes_.empty()) {
-    SPDLOG_DEBUG("Writing TelemetryCommand to {}: {}", peer_address_, 
writes_.front().ShortDebugString());
-    if (StreamState::Ready == state_) {
-      AddHold();
-      StartWrite(&(writes_.front()));
-    } else {
-      SPDLOG_WARN("Writing TelemetryCommand error due to unexpected state. 
State={}, Peer={}",
-                  static_cast<uint8_t>(state_), peer_address_);
-    }
-  }
+  SPDLOG_DEBUG("Writing TelemetryCommand to {}: {}", peer_address_, 
writes_.front().ShortDebugString());
+  AddHold();
+  StartWrite(&(writes_.front()));
 }
 
 void TelemetryBidiReactor::signalClose() {
diff --git a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp 
b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
index c607760c..82d13bd8 100644
--- a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -163,6 +163,28 @@ void ConsumeMessageServiceImpl::forward(const Message& 
message, std::function<vo
 }
 
 void ConsumeMessageServiceImpl::schedule(std::shared_ptr<ConsumeTask> task, 
std::chrono::milliseconds delay) {
+  auto consumer = consumer_.lock();
+  if (!consumer) {
+    return;
+  }
+
+  auto scheduler = consumer->manager()->getScheduler();
+  if (!scheduler) {
+    SPDLOG_WARN("Scheduler is not available, submitting task immediately");
+    submit(task);
+    return;
+  }
+
+  std::weak_ptr<ConsumeMessageServiceImpl> self(shared_from_this());
+  scheduler->schedule(
+      [self, task]() {
+        auto svc = self.lock();
+        if (!svc) {
+          return;
+        }
+        svc->submit(task);
+      },
+      "consume-retry", delay, std::chrono::milliseconds(0));
 }
 
 std::size_t ConsumeMessageServiceImpl::maxDeliveryAttempt() {
diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp 
b/cpp/source/rocketmq/FifoProducerPartition.cpp
index 8a2f06ff..67693f0d 100644
--- a/cpp/source/rocketmq/FifoProducerPartition.cpp
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -87,9 +87,9 @@ void FifoProducerPartition::onComplete(const std::error_code& 
ec, const SendRece
     return;
   }
 
-  // Put the message back to the front of the list
-  SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
-  FifoContext retry_context(std::move(receipt_mut.message), callback);
+  // Put the message back to the front of the list.
+  // receipt is a local temporary in SendContext — const_cast + move is safe 
here.
+  FifoContext 
retry_context(std::move(const_cast<SendReceipt&>(receipt).message), callback);
   {
     absl::MutexLock lk(&messages_mtx_);
     messages_.emplace_front(std::move(retry_context));
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index 184f542f..3450faf2 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -235,12 +235,14 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec) noe
   auto callback =
       [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) 
mutable {
     ec = code;
-    auto& receipt_mut = const_cast<SendReceipt&>(receipt);
-    send_receipt.target = std::move(receipt_mut.target);
-    send_receipt.message_id = std::move(receipt_mut.message_id);
-    send_receipt.message = std::move(receipt_mut.message);
-    send_receipt.transaction_id = std::move(receipt_mut.transaction_id);
-    send_receipt.recall_handle = std::move(receipt_mut.recall_handle);
+    // SendReceipt contains a unique_ptr (MessageConstPtr) and is non-copyable.
+    // The receipt here is always a local temporary in SendContext, so 
const_cast + move is safe.
+    auto& mutable_receipt = const_cast<SendReceipt&>(receipt);
+    send_receipt.target = std::move(mutable_receipt.target);
+    send_receipt.message_id = std::move(mutable_receipt.message_id);
+    send_receipt.message = std::move(mutable_receipt.message);
+    send_receipt.transaction_id = std::move(mutable_receipt.transaction_id);
+    send_receipt.recall_handle = std::move(mutable_receipt.recall_handle);
     {
       absl::MutexLock lk(mtx.get());
       completed = true;
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp 
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 90632035..bd2a9977 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -581,10 +581,12 @@ void 
PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::funct
           case ConsumeResult::SUCCESS: {
             cmd.mutable_status()->set_code(rmq::Code::OK);
             cmd.mutable_status()->set_message("OK");
+            break;
           }
           case ConsumeResult::FAILURE: {
             
cmd.mutable_status()->set_code(rmq::Code::FAILED_TO_CONSUME_MESSAGE);
             cmd.mutable_status()->set_message("Consume message failed");
+            break;
           }
         }
       } catch (const std::exception& e) {
@@ -602,6 +604,7 @@ void 
PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::funct
     cmd.mutable_status()->set_code(rmq::Code::MESSAGE_CORRUPTED);
     cmd.mutable_status()->set_message("Checksum Mismatch");
   }
+  cb(cmd);
 }
 
 void PushConsumerImpl::collectCacheStats() {
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp 
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index 5bb3eaf9..d1508d8b 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -199,7 +199,10 @@ void SimpleConsumerImpl::updateAssignments(const 
std::string& topic, const std::
         changed = true;
         absl::MutexLock lk(&assignments_mtx_);
         for (const auto& item : to_remove) {
-          std::remove_if(assignments_.begin(), assignments_.end(), [&](const 
rmq::Assignment& e) { return e == item; });
+          assignments_.erase(
+              std::remove_if(assignments_.begin(), assignments_.end(),
+                             [&](const rmq::Assignment& e) { return e == item; 
}),
+              assignments_.end());
         }
 
         for (const auto& item : to_add) {
diff --git a/cpp/source/scheduler/SchedulerImpl.cpp 
b/cpp/source/scheduler/SchedulerImpl.cpp
index cb05718b..50e31fbd 100644
--- a/cpp/source/scheduler/SchedulerImpl.cpp
+++ b/cpp/source/scheduler/SchedulerImpl.cpp
@@ -119,17 +119,16 @@ void SchedulerImpl::shutdown0() {
 
 std::uint32_t SchedulerImpl::schedule(const std::function<void(void)>& 
functor, const std::string& task_name,
                                       std::chrono::milliseconds delay, 
std::chrono::milliseconds interval) {
-  static std::uint32_t task_id = 0;
+  static std::atomic<std::uint32_t> task_id{0};
 
   auto task = std::make_shared<TimerTask>();
   task->task_name = task_name;
   task->callback = functor;
   task->interval = interval;
 
-  std::uint32_t id;
+  std::uint32_t id = ++task_id;
   {
     absl::MutexLock lk(&tasks_mtx_);
-    id = ++task_id;
     tasks_.insert({id, task});
   }
   task->task_id = id;

Reply via email to