This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d1fc6d5f3b [fix][broker] catch exception for brokerInterceptor 
(#19147)
5d1fc6d5f3b is described below

commit 5d1fc6d5f3b5b68e9760a572172cf163a1e9785a
Author: AloysZhang <[email protected]>
AuthorDate: Tue Feb 14 16:15:30 2023 +0800

    [fix][broker] catch exception for brokerInterceptor (#19147)
---
 .../BrokerInterceptorWithClassLoader.java          |   6 ++
 .../broker/intercept/BrokerInterceptors.java       |   6 ++
 .../apache/pulsar/broker/service/ServerCnx.java    |  22 ++--
 .../intercept/ExceptionsBrokerInterceptor.java     | 102 ++++++++++++++++++
 .../intercept/ExceptionsBrokerInterceptorTest.java | 117 +++++++++++++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  20 ++++
 6 files changed, 267 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index a74730d23e1..faee5799289 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.intercept;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.Map;
@@ -208,4 +209,9 @@ public class BrokerInterceptorWithClassLoader implements 
BrokerInterceptor {
             log.warn("Failed to close the broker interceptor class loader", e);
         }
     }
+
+    @VisibleForTesting
+    public BrokerInterceptor getInterceptor() {
+        return interceptor;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
index e7f82742a97..cef3f0eb609 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.intercept;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
@@ -277,4 +278,9 @@ public class BrokerInterceptors implements 
BrokerInterceptor {
     private boolean interceptorsEnabled() {
         return interceptors != null && !interceptors.isEmpty();
     }
+
+    @VisibleForTesting
+    public Map<String, BrokerInterceptorWithClassLoader> getInterceptors() {
+        return interceptors;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1351c6fe715..4c81b46601e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1185,7 +1185,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                         remoteAddress, topicName, 
subscriptionName);
                                 commandSender.sendSuccessResponse(requestId);
                                 if (brokerInterceptor != null) {
-                                    brokerInterceptor.consumerCreated(this, 
consumer, metadata);
+                                    try {
+                                        
brokerInterceptor.consumerCreated(this, consumer, metadata);
+                                    } catch (Throwable t) {
+                                        log.error("Exception occur when 
intercept consumer created.", t);
+                                    }
                                 }
                             } else {
                                 // The consumer future was completed before by 
a close command
@@ -1223,8 +1227,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             }
 
                             // If client timed out, the future would have been 
completed by subsequent close.
-                            // Send error
-                            // back to client, only if not completed already.
+                            // Send error back to client, only if not 
completed already.
                             if 
(consumerFuture.completeExceptionally(exception)) {
                                 commandSender.sendErrorResponse(requestId,
                                         
BrokerServiceException.getClientErrorCode(exception),
@@ -1521,8 +1524,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             producer.getLastSequenceId(), 
producer.getSchemaVersion(),
                             newTopicEpoch, true /* producer is ready now */);
                     if (brokerInterceptor != null) {
-                        brokerInterceptor.
-                                producerCreated(this, producer, metadata);
+                        try {
+                            brokerInterceptor.producerCreated(this, producer, 
metadata);
+                        } catch (Throwable t) {
+                            log.error("Exception occur when intercept producer 
created.", t);
+                        }
                     }
                     return;
                 } else {
@@ -1689,7 +1695,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             requestId, null, null, consumerId));
                 }
                 if (brokerInterceptor != null) {
-                    brokerInterceptor.messageAcked(this, consumer, 
copyOfAckForInterceptor);
+                    try {
+                        brokerInterceptor.messageAcked(this, consumer, 
copyOfAckForInterceptor);
+                    } catch (Throwable t) {
+                        log.error("Exception occur when intercept message 
acked.", t);
+                    }
                 }
             }).exceptionally(e -> {
                 if (hasRequestId) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptor.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptor.java
new file mode 100644
index 00000000000..f58d56c05a9
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Producer;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.CommandAck;
+import org.apache.pulsar.common.intercept.InterceptException;
+
+public class ExceptionsBrokerInterceptor implements BrokerInterceptor {
+
+
+    private AtomicInteger producerCount = new AtomicInteger();
+    private AtomicInteger consumerCount = new AtomicInteger();
+    private AtomicInteger messageAckCount = new AtomicInteger();
+
+    public AtomicInteger getProducerCount() {
+        return producerCount;
+    }
+
+    public AtomicInteger getConsumerCount() {
+        return consumerCount;
+    }
+
+    public AtomicInteger getMessageAckCount() {
+        return messageAckCount;
+    }
+
+    @Override
+    public void producerCreated(ServerCnx cnx, Producer producer, Map<String, 
String> metadata) {
+        producerCount.incrementAndGet();
+        throw new RuntimeException("exception when intercept producer 
created");
+    }
+
+    @Override
+    public void consumerCreated(ServerCnx cnx, Consumer consumer, Map<String, 
String> metadata) {
+        consumerCount.incrementAndGet();
+        throw new RuntimeException("exception when intercept consumer 
created");
+    }
+
+    @Override
+    public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck 
ackCmd) {
+        messageAckCount.incrementAndGet();
+        throw new RuntimeException("exception when intercept consumer ack 
message");
+    }
+
+    @Override
+    public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws 
InterceptException {
+
+    }
+
+    @Override
+    public void onConnectionClosed(ServerCnx cnx) {
+
+    }
+
+    @Override
+    public void onWebserviceRequest(ServletRequest request) throws 
IOException, ServletException, InterceptException {
+
+    }
+
+    @Override
+    public void onWebserviceResponse(ServletRequest request, ServletResponse 
response)
+            throws IOException, ServletException {
+
+    }
+
+    @Override
+    public void initialize(PulsarService pulsarService) throws Exception {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptorTest.java
new file mode 100644
index 00000000000..aa254a8ac16
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.awaitility.Awaitility;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ExceptionsBrokerInterceptorTest extends ProducerConsumerBase {
+
+    private String interceptorName = "exception_interceptor";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        this.conf.setDisableBrokerInterceptors(false);
+
+
+        this.enableBrokerInterceptor = true;
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder 
pulsarTestContextBuilder) {
+        Map<String, BrokerInterceptorWithClassLoader> listenerMap = new 
HashMap<>();
+        BrokerInterceptor interceptor = new ExceptionsBrokerInterceptor();
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        listenerMap.put(interceptorName, new 
BrokerInterceptorWithClassLoader(interceptor, narClassLoader));
+        pulsarTestContextBuilder.brokerInterceptor(new 
BrokerInterceptors(listenerMap));
+    }
+
+    @Test
+    public void testMessageAckedExceptions() throws Exception {
+        String topic = "persistent://public/default/test";
+        String subName = "test-sub";
+        int messageNumber = 10;
+        admin.topics().createNonPartitionedTopic(topic);
+
+        BrokerInterceptors listener = (BrokerInterceptors) 
pulsar.getBrokerInterceptor();
+        assertNotNull(listener);
+        BrokerInterceptorWithClassLoader brokerInterceptor = 
listener.getInterceptors().get(interceptorName);
+        assertNotNull(brokerInterceptor);
+        BrokerInterceptor interceptor = brokerInterceptor.getInterceptor();
+        assertTrue(interceptor instanceof ExceptionsBrokerInterceptor);
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+
+        ConsumerImpl consumer = (ConsumerImpl) pulsarClient
+                .newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
+                .isAckReceiptEnabled(true)
+                .subscribe();
+
+        Awaitility.await().until(() -> ((ExceptionsBrokerInterceptor) 
interceptor).getProducerCount().get() == 1);
+        Awaitility.await().until(() -> ((ExceptionsBrokerInterceptor) 
interceptor).getConsumerCount().get() == 1);
+
+        for (int i = 0; i < messageNumber; i ++) {
+            producer.send("test".getBytes(StandardCharsets.UTF_8));
+        }
+
+        int receiveCounter = 0;
+        Message message;
+        while((message = consumer.receive(3, TimeUnit.SECONDS)) != null) {
+            receiveCounter ++;
+            consumer.acknowledge(message);
+        }
+        assertEquals(receiveCounter, 10);
+        Awaitility.await().until(()
+                -> ((ExceptionsBrokerInterceptor) 
interceptor).getMessageAckCount().get() == messageNumber);
+
+        ClientCnx clientCnx = consumer.getClientCnx();
+        // no duplicated responses received from broker
+        assertEquals(clientCnx.getDuplicatedResponseCount(), 0);
+    }
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index f2ebb12f957..5074d0f55ef 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.AccessLevel;
 import lombok.Getter;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -115,6 +116,8 @@ public class ClientCnx extends PulsarHandler {
     protected final Authentication authentication;
     protected State state;
 
+    private AtomicLong duplicatedResponseCounter = new AtomicLong(0);
+
     @Getter
     private final ConcurrentLongHashMap<TimedCompletableFuture<? extends 
Object>> pendingRequests =
             ConcurrentLongHashMap.<TimedCompletableFuture<? extends 
Object>>newBuilder()
@@ -352,6 +355,11 @@ public class ClientCnx extends PulsarHandler {
         return t instanceof NativeIoException || t instanceof 
ClosedChannelException;
     }
 
+    @VisibleForTesting
+    public long getDuplicatedResponseCount() {
+        return duplicatedResponseCounter.get();
+    }
+
     @Override
     protected void handleConnected(CommandConnected connected) {
         checkArgument(state == State.SentConnectFrame || state == 
State.Connecting);
@@ -475,6 +483,7 @@ public class ClientCnx extends PulsarHandler {
                                                  
buildError(ackResponse.getRequestId(), ackResponse.getMessage())));
             }
         } else {
+            duplicatedResponseCounter.incrementAndGet();
             log.warn("AckResponse has complete when receive response! 
requestId : {}, consumerId : {}",
                     ackResponse.getRequestId(), ackResponse.hasConsumerId());
         }
@@ -519,6 +528,7 @@ public class ClientCnx extends PulsarHandler {
         if (requestFuture != null) {
             requestFuture.complete(null);
         } else {
+            duplicatedResponseCounter.incrementAndGet();
             log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), success.getRequestId());
         }
     }
@@ -537,6 +547,7 @@ public class ClientCnx extends PulsarHandler {
         if (requestFuture != null) {
             requestFuture.complete(new 
CommandGetLastMessageIdResponse().copyFrom(success));
         } else {
+            duplicatedResponseCounter.incrementAndGet();
             log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), success.getRequestId());
         }
     }
@@ -572,6 +583,7 @@ public class ClientCnx extends PulsarHandler {
                     success.hasTopicEpoch() ? 
Optional.of(success.getTopicEpoch()) : Optional.empty());
             requestFuture.complete(pr);
         } else {
+            duplicatedResponseCounter.incrementAndGet();
             log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), success.getRequestId());
         }
     }
