merlimat closed pull request #1537: Log warn message if exception occurs while 
WebSocket proxy is sending…
URL: https://github.com/apache/incubator-pulsar/pull/1537
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index aea88f3433..406a2c8154 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -35,6 +35,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
+import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -181,6 +182,14 @@ public void writeSuccess() {
                 service.getExecutor().execute(() -> receiveMessage());
             }
         }).exceptionally(exception -> {
+            if (exception.getCause() instanceof AlreadyClosedException) {
+                log.info("[{}/{}] Consumer was closed while receiving msg from 
broker", consumer.getTopic(),
+                        subscription);
+            } else {
+                log.warn("[{}/{}] Error occurred while consumer handler was 
delivering msg to {}: {}",
+                        consumer.getTopic(), subscription, 
getRemote().getInetSocketAddress().toString(),
+                        exception.getMessage());
+            }
             return null;
         });
     }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 25fed37f11..c2c3ee9400 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -187,6 +187,8 @@ public void onWebSocketText(String message) {
                 sendAckResponse(new ProducerAck(messageId, 
sendRequest.context));
             }
         }).exceptionally(exception -> {
+            log.warn("[{}] Error occurred while producer handler was sending 
msg from {}: {}", producer.getTopic(),
+                    getRemote().getInetSocketAddress().toString(), 
exception.getMessage());
             numMsgsFailed.increment();
             sendAckResponse(
                     new ProducerAck(UnknownError, exception.getMessage(), 
null, sendRequest.context));
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 2efb58545a..54037fa852 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -31,6 +31,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
+import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -166,8 +167,12 @@ public void writeSuccess() {
                 service.getExecutor().execute(() -> receiveMessage());
             }
         }).exceptionally(exception -> {
-            log.warn("[{}/{}] Failed to deliver msg to {} {}", 
reader.getTopic(),
-                    subscription, 
getRemote().getInetSocketAddress().toString(), exception);
+            if (exception.getCause() instanceof AlreadyClosedException) {
+                log.info("[{}/{}] Reader was closed while receiving msg from 
broker", reader.getTopic(), subscription);
+            } else {
+                log.warn("[{}/{}] Error occurred while reader handler was 
delivering msg to {}: {}", reader.getTopic(),
+                        subscription, 
getRemote().getInetSocketAddress().toString(), exception.getMessage());
+            }
             return null;
         });
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to