lhotari commented on code in PR #26026:
URL: https://github.com/apache/pulsar/pull/26026#discussion_r3409386691


##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java:
##########
@@ -160,6 +161,25 @@ protected boolean isUnrecoverableError(Throwable t) {
             return super.isUnrecoverableError(t);
         }
 
+        @Override
+        public boolean connectionFailed(PulsarClientException exception) {
+            // A compaction reader is created with 
retryOnRecoverableErrors=false. When the topic is fenced or
+            // deleted, a reconnect can fail at the lookup/connection stage 
with a retriable error such as
+            // ServiceNotReadyException. The base 
ConsumerImpl.connectionFailed only consults isUnrecoverableError
+            // for non-retriable errors (or after the lookup deadline has 
passed), so for such a retriable error it
+            // would keep reconnecting and never fail the in-flight read, 
leaving the compaction future pending.
+            // Honor isUnrecoverableError here too so the reader is closed 
promptly and pending reads are failed,
+            // mirroring the subscribe-stage handling in 
ConsumerImpl.connectionOpened(). This matters for
+            // compaction: a never-failing read keeps the compaction future 
pending, which blocks forced
+            // topic/namespace deletion (issue #24148).
+            Throwable actError = 
FutureUtil.unwrapCompletionException(exception);
+            if (isUnrecoverableError(actError)) {
+                closeWhenReceivedUnrecoverableError(actError, null);
+                return false;

Review Comment:
   Good catch, thanks. You're right — `closeWhenReceivedUnrecoverableError` 
only closed the consumer and failed in-flight reads; it never completed the 
subscribe future, so an unrecoverable error before the initial subscribe (at 
the lookup stage via `connectionFailed`, or the subscribe stage via 
`connectionOpened`) left `RawReader.create(...)` pending.
   
   Fixed at the root by completing the subscribe future exceptionally in 
`closeWhenReceivedUnrecoverableError` — a no-op once it's already done, so it's 
a no-op for normal consumers (which only reach this path post-subscribe), and 
it covers both the `connectionFailed` case you flagged and the equivalent 
`connectionOpened` one. Added a `RawReaderTest` case that injects the failure 
while the consumer future is still pending and asserts it completes 
exceptionally.
   
   Done in 3c4a3fa95d7.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to