zhanglistar commented on code in PR #549:
URL: https://github.com/apache/pulsar-client-cpp/pull/549#discussion_r2943871818


##########
lib/ConsumerImpl.cc:
##########
@@ -1819,7 +1819,13 @@ void ConsumerImpl::seekAsyncInternal(long requestId, 
const SharedBuffer& seek, c
             if (result == ResultOk) {
                 LockGuard lock(mutex_);
                 if (getCnx().expired() || reconnectionPending_) {
-                    // It's during reconnection, complete the seek future 
after connection is established
+                    // It's during reconnection, complete the seek future 
after connection is established.
+                    // Clear local state now so hasMessageAvailable() does not 
see stale prefetched messages.
+                    ackGroupingTrackerPtr_->flushAndClean();
+                    incomingMessages_.clear();
+                    if (lastSeekArg_.has_value() && 
std::holds_alternative<MessageId>(lastSeekArg_.value())) {
+                        startMessageId_ = 
std::get<MessageId>(lastSeekArg_.value());
+                    }

Review Comment:
   Moved the clear in handleCreateConsumer() inside the lock (before unlock()), 
so the queue is cleared while holding mutex_. That way the seek callback and 
hasMessageAvailable() always see the cleared state and the flaky test is fixed. 
No lock-order change, so no new deadlock risk; the only effect is a bit longer 
lock hold during reconnection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to