This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fe4bc9af39022cab75fb1677eff231285c6ad5fa
Author: lipenghui <peng...@apache.org>
AuthorDate: Fri Oct 22 14:57:05 2021 +0800

    Add debug log for WebSocket. (#12458)
---
 .../org/apache/pulsar/websocket/ConsumerHandler.java | 20 ++++++++++++++++++++
 .../org/apache/pulsar/websocket/ProducerHandler.java |  8 ++++++++
 2 files changed, 28 insertions(+)

diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index a1c76d2..d30c084 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -232,6 +232,10 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
 
     // Check and notify consumer if reached end of topic.
     private void handleEndOfTopic() {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}/{}] Received check reach the end of topic request 
from {} ", consumer.getTopic(),
+                    subscription, 
getRemote().getInetSocketAddress().toString());
+        }
         try {
             String msg = 
ObjectMapperFactory.getThreadLocal().writeValueAsString(
                     new EndOfTopicResponse(consumer.hasReachedEndOfTopic()));
@@ -259,6 +263,10 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
     }
 
     private void handleUnsubscribe(ConsumerCommand command) throws 
PulsarClientException {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}/{}] Received unsubscribe request from {} ", 
consumer.getTopic(),
+                    subscription, 
getRemote().getInetSocketAddress().toString());
+        }
         consumer.unsubscribe();
     }
 
@@ -276,6 +284,10 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
         // We should have received an ack
         MessageId msgId = 
MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                 topic.toString());
+        if (log.isDebugEnabled()) {
+            log.debug("[{}/{}] Received ack request of message {} from {} ", 
consumer.getTopic(),
+                    subscription, msgId, 
getRemote().getInetSocketAddress().toString());
+        }
         consumer.acknowledgeAsync(msgId).thenAccept(consumer -> 
numMsgsAcked.increment());
         checkResumeReceive();
     }
@@ -283,11 +295,19 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
     private void handleNack(ConsumerCommand command) throws IOException {
         MessageId msgId = 
MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
             topic.toString());
+        if (log.isDebugEnabled()) {
+            log.debug("[{}/{}] Received negative ack request of message {} 
from {} ", consumer.getTopic(),
+                    subscription, msgId, 
getRemote().getInetSocketAddress().toString());
+        }
         consumer.negativeAcknowledge(msgId);
         checkResumeReceive();
     }
 
     private void handlePermit(ConsumerCommand command) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}/{}] Received {} permits request from {} ", 
consumer.getTopic(),
+                    subscription, command.permitMessages, 
getRemote().getInetSocketAddress().toString());
+        }
         if (command.permitMessages == null) {
             throw new IOException("Missing required permitMessages field for 
'permit' command");
         }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 9552d42..9b6593d 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -131,6 +131,10 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
 
     @Override
     public void onWebSocketText(String message) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Received new message from producer {} ", 
producer.getTopic(),
+                    getRemote().getInetSocketAddress().toString());
+        }
         ProducerMessage sendRequest;
         byte[] rawPayload = null;
         String requestContext = null;
@@ -188,6 +192,10 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
         final long now = System.nanoTime();
 
         builder.sendAsync().thenAccept(msgId -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Success fully write the message to broker with 
returned message ID {} from producer {}",
+                        producer.getTopic(), msgId, 
getRemote().getInetSocketAddress().toString());
+            }
             updateSentMsgStats(msgSize, 
TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - now));
             if (isConnected()) {
                 String messageId = 
Base64.getEncoder().encodeToString(msgId.toByteArray());

Reply via email to