YinY1 opened a new issue, #25204: URL: https://github.com/apache/pulsar/issues/25204
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment - broker version: 4.0.8 - client version: 4.1.1 (java) ### Issue Description # configs - producer config - enableBatching(true) - blockIfQueueFull(true) - .batchingMaxMessages(100) - consumer config - subscriptionType(SubscriptionType.Exclusive) - subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - enableBatchIndexAcknowledgment(false) # What I did - produce messages with property "sequence", then consume them and validate sequence number. - cumulative acknowledge every `groupSize` message received - `redeliverUnacknowledgedMessages` in schedule, and reset sequence number. # Expected assume current epoch = 0 if `lastAckNumber = 1000`,and number of current received message is `1300` which matches `expectedSequenceNumber`, after calling `redeliverUnacknowledgedMessages()`, it should received a message with number `1001` with higher `comsumerEpoch` and `redeliveryCount` # Got A random message with number > `1001`, `consumerEpoch = 0`, `redeliveryCount = 0` And other message with bigger batch index in the same batch(entry) filltered out. see logs below. ### Error messages ```text 2026-02-03 02:58:43.202 [pool-2-thread-2] INFO o.e.OrderedCumulativeConsumerWorker - After interval redeliver, resetting expected sequence to 4501 2026-02-03 02:58:43.202 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:76], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:77], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:78], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:79], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:80], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:81], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:82], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:83], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:84], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:85], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:86], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:87], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:88], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:89], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:90], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:91], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:92], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:93], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:94], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:95], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:96], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:97], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:98], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:99], messageConsumerEpoch : [0], consumerEpoch : [1] 2026-02-03 02:58:43.703 [pool-2-thread-2] WARN o.e.OrderedCumulativeConsumerWorker - Found out-of-order message: MessageId=14248:2059:-1:75, expectedSequence=4501, actualSequence=5976, uuid=d2363f75-f1db-453b-9b0c-e8d70b91ba0f, consumerEpoch=0, redeliveryCount=0 ``` ### Reproducing the issue ```java @SneakyThrows private void produceMessages(Producer<byte[]> producer, int count) { for (int i = 0; i < count; i++) { producer.newMessage().property("sequence", String.valueOf(i)) .value(("message-" + i).getBytes()) .send(); } } @SneakyThrows private void consumeAndRedeliverRoundtrip(Consumer<byte[]> consumer, int groupSize, long interval) { long expectedSequenceNumber = 0; long lastAckNumer = 0; int count = 0; long last = System.currentTimeMillis(); while (true) { var msg = consumer.receive(10, TimeUnit.SECONDS); if (msg == null) { log.info("No more messages to consume."); return; } // Validate sequence - should be strictly increasing, gap indicates message loss String sequenceStr = msg.getProperty("sequence"); if (sequenceStr != null) { long actualSequence = Long.parseLong(sequenceStr); if (actualSequence != expectedSequenceNumber) { log.error("Sequence gap detected! Expected: {}, Actual: {}", expectedSequenceNumber, actualSequence); throw new RuntimeException("Sequence gap detected! Expected: " + expectedSequenceNumber + ", Actual: " + actualSequence); } } count ++; if (count == groupSize) { consumer.acknowledgeCumulative(msg); count = 0; lastAckNumer = expectedSequenceNumber; } expectedSequenceNumber++; long cur = System.currentTimeMillis(); if ( cur - last >= interval ) { consumer.redeliverUnacknowledgedMessages(); expectedSequenceNumber = lastAckNumber + 1; // reset expected seq last = cur; Thread.sleep(100); } } } ``` ### Additional information _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
