This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5d1fc6d5f3b [fix][broker] catch exception for brokerInterceptor
(#19147)
5d1fc6d5f3b is described below
commit 5d1fc6d5f3b5b68e9760a572172cf163a1e9785a
Author: AloysZhang <[email protected]>
AuthorDate: Tue Feb 14 16:15:30 2023 +0800
[fix][broker] catch exception for brokerInterceptor (#19147)
---
.../BrokerInterceptorWithClassLoader.java | 6 ++
.../broker/intercept/BrokerInterceptors.java | 6 ++
.../apache/pulsar/broker/service/ServerCnx.java | 22 ++--
.../intercept/ExceptionsBrokerInterceptor.java | 102 ++++++++++++++++++
.../intercept/ExceptionsBrokerInterceptorTest.java | 117 +++++++++++++++++++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 20 ++++
6 files changed, 267 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index a74730d23e1..faee5799289 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.intercept;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Map;
@@ -208,4 +209,9 @@ public class BrokerInterceptorWithClassLoader implements
BrokerInterceptor {
log.warn("Failed to close the broker interceptor class loader", e);
}
}
+
+ @VisibleForTesting
+ public BrokerInterceptor getInterceptor() {
+ return interceptor;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
index e7f82742a97..cef3f0eb609 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.intercept;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
@@ -277,4 +278,9 @@ public class BrokerInterceptors implements
BrokerInterceptor {
private boolean interceptorsEnabled() {
return interceptors != null && !interceptors.isEmpty();
}
+
+ @VisibleForTesting
+ public Map<String, BrokerInterceptorWithClassLoader> getInterceptors() {
+ return interceptors;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1351c6fe715..4c81b46601e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1185,7 +1185,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
remoteAddress, topicName,
subscriptionName);
commandSender.sendSuccessResponse(requestId);
if (brokerInterceptor != null) {
- brokerInterceptor.consumerCreated(this,
consumer, metadata);
+ try {
+
brokerInterceptor.consumerCreated(this, consumer, metadata);
+ } catch (Throwable t) {
+ log.error("Exception occur when
intercept consumer created.", t);
+ }
}
} else {
// The consumer future was completed before by
a close command
@@ -1223,8 +1227,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
// If client timed out, the future would have been
completed by subsequent close.
- // Send error
- // back to client, only if not completed already.
+ // Send error back to client, only if not
completed already.
if
(consumerFuture.completeExceptionally(exception)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(exception),
@@ -1521,8 +1524,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
producer.getLastSequenceId(),
producer.getSchemaVersion(),
newTopicEpoch, true /* producer is ready now */);
if (brokerInterceptor != null) {
- brokerInterceptor.
- producerCreated(this, producer, metadata);
+ try {
+ brokerInterceptor.producerCreated(this, producer,
metadata);
+ } catch (Throwable t) {
+ log.error("Exception occur when intercept producer
created.", t);
+ }
}
return;
} else {
@@ -1689,7 +1695,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
requestId, null, null, consumerId));
}
if (brokerInterceptor != null) {
- brokerInterceptor.messageAcked(this, consumer,
copyOfAckForInterceptor);
+ try {
+ brokerInterceptor.messageAcked(this, consumer,
copyOfAckForInterceptor);
+ } catch (Throwable t) {
+ log.error("Exception occur when intercept message
acked.", t);
+ }
}
}).exceptionally(e -> {
if (hasRequestId) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptor.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptor.java
new file mode 100644
index 00000000000..f58d56c05a9
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Producer;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.CommandAck;
+import org.apache.pulsar.common.intercept.InterceptException;
+
+public class ExceptionsBrokerInterceptor implements BrokerInterceptor {
+
+
+ private AtomicInteger producerCount = new AtomicInteger();
+ private AtomicInteger consumerCount = new AtomicInteger();
+ private AtomicInteger messageAckCount = new AtomicInteger();
+
+ public AtomicInteger getProducerCount() {
+ return producerCount;
+ }
+
+ public AtomicInteger getConsumerCount() {
+ return consumerCount;
+ }
+
+ public AtomicInteger getMessageAckCount() {
+ return messageAckCount;
+ }
+
+ @Override
+ public void producerCreated(ServerCnx cnx, Producer producer, Map<String,
String> metadata) {
+ producerCount.incrementAndGet();
+ throw new RuntimeException("exception when intercept producer
created");
+ }
+
+ @Override
+ public void consumerCreated(ServerCnx cnx, Consumer consumer, Map<String,
String> metadata) {
+ consumerCount.incrementAndGet();
+ throw new RuntimeException("exception when intercept consumer
created");
+ }
+
+ @Override
+ public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck
ackCmd) {
+ messageAckCount.incrementAndGet();
+ throw new RuntimeException("exception when intercept consumer ack
message");
+ }
+
+ @Override
+ public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws
InterceptException {
+
+ }
+
+ @Override
+ public void onConnectionClosed(ServerCnx cnx) {
+
+ }
+
+ @Override
+ public void onWebserviceRequest(ServletRequest request) throws
IOException, ServletException, InterceptException {
+
+ }
+
+ @Override
+ public void onWebserviceResponse(ServletRequest request, ServletResponse
response)
+ throws IOException, ServletException {
+
+ }
+
+ @Override
+ public void initialize(PulsarService pulsarService) throws Exception {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptorTest.java
new file mode 100644
index 00000000000..aa254a8ac16
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.awaitility.Awaitility;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ExceptionsBrokerInterceptorTest extends ProducerConsumerBase {
+
+ private String interceptorName = "exception_interceptor";
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ conf.setSystemTopicEnabled(false);
+ conf.setTopicLevelPoliciesEnabled(false);
+ this.conf.setDisableBrokerInterceptors(false);
+
+
+ this.enableBrokerInterceptor = true;
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder
pulsarTestContextBuilder) {
+ Map<String, BrokerInterceptorWithClassLoader> listenerMap = new
HashMap<>();
+ BrokerInterceptor interceptor = new ExceptionsBrokerInterceptor();
+ NarClassLoader narClassLoader = mock(NarClassLoader.class);
+ listenerMap.put(interceptorName, new
BrokerInterceptorWithClassLoader(interceptor, narClassLoader));
+ pulsarTestContextBuilder.brokerInterceptor(new
BrokerInterceptors(listenerMap));
+ }
+
+ @Test
+ public void testMessageAckedExceptions() throws Exception {
+ String topic = "persistent://public/default/test";
+ String subName = "test-sub";
+ int messageNumber = 10;
+ admin.topics().createNonPartitionedTopic(topic);
+
+ BrokerInterceptors listener = (BrokerInterceptors)
pulsar.getBrokerInterceptor();
+ assertNotNull(listener);
+ BrokerInterceptorWithClassLoader brokerInterceptor =
listener.getInterceptors().get(interceptorName);
+ assertNotNull(brokerInterceptor);
+ BrokerInterceptor interceptor = brokerInterceptor.getInterceptor();
+ assertTrue(interceptor instanceof ExceptionsBrokerInterceptor);
+
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+
+ ConsumerImpl consumer = (ConsumerImpl) pulsarClient
+ .newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
+ .isAckReceiptEnabled(true)
+ .subscribe();
+
+ Awaitility.await().until(() -> ((ExceptionsBrokerInterceptor)
interceptor).getProducerCount().get() == 1);
+ Awaitility.await().until(() -> ((ExceptionsBrokerInterceptor)
interceptor).getConsumerCount().get() == 1);
+
+ for (int i = 0; i < messageNumber; i ++) {
+ producer.send("test".getBytes(StandardCharsets.UTF_8));
+ }
+
+ int receiveCounter = 0;
+ Message message;
+ while((message = consumer.receive(3, TimeUnit.SECONDS)) != null) {
+ receiveCounter ++;
+ consumer.acknowledge(message);
+ }
+ assertEquals(receiveCounter, 10);
+ Awaitility.await().until(()
+ -> ((ExceptionsBrokerInterceptor)
interceptor).getMessageAckCount().get() == messageNumber);
+
+ ClientCnx clientCnx = consumer.getClientCnx();
+ // no duplicated responses received from broker
+ assertEquals(clientCnx.getDuplicatedResponseCount(), 0);
+ }
+
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index f2ebb12f957..5074d0f55ef 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -115,6 +116,8 @@ public class ClientCnx extends PulsarHandler {
protected final Authentication authentication;
protected State state;
+ private AtomicLong duplicatedResponseCounter = new AtomicLong(0);
+
@Getter
private final ConcurrentLongHashMap<TimedCompletableFuture<? extends
Object>> pendingRequests =
ConcurrentLongHashMap.<TimedCompletableFuture<? extends
Object>>newBuilder()
@@ -352,6 +355,11 @@ public class ClientCnx extends PulsarHandler {
return t instanceof NativeIoException || t instanceof
ClosedChannelException;
}
+ @VisibleForTesting
+ public long getDuplicatedResponseCount() {
+ return duplicatedResponseCounter.get();
+ }
+
@Override
protected void handleConnected(CommandConnected connected) {
checkArgument(state == State.SentConnectFrame || state ==
State.Connecting);
@@ -475,6 +483,7 @@ public class ClientCnx extends PulsarHandler {
buildError(ackResponse.getRequestId(), ackResponse.getMessage())));
}
} else {
+ duplicatedResponseCounter.incrementAndGet();
log.warn("AckResponse has complete when receive response!
requestId : {}, consumerId : {}",
ackResponse.getRequestId(), ackResponse.hasConsumerId());
}
@@ -519,6 +528,7 @@ public class ClientCnx extends PulsarHandler {
if (requestFuture != null) {
requestFuture.complete(null);
} else {
+ duplicatedResponseCounter.incrementAndGet();
log.warn("{} Received unknown request id from server: {}",
ctx.channel(), success.getRequestId());
}
}
@@ -537,6 +547,7 @@ public class ClientCnx extends PulsarHandler {
if (requestFuture != null) {
requestFuture.complete(new
CommandGetLastMessageIdResponse().copyFrom(success));
} else {
+ duplicatedResponseCounter.incrementAndGet();
log.warn("{} Received unknown request id from server: {}",
ctx.channel(), success.getRequestId());
}
}
@@ -572,6 +583,7 @@ public class ClientCnx extends PulsarHandler {
success.hasTopicEpoch() ?
Optional.of(success.getTopicEpoch()) : Optional.empty());
requestFuture.complete(pr);
} else {
+ duplicatedResponseCounter.incrementAndGet();
log.warn("{} Received unknown request id from server: {}",
ctx.channel(), success.getRequestId());
}
}
@@ -719,6 +731,8 @@ public class ClientCnx extends PulsarHandler {
} else {
pendingLookupRequestSemaphore.release();
}
+ } else {
+ duplicatedResponseCounter.incrementAndGet();
}
return result;
}
@@ -775,6 +789,7 @@ public class ClientCnx extends PulsarHandler {
getPulsarClientException(error.getError(),
buildError(error.getRequestId(),
error.getMessage())));
} else {
+ duplicatedResponseCounter.incrementAndGet();
log.warn("{} Received unknown request id from server: {}",
ctx.channel(), error.getRequestId());
}
}
@@ -882,6 +897,7 @@ public class ClientCnx extends PulsarHandler {
success.isFiltered(),
success.isChanged()));
} else {
+ duplicatedResponseCounter.incrementAndGet();
log.warn("{} Received unknown request id from server: {}",
ctx.channel(), success.getRequestId());
}
}
@@ -895,6 +911,7 @@ public class ClientCnx extends PulsarHandler {
CompletableFuture<CommandGetSchemaResponse> future =
(CompletableFuture<CommandGetSchemaResponse>)
pendingRequests.remove(requestId);
if (future == null) {
+ duplicatedResponseCounter.incrementAndGet();
log.warn("{} Received unknown request id from server: {}",
ctx.channel(), requestId);
return;
}
@@ -908,6 +925,7 @@ public class ClientCnx extends PulsarHandler {
CompletableFuture<CommandGetOrCreateSchemaResponse> future =
(CompletableFuture<CommandGetOrCreateSchemaResponse>)
pendingRequests.remove(requestId);
if (future == null) {
+ duplicatedResponseCounter.incrementAndGet();
log.warn("{} Received unknown request id from server: {}",
ctx.channel(), requestId);
return;
}
@@ -1080,6 +1098,7 @@ public class ClientCnx extends PulsarHandler {
requestFuture.completeExceptionally(getExceptionByServerError(error,
response.getMessage()));
}
} else {
+ duplicatedResponseCounter.incrementAndGet();
log.warn("Tc client connect command has been completed and get
response for request: {}",
response.getRequestId());
}
@@ -1133,6 +1152,7 @@ public class ClientCnx extends PulsarHandler {
if (requestFuture != null) {
requestFuture.complete(commandWatchTopicListSuccess);
} else {
+ duplicatedResponseCounter.incrementAndGet();
log.warn("{} Received unknown request id from server: {}",
ctx.channel(),
commandWatchTopicListSuccess.getRequestId());
}