darinspivey opened a new issue, #25279:
URL: https://github.com/apache/pulsar/issues/25279

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
     Version: 4.0.8 via Helm chart `pulsar-4.4.0` (but the bug is also present 
in current master)                                                              
                            
                                     
     Related: #25119 (different bug, same area — phase 2 __compaction cursor)
   
   ### Issue Description
   
   ## Summary
   
   This bug was found because we update topic policies frequently in our pulsar 
cluster, and we noticed an accumulation of thousands of backlogged events in 
`__change_events`.  Because compaction is triggered automatically when change 
events are added, we see this bug:
   
   Compaction phase 2 fails every time with a `ConnectException` because the 
broker disconnects the compaction consumer as part of processing its own seek 
request, causing `channelInactive` to fire on the client and kill the in-flight 
seek future before the broker sends the success response. Since 
`AbstractTwoPhaseCompactor.phaseTwoSeekThenLoop` has no retry logic for 
transient seek failures, every compaction attempt aborts. The `__compaction` 
subscription backlog grows without bound.
   
     ## Affected Topics
   
    Any topic with compaction enabled. Most visibly affects `__change_events` 
system topics because `SystemTopic.isCompactionEnabled()` hardcodes `true` and 
the effective `compactionThreshold` is `0` bytes, so compaction is triggered on 
any non-zero backlog. Frequent topic-level policy writes (each write appends a 
message to `__change_events`) cause compaction to be triggered and fail in a 
continuous loop.
   
     ## Expected Behavior
   
     Compaction phase 2 seeks the `__compaction` reader back to the start of 
the compacted range and reads forward, producing a new compacted
     ledger.
   
     ## Actual Behavior
   
     Compaction fails on every attempt and the `__compaction` subscription 
backlog grows indefinitely.
   
   ### Error messages
   
   ```text
   [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  
o.a.p.compaction.AbstractTwoPhaseCompactor - Commencing phase two of compaction 
for
     persistent://my-tenant/my-namespace/__change_events-partition-2, from 
1218818:0:2:-1 to 1330087:5:2:-1, compacting 12 keys to ledger
     1341961
     [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  
o.a.p.client.impl.ConsumerImpl -
     
[persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] 
Seeking subscription to the message 1218818:0:2:-1
     [pulsar-io-3-4] INFO  o.a.p.broker.service.Consumer - Disconnecting 
consumer:
     
Consumer{subscription=PulsarCompactorSubscription{topic=persistent://my-tenant/my-namespace/__change_events-partition-2,
     name=__compaction}, consumerId=27241, consumerName=6QQBD, address=[id: 
0x6da7169d, L:/10.10.x.x:6650 - R:/10.10.x.x:53634] [SR:10.10.x.x,
      state:Connected]}
     [pulsar-io-3-4] INFO  
o.a.p.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer
     
Consumer{subscription=PulsarCompactorSubscription{topic=persistent://my-tenant/my-namespace/__change_events-partition-2,
     name=__compaction}, ...}
     [pulsar-io-3-4] INFO  
o.a.p.broker.service.persistent.PersistentSubscription -
     
[persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] 
Successfully disconnected consumers from subscription,
     proceeding with cursor reset
     [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  
o.a.b.mledger.impl.ManagedCursorImpl -
     [my-tenant/my-namespace/persistent/__change_events-partition-2] Initiate 
reset readPosition from 1330087:6 to 1218818:0 (ackSet is null)
     on cursor __compaction
     [pulsar-io-3-6] INFO  o.a.p.client.impl.ConnectionHandler - 
[persistent://my-tenant/my-namespace/__change_events-partition-2]
     [__compaction] Closed connection [id: 0xb3fcdb76, L:/10.10.x.x:53634 ! 
R:/10.10.x.x:6650] -- Will try again in 0.1 s, hostUrl: null
     [pulsar-io-3-6] ERROR o.a.p.client.impl.ConsumerImpl - 
[persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction]
     Failed to reset subscription: Disconnected from server at /10.10.x.x:6650
     [broker-client-shared-internal-executor-5-1] WARN  
o.a.p.broker.service.persistent.PersistentTopic -
     [persistent://my-tenant/my-namespace/__change_events-partition-2] 
Compaction failure.
     java.util.concurrent.CompletionException: 
org.apache.pulsar.client.api.PulsarClientException$ConnectException: Failed to 
seek the
     subscription __compaction of the topic 
persistent://my-tenant/my-namespace/__change_events-partition-2 to the message 
1218818:0:2:-1
           at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
Source) ~[?:?]
           at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
Source) ~[?:?]
           at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown 
Source) ~[?:?]
           at 
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
           at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown 
Source) ~[?:?]
           at 
org.apache.pulsar.client.impl.ConsumerImpl.failSeek(ConsumerImpl.java:2631)
     ~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8]
           at 
org.apache.pulsar.client.impl.ConsumerImpl.lambda$seekAsyncInternal$60(ConsumerImpl.java:2603)
     ~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8]
           at 
org.apache.pulsar.client.impl.ClientCnx.channelInactive(ClientCnx.java:342)
     ~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8]
     Caused by: 
org.apache.pulsar.client.api.PulsarClientException$ConnectException: Failed to 
seek the subscription __compaction of the topic
      persistent://my-tenant/my-namespace/__change_events-partition-2 to the 
message 1218818:0:2:-1
           Disconnected from server at /10.10.x.x:6650
     [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  
o.a.b.mledger.impl.ManagedCursorImpl -
     [my-tenant/my-namespace/persistent/__change_events-partition-2] reset 
readPosition to 1218818:0 (ackSet is null) before current read
     readPosition 1330087:6 on cursor __compaction
     [BookKeeperClientWorker-OrderedExecutor-0-0] INFO  
o.a.p.broker.service.ServerCnx - [/10.10.x.x:53634]
     
[persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] 
Reset subscription to message id 1218818:0 (ackSet is
     null)
   ```
   
   ### Reproducing the issue
   
     1. Create a namespace with frequent topic-level policy writes to cause 
