This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 56f89840f6d [improve] [broker] Add consumer-id into the log when doing 
subscribe. (#20568)
56f89840f6d is described below

commit 56f89840f6d78bf942712620175536acfbff01e4
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 12 22:46:38 2023 +0800

    [improve] [broker] Add consumer-id into the log when doing subscribe. 
(#20568)
    
    - Since `cnx.address + consumerId` is the identifier of one consumer; add 
`consumer-id` into the log when doing subscribe.
    - add a test to confirm that even if the error occurs when sending messages 
to the client, the consumption is still OK.
    - print debug log if ack-command was discarded due to `ConsumerFuture is 
not complete.`
    - print debug log if sending a message to the client is failed.
---
 .../org/apache/pulsar/broker/service/Consumer.java |  6 +++
 .../apache/pulsar/broker/service/ServerCnx.java    |  9 +++-
 .../client/api/SimpleProducerConsumerTest.java     | 52 +++++++++++++++++++++-
 3 files changed, 65 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 5b0b37efdef..21d618d45a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -345,6 +345,12 @@ public class Consumer {
                 msgOutCounter.add(totalMessages);
                 bytesOutCounter.add(totalBytes);
                 chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 
0);
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}-{}] Sent messages to client fail by IO 
exception[{}], close the connection"
+                                    + " immediately. Consumer: {}",  
topicName, subscription,
+                            status.cause() == null ? "" : 
status.cause().getMessage(), this.toString());
+                }
             }
         });
         return writeAndFlushPromise;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 33c24c8c973..2c70baa80a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1049,7 +1049,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             remoteAddress, getPrincipal());
                 }
 
-                log.info("[{}] Subscribing on topic {} / {}", remoteAddress, 
topicName, subscriptionName);
+                log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", 
this.ctx().channel().toString(),
+                        topicName, subscriptionName, consumerId);
                 try {
                     Metadata.validateMetadata(metadata,
                             
service.getPulsar().getConfiguration().getMaxConsumerMetadataSize());
@@ -1633,6 +1634,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         }
                         return null;
                     });
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Consumer future is not complete(not complete or 
error), but received command ack. so discard"
+                                + " this command. consumerId: {}, cnx: {}, 
messageIdCount: {}", ack.getConsumerId(),
+                        this.ctx().channel().toString(), 
ack.getMessageIdsCount());
+            }
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 21ec7e3e5f3..46513c70844 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -39,6 +39,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.Timeout;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -79,6 +81,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -4506,4 +4510,50 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             assertEquals(values.get(i), "msg-" + i);
         }
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testConsumeWhenDeliveryFailedByIOException() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "subscription1";
+        final int messagesCount = 100;
+        final int receiverQueueSize = 1;
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                
.subscriptionName(subscriptionName).receiverQueueSize(receiverQueueSize).subscribe();
+        for (int i = 0; i < messagesCount; i++) {
+            producer.send(i + "");
+        }
+        // Wait incoming queue of the consumer is full.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(consumer.getIncomingMessageSize(), receiverQueueSize);
+        });
+
+        // Mock an io error for sending messages out.
+        ServerCnx serverCnx = (ServerCnx) 
pulsar.getBrokerService().getTopic(topic, false).join().get()
+                
.getSubscription(subscriptionName).getDispatcher().getConsumers().iterator().next().cnx();
+        serverCnx.ctx().channel().pipeline().addFirst(new 
ChannelDuplexHandler() {
+
+            @Override
+            public void flush(ChannelHandlerContext ctx) throws Exception {
+                throw new IOException("Mocked error");
+            }
+        });
+
+        // Verify all messages will be consumed.
+        Set<String> receivedMessages = new HashSet<>();
+        while (true) {
+            Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg != null) {
+                receivedMessages.add(msg.getValue());
+                consumer.acknowledge(msg);
+            } else {
+                break;
+            }
+        }
+        Assert.assertEquals(receivedMessages.size(), messagesCount);
+
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topic, false);
+    }
+}

Reply via email to