This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a41ac49d9f3 [improve] [broker] Add consumer-id into the log when doing
subscribe. (#20568)
a41ac49d9f3 is described below
commit a41ac49d9f30c415d87ce747393a16fa724cf4c9
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 12 22:46:38 2023 +0800
[improve] [broker] Add consumer-id into the log when doing subscribe.
(#20568)
- Since `cnx.address + consumerId` is the identifier of one consumer; add
`consumer-id` into the log when doing subscribe.
- add a test to confirm that even if the error occurs when sending messages
to the client, the consumption is still OK.
- print debug log if ack-command was discarded due to `ConsumerFuture is
not complete.`
- print debug log if sending a message to the client is failed.
---
.../org/apache/pulsar/broker/service/Consumer.java | 6 +++
.../apache/pulsar/broker/service/ServerCnx.java | 9 +++-
.../client/api/SimpleProducerConsumerTest.java | 50 ++++++++++++++++++++++
3 files changed, 64 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 275d6852808..176f033a6dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -361,6 +361,12 @@ public class Consumer {
msgOutCounter.add(totalMessages);
bytesOutCounter.add(totalBytes);
chunkedMessageRate.recordMultipleEvents(totalChunkedMessages,
0);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Sent messages to client fail by IO
exception[{}], close the connection"
+ + " immediately. Consumer: {}",
topicName, subscription,
+ status.cause() == null ? "" :
status.cause().getMessage(), this.toString());
+ }
}
});
return writeAndFlushPromise;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f3e6e4a71d0..f91793cadfa 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1160,7 +1160,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
remoteAddress, getPrincipal());
}
- log.info("[{}] Subscribing on topic {} / {}", remoteAddress,
topicName, subscriptionName);
+ log.info("[{}] Subscribing on topic {} / {}. consumerId: {}",
this.ctx().channel().toString(),
+ topicName, subscriptionName, consumerId);
try {
Metadata.validateMetadata(metadata,
service.getPulsar().getConfiguration().getMaxConsumerMetadataSize());
@@ -1783,6 +1784,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
return null;
});
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Consumer future is not complete(not complete or
error), but received command ack. so discard"
+ + " this command. consumerId: {}, cnx: {},
messageIdCount: {}", ack.getConsumerId(),
+ this.ctx().channel().toString(),
ack.getMessageIdsCount());
+ }
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index c45c2b1522f..0c0e61fe33f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -39,6 +39,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Timeout;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -84,7 +86,9 @@ import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -4642,4 +4646,50 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
producer2.close();
client.close();
}
+
+ @Test
+ public void testConsumeWhenDeliveryFailedByIOException() throws Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ final String subscriptionName = "subscription1";
+ final int messagesCount = 100;
+ final int receiverQueueSize = 1;
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>)
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+
.subscriptionName(subscriptionName).receiverQueueSize(receiverQueueSize).subscribe();
+ for (int i = 0; i < messagesCount; i++) {
+ producer.send(i + "");
+ }
+ // Wait incoming queue of the consumer is full.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(consumer.getIncomingMessageSize(), receiverQueueSize);
+ });
+
+ // Mock an io error for sending messages out.
+ ServerCnx serverCnx = (ServerCnx)
pulsar.getBrokerService().getTopic(topic, false).join().get()
+
.getSubscription(subscriptionName).getDispatcher().getConsumers().iterator().next().cnx();
+ serverCnx.ctx().channel().pipeline().addFirst(new
ChannelDuplexHandler() {
+
+ @Override
+ public void flush(ChannelHandlerContext ctx) throws Exception {
+ throw new IOException("Mocked error");
+ }
+ });
+
+ // Verify all messages will be consumed.
+ Set<String> receivedMessages = new HashSet<>();
+ while (true) {
+ Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+ if (msg != null) {
+ receivedMessages.add(msg.getValue());
+ consumer.acknowledge(msg);
+ } else {
+ break;
+ }
+ }
+ Assert.assertEquals(receivedMessages.size(), messagesCount);
+
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topic, false);
+ }
}