glumia opened a new issue, #25996:
URL: https://github.com/apache/pulsar/issues/25996

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   - Broker version: 4.2.1 and 4.2.2 (`apachepulsar/pulsar:4.2.1` / `:4.2.2` 
Docker images, standalone), `InMemoryDelayedDeliveryTracker`, 
`delayedDeliveryTickTimeMillis=1000`
   - Broker OS/hardware: Linux 6.12.76-linuxkit aarch64 (Docker on macOS arm64)
   - Broker Java: OpenJDK Corretto 21.0.11 (bundled in image)
   - Client library: Python `pulsar-client` 3.12.0; also observed with Java 
client 4.2.1 via `reconsumeLaterAsync`. The defect is broker-side.
   - Client OS: macOS arm64 (Darwin 25.4.0)
   
   ### Issue Description
   
   With `isDelayedDeliveryDeliverAtTimeStrict=true`, delayed messages can 
remain undelivered indefinitely past their deliverAt time while a consumer is 
blocked in `receive()` the whole time. The stalled messages are released only 
when the next message is published to the topic - a new `receive()` call does 
not release them. With the default `strict=false`, the same traffic is 
delivered on time.
   
   Expected: delayed messages delivered at deliverAt time (plus tick/timer 
granularity).
   Actual: with the reproducer below, messages due at t≈60–65s are still 
undelivered at t≈128s; a single publish then flushes them all within ~100ms. On 
a quiet topic the delay is unbounded; we hit this in production-like QA where 
`reconsumeLaterAsync` retries on a retry topic stalled ~20s until an unrelated 
publish.
   
   It appears to be broker bug: the delivery timer in 
`AbstractDelayedDeliveryTracker` is cancelled and never re-armed (analysis in 
Additional information).
   
   ### Error messages
   
   ```text
   No errors or stack traces. With PULSAR_LOG_LEVEL=debug, a stalled run shows 
the timer being armed
   for the remaining delayed messages and then never firing — this is the last 
tracker activity for
   the topic (followed by a re-add busy-loop and then total silence; "Timer 
triggered" never appears):
   
   20:16:48,861 DEBUG ...AbstractDelayedDeliveryTracker - [...probe-sub] Timer 
triggered
   20:16:48,870 DEBUG ...InMemoryDelayedDeliveryTracker - [...probe-sub] Get 
scheduled messages - found 2
   20:16:48,870 DEBUG ...AbstractDelayedDeliveryTracker - [...probe-sub] Start 
timer in 50458 millis   <- never fires
   20:16:48,873 DEBUG ...InMemoryDelayedDeliveryTracker - [...probe-sub] Add 
message 3:7 -- Delivery in 56 ms
      (the line above repeats ~189 times over ~60ms, then nothing for the rest 
of the run)
   ```
   
   ### Reproducing the issue
   
   1. Start a strict-mode standalone broker:
   
   ```bash
   docker run -d --name pulsar-strict -p 6650:6650 \
     -e PULSAR_PREFIX_isDelayedDeliveryDeliverAtTimeStrict=true \
     apachepulsar/pulsar:4.2.1 \
     /bin/bash -lc "bin/apply-config-from-env.py conf/standalone.conf && 
bin/pulsar standalone --no-functions-worker -nss"
   ```
   
   2. Run this script (`pip install pulsar-client`) with a fresh topic name as 
argv[1]:
   
   ```python
   import sys, time
   from datetime import timedelta
   import pulsar
   
   client = pulsar.Client("pulsar://localhost:6650")
   consumer = client.subscribe(sys.argv[1], "sub", 
consumer_type=pulsar.ConsumerType.Shared)
   producer = client.create_producer(sys.argv[1])
   t0 = time.time()
   
   for i in range(6):
       producer.send(f"LONG-{i}".encode(), deliver_after=timedelta(seconds=60))
       time.sleep(1)
   producer.send(b"SHORT-2s", deliver_after=timedelta(seconds=2))
   time.sleep(0.5)
   producer.send(b"SHORT-1s", deliver_after=timedelta(seconds=1))
   
   for _ in range(8):
       msg = consumer.receive(timeout_millis=120_000)  # raises pulsar.Timeout 
on the stall
       print(f"t={time.time()-t0:5.1f}s received {msg.data().decode()}", 
flush=True)
       consumer.acknowledge(msg)
   client.close()
   ```
   
   Observed (strict=true): SHORTs arrive on time, then all six LONGs — due 
t≈60–65s — never arrive; `receive()` times out at t≈128s. Publishing one more 
message releases them all instantly. With strict=false (same image, no env 
var): LONGs arrive at t≈60–65s.
   
   
   ### Additional information
   
   **Disclaimer**: this issue has been filed with the help of an LLM, I've 
double checked that the problem is real but I didn't validate the explanation 
below (would take me some time and could still be wrong as I'm not that 
familiar with the code).
   
   ---
   
   Root cause analysis (from the DEBUG trace above + reading 
`AbstractDelayedDeliveryTracker` at v4.2.1; `updateTimer()` is unchanged 
through master):
   
   1. The timer fires for a SHORT. `getScheduledMessages` pops every entry 
whose **trimmed** key (`trimLowerBit`, up to ~511ms below the real deliverAt 
with tick=1000ms) is ≤ now — including messages up to ~511ms early — then 
correctly re-arms for the next LONG (`currentTimeoutTarget = K_LONG`, the 
`Start timer in 50458 millis` above).
   2. The dispatcher sees the early-popped message is not yet due (strict 
check) and re-adds it. `addMessage → updateTimer()`: trimmed key ≠ `K_LONG` → 
**`timeout.cancel()`** → `delayMillis < 0` → early return **without resetting 
`currentTimeoutTarget` or nulling `timeout`**. (This pop/re-add cycle also 
busy-loops ~1 dispatch round per ms until the message is really due — the 189 
`Add message` lines — a secondary issue.)
   3. When the SHORT is finally dispatched, the closing `updateTimer()` hits 
`timestamp == currentTimeoutTarget` → *"The timer is already set to the correct 
target time"* → returns. No live timer exists. The LONGs stall until an 
unrelated dispatch round (next publish) finds them via `hasMessageAvailable()`, 
which checks the map, not the timer.
   
   `strict=false` is immune because its cutoff (`now + tickTimeMillis`) covers 
the trim window, so early-popped messages are delivered instead of re-added and 
step 2 is unreachable.
   
   Suggested fix: in the `delayMillis < 0` early return of `updateTimer()`, 
reset `currentTimeoutTarget = -1` and null `timeout` so a later call cannot 
short-circuit on stale state.
   
   Related but distinct issues: #25617 / #25681 (tracker state corruption with 
`NoSuchElementException`, fixed in 4.2.2) — no NSEE appears here and 4.2.2 
still reproduces. #18399 (load-dependent late delayed delivery) — no strict 
flag involved.
   
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to