GitHub user filthz edited a discussion: recycled already - exception
We started getting the following exception in one of the consumers. It is
occurring when the `consumer.acknowledge()` method is called. What is causing
this and how can we fix this issue?
Exception:
```
java.lang.IllegalStateException: recycled already
at io.netty.util.Recycler$WeakOrderQueue.transfer(Recycler.java:442)
at io.netty.util.Recycler$Stack.scavengeSome(Recycler.java:587)
at io.netty.util.Recycler$Stack.scavenge(Recycler.java:562)
at io.netty.util.Recycler$Stack.pop(Recycler.java:535)
at io.netty.util.Recycler.get(Recycler.java:162)
at
org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable.create(ConcurrentBitSetRecyclable.java:51)
at
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.lambda$doIndividualBatchAckAsync$8(PersistentAcknowledgmentsGroupingTracker.java:328)
at
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
at
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doIndividualBatchAckAsync(PersistentAcknowledgmentsGroupingTracker.java:323)
at
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doIndividualBatchAck(PersistentAcknowledgmentsGroupingTracker.java:294)
at
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doIndividualBatchAck(PersistentAcknowledgmentsGroupingTracker.java:287)
at
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.lambda$addAcknowledgment$4(PersistentAcknowledgmentsGroupingTracker.java:235)
at
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addIndividualAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:220)
at
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:232)
at
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:196)
at
org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:560)
at
org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:688)
at
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:634)
at
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:619)
at
org.apache.pulsar.client.impl.ConsumerBase.acknowledge(ConsumerBase.java:416)
at
org.apache.pulsar.client.impl.ConsumerBase.acknowledge(ConsumerBase.java:409)
at
de.seepex.service.lastactive.PulsarLastActiveConsumer.messageListener(PulsarLastActiveConsumer.java:80)
at
org.apache.pulsar.client.impl.ConsumerBase.callMessageListener(ConsumerBase.java:1157)
at
org.apache.pulsar.client.impl.ConsumerBase.lambda$triggerListener$9(ConsumerBase.java:1120)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
```
The consumer:
```
private void messageListener(Consumer<byte[]> consumer, Message<byte[]> msg) {
try {
byte[] payload = msg.getData();
if (payload == null || payload.length == 0) {
consumer.acknowledge(msg);
return;
}
final LastActiveUpdate lastActiveUpdate =
objectMapper.readValue(payload, LastActiveUpdate.class);
lastActiveProcessor.process(lastActiveUpdate);
consumer.acknowledge(msg);
} catch (Exception e) {
LOG.error("failed to process", e);
consumer.acknowledgeAsync(msg);
}
}
```
GitHub link: https://github.com/apache/pulsar/discussions/21929
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]