lizhimins opened a new issue, #1258:
URL: https://github.com/apache/rocketmq-clients/issues/1258
## Summary
Found and fixed 8 bugs in the C++ client SDK spanning correctness, thread
safety, and memory management. 3 are critical severity (can cause silent data
loss or crashes), 5 are major.
| # | Category | Issue | Severity | Files |
|---|----------|-------|----------|-------|
| 1 | Correctness | `onVerifyMessage` switch fall-through | Critical |
PushConsumerImpl.cpp |
| 2 | Correctness | `onVerifyMessage` callback never invoked | Critical |
PushConsumerImpl.cpp |
| 3 | Correctness | `schedule()` empty implementation | Critical |
ConsumeMessageServiceImpl.cpp |
| 4 | Correctness | `std::remove_if` result discarded | Major |
SimpleConsumerImpl.cpp |
| 5 | Thread Safety | Static `task_id` data race | Major | SchedulerImpl.cpp
|
| 6 | Thread Safety | `state_` read without lock | Major |
TelemetryBidiReactor.cpp |
| 7 | Thread Safety | Use-after-free in `asyncCallback` | Critical |
RpcClientImpl.cpp |
| 8 | Memory | `const_cast` to move from const ref | Major | SendCallback.h,
ProducerImpl.cpp, FifoProducerPartition.cpp/.h, SendContextTest.cpp |
## Details
### 1. `onVerifyMessage` switch fall-through
**File:** `source/rocketmq/PushConsumerImpl.cpp`
The switch statement in `onVerifyMessage` lacked `break` statements, causing
the `SUCCESS` case to fall through into `FAILURE`. Every message verification
was reported as failed to the server regardless of actual consumption result.
```cpp
// Before
switch (result) {
case ConsumeResult::SUCCESS: {
cmd.mutable_status()->set_code(rmq::Code::OK);
cmd.mutable_status()->set_message("OK");
} // fall-through
case ConsumeResult::FAILURE: {
cmd.mutable_status()->set_code(rmq::Code::FAILED_TO_CONSUME_MESSAGE);
cmd.mutable_status()->set_message("Consume message failed");
}
}
```
**Fix:** Added `break;` to both cases.
### 2. `onVerifyMessage` callback never invoked
**File:** `source/rocketmq/PushConsumerImpl.cpp`
The function accepts a callback `std::function<void(TelemetryCommand)> cb`
and constructs a `TelemetryCommand cmd` with the verification result, but never
calls `cb(cmd)`. The verification result was silently discarded.
**Fix:** Added `cb(cmd);` at the end of the function, after all branches
have populated the command.
### 3. `schedule()` empty implementation
**File:** `source/rocketmq/ConsumeMessageServiceImpl.cpp`
The `schedule()` method had an empty body:
```cpp
void ConsumeMessageServiceImpl::schedule(
std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds delay) {
}
```
This method is called when a FIFO consumption attempt fails and needs to be
retried after a delay. With the empty body, failed consume tasks were silently
dropped, breaking FIFO retry semantics entirely.
**Fix:** Implemented the method to schedule a delayed resubmission via
`SchedulerImpl`:
```cpp
void ConsumeMessageServiceImpl::schedule(
std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds delay) {
auto consumer = consumer_.lock();
if (!consumer) {
return;
}
auto scheduler = consumer->manager()->getScheduler();
if (!scheduler) {
SPDLOG_WARN("Scheduler is not available, submitting task immediately");
submit(task);
return;
}
std::weak_ptr<ConsumeMessageServiceImpl> self(shared_from_this());
scheduler->schedule(
[self, task]() {
auto svc = self.lock();
if (!svc) {
return;
}
svc->submit(task);
},
"consume-retry", delay, std::chrono::milliseconds(0));
}
```
Key design decisions:
- Uses `weak_ptr` for both the consumer and the service itself to avoid
preventing shutdown during a pending retry.
- Falls back to immediate `submit()` if the scheduler is unavailable.
- Single-shot schedule (`interval = 0`), not periodic.
### 4. `std::remove_if` result discarded
**File:** `source/rocketmq/SimpleConsumerImpl.cpp`
`std::remove_if` does not erase elements from a container. It moves matching
elements to the end and returns an iterator to the new logical end. The return
value must be passed to `erase()` to actually remove them.
```cpp
// Before: elements moved but never erased
std::remove_if(assignments_.begin(), assignments_.end(),
[&](const rmq::Assignment& e) { return e == item; });
// After: proper erase-remove idiom
assignments_.erase(
std::remove_if(assignments_.begin(), assignments_.end(),
[&](const rmq::Assignment& e) { return e == item; }),
assignments_.end());
```
Without this fix, queue assignment changes in `SimpleConsumer` were additive
only — removed assignments persisted in the vector, potentially causing
duplicate consumption.
### 5. Static `task_id` data race
**File:** `source/scheduler/SchedulerImpl.cpp`
The `schedule()` function used a function-local `static std::uint32_t
task_id` for generating unique task IDs. While `tasks_mtx_` (an instance-level
mutex) was held during the increment, the static variable is shared across all
`SchedulerImpl` instances. Concurrent calls from different instances constitute
a data race on the non-atomic static.
```cpp
// Before: static variable protected by instance-level lock
static std::uint32_t task_id = 0;
// ...
absl::MutexLock lk(&tasks_mtx_);
id = ++task_id;
// After: atomic increment, then take lock only for the map insertion
static std::atomic<std::uint32_t> task_id{0};
std::uint32_t id = ++task_id;
{
absl::MutexLock lk(&tasks_mtx_);
tasks_.insert({id, task});
}
```
### 6. `state_` read without lock
**File:** `source/client/TelemetryBidiReactor.cpp`
`tryWriteNext()` read `state_` while holding `writes_mtx_`, but `state_` is
modified under `state_mtx_` (a different mutex). Reading a variable protected
by mutex A while holding only mutex B is a data race.
The function also contained redundant logic: after checking
`writes_.empty()` and returning if true, it immediately checked
`!writes_.empty()` again, and re-checked `state_` a second time inside that
block.
**Fix:** Restructured to acquire `state_mtx_` first for the state check
(then release it), then acquire `writes_mtx_` for the write operation. Removed
the redundant second emptiness check and second state check.
```cpp
void TelemetryBidiReactor::tryWriteNext() {
{
absl::MutexLock state_lk(&state_mtx_);
if (StreamState::Ready != state_) {
return;
}
}
absl::MutexLock lk(&writes_mtx_);
if (writes_.empty()) {
return;
}
AddHold();
StartWrite(&(writes_.front()));
}
```
Note: There is a TOCTOU window between releasing `state_mtx_` and calling
`StartWrite`. This is acceptable because gRPC's `StartWrite` is safe to call on
a closing stream — it will simply fail, and the error is handled in
`OnWriteDone`.
### 7. Use-after-free in `asyncCallback`
**File:** `source/client/RpcClientImpl.cpp`
When `ClientManager` has already destructed (`manager` is null), the code
deleted `invocation_context` but did not return. Execution continued to the
lambda capture (capturing the now-dangling pointer) and the `manager->submit()`
call (null pointer dereference).
```cpp
// Before
if (!manager) {
SPDLOG_WARN("ClientManager has destructed. Response ignored");
delete invocation_context;
}
// continues to use invocation_context and manager...
// After
if (!manager) {
SPDLOG_WARN("ClientManager has destructed. Response ignored");
delete invocation_context;
return;
}
```
This also resolves the related `InvocationContext` dual-deletion concern:
with the early return, the deleted pointer is never captured by the lambda, so
the `onCompletion()` → `delete this` path cannot be reached for an
already-freed object.
### 8. `const_cast` to move from const reference
**Files:** `include/rocketmq/SendCallback.h`,
`source/rocketmq/ProducerImpl.cpp`,
`source/rocketmq/FifoProducerPartition.cpp`,
`source/rocketmq/include/FifoProducerPartition.h`,
`source/rocketmq/tests/SendContextTest.cpp`
`SendCallback` was defined as:
```cpp
using SendCallback = std::function<void(const std::error_code&, const
SendReceipt&)>;
```
The synchronous `ProducerImpl::send()` needed to move fields out of the
receipt for efficiency, so it used `const_cast` to strip the const qualifier —
undefined behavior if the object is truly const.
**Fix:** Changed the callback signature from `const SendReceipt&` to
`SendReceipt&`. The receipt is always a mutable temporary created by the send
path, so there is no semantic reason for it to be const. Updated all call sites:
- `ProducerImpl::send()` — removed `const_cast`, moves directly from
`receipt`
- `FifoProducerPartition::onComplete()` — parameter changed to `SendReceipt&`
- `FifoProducerPartition::trySend()` — lambda signature updated
- `SendContextTest` — test lambda updated
## Verification
- **CMake build:** All targets compiled successfully (library, tests,
examples)
- **CMake tests:** 24/24 passed
- **Bazel build:** All library and test targets compiled successfully (3387
actions)
- **Bazel tests:** 25/25 passed
- **Runtime validation:** Producer (sync + async), PushConsumer, and
SimpleConsumer examples tested against a live RocketMQ 5.x instance — message
send, receive, ack, and invisible duration change all work correctly
Will submit a PR with all fixes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]