Shawyeok opened a new issue #9109:
URL: https://github.com/apache/pulsar/issues/9109
**Describe the bug**
After broker crash and restart, consumers got blocked, consumer rateOut
decrease to 0, can't auto recover without restart consumer process.
**To Reproduce**
Steps to reproduce the behavior:
1. Run reproduce code below
```java
public class PulsarConsumerTest {
private static final Logger LOG =
LoggerFactory.getLogger(PulsarConsumerTest.class);
private PulsarClient pulsarClient;
private Consumer<byte[]> consumer;
@Before
public void setUp() throws Exception {
pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://<broker>:6650")
.build();
String topic = "persistent://sample/ns1/topic1";
String subscriptionName = "test";
DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
.deadLetterTopic(String.format("%s-%s-DLQ", topic,
subscriptionName))
.maxRedeliverCount(3)
.build();
consumer = pulsarClient.newConsumer()
.topic(topic)
.deadLetterPolicy(deadLetterPolicy)
.ackTimeout(5, TimeUnit.SECONDS)
.acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
}
@After
public void tearDown() throws Exception {
pulsarClient.close();
}
@Test
public void test() throws PulsarClientException {
while (true) {
Message<byte[]> message = consumer.receive();
MessageId messageId = message.getMessageId();
LOG.info("received message with messageId: {}", messageId);
try {
consume(message);
} catch (Exception e) {
LOG.error("consume message exception with messageId: {}",
messageId, e);
}
}
}
private void consume(Message<byte[]> message) {
throw new IllegalStateException("mock consume fails");
}
}
```
2. Send 1000 messages to topic `persistent://sample/ns1/topic1`
3. Wait about 10-15s(time to redeliver), kill and restart broker process
4. Look into the thread `pulsar-timer-4-1`'s stack, check whether it's
blocked or not
5. If the problem doesn't appear, try step 2-4 a few more times
**Expected behavior**
The thread `pulsar-timer-4-1` blocked at `producer.send` forever, and the
method `consumer.receive` may blocked at `UnAckedMessageTracker#add` method due
to acquire a writeLock inside `UnAckedMessageTracker`.
```
"pulsar-timer-4-1" #37 prio=5 os_prio=0 tid=0x00007efe584c0000 nid=0x62
waiting on condition [0x00007efe30aec000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000af9bbee8> (a
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:115)
at
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89)
at
org.apache.pulsar.client.impl.ConsumerImpl.processPossibleToDLQ(ConsumerImpl.java:1452)
at
org.apache.pulsar.client.impl.ConsumerImpl.lambda$null$12(ConsumerImpl.java:1390)
at
org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$811/717118161.test(Unknown
Source)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.pulsar.client.impl.ConsumerImpl.lambda$redeliverUnacknowledgedMessages$14(ConsumerImpl.java:1396)
at
org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$810/1511392410.accept(Unknown
Source)
at java.lang.Iterable.forEach(Iterable.java:75)
at
org.apache.pulsar.client.impl.ConsumerImpl.redeliverUnacknowledgedMessages(ConsumerImpl.java:1388)
at
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$redeliverUnacknowledgedMessages$20(MultiTopicsConsumerImpl.java:621)
at
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl$$Lambda$807/636669140.accept(Unknown
Source)
at java.util.HashMap.forEach(HashMap.java:1289)
at
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.redeliverUnacknowledgedMessages(MultiTopicsConsumerImpl.java:619)
at
org.apache.pulsar.client.impl.UnAckedMessageTracker$2.run(UnAckedMessageTracker.java:144)
at
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
at
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
at
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
at
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- <0x00000000a9dab610> (a
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
```
**Screenshots**
The consumer thread got blocked in our production.

**Additional context**
Broker version: `2.4.0`
pulsar-client version: `2.5.2`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]