This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a41ac49d9f3 [improve] [broker] Add consumer-id into the log when doing 
subscribe. (#20568)
a41ac49d9f3 is described below

commit a41ac49d9f30c415d87ce747393a16fa724cf4c9
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 12 22:46:38 2023 +0800

    [improve] [broker] Add consumer-id into the log when doing subscribe. 
(#20568)
    
    - Since `cnx.address + consumerId` is the identifier of one consumer; add 
`consumer-id` into the log when doing subscribe.
    - add a test to confirm that even if the error occurs when sending messages 
to the client, the consumption is still OK.
    - print debug log if ack-command was discarded due to `ConsumerFuture is 
not complete.`
    - print debug log if sending a message to the client is failed.
---
 .../org/apache/pulsar/broker/service/Consumer.java |  6 +++
 .../apache/pulsar/broker/service/ServerCnx.java    |  9 +++-
 .../client/api/SimpleProducerConsumerTest.java     | 50 ++++++++++++++++++++++
 3 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 275d6852808..176f033a6dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -361,6 +361,12 @@ public class Consumer {
                 msgOutCounter.add(totalMessages);
                 bytesOutCounter.add(totalBytes);
                 chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 
0);
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}-{}] Sent messages to client fail by IO 
exception[{}], close the connection"
+                                    + " immediately. Consumer: {}",  
topicName, subscription,
+                            status.cause() == null ? "" : 
status.cause().getMessage(), this.toString());
+                }
             }
         });
         return writeAndFlushPromise;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f3e6e4a71d0..f91793cadfa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1160,7 +1160,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             remoteAddress, getPrincipal());
                 }
 
-                log.info("[{}] Subscribing on topic {} / {}", remoteAddress, 
topicName, subscriptionName);
+                log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", 
this.ctx().channel().toString(),
+                        topicName, subscriptionName, consumerId);
                 try {
                     Metadata.validateMetadata(metadata,
                             
service.getPulsar().getConfiguration().getMaxConsumerMetadataSize());
@@ -1783,6 +1784,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 }
                 return null;
             });
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Consumer future is not complete(not complete or 
error), but received command ack. so discard"
+                                + " this command. consumerId: {}, cnx: {}, 
messageIdCount: {}", ack.getConsumerId(),
+                        this.ctx().channel().toString(), 
ack.getMessageIdsCount());
+            }
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index c45c2b1522f..0c0e61fe33f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -39,6 +39,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.Timeout;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -84,7 +86,9 @@ import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -4642,4 +4646,50 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         producer2.close();
         client.close();
     }
+
+    @Test
+    public void testConsumeWhenDeliveryFailedByIOException() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "subscription1";
+        final int messagesCount = 100;
+        final int receiverQueueSize = 1;
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                
.subscriptionName(subscriptionName).receiverQueueSize(receiverQueueSize).subscribe();
+        for (int i = 0; i < messagesCount; i++) {
+            producer.send(i + "");
+        }
+        // Wait incoming queue of the consumer is full.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(consumer.getIncomingMessageSize(), receiverQueueSize);
+        });
+
+        // Mock an io error for sending messages out.
+        ServerCnx serverCnx = (ServerCnx) 
pulsar.getBrokerService().getTopic(topic, false).join().get()
+                
.getSubscription(subscriptionName).getDispatcher().getConsumers().iterator().next().cnx();
+        serverCnx.ctx().channel().pipeline().addFirst(new 
ChannelDuplexHandler() {
+
+            @Override
+            public void flush(ChannelHandlerContext ctx) throws Exception {
+                throw new IOException("Mocked error");
+            }
+        });
+
+        // Verify all messages will be consumed.
+        Set<String> receivedMessages = new HashSet<>();
+        while (true) {
+            Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg != null) {
+                receivedMessages.add(msg.getValue());
+                consumer.acknowledge(msg);
+            } else {
+                break;
+            }
+        }
+        Assert.assertEquals(receivedMessages.size(), messagesCount);
+
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topic, false);
+    }
 }

Reply via email to