codelipenghui commented on code in PR #17318:
URL: https://github.com/apache/pulsar/pull/17318#discussion_r990778655


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java:
##########
@@ -193,4 +194,42 @@ public void 
testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
         }
         Assert.assertEquals(numPartitions * numMessages, receivedCount);
     }
+
+    @Test
+    public void testBatchReceiveAckTimeout()
+            throws PulsarAdminException, PulsarClientException, 
InterruptedException {
+        String topicName = newTopicName();
+        int numPartitions = 2;
+        int numMessages = 100000;
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+        Producer<Long> producer = pulsarClient.newProducer(Schema.INT64)

Review Comment:
   @Cleanup



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java:
##########
@@ -193,4 +194,42 @@ public void 
testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
         }
         Assert.assertEquals(numPartitions * numMessages, receivedCount);
     }
+
+    @Test
+    public void testBatchReceiveAckTimeout()
+            throws PulsarAdminException, PulsarClientException, 
InterruptedException {
+        String topicName = newTopicName();
+        int numPartitions = 2;
+        int numMessages = 100000;
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+        Producer<Long> producer = pulsarClient.newProducer(Schema.INT64)
+                .topic(topicName)
+                .enableBatching(false)
+                .blockIfQueueFull(true)
+                .create();
+
+        @Cleanup
+        Consumer<Long> consumer = pulsarClient
+                .newConsumer(Schema.INT64)
+                .topic(topicName)
+                .receiverQueueSize(numMessages)
+                .batchReceivePolicy(
+                        
BatchReceivePolicy.builder().maxNumMessages(1).timeout(2, 
TimeUnit.SECONDS).build()
+                ).ackTimeout(1000, TimeUnit.MILLISECONDS)
+                .subscriptionName(methodName)
+                .subscribe();
+
+        producer.newMessage()
+                .value(1l)
+                .send();
+        Thread.sleep(300);

Review Comment:
   Why do we need `sleep` here?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1890,64 +1858,62 @@ public int numMessagesInQueue() {
         return incomingMessages.size();
     }
 
-    @Override
-    public void redeliverUnacknowledgedMessages() {
-        // First : synchronized in order to handle consumer reconnect produce 
race condition, when broker receive
-        // redeliverUnacknowledgedMessages and consumer have not be created and
-        // then receive reconnect epoch change the broker is smaller than the 
client epoch, this will cause client epoch
-        // smaller than broker epoch forever. client will not receive message 
anymore.
-        // Second : we should synchronized `ClientCnx cnx = cnx()` to
-        // prevent use old cnx to send redeliverUnacknowledgedMessages to a 
old broker
-        synchronized (ConsumerImpl.this) {
-            ClientCnx cnx = cnx();
-            // V1 don't support redeliverUnacknowledgedMessages
-            if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v2.getValue()) {
-                if ((getState() == State.Connecting)) {
-                    log.warn("[{}] Client Connection needs to be established "
-                            + "for redelivery of unacknowledged messages", 
this);
-                } else {
-                    log.warn("[{}] Reconnecting the client to redeliver the 
messages.", this);
-                    cnx.ctx().close();
-                }
+    public CompletableFuture<Void> internalRedeliverUnacknowledgedMessages() {
+        return CompletableFuture.runAsync(() -> {
+            // First : synchronized in order to handle consumer reconnect 
produce race condition, when broker receive
+            // redeliverUnacknowledgedMessages and consumer have not be 
created and then receive reconnect epoch
+            // change the broker is smaller than the client epoch, this will 
cause client epoch smaller
+            // than broker epoch forever. client will not receive message 
anymore.
+            // Second : we should synchronized `ClientCnx cnx = cnx()` to  
prevent use old cnx to
+            // send redeliverUnacknowledgedMessages to a old broker
+            synchronized (ConsumerImpl.this) {
+                ClientCnx cnx = cnx();
+                // V1 don't support redeliverUnacknowledgedMessages
+                if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v2.getValue()) {
+                    if ((getState() == State.Connecting)) {
+                        log.warn("[{}] Client Connection needs to be 
established "
+                                + "for redelivery of unacknowledged messages", 
this);
+                    } else {
+                        log.warn("[{}] Reconnecting the client to redeliver 
the messages.", this);
+                        cnx.ctx().close();
+                    }
 
-                return;
-            }
+                    return;
+                }
 
-            // clear local message
-            int currentSize = 0;
-            currentSize = incomingMessages.size();
-            clearIncomingMessages();
-            unAckedMessageTracker.clear();
-
-            // we should increase epoch every time, because 
MultiTopicsConsumerImpl also increase it,
-            // we need to keep both epochs the same
-            if (conf.getSubscriptionType() == SubscriptionType.Failover
-                    || conf.getSubscriptionType() == 
SubscriptionType.Exclusive) {
-                CONSUMER_EPOCH.incrementAndGet(this);
-            }
-            // is channel is connected, we should send redeliver command to 
broker
-            if (cnx != null && isConnected(cnx)) {
-                
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(
-                        consumerId, CONSUMER_EPOCH.get(this)), 
cnx.ctx().voidPromise());
-                if (currentSize > 0) {
-                    increaseAvailablePermits(cnx, currentSize);
+                // we should increase epoch every time, because 
MultiTopicsConsumerImpl also increase it,
+                // we need to keep both epochs the same
+                if (conf.getSubscriptionType() == SubscriptionType.Failover
+                        || conf.getSubscriptionType() == 
SubscriptionType.Exclusive) {
+                    CONSUMER_EPOCH.incrementAndGet(this);
                 }
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] [{}] [{}] Redeliver unacked messages and 
send {} permits", subscription, topic,
-                            consumerName, currentSize);
+                // clear local message
+                int currentSize = incomingMessages.size();
+                clearIncomingMessages();
+                unAckedMessageTracker.clear();
+                // is channel is connected, we should send redeliver command 
to broker
+                if (cnx != null && isConnected(cnx)) {
+                    
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(
+                            consumerId, CONSUMER_EPOCH.get(this)), 
cnx.ctx().voidPromise());
+                    if (currentSize > 0) {
+                        increaseAvailablePermits(cnx, currentSize);
+                    }
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] [{}] [{}] Redeliver unacked messages 
and send {} permits", subscription, topic,
+                                consumerName, currentSize);
+                    }
+                } else {
+                    log.warn("[{}] Send redeliver messages command but the 
client is reconnect or close, "
+                            + "so don't need to send redeliver command to 
broker", this);
                 }
-            } else {
-                log.warn("[{}] Send redeliver messages command but the client 
is reconnect or close, "
-                        + "so don't need to send redeliver command to broker", 
this);
             }
-        }
+        }, internalPinnedExecutor);
     }
 
