This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 22d6beb15b9 [fix][broker] Fix rewind failed when 
``redeliverUnacknowledgedMessages`` (#15046)
22d6beb15b9 is described below

commit 22d6beb15b9d44e630e46931f5905d19f33fe724
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Apr 8 16:01:11 2022 +0800

    [fix][broker] Fix rewind failed when ``redeliverUnacknowledgedMessages`` 
(#15046)
    
    ### Motivation
    
    According to many PRs flaky-test 
``SimpleProducerConsumerTest#testRedeliveryFailOverConsumer``, the broker logs 
as below.
    
    ```
    2022-04-07T11:19:16,561+0800 [pulsar-io-308-1] INFO  
org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to 
consumer, message id 3/0 epoch 0
    2022-04-07T11:19:16,562+0800 [pulsar-client-io-336-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, 
message id 3:0:-1  epoch 0
    2022-04-07T11:19:16,562+0800 [pulsar-io-308-1] INFO  
org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to 
consumer, message id 3/1 epoch 0
    2022-04-07T11:19:16,562+0800 [pulsar-client-io-336-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, 
message id 3:1:-1  epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  
org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to 
consumer, message id 3/2 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  
org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to 
consumer, message id 3/3 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  
org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to 
consumer, message id 3/4 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  
org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to 
consumer, message id 3/5 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  
org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to 
consumer, message id 3/6 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  
org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to 
consumer, message id 3/7 epoch 0
    2022-04-07T11:19:16,576+0800 [pulsar-io-308-1] INFO  
org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to 
consumer, message id 3/8 epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, 
message id 3:2:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, 
message id 3:3:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, 
message id 3:4:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [main] INFO  
org.apache.pulsar.client.api.SimpleProducerConsumerTest - ----Consumer receive 
and ack message 3/0 epoch 0
    2022-04-07T11:19:16,577+0800 [main] INFO  
org.apache.pulsar.client.api.SimpleProducerConsumerTest - ----Consumer receive 
and ack message 3/1 epoch 0
    2022-04-07T11:19:16,577+0800 [main] INFO  
org.apache.pulsar.client.api.SimpleProducerConsumerTest - ----Consumer receive 
and ack message 3/2 epoch 0
    2022-04-07T11:19:16,577+0800 [main] INFO  
org.apache.pulsar.client.api.SimpleProducerConsumerTest - ----Consumer receive 
and ack message 3/3 epoch 0
    2022-04-07T11:19:16,577+0800 [main] INFO  
org.apache.pulsar.client.api.SimpleProducerConsumerTest - ----Trigger unack 
messages redeliver and clear receive queue----
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, 
message id 3:5:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, 
message id 3:6:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, 
message id 3:7:-1  epoch 0
    2022-04-07T11:19:16,577+0800 [pulsar-client-io-336-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, 
message id 3:8:-1  epoch 0
    2022-04-07T11:19:16,578+0800 [broker-topic-workers-OrderedScheduler-8-0] 
INFO  
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer
 - [persistent://my-property/my-ns/unacked-topic / 
subscriber-1-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/unacked-topic,
 name=subscriber-1}, consumerId=0, consumerName=dde4a, 
address=/127.0.0.1:49153}] Ignoring reDeliverUnAcknowledgedMessages: 
cancelPendingRequest on cursor failed
    2022-04-07T11:19:16,578+0800 [pulsar-io-308-1] INFO  
org.apache.pulsar.broker.service.PulsarCommandSenderImpl - Send message to 
consumer, message id 3/9 epoch 0
    2022-04-07T11:19:16,578+0800 [pulsar-client-io-336-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - receiver queue received message, 
message id 3:9:-1  epoch 0
    2022-04-07T11:19:17,580+0800 [main] WARN  
org.apache.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, 
topic : [persistent://my-property/my-ns/unacked-topic], messageId : [3:5:-1:0], 
messageConsumerEpoch : [0], consumerEpoch : [1]
    2022-04-07T11:19:17,580+0800 [main] WARN  
org.apache.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, 
topic : [persistent://my-property/my-ns/unacked-topic], messageId : [3:6:-1:0], 
messageConsumerEpoch : [0], consumerEpoch : [1]
    2022-04-07T11:19:17,580+0800 [main] WARN  
org.apache.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, 
topic : [persistent://my-property/my-ns/unacked-topic], messageId : [3:7:-1:0], 
messageConsumerEpoch : [0], consumerEpoch : [1]
    2022-04-07T11:19:17,580+0800 [main] WARN  
org.apache.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, 
topic : [persistent://my-property/my-ns/unacked-topic], messageId : [3:8:-1:0], 
messageConsumerEpoch : [0], consumerEpoch : [1]
    2022-04-07T11:19:17,580+0800 [main] WARN  
org.apache.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, 
topic : [persistent://my-property/my-ns/unacked-topic], messageId : [3:9:-1:0], 
messageConsumerEpoch : [0], consumerEpoch : [1]
    ```
    
    In normal logic, the consumer will continue to receive messages from the 
broker. When the consumer triggers `redeliverUnacknowledgedMessages`, it will 
immediately increase the consumer epoch, and the request will rewind the broker 
cursor to trigger the broker to redeliver these messages with the new epoch.
    And then, the consumer will filter old messages by epoch.
    
    But in this case, we will find the following abnormal log, which means that 
the cursor pending read has been cancelled(cursor.cancelPendingReadRequest() 
return false), and then havePendingRead is not set to false. causing no cursor 
rewind at the end and didn't trigger the Dispatcher to re-push the messages.
    
    > Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor 
failed
    
    Due to this abnormal behaviour causing the consumer will never receive 
redelivered messages with epoch 1 as below.
    
    ```
    message id 3:4:-1  epoch 1
    message id 3:5:-1  epoch 1
    message id 3:6:-1  epoch 1
    message id 3:7:-1  epoch 1
    message id 3:8:-1  epoch 1
    message id 3:9:-1  epoch 1
    ```
    
    Relative code:
    
    
https://github.com/apache/pulsar/blob/81da8d3cd199fd6c1e4510a1c1c2ac71418efd5e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L317-L328
    
    
https://github.com/apache/pulsar/blob/81da8d3cd199fd6c1e4510a1c1c2ac71418efd5e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L136-L141
    
    
https://github.com/apache/pulsar/blob/81da8d3cd199fd6c1e4510a1c1c2ac71418efd5e/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L847-L851
    
    ### Modifications
    
    -  Force to rewind cursor when `redeliverUnacknowledgedMessages`
---
 .../PersistentDispatcherSingleActiveConsumer.java      | 18 ++++++------------
 .../pulsar/client/api/SimpleProducerConsumerTest.java  |  6 +++---
 2 files changed, 9 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index e2f2ac9a9f4..c0effad9bbd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -313,19 +313,13 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                     name, consumer);
             return;
         }
