This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9a7c4bbce69 [improve] [proxy] Not close the socket if lookup failed
caused by too many requests (#21216)
9a7c4bbce69 is described below
commit 9a7c4bbce69502d91f6ca5249a617471decd501f
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 21 23:18:45 2023 +0800
[improve] [proxy] Not close the socket if lookup failed caused by too many
requests (#21216)
Motivation: The Pulsar client will close the socket if it receives a
`ServiceNotReady` error when doing a lookup. The Broker will respond to the
client with a `TooManyRequests` error if there are too many lookup requests in
progress, but the Pulsar Proxy responds to the client with a `ServiceNotReady`
error in the same scenario.
Modifications: Make Pulsar Proxy respond to the client with a
`TooManyRequests` error if there are too many lookup requests in progress.
(cherry picked from commit d6c3fa42059d96b04b4132ccc9256c3e76d26959)
---
.../pulsar/proxy/server/LookupProxyHandler.java | 2 +-
.../proxy/server/ProxyLookupThrottlingTest.java | 36 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index b62b3bacf01..f76adadcc3e 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -116,7 +116,7 @@ public class LookupProxyHandler {
log.debug("Lookup Request ID {} from {} rejected - {}.",
clientRequestId, clientAddress,
throttlingErrorMessage);
}
-
writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+
writeAndFlush(Commands.newLookupErrorResponse(ServerError.TooManyRequests,
throttlingErrorMessage, clientRequestId));
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 4861117ef6f..1b63aa14dfe 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -20,18 +20,26 @@ package org.apache.pulsar.proxy.server;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import java.util.Optional;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.BinaryProtoLookupService;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -112,4 +120,32 @@ public class ProxyLookupThrottlingTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(LookupProxyHandler.REJECTED_PARTITIONS_METADATA_REQUESTS.get(),
5.0d);
}
+
+ @Test
+ public void testLookupThrottling() throws Exception {
+ PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
+ .serviceUrl(proxyService.getServiceUrl()).build();
+ String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ LookupService lookupService = client.getLookup();
+ assertTrue(lookupService instanceof BinaryProtoLookupService);
+ ClientCnx lookupConnection =
client.getCnxPool().getConnection(lookupService.resolveHost()).join();
+
+ // Make no permits to lookup.
+ Semaphore lookupSemaphore = proxyService.getLookupRequestSemaphore();
+ int availablePermits = lookupSemaphore.availablePermits();
+ lookupSemaphore.acquire(availablePermits);
+
+ // Verify will receive too many request exception, and the socket will
not be closed.
+ try {
+ lookupService.getBroker(TopicName.get(tpName)).get();
+ fail("Expected too many request error.");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("Too many"));
+ }
+ assertTrue(lookupConnection.ctx().channel().isActive());
+
+ // cleanup.
+ lookupSemaphore.release(availablePermits);
+ client.close();
+ }
}