-    public int clearIncomingMessagesAndGetMessageNumber() {
-        int messagesNumber = incomingMessages.size();
-        clearIncomingMessages();
-        unAckedMessageTracker.clear();
-        return messagesNumber;
+    @SneakyThrows
+    @Override
+    public void redeliverUnacknowledgedMessages() {
+        internalRedeliverUnacknowledgedMessages().get();

Review Comment:
   We should unwrap the `ExecutionException` that is introduced by the `get()` 
operation.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -695,17 +656,20 @@ private ConsumerConfigurationData<T> 
getInternalConsumerConfig() {
         return internalConsumerConfig;
     }
 
+    @SneakyThrows
     @Override
     public void redeliverUnacknowledgedMessages() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
         internalPinnedExecutor.execute(() -> {
             CONSUMER_EPOCH.incrementAndGet(this);
             consumers.values().stream().forEach(consumer -> {
-                consumer.redeliverUnacknowledgedMessages();
+                
futures.add(consumer.internalRedeliverUnacknowledgedMessages());
                 consumer.unAckedChunkedMessageIdSequenceMap.clear();
             });
             clearIncomingMessages();
             unAckedMessageTracker.clear();
         });
+        FutureUtil.waitForAll(futures).get();

Review Comment:
   We should unwrap the `ExecutionException` that is introduced by the `get()` 
operation.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -695,17 +656,20 @@ private ConsumerConfigurationData<T> 
getInternalConsumerConfig() {
         return internalConsumerConfig;
     }
 
+    @SneakyThrows
     @Override
     public void redeliverUnacknowledgedMessages() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();

Review Comment:
   ```suggestion
           List<CompletableFuture<Void>> futures = new 
ArrayList<>(consumers.size());
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java:
##########
@@ -309,21 +303,110 @@ public void testRedeliveryAddEpoch(boolean enableBatch) 
throws Exception{
         message = consumer.receive(3, TimeUnit.SECONDS);
         assertNull(message);
 
-        Field field = 
consumer.getClass().getDeclaredField("connectionHandler");
-        field.setAccessible(true);
-        ConnectionHandler connectionHandler = (ConnectionHandler) 
field.get(consumer);
-
-        field = 
connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
-        field.setAccessible(true);
-
+        ConnectionHandler connectionHandler = consumer.getConnectionHandler();
         connectionHandler.cnx().channel().close();
 
-        ((ConsumerImpl<String>) consumer).grabCnx();
+        consumer.grabCnx();
+
         message = consumer.receive(3, TimeUnit.SECONDS);
         assertNotNull(message);
         assertEquals(message.getValue(), test3);
     }
 
+    @Test(dataProvider = "enableBatch")
+    public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws 
Exception {
+        final String topic = "testRedeliveryAddEpochAndPermits";
+        final String subName = "my-sub";
+        // set receive queue size is 4, and first send 4 messages,
+        // then call redeliver messages, assert receive msg num.
+        int receiveQueueSize = 4;
+        ConsumerImpl<String> consumer = ((ConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .receiverQueueSize(receiveQueueSize)
+                .autoScaledReceiverQueueSizeEnabled(false)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe());
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(enableBatch)
+                .create();
+
+        consumer.setConsumerEpoch(1);
+        for (int i = 0; i < receiveQueueSize; i++) {
+            producer.send("pulsar" + i);
+        }
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        consumer.redeliverUnacknowledgedMessages();
+        for (int i = 0; i < receiveQueueSize; i++) {
+            Message<String> msg = consumer.receive();
+            assertEquals("pulsar" + i, msg.getValue());
+        }
+    }
+
+    @Test(dataProvider = "enableBatch")
+    public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws 
Exception{

Review Comment:
   Looks like we missed the consumer epoch test for the partitioned topic?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java:
##########
@@ -309,21 +303,110 @@ public void testRedeliveryAddEpoch(boolean enableBatch) 
throws Exception{
         message = consumer.receive(3, TimeUnit.SECONDS);
         assertNull(message);
 
-        Field field = 
consumer.getClass().getDeclaredField("connectionHandler");
-        field.setAccessible(true);
-        ConnectionHandler connectionHandler = (ConnectionHandler) 
field.get(consumer);
-
-        field = 
connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
-        field.setAccessible(true);
-
+        ConnectionHandler connectionHandler = consumer.getConnectionHandler();
         connectionHandler.cnx().channel().close();
 
-        ((ConsumerImpl<String>) consumer).grabCnx();
+        consumer.grabCnx();
+
         message = consumer.receive(3, TimeUnit.SECONDS);
         assertNotNull(message);
         assertEquals(message.getValue(), test3);
     }
 
+    @Test(dataProvider = "enableBatch")
+    public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws 
Exception {
+        final String topic = "testRedeliveryAddEpochAndPermits";
+        final String subName = "my-sub";
+        // set receive queue size is 4, and first send 4 messages,
+        // then call redeliver messages, assert receive msg num.
+        int receiveQueueSize = 4;
+        ConsumerImpl<String> consumer = ((ConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .receiverQueueSize(receiveQueueSize)
+                .autoScaledReceiverQueueSizeEnabled(false)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe());
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(enableBatch)
+                .create();
+
+        consumer.setConsumerEpoch(1);
+        for (int i = 0; i < receiveQueueSize; i++) {
+            producer.send("pulsar" + i);
+        }
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        consumer.redeliverUnacknowledgedMessages();
+        for (int i = 0; i < receiveQueueSize; i++) {
+            Message<String> msg = consumer.receive();
+            assertEquals("pulsar" + i, msg.getValue());
+        }
+    }
+
+    @Test(dataProvider = "enableBatch")
+    public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws 
Exception{
+        final String topic = "testBatchReceiveRedeliveryAddEpoch";
+        final String subName = "my-sub";
+        ConsumerImpl<String> consumer = ((ConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subName)
+                .batchReceivePolicy(BatchReceivePolicy.builder().timeout(1000, 
TimeUnit.MILLISECONDS).build())
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe());
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(enableBatch)
+                .create();

Review Comment:
   Close the producer and consumer after done the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to