This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 00ce781  Catch throwable in interceptors of consumer and producer. 
(#4860)
00ce781 is described below

commit 00ce7815f4c3428215abedb0608b7acfa2c35bd5
Author: lipenghui <[email protected]>
AuthorDate: Thu Aug 1 14:40:08 2019 +0800

    Catch throwable in interceptors of consumer and producer. (#4860)
---
 .../apache/pulsar/client/api/InterceptorsTest.java | 101 +++++++++++++++++++++
 .../pulsar/client/impl/ConsumerInterceptors.java   |  12 +--
 .../pulsar/client/impl/ProducerInterceptors.java   |   6 +-
 3 files changed, 110 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index c079d65..3192981 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -146,6 +146,107 @@ public class InterceptorsTest extends 
ProducerConsumerBase {
     }
 
     @Test
+    public void testProducerInterceptorsWithErrors() throws 
PulsarClientException {
+        ProducerInterceptor<String> interceptor = new 
ProducerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeSend(Producer<String> producer, 
Message<String> message) {
+                throw new AbstractMethodError();
+            }
+
+            @Override
+            public void onSendAcknowledgement(Producer<String> producer, 
Message<String> message, MessageId msgId, Throwable exception) {
+                throw new AbstractMethodError();
+            }
+        };
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .intercept(interceptor)
+                .create();
+
+        MessageId messageId = producer.newMessage().value("Hello 
Pulsar!").send();
+        Assert.assertNotNull(messageId);
+        producer.close();
+    }
+
+    @Test
+    public void testConsumerInterceptorWithErrors() throws 
PulsarClientException {
+        ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
+            @Override
+            public void close() {
+                throw new AbstractMethodError();
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, 
Message<String> message) {
+                throw new AbstractMethodError();
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId 
messageId, Throwable exception) {
+                throw new AbstractMethodError();
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, 
MessageId messageId, Throwable exception) {
+                throw new AbstractMethodError();
+            }
+
+            @Override
+            public void onNegativeAcksSend(Consumer<String> consumer, 
Set<MessageId> messageIds) {
+                throw new AbstractMethodError();
+            }
+
+            @Override
+            public void onAckTimeoutSend(Consumer<String> consumer, 
Set<MessageId> messageIds) {
+                throw new AbstractMethodError();
+            }
+        };
+        Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic-exception")
+                .subscriptionType(SubscriptionType.Shared)
+                .intercept(interceptor)
+                .subscriptionName("my-subscription-ack-timeout")
+                .ackTimeout(3, TimeUnit.SECONDS)
+                .subscribe();
+
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic-exception")
+                .subscriptionType(SubscriptionType.Shared)
+                .intercept(interceptor)
+                .subscriptionName("my-subscription-negative")
+                .subscribe();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic-exception")
+                .create();
+
+        producer.newMessage().value("Hello Pulsar!").send();
+
+        Message<String> received = consumer1.receive();
+        Assert.assertEquals(received.getValue(), "Hello Pulsar!");
+        // wait ack timeout
+        Message<String> receivedAgain = consumer1.receive();
+        Assert.assertEquals(receivedAgain.getValue(), "Hello Pulsar!");
+        consumer1.acknowledge(receivedAgain);
+
+        received = consumer2.receive();
+        Assert.assertEquals(received.getValue(), "Hello Pulsar!");
+        consumer2.negativeAcknowledge(received);
+        receivedAgain = consumer2.receive();
+        Assert.assertEquals(receivedAgain.getValue(), "Hello Pulsar!");
+        consumer2.acknowledge(receivedAgain);
+
+        producer.close();
+        consumer1.close();
+        consumer2.close();
+    }
+
+    @Test
     public void testConsumerInterceptorWithSingleTopicSubscribe() throws 
PulsarClientException {
         ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
             @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
index e5bf834..91b3228 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
@@ -66,7 +66,7 @@ public class ConsumerInterceptors<T> implements Closeable {
         for (int i = 0, interceptorsSize = interceptors.size(); i < 
interceptorsSize; i++) {
             try {
                 interceptorMessage = 
interceptors.get(i).beforeConsume(consumer, interceptorMessage);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 if (consumer != null) {
                     log.warn("Error executing interceptor beforeConsume 
callback topic: {} consumerName: {}", consumer.getTopic(), 
consumer.getConsumerName(), e);
                 } else {
@@ -92,7 +92,7 @@ public class ConsumerInterceptors<T> implements Closeable {
         for (int i = 0, interceptorsSize = interceptors.size(); i < 
interceptorsSize; i++) {
             try {
                 interceptors.get(i).onAcknowledge(consumer, messageId, 
exception);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 log.warn("Error executing interceptor onAcknowledge callback 
", e);
             }
         }
@@ -113,7 +113,7 @@ public class ConsumerInterceptors<T> implements Closeable {
         for (int i = 0, interceptorsSize = interceptors.size(); i < 
interceptorsSize; i++) {
             try {
                 interceptors.get(i).onAcknowledgeCumulative(consumer, 
messageId, exception);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 log.warn("Error executing interceptor onAcknowledgeCumulative 
callback ", e);
             }
         }
@@ -134,7 +134,7 @@ public class ConsumerInterceptors<T> implements Closeable {
         for (int i = 0, interceptorsSize = interceptors.size(); i < 
interceptorsSize; i++) {
             try {
                 interceptors.get(i).onNegativeAcksSend(consumer, messageIds);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 log.warn("Error executing interceptor onNegativeAcksSend 
callback", e);
             }
         }
@@ -155,7 +155,7 @@ public class ConsumerInterceptors<T> implements Closeable {
         for (int i = 0, interceptorsSize = interceptors.size(); i < 
interceptorsSize; i++) {
             try {
                 interceptors.get(i).onAckTimeoutSend(consumer, messageIds);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 log.warn("Error executing interceptor onAckTimeoutSend 
callback", e);
             }
         }
@@ -166,7 +166,7 @@ public class ConsumerInterceptors<T> implements Closeable {
         for (int i = 0, interceptorsSize = interceptors.size(); i < 
interceptorsSize; i++) {
             try {
                 interceptors.get(i).close();
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 log.error("Fail to close consumer interceptor ", e);
             }
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
index c70e4a3..68fa54b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
@@ -63,7 +63,7 @@ public class ProducerInterceptors<T> implements Closeable {
         for (int i = 0; i < interceptors.size(); i++) {
             try {
                 interceptorMessage = interceptors.get(i).beforeSend(producer, 
interceptorMessage);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 if (producer != null) {
                     log.warn("Error executing interceptor beforeSend callback 
for topicName:{} ", producer.getTopic(), e);
                 } else {
@@ -91,7 +91,7 @@ public class ProducerInterceptors<T> implements Closeable {
         for (int i = 0; i < interceptors.size(); i++) {
             try {
                 interceptors.get(i).onSendAcknowledgement(producer, message, 
msgId, exception);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 log.warn("Error executing interceptor onSendAcknowledgement 
callback ", e);
             }
         }
@@ -102,7 +102,7 @@ public class ProducerInterceptors<T> implements Closeable {
         for (int i = 0; i < interceptors.size(); i++) {
             try {
                 interceptors.get(i).close();
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 log.error("Fail to close producer interceptor ", e);
             }
         }

Reply via email to