This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f4d6c2b Pulsar Proxy - Added Prometheus metrics for throttled
connections and lookups. (#1472)
f4d6c2b is described below
commit f4d6c2bf450c90c4f2e291607dc3f887adc4a1a5
Author: Jai Asher <[email protected]>
AuthorDate: Thu Mar 29 15:54:10 2018 -0700
Pulsar Proxy - Added Prometheus metrics for throttled connections and
lookups. (#1472)
---
.../pulsar/proxy/server/LookupProxyHandler.java | 23 ++++++++++++++++++----
.../pulsar/proxy/server/ProxyConnection.java | 10 +++++-----
.../server/ProxyConnectionThrottlingTest.java | 2 ++
.../proxy/server/ProxyLookupThrottlingTest.java | 3 ++-
4 files changed, 28 insertions(+), 10 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index aad42df..bbf1c44 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -38,6 +38,7 @@ import io.prometheus.client.Counter;
import static org.apache.commons.lang3.StringUtils.isBlank;
public class LookupProxyHandler {
+ private final String throttlingErrorMessage = "Too many concurrent lookup
and partitionsMetadata requests";
private final ProxyService service;
private final ProxyConnection proxyConnection;
private final boolean connectWithTLS;
@@ -52,6 +53,14 @@ public class LookupProxyHandler {
.build("pulsar_proxy_partitions_metadata_requests", "Counter of
partitions metadata requests").create()
.register();
+ static final Counter rejectedLookupRequests =
Counter.build("pulsar_proxy_rejected_lookup_requests",
+ "Counter of topic lookup requests rejected due to
throttling").create().register();
+
+ static final Counter rejectedPartitionsMetadataRequests = Counter
+ .build("pulsar_proxy_rejected_partitions_metadata_requests",
+ "Counter of partitions metadata requests rejected due to
throttling")
+ .create().register();
+
public LookupProxyHandler(ProxyService proxy, ProxyConnection
proxyConnection) {
this.service = proxy;
this.proxyConnection = proxyConnection;
@@ -89,11 +98,13 @@ public class LookupProxyHandler {
performLookup(clientRequestId, topic, serviceUrl, false, 10);
this.service.getLookupRequestSemaphore().release();
} else {
+ rejectedLookupRequests.inc();
if (log.isDebugEnabled()) {
- log.debug("Request ID {} from {} rejected - Too many
concurrent lookup requests.", clientRequestId, clientAddress);
+ log.debug("Lookup Request ID {} from {} rejected - {}.",
clientRequestId, clientAddress,
+ throttlingErrorMessage);
}
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
- "Too many concurrent lookup requests", clientRequestId));
+ throttlingErrorMessage, clientRequestId));
}
}
@@ -137,7 +148,6 @@ public class LookupProxyHandler {
performLookup(clientRequestId, topic, brokerUrl,
result.authoritative, numberOfRetries - 1);
} else {
// Reply the same address for both TLS non-TLS. The reason
is that whether we use TLS
- // between proxy
// and broker is independent of whether the client itself
uses TLS, but we need to force the
// client
// to use the appropriate target broker (and port) when it
will connect back.
@@ -168,8 +178,13 @@ public class LookupProxyHandler {
handlePartitionMetadataResponse(partitionMetadata,
clientRequestId);
this.service.getLookupRequestSemaphore().release();
} else {
+ rejectedPartitionsMetadataRequests.inc();
+ if (log.isDebugEnabled()) {
+ log.debug("PartitionMetaData Request ID {} from {} rejected -
{}.", clientRequestId, clientAddress,
+ throttlingErrorMessage);
+ }
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
- "Too many concurrent lookup requests", clientRequestId));
+ throttlingErrorMessage, clientRequestId));
}
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index e65461a..d2c7f97 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -40,8 +40,6 @@ import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandler;
-import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@@ -87,6 +85,10 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
.build("pulsar_proxy_new_connections", "Counter of connections
being opened in the proxy").create()
.register();
+ static final Counter rejectedConnections = Counter
+ .build("pulsar_proxy_rejected_connections", "Counter for
connections rejected due to throttling").create()
+ .register();
+
public ProxyConnection(ProxyService proxyService) {
super(30, TimeUnit.SECONDS);
this.service = proxyService;
@@ -98,10 +100,8 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
super.channelRegistered(ctx);
activeConnections.inc();
if (activeConnections.get() >
service.getConfiguration().getMaxConcurrentInboundConnections()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Too many connection opened {}", remoteAddress,
activeConnections.get());
- }
ctx.close();
+ rejectedConnections.inc();
return;
}
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 008e751..21c88c6 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -74,6 +74,7 @@ public class ProxyConnectionThrottlingTest extends
MockedPulsarServiceBaseTest {
PulsarClient client2 =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
proxyConfig.getServicePort())
.build();
Producer<byte[]> producer2;
+ Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
try {
producer2 =
client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
producer2.send("Message 1".getBytes());
@@ -81,6 +82,7 @@ public class ProxyConnectionThrottlingTest extends
MockedPulsarServiceBaseTest {
} catch (Exception ex) {
// OK
}
+ Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 1.0d);
}
private static final Logger LOG =
LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index c661cae..4411f80 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -78,7 +78,7 @@ public class ProxyLookupThrottlingTest extends
MockedPulsarServiceBaseTest {
} catch (Exception ex) {
// Ignore
}
-
+
Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(),
1.0d);
proxyService.getLookupRequestSemaphore().release();
try {
Producer<byte[]> producer3 =
client.newProducer().topic("persistent://sample/test/local/producer-topic")
@@ -86,6 +86,7 @@ public class ProxyLookupThrottlingTest extends
MockedPulsarServiceBaseTest {
} catch (Exception ex) {
Assert.fail("Should not have failed since can acquire
LookupRequestSemaphore");
}
+
Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(),
1.0d);
client.close();
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].