-
-        cancelPendingRead();
-
-        if (!havePendingRead) {
-            cursor.rewind();
-            if (log.isDebugEnabled()) {
-                log.debug("[{}-{}] Cursor rewinded, redelivering 
unacknowledged messages. ", name, consumer);
-            }
-            readMoreEntries(consumer);
-        } else {
-            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: 
cancelPendingRequest on cursor failed", name,
-                    consumer);
+        cursor.cancelPendingReadRequest();
+        havePendingRead = false;
+        cursor.rewind();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged 
messages. ", name, consumer);
         }
+        readMoreEntries(consumer);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 7506a9bf3f0..fa1505f4d6f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2435,7 +2435,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         Message<byte[]> msg;
         List<Message<byte[]>> messages1 = Lists.newArrayList();
         for (int i = 0; i < consumeMsgInParts; i++) {
-            msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            msg = consumer.receive();
             if (msg != null) {
                 messages1.add(msg);
                 consumer.acknowledge(msg);
@@ -2451,7 +2451,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         // (1.b) consume second consumeMsgInParts msgs and trigger redeliver
         messages1.clear();
         for (int i = 0; i < consumeMsgInParts; i++) {
-            msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            msg = consumer.receive();
             if (msg != null) {
                 messages1.add(msg);
                 consumer.acknowledge(msg);
@@ -2474,7 +2474,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         int remainingMsgs = (2 * receiverQueueSize) - (2 * consumeMsgInParts);
         messages1.clear();
         for (int i = 0; i < remainingMsgs; i++) {
-            msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            msg = consumer.receive();
             if (msg != null) {
                 messages1.add(msg);
                 consumer.acknowledge(msg);

Reply via email to