merlimat closed pull request #1472: Pulsar Proxy - Added Prometheus metrics for 
throttled connections and…
URL: https://github.com/apache/incubator-pulsar/pull/1472
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index aad42dfaa..bbf1c4425 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -38,6 +38,7 @@
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
 public class LookupProxyHandler {
+    private final String throttlingErrorMessage = "Too many concurrent lookup 
and partitionsMetadata requests";
     private final ProxyService service;
     private final ProxyConnection proxyConnection;
     private final boolean connectWithTLS;
@@ -52,6 +53,14 @@
             .build("pulsar_proxy_partitions_metadata_requests", "Counter of 
partitions metadata requests").create()
             .register();
 
+    static final Counter rejectedLookupRequests = 
Counter.build("pulsar_proxy_rejected_lookup_requests",
+            "Counter of topic lookup requests rejected due to 
throttling").create().register();
+
+    static final Counter rejectedPartitionsMetadataRequests = Counter
+            .build("pulsar_proxy_rejected_partitions_metadata_requests",
+                    "Counter of partitions metadata requests rejected due to 
throttling")
+            .create().register();
+
     public LookupProxyHandler(ProxyService proxy, ProxyConnection 
proxyConnection) {
         this.service = proxy;
         this.proxyConnection = proxyConnection;
@@ -89,11 +98,13 @@ public void handleLookup(CommandLookupTopic lookup) {
             performLookup(clientRequestId, topic, serviceUrl, false, 10);
             this.service.getLookupRequestSemaphore().release();
         } else {
+            rejectedLookupRequests.inc();
             if (log.isDebugEnabled()) {
-                log.debug("Request ID {} from {} rejected - Too many 
concurrent lookup requests.", clientRequestId, clientAddress);
+                log.debug("Lookup Request ID {} from {} rejected - {}.", 
clientRequestId, clientAddress,
+                        throttlingErrorMessage);
             }
             
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
-                    "Too many concurrent lookup requests", clientRequestId));
+                    throttlingErrorMessage, clientRequestId));
         }
 
     }
@@ -137,7 +148,6 @@ private void performLookup(long clientRequestId, String 
topic, String brokerServ
                     performLookup(clientRequestId, topic, brokerUrl, 
result.authoritative, numberOfRetries - 1);
                 } else {
                     // Reply the same address for both TLS non-TLS. The reason 
is that whether we use TLS
-                    // between proxy
                     // and broker is independent of whether the client itself 
uses TLS, but we need to force the
                     // client
                     // to use the appropriate target broker (and port) when it 
will connect back.
@@ -168,8 +178,13 @@ public void 
handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part
             handlePartitionMetadataResponse(partitionMetadata, 
clientRequestId);
             this.service.getLookupRequestSemaphore().release();
         } else {
+            rejectedPartitionsMetadataRequests.inc();
+            if (log.isDebugEnabled()) {
+                log.debug("PartitionMetaData Request ID {} from {} rejected - 
{}.", clientRequestId, clientAddress,
+                        throttlingErrorMessage);
+            }
             
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
-                    "Too many concurrent lookup requests", clientRequestId));
+                    throttlingErrorMessage, clientRequestId));
         }
     }
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index e65461a3b..d2c7f9709 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -40,8 +40,6 @@
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandler;
-import io.netty.channel.ChannelPipeline;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -87,6 +85,10 @@
             .build("pulsar_proxy_new_connections", "Counter of connections 
being opened in the proxy").create()
             .register();
 
+    static final Counter rejectedConnections = Counter
+            .build("pulsar_proxy_rejected_connections", "Counter for 
connections rejected due to throttling").create()
+            .register();
+    
     public ProxyConnection(ProxyService proxyService) {
         super(30, TimeUnit.SECONDS);
         this.service = proxyService;
@@ -98,10 +100,8 @@ public void channelRegistered(ChannelHandlerContext ctx) 
throws Exception {
         super.channelRegistered(ctx);
         activeConnections.inc();
         if (activeConnections.get() > 
service.getConfiguration().getMaxConcurrentInboundConnections()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("[{}] Too many connection opened {}", remoteAddress, 
activeConnections.get());
-            }
             ctx.close();
+            rejectedConnections.inc();
             return;
         }
     }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 008e751e3..21c88c6cc 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -74,6 +74,7 @@ public void testInboundConnection() throws Exception {
         PulsarClient client2 = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
proxyConfig.getServicePort())
                 .build();
         Producer<byte[]> producer2;
+        Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
         try {
             producer2 = 
client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
             producer2.send("Message 1".getBytes());
@@ -81,6 +82,7 @@ public void testInboundConnection() throws Exception {
         } catch (Exception ex) {
             // OK
         }
+        Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 1.0d);
     }
     
     private static final Logger LOG = 
LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index c661caea4..4411f8097 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -78,7 +78,7 @@ public void testLookup() throws Exception {
         } catch (Exception ex) {
             // Ignore
         }
-
+        
Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(),
 1.0d);
         proxyService.getLookupRequestSemaphore().release();
         try {
             Producer<byte[]> producer3 = 
client.newProducer().topic("persistent://sample/test/local/producer-topic")
@@ -86,6 +86,7 @@ public void testLookup() throws Exception {
         } catch (Exception ex) {
             Assert.fail("Should not have failed since can acquire 
LookupRequestSemaphore");
         }
+        
Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(),
 1.0d);
         client.close();
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to