`__change_events` backlog to grow.  For example, 
calling`setMaxUnackedMessagesPerConsumer` per topic on every consumer restart. 
This is a real-world use case that triggered these findings.
     2. Observe compaction triggering automatically (threshold = 0 bytes for 
system topics) or trigger manually: `pulsar-admin topics compact 
persistent://my-tenant/my-namespace/__change_events`
     3. Observe compaction failure in broker logs
   
   
   
   ### Additional information
   
     ## Root Cause
   
     The sequence in `PersistentSubscription.resetCursorInternal` 
(`PersistentSubscription.java:916`) is:
   1.  disconnect active consumers via 
`dispatcher.disconnectActiveConsumers(true)`
   2. reset the managed cursor position via 
`ManagedCursorImpl.internalResetCursor`
   3. send `commandSender.sendSuccessResponse(requestId)` back in 
`ServerCnx.handleSeek`'s `thenRun`. 
   
   
   Step 1 closes the consumer's Netty channel server-side, which fires 
`channelInactive` on the client (`ClientCnx.java:328`). `channelInactive` 
immediately fails all entries in`pendingRequests` with `ConnectException` 
(`ClientCnx.java:341–344`), including the seek request that triggered the 
reset. By the time the server sends the success response in step 3, the client 
has already failed the seek future and aborted compaction.
   
   The disconnect in step 1 is necessary for correctness — the dispatcher may 
have messages in flight to the consumer that would be inconsistent with the new 
cursor position — so the server-side ordering cannot simply be reversed. The 
fix belongs on the compactor side.
   
   `AbstractTwoPhaseCompactor.phaseTwoSeekThenLoop` has no retry logic — a 
failed `seekAsync` propagates directly to `whenComplete` and  aborts:
   
   ```java
     reader.seekAsync(from).thenCompose((v) -> {
         // phase two loop
     }).whenComplete((res, exception) -> {
         if (exception != null) {
             deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
                 promise.completeExceptionally(exception); // no retry
             });
         }
     });
   ```
   ## Potential Fix
   
     The seek is idempotent and the ConnectException is always transient in 
this context — the broker disconnects the consumer as part of  processing the 
seek, so by the time the consumer reconnects the cursor is already at the 
correct position. Retrying seekAsync with a short delay allows the consumer to 
reconnect, at which point the seek succeeds immediately.  But I leave that up 
to the implementors.
   
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to