@@ -719,6 +731,8 @@ public class ClientCnx extends PulsarHandler {
             } else {
                 pendingLookupRequestSemaphore.release();
             }
+        } else {
+            duplicatedResponseCounter.incrementAndGet();
         }
         return result;
     }
@@ -775,6 +789,7 @@ public class ClientCnx extends PulsarHandler {
                     getPulsarClientException(error.getError(),
                                              buildError(error.getRequestId(), 
error.getMessage())));
         } else {
+            duplicatedResponseCounter.incrementAndGet();
             log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), error.getRequestId());
         }
     }
@@ -882,6 +897,7 @@ public class ClientCnx extends PulsarHandler {
                     success.isFiltered(),
                     success.isChanged()));
         } else {
+            duplicatedResponseCounter.incrementAndGet();
             log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), success.getRequestId());
         }
     }
@@ -895,6 +911,7 @@ public class ClientCnx extends PulsarHandler {
         CompletableFuture<CommandGetSchemaResponse> future =
                 (CompletableFuture<CommandGetSchemaResponse>) 
pendingRequests.remove(requestId);
         if (future == null) {
+            duplicatedResponseCounter.incrementAndGet();
             log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), requestId);
             return;
         }
@@ -908,6 +925,7 @@ public class ClientCnx extends PulsarHandler {
         CompletableFuture<CommandGetOrCreateSchemaResponse> future =
                 (CompletableFuture<CommandGetOrCreateSchemaResponse>) 
pendingRequests.remove(requestId);
         if (future == null) {
+            duplicatedResponseCounter.incrementAndGet();
             log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), requestId);
             return;
         }
@@ -1080,6 +1098,7 @@ public class ClientCnx extends PulsarHandler {
                 
requestFuture.completeExceptionally(getExceptionByServerError(error, 
response.getMessage()));
             }
         } else {
+            duplicatedResponseCounter.incrementAndGet();
             log.warn("Tc client connect command has been completed and get 
response for request: {}",
                     response.getRequestId());
         }
@@ -1133,6 +1152,7 @@ public class ClientCnx extends PulsarHandler {
         if (requestFuture != null) {
             requestFuture.complete(commandWatchTopicListSuccess);
         } else {
+            duplicatedResponseCounter.incrementAndGet();
             log.warn("{} Received unknown request id from server: {}",
                     ctx.channel(), 
commandWatchTopicListSuccess.getRequestId());
         }

Reply via email to