michaeljmarshall commented on issue #9916:
URL: https://github.com/apache/pulsar/issues/9916#issuecomment-803791838
> Fixing this test seems pretty challenging. If there are obstacles that are
hard to overcome, perhaps there's a way to simplify the test.
I agree this test is a bit complicated, but annotating the test should help
to discuss the behavior that I believe is leading to test flakiness. Because
there are no timeouts in the test, I only see one place for a race condition,
and I'm able to consistently get failures by adding a `sleep` in one key
location.
## Annotated Test
In the following code block, we create a normal consumer, a DLQ consumer,
and a producer. We then produce `sendMessages` to the `topic`. This all appears
deterministic to me.
```java
public void testDeadLetterTopic() throws Exception {
final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
final int maxRedeliveryCount = 3;
final int sendMessages = 100;
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();
for (int i = 0; i < sendMessages; i++) {
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
}
producer.close();
```
After running the above, we have a topic with 100 messages. The while loop
will complete when we have completed 300 loops (100 * (2 + 1)). We start the
while loop by consuming from the topic with the basic consumer. We consume 100
messages very quickly, and then wait 1 second for each of them to timeout
because we haven't ack'ed any of them. We consume 100 messages again, then wait
1 second again. We do it a third time, which is when the `while` clause is
false and we move on to the next block (before the messages time out from not
being ack'ed).
```java
int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} - {}",
message.getMessageId(), new String(message.getData()));
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
```
Up to this point, all of the behavior is deterministic as far as message
delivery and acking. Even if the redeliveries are delayed, the code waits
indefinitely until the messages are delivered.
The next code block waits 1 second at `deadLetterConsumer.receive()` because
that consumer is looking for messages on the DLQ topic, and messages aren't
produced to the topic until they timeout from not being ack'ed. We then consume
all 100 messages from the DLQ topic.
This is the where we find the race condition. The redelivery logic lives in
the client. The client produces each expired message to the DLQ topic and when
that call completes, the client sends acks to the source topic for each
message. Given that order, the `deadLetterConsumer` could consume messages from
the DLQ topic before they are removed (ack'ed) from the source topic.
We then close the clients. If a client is closed before the call to ack the
messages, the ack calls will fail. Closing the client does not include any type
of wait to give a best effort at delivering those acks.
```java
int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive();
log.info("dead letter consumer received message : {} {}",
message.getMessageId(), new String(message.getData()));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);
deadLetterConsumer.close();
consumer.close();
```
Next, we create a consumer to consume from the source topic to see if any
messages are on it. If any messages are there, we fail the test (this is
exactly what we've been seeing in the above failures).
```java
Consumer<byte[]> checkConsumer =
this.pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Message<byte[]> checkMessage = checkConsumer.receive(3,
TimeUnit.SECONDS);
if (checkMessage != null) {
log.info("check consumer received message : {} - {}",
checkMessage.getMessageId(), new String(checkMessage.getData()));
}
assertNull(checkMessage);
checkConsumer.close();
newPulsarClient.close();
```
## Thread Sleep And Reproducing Failure
To further prove this point, I added a `Thread.sleep(1000);` to the line
immediately before the following:
https://github.com/apache/pulsar/blob/a8b921cf15c0a0f652f1d9f62a6481efea243881/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1687
And it results in the test consistently failing with the following log line
in the test logs:
```
23:24:11.934 [pulsar-client-io-38-1] WARN
org.apache.pulsar.client.impl.ConsumerImpl -
[persistent://my-property/my-ns/dead-letter-topic] [my-subscription] [3e1bb]
Failed to acknowledge the message 3:3:-1 of the original topic but send to the
DLQ successfully.
org.apache.pulsar.client.api.PulsarClientException: Consumer not ready.
State: Closing
at
org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:509)
~[classes/:?]
at
org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:506)
~[classes/:?]
at
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:446)
~[classes/:?]
at
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:435)
~[classes/:?]
at
org.apache.pulsar.client.impl.ConsumerImpl.lambda$processPossibleToDLQ$27(ConsumerImpl.java:1713)
~[classes/:?]
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
[?:?]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
[?:?]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
[?:?]
at
org.apache.pulsar.client.impl.ProducerImpl$1.sendComplete(ProducerImpl.java:319)
[classes/:?]
at
org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1224)
[classes/:?]
at
org.apache.pulsar.client.impl.ProducerImpl.ackReceived(ProducerImpl.java:991)
[classes/:?]
at
org.apache.pulsar.client.impl.ClientCnx.handleSendReceipt(ClientCnx.java:402)
[classes/:?]
at
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:220)
[classes/:?]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
[netty-codec-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
[netty-codec-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
[netty-common-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
[netty-common-4.1.51.Final.jar:4.1.51.Final]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[netty-common-4.1.51.Final.jar:4.1.51.Final]
at java.lang.Thread.run(Thread.java:834) [?:?]
```
## Potential Solutions
I am still uncertain about how to best solve this test's flakiness. This
test is a bit contrived, and the race condition is likely not one seen by end
users. A few possible solutions:
1. Assuming the current design is what we want, find a way to improve the
test.
2. Add logic to the pulsar client to ensure that when it is closed it waits
(possibly with some configurable timeout) for these DLQ messages to be produced
and ack'ed appropriately.
3. Look into the broker server's handling of these messages that have
reached the maximum number of redeliveries.
I prefer the second option. The first option leaves the race condition in
the client. The third is likely not easy because the server doesn't have the
necessary information, and further, the current paradigm is that the redelivery
logic lives in the client.
If there is agreement that the second solution is the right direction, I'm
happy to look into implementing it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]