michaeljmarshall commented on issue #9916:
URL: https://github.com/apache/pulsar/issues/9916#issuecomment-803791838


   > Fixing this test seems pretty challenging. If there are obstacles that are 
hard to overcome, perhaps there's a way to simplify the test.
   
   I agree this test is a bit complicated, but annotating the test should help 
to discuss the behavior that I believe is leading to test flakiness. Because 
there are no timeouts in the test, I only see one place for a race condition, 
and I'm able to consistently get failures by adding a `sleep` in one key 
location.
   
   ## Annotated Test
   
   In the following code block, we create a normal consumer, a DLQ consumer, 
and a producer. We then produce `sendMessages` to the `topic`. This all appears 
deterministic to me.
   
   ```java
       public void testDeadLetterTopic() throws Exception {
           final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
   
           final int maxRedeliveryCount = 3;
   
           final int sendMessages = 100;
   
           Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                   .topic(topic)
                   .subscriptionName("my-subscription")
                   .subscriptionType(SubscriptionType.Shared)
                   .ackTimeout(1, TimeUnit.SECONDS)
                   
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
                   .receiverQueueSize(100)
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .subscribe();
   
           PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
           Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
                   
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
                   .subscriptionName("my-subscription")
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .subscribe();
   
           Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
                   .topic(topic)
                   .create();
   
           for (int i = 0; i < sendMessages; i++) {
               producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
           }
   
           producer.close();
   ```
   
   After running the above, we have a topic with 100 messages. The while loop 
will complete when we have completed 300 loops (100 * (2 + 1)). We start the 
while loop by consuming from the topic with the basic consumer. We consume 100 
messages very quickly, and then wait 1 second for each of them to timeout 
because we haven't ack'ed any of them. We consume 100 messages again, then wait 
1 second again. We do it a third time, which is when the `while` clause is 
false and we move on to the next block (before the messages time out from not 
being ack'ed).
   
   ```java
           int totalReceived = 0;
           do {
               Message<byte[]> message = consumer.receive();
               log.info("consumer received message : {} - {}", 
message.getMessageId(), new String(message.getData()));
               totalReceived++;
           } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
   ```
   
   Up to this point, all of the behavior is deterministic as far as message 
delivery and acking. Even if the redeliveries are delayed, the code waits 
indefinitely until the messages are delivered. 
   
   The next code block waits 1 second at `deadLetterConsumer.receive()` because 
that consumer is looking for messages on the DLQ topic, and messages aren't 
produced to the topic until they timeout from not being ack'ed. We then consume 
all 100 messages from the DLQ topic.
   
   This is the where we find the race condition. The redelivery logic lives in 
the client. The client produces each expired message to the DLQ topic and when 
that call completes, the client sends acks to the source topic for each 
message. Given that order, the `deadLetterConsumer` could consume messages from 
the DLQ topic before they are removed (ack'ed) from the source topic.
   
   We then close the clients. If a client is closed before the call to ack the 
messages, the ack calls will fail. Closing the client does not include any type 
of wait to give a best effort at delivering those acks.
   
   ```java
           int totalInDeadLetter = 0;
           do {
               Message message = deadLetterConsumer.receive();
               log.info("dead letter consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
               deadLetterConsumer.acknowledge(message);
               totalInDeadLetter++;
           } while (totalInDeadLetter < sendMessages);
   
           deadLetterConsumer.close();
           consumer.close();
   ```
   
   Next, we create a consumer to consume from the source topic to see if any 
messages are on it. If any messages are there, we fail the test (this is 
exactly what we've been seeing in the above failures).
   
   ```java
           Consumer<byte[]> checkConsumer = 
this.pulsarClient.newConsumer(Schema.BYTES)
                   .topic(topic)
                   .subscriptionName("my-subscription")
                   .subscriptionType(SubscriptionType.Shared)
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .subscribe();
   
           Message<byte[]> checkMessage = checkConsumer.receive(3, 
TimeUnit.SECONDS);
           if (checkMessage != null) {
               log.info("check consumer received message : {} - {}", 
checkMessage.getMessageId(), new String(checkMessage.getData()));
           }
           assertNull(checkMessage);
   
           checkConsumer.close();
           newPulsarClient.close();
   ```
   
   ## Thread Sleep And Reproducing Failure
   
   To further prove this point, I added a `Thread.sleep(1000);` to the line 
immediately before the following:
   
   
https://github.com/apache/pulsar/blob/a8b921cf15c0a0f652f1d9f62a6481efea243881/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1687
   
   And it results in the test consistently failing with the following log line 
in the test logs:
   
   ```
   23:24:11.934 [pulsar-client-io-38-1] WARN  
org.apache.pulsar.client.impl.ConsumerImpl - 
[persistent://my-property/my-ns/dead-letter-topic] [my-subscription] [3e1bb] 
Failed to acknowledge the message 3:3:-1 of the original topic but send to the 
DLQ successfully.
   org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. 
State: Closing
        at 
org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:509) 
~[classes/:?]
        at 
org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:506)
 ~[classes/:?]
        at 
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:446)
 ~[classes/:?]
        at 
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:435)
 ~[classes/:?]
        at 
org.apache.pulsar.client.impl.ConsumerImpl.lambda$processPossibleToDLQ$27(ConsumerImpl.java:1713)
 ~[classes/:?]
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
 [?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
[?:?]
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
[?:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl$1.sendComplete(ProducerImpl.java:319)
 [classes/:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1224)
 [classes/:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl.ackReceived(ProducerImpl.java:991) 
[classes/:?]
        at 
org.apache.pulsar.client.impl.ClientCnx.handleSendReceipt(ClientCnx.java:402) 
[classes/:?]
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:220)
 [classes/:?]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
 [netty-codec-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
 [netty-codec-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) 
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 [netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) 
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) 
[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 [netty-common-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
[netty-common-4.1.51.Final.jar:4.1.51.Final]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-common-4.1.51.Final.jar:4.1.51.Final]
        at java.lang.Thread.run(Thread.java:834) [?:?]
   ```
   
   ## Potential Solutions
   
   I am still uncertain about how to best solve this test's flakiness. This 
test is a bit contrived, and the race condition is likely not one seen by end 
users. A few possible solutions:
   
   1. Assuming the current design is what we want, find a way to improve the 
test.
   2. Add logic to the pulsar client to ensure that when it is closed it waits 
(possibly with some configurable timeout) for these DLQ messages to be produced 
and ack'ed appropriately.
   3. Look into the broker server's handling of these messages that have 
reached the maximum number of redeliveries.
   
   I prefer the second option. The first option leaves the race condition in 
the client. The third is likely not easy because the server doesn't have the 
necessary information, and further, the current paradigm is that the redelivery 
logic lives in the client.
   
   If there is agreement that the second solution is the right direction, I'm 
happy to look into implementing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to