This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 00ce781 Catch throwable in interceptors of consumer and producer.
(#4860)
00ce781 is described below
commit 00ce7815f4c3428215abedb0608b7acfa2c35bd5
Author: lipenghui <[email protected]>
AuthorDate: Thu Aug 1 14:40:08 2019 +0800
Catch throwable in interceptors of consumer and producer. (#4860)
---
.../apache/pulsar/client/api/InterceptorsTest.java | 101 +++++++++++++++++++++
.../pulsar/client/impl/ConsumerInterceptors.java | 12 +--
.../pulsar/client/impl/ProducerInterceptors.java | 6 +-
3 files changed, 110 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index c079d65..3192981 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -146,6 +146,107 @@ public class InterceptorsTest extends
ProducerConsumerBase {
}
@Test
+ public void testProducerInterceptorsWithErrors() throws
PulsarClientException {
+ ProducerInterceptor<String> interceptor = new
ProducerInterceptor<String>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Message<String> beforeSend(Producer<String> producer,
Message<String> message) {
+ throw new AbstractMethodError();
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer<String> producer,
Message<String> message, MessageId msgId, Throwable exception) {
+ throw new AbstractMethodError();
+ }
+ };
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .intercept(interceptor)
+ .create();
+
+ MessageId messageId = producer.newMessage().value("Hello
Pulsar!").send();
+ Assert.assertNotNull(messageId);
+ producer.close();
+ }
+
+ @Test
+ public void testConsumerInterceptorWithErrors() throws
PulsarClientException {
+ ConsumerInterceptor<String> interceptor = new
ConsumerInterceptor<String>() {
+ @Override
+ public void close() {
+ throw new AbstractMethodError();
+ }
+
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer,
Message<String> message) {
+ throw new AbstractMethodError();
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId
messageId, Throwable exception) {
+ throw new AbstractMethodError();
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer,
MessageId messageId, Throwable exception) {
+ throw new AbstractMethodError();
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ throw new AbstractMethodError();
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<String> consumer,
Set<MessageId> messageIds) {
+ throw new AbstractMethodError();
+ }
+ };
+ Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic-exception")
+ .subscriptionType(SubscriptionType.Shared)
+ .intercept(interceptor)
+ .subscriptionName("my-subscription-ack-timeout")
+ .ackTimeout(3, TimeUnit.SECONDS)
+ .subscribe();
+
+ Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic-exception")
+ .subscriptionType(SubscriptionType.Shared)
+ .intercept(interceptor)
+ .subscriptionName("my-subscription-negative")
+ .subscribe();
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic-exception")
+ .create();
+
+ producer.newMessage().value("Hello Pulsar!").send();
+
+ Message<String> received = consumer1.receive();
+ Assert.assertEquals(received.getValue(), "Hello Pulsar!");
+ // wait ack timeout
+ Message<String> receivedAgain = consumer1.receive();
+ Assert.assertEquals(receivedAgain.getValue(), "Hello Pulsar!");
+ consumer1.acknowledge(receivedAgain);
+
+ received = consumer2.receive();
+ Assert.assertEquals(received.getValue(), "Hello Pulsar!");
+ consumer2.negativeAcknowledge(received);
+ receivedAgain = consumer2.receive();
+ Assert.assertEquals(receivedAgain.getValue(), "Hello Pulsar!");
+ consumer2.acknowledge(receivedAgain);
+
+ producer.close();
+ consumer1.close();
+ consumer2.close();
+ }
+
+ @Test
public void testConsumerInterceptorWithSingleTopicSubscribe() throws
PulsarClientException {
ConsumerInterceptor<String> interceptor = new
ConsumerInterceptor<String>() {
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
index e5bf834..91b3228 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
@@ -66,7 +66,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i <
interceptorsSize; i++) {
try {
interceptorMessage =
interceptors.get(i).beforeConsume(consumer, interceptorMessage);
- } catch (Exception e) {
+ } catch (Throwable e) {
if (consumer != null) {
log.warn("Error executing interceptor beforeConsume
callback topic: {} consumerName: {}", consumer.getTopic(),
consumer.getConsumerName(), e);
} else {
@@ -92,7 +92,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i <
interceptorsSize; i++) {
try {
interceptors.get(i).onAcknowledge(consumer, messageId,
exception);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.warn("Error executing interceptor onAcknowledge callback
", e);
}
}
@@ -113,7 +113,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i <
interceptorsSize; i++) {
try {
interceptors.get(i).onAcknowledgeCumulative(consumer,
messageId, exception);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.warn("Error executing interceptor onAcknowledgeCumulative
callback ", e);
}
}
@@ -134,7 +134,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i <
interceptorsSize; i++) {
try {
interceptors.get(i).onNegativeAcksSend(consumer, messageIds);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.warn("Error executing interceptor onNegativeAcksSend
callback", e);
}
}
@@ -155,7 +155,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i <
interceptorsSize; i++) {
try {
interceptors.get(i).onAckTimeoutSend(consumer, messageIds);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.warn("Error executing interceptor onAckTimeoutSend
callback", e);
}
}
@@ -166,7 +166,7 @@ public class ConsumerInterceptors<T> implements Closeable {
for (int i = 0, interceptorsSize = interceptors.size(); i <
interceptorsSize; i++) {
try {
interceptors.get(i).close();
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error("Fail to close consumer interceptor ", e);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
index c70e4a3..68fa54b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
@@ -63,7 +63,7 @@ public class ProducerInterceptors<T> implements Closeable {
for (int i = 0; i < interceptors.size(); i++) {
try {
interceptorMessage = interceptors.get(i).beforeSend(producer,
interceptorMessage);
- } catch (Exception e) {
+ } catch (Throwable e) {
if (producer != null) {
log.warn("Error executing interceptor beforeSend callback
for topicName:{} ", producer.getTopic(), e);
} else {
@@ -91,7 +91,7 @@ public class ProducerInterceptors<T> implements Closeable {
for (int i = 0; i < interceptors.size(); i++) {
try {
interceptors.get(i).onSendAcknowledgement(producer, message,
msgId, exception);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.warn("Error executing interceptor onSendAcknowledgement
callback ", e);
}
}
@@ -102,7 +102,7 @@ public class ProducerInterceptors<T> implements Closeable {
for (int i = 0; i < interceptors.size(); i++) {
try {
interceptors.get(i).close();
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error("Fail to close producer interceptor ", e);
}
}