This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fe4bc9af39022cab75fb1677eff231285c6ad5fa Author: lipenghui <peng...@apache.org> AuthorDate: Fri Oct 22 14:57:05 2021 +0800 Add debug log for WebSocket. (#12458) --- .../org/apache/pulsar/websocket/ConsumerHandler.java | 20 ++++++++++++++++++++ .../org/apache/pulsar/websocket/ProducerHandler.java | 8 ++++++++ 2 files changed, 28 insertions(+) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index a1c76d2..d30c084 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -232,6 +232,10 @@ public class ConsumerHandler extends AbstractWebSocketHandler { // Check and notify consumer if reached end of topic. private void handleEndOfTopic() { + if (log.isDebugEnabled()) { + log.debug("[{}/{}] Received check reach the end of topic request from {} ", consumer.getTopic(), + subscription, getRemote().getInetSocketAddress().toString()); + } try { String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString( new EndOfTopicResponse(consumer.hasReachedEndOfTopic())); @@ -259,6 +263,10 @@ public class ConsumerHandler extends AbstractWebSocketHandler { } private void handleUnsubscribe(ConsumerCommand command) throws PulsarClientException { + if (log.isDebugEnabled()) { + log.debug("[{}/{}] Received unsubscribe request from {} ", consumer.getTopic(), + subscription, getRemote().getInetSocketAddress().toString()); + } consumer.unsubscribe(); } @@ -276,6 +284,10 @@ public class ConsumerHandler extends AbstractWebSocketHandler { // We should have received an ack MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), topic.toString()); + if (log.isDebugEnabled()) { + log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(), + subscription, msgId, getRemote().getInetSocketAddress().toString()); + } consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment()); checkResumeReceive(); } @@ -283,11 +295,19 @@ public class ConsumerHandler extends AbstractWebSocketHandler { private void handleNack(ConsumerCommand command) throws IOException { MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), topic.toString()); + if (log.isDebugEnabled()) { + log.debug("[{}/{}] Received negative ack request of message {} from {} ", consumer.getTopic(), + subscription, msgId, getRemote().getInetSocketAddress().toString()); + } consumer.negativeAcknowledge(msgId); checkResumeReceive(); } private void handlePermit(ConsumerCommand command) throws IOException { + if (log.isDebugEnabled()) { + log.debug("[{}/{}] Received {} permits request from {} ", consumer.getTopic(), + subscription, command.permitMessages, getRemote().getInetSocketAddress().toString()); + } if (command.permitMessages == null) { throw new IOException("Missing required permitMessages field for 'permit' command"); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java index 9552d42..9b6593d 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java @@ -131,6 +131,10 @@ public class ProducerHandler extends AbstractWebSocketHandler { @Override public void onWebSocketText(String message) { + if (log.isDebugEnabled()) { + log.debug("[{}] Received new message from producer {} ", producer.getTopic(), + getRemote().getInetSocketAddress().toString()); + } ProducerMessage sendRequest; byte[] rawPayload = null; String requestContext = null; @@ -188,6 +192,10 @@ public class ProducerHandler extends AbstractWebSocketHandler { final long now = System.nanoTime(); builder.sendAsync().thenAccept(msgId -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Success fully write the message to broker with returned message ID {} from producer {}", + producer.getTopic(), msgId, getRemote().getInetSocketAddress().toString()); + } updateSentMsgStats(msgSize, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - now)); if (isConnected()) { String messageId = Base64.getEncoder().encodeToString(msgId.toByteArray());