This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new f48aa4bcad1 [improve] [proxy] Not close the socket if lookup failed 
caused by too many requests (#21216)
f48aa4bcad1 is described below

commit f48aa4bcad17a7bde1cec8418530cff72cad6690
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 21 23:18:45 2023 +0800

    [improve] [proxy] Not close the socket if lookup failed caused by too many 
requests (#21216)
    
    Motivation: The Pulsar client will close the socket if it receives a 
`ServiceNotReady` error when doing a lookup. The Broker will respond to the 
client with a `TooManyRequests` error if there are too many lookup requests in 
progress, but the Pulsar Proxy responds to the client with a `ServiceNotReady` 
error in the same scenario.
    
    Modifications: Make Pulsar Proxy respond to the client with a 
`TooManyRequests` error if there are too many lookup requests in progress.
    (cherry picked from commit d6c3fa42059d96b04b4132ccc9256c3e76d26959)
---
 .../pulsar/proxy/server/LookupProxyHandler.java    |  2 +-
 .../proxy/server/ProxyLookupThrottlingTest.java    | 40 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 76cdddccad2..34488ee8d3d 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -113,7 +113,7 @@ public class LookupProxyHandler {
                 log.debug("Lookup Request ID {} from {} rejected - {}.", 
clientRequestId, clientAddress,
                         throttlingErrorMessage);
             }
-            
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+            
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.TooManyRequests,
                     throttlingErrorMessage, clientRequestId));
         }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 6b2dd3221f5..a3ca125580c 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -20,19 +20,29 @@ package org.apache.pulsar.proxy.server;
 
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import java.util.Optional;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.BinaryProtoLookupService;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.ServiceNameResolver;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -112,4 +122,34 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
 
         
Assert.assertEquals(LookupProxyHandler.REJECTED_PARTITIONS_METADATA_REQUESTS.get(),
 5.0d);
     }
+
+    @Test
+    public void testLookupThrottling() throws Exception {
+        PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
+                .serviceUrl(proxyService.getServiceUrl()).build();
+        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        LookupService lookupService = client.getLookup();
+        assertTrue(lookupService instanceof BinaryProtoLookupService);
+        ServiceNameResolver serviceNameResolver =
+                WhiteboxImpl.getInternalState(lookupService, 
"serviceNameResolver");
+        ClientCnx lookupConnection = 
client.getCnxPool().getConnection(serviceNameResolver.resolveHost()).join();
+
+        // Make no permits to lookup.
+        Semaphore lookupSemaphore = proxyService.getLookupRequestSemaphore();
+        int availablePermits = lookupSemaphore.availablePermits();
+        lookupSemaphore.acquire(availablePermits);
+
+        // Verify will receive too many request exception, and the socket will 
not be closed.
+        try {
+            lookupService.getBroker(TopicName.get(tpName)).get();
+            fail("Expected too many request error.");
+        } catch (Exception ex) {
+            assertTrue(ex.getMessage().contains("Too many"));
+        }
+        assertTrue(lookupConnection.ctx().channel().isActive());
+
+        // cleanup.
+        lookupSemaphore.release(availablePermits);
+        client.close();
+    }
 }

Reply via email to