merlimat closed pull request #1472: Pulsar Proxy - Added Prometheus metrics for
throttled connections and…
URL: https://github.com/apache/incubator-pulsar/pull/1472
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index aad42dfaa..bbf1c4425 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -38,6 +38,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
public class LookupProxyHandler {
+ private final String throttlingErrorMessage = "Too many concurrent lookup
and partitionsMetadata requests";
private final ProxyService service;
private final ProxyConnection proxyConnection;
private final boolean connectWithTLS;
@@ -52,6 +53,14 @@
.build("pulsar_proxy_partitions_metadata_requests", "Counter of
partitions metadata requests").create()
.register();
+ static final Counter rejectedLookupRequests =
Counter.build("pulsar_proxy_rejected_lookup_requests",
+ "Counter of topic lookup requests rejected due to
throttling").create().register();
+
+ static final Counter rejectedPartitionsMetadataRequests = Counter
+ .build("pulsar_proxy_rejected_partitions_metadata_requests",
+ "Counter of partitions metadata requests rejected due to
throttling")
+ .create().register();
+
public LookupProxyHandler(ProxyService proxy, ProxyConnection
proxyConnection) {
this.service = proxy;
this.proxyConnection = proxyConnection;
@@ -89,11 +98,13 @@ public void handleLookup(CommandLookupTopic lookup) {
performLookup(clientRequestId, topic, serviceUrl, false, 10);
this.service.getLookupRequestSemaphore().release();
} else {
+ rejectedLookupRequests.inc();
if (log.isDebugEnabled()) {
- log.debug("Request ID {} from {} rejected - Too many
concurrent lookup requests.", clientRequestId, clientAddress);
+ log.debug("Lookup Request ID {} from {} rejected - {}.",
clientRequestId, clientAddress,
+ throttlingErrorMessage);
}
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
- "Too many concurrent lookup requests", clientRequestId));
+ throttlingErrorMessage, clientRequestId));
}
}
@@ -137,7 +148,6 @@ private void performLookup(long clientRequestId, String
topic, String brokerServ
performLookup(clientRequestId, topic, brokerUrl,
result.authoritative, numberOfRetries - 1);
} else {
// Reply the same address for both TLS non-TLS. The reason
is that whether we use TLS
- // between proxy
// and broker is independent of whether the client itself
uses TLS, but we need to force the
// client
// to use the appropriate target broker (and port) when it
will connect back.
@@ -168,8 +178,13 @@ public void
handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part
handlePartitionMetadataResponse(partitionMetadata,
clientRequestId);
this.service.getLookupRequestSemaphore().release();
} else {
+ rejectedPartitionsMetadataRequests.inc();
+ if (log.isDebugEnabled()) {
+ log.debug("PartitionMetaData Request ID {} from {} rejected -
{}.", clientRequestId, clientAddress,
+ throttlingErrorMessage);
+ }
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
- "Too many concurrent lookup requests", clientRequestId));
+ throttlingErrorMessage, clientRequestId));
}
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index e65461a3b..d2c7f9709 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -40,8 +40,6 @@
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandler;
-import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@@ -87,6 +85,10 @@
.build("pulsar_proxy_new_connections", "Counter of connections
being opened in the proxy").create()
.register();
+ static final Counter rejectedConnections = Counter
+ .build("pulsar_proxy_rejected_connections", "Counter for
connections rejected due to throttling").create()
+ .register();
+
public ProxyConnection(ProxyService proxyService) {
super(30, TimeUnit.SECONDS);
this.service = proxyService;
@@ -98,10 +100,8 @@ public void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
super.channelRegistered(ctx);
activeConnections.inc();
if (activeConnections.get() >
service.getConfiguration().getMaxConcurrentInboundConnections()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Too many connection opened {}", remoteAddress,
activeConnections.get());
- }
ctx.close();
+ rejectedConnections.inc();
return;
}
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 008e751e3..21c88c6cc 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -74,6 +74,7 @@ public void testInboundConnection() throws Exception {
PulsarClient client2 =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort())
.build();
Producer<byte[]> producer2;
+ Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
try {
producer2 =
client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
producer2.send("Message 1".getBytes());
@@ -81,6 +82,7 @@ public void testInboundConnection() throws Exception {
} catch (Exception ex) {
// OK
}
+ Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 1.0d);
}
private static final Logger LOG =
LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index c661caea4..4411f8097 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -78,7 +78,7 @@ public void testLookup() throws Exception {
} catch (Exception ex) {
// Ignore
}
-
+
Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(),
1.0d);
proxyService.getLookupRequestSemaphore().release();
try {
Producer<byte[]> producer3 =
client.newProducer().topic("persistent://sample/test/local/producer-topic")
@@ -86,6 +86,7 @@ public void testLookup() throws Exception {
} catch (Exception ex) {
Assert.fail("Should not have failed since can acquire
LookupRequestSemaphore");
}
+
Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(),
1.0d);
client.close();
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services