This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 04035c5  Limit the number of times lookup requests are redirected 
(#7096)
04035c5 is described below

commit 04035c59d6b55479a443cc64e8351f4a467cc253
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Sun May 31 11:20:50 2020 +0900

    Limit the number of times lookup requests are redirected (#7096)
    
    Master Issue: #7041
    
    ### Motivation
    
    When a leader broker is restarted, some producers for topics owned by that 
broker may not be reopened on the new broker. When this happens, message 
publishing will continue to fail until the client application is restarted.
    
    As a result of the investigation, I found that lookup requests sent by the 
producers in question are redirected more than 10,000 times between multiple 
brokers.
    
    When a lookup request is redirected, 
`BinaryProtoLookupService#findBroker()` is called recursively. Therefore, tens 
of thousands of redirects will cause `StackOverflowError` and 
`BinaryProtoLookupService#findBroker()` will never complete.
    
    ### Modifications
    
    Limit the number of times a lookup is redirected to 100. This maximum is 
user configurable. If the number of redirects exceeds 100, the lookup will 
fail. But `ConnectionHandler` retries lookup so that the producer can 
eventually reconnect to the new broker.
---
 .../apache/pulsar/client/api/ClientBuilder.java    |   9 ++
 .../client/impl/BinaryProtoLookupService.java      |  25 +++-
 .../pulsar/client/impl/ClientBuilderImpl.java      |   6 +
 .../org/apache/pulsar/client/impl/HttpClient.java  |   1 +
 .../client/impl/conf/ClientConfigurationData.java  |   1 +
 .../client/impl/BinaryProtoLookupServiceTest.java  | 128 +++++++++++++++++++++
 .../impl/conf/ConfigurationDataUtilsTest.java      |   3 +
 7 files changed, 168 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index e84f8ba..c12e9a1 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -387,6 +387,15 @@ public interface ClientBuilder extends Cloneable {
     ClientBuilder maxLookupRequests(int maxLookupRequests);
 
     /**
+     * Set the maximum number of times a lookup-request to a broker will be 
redirected.
+     *
+     * @since 2.6.0
+     * @param maxLookupRedirects the maximum number of redirects
+     * @return the client builder instance
+     */
+    ClientBuilder maxLookupRedirects(int maxLookupRedirects);
+
+    /**
      * Set max number of broker-rejected requests in a certain time-frame (30 
seconds) after which current connection
      * will be closed and client creates a new connection that give chance to 
connect a different broker <i>(default:
      * 50)</i>.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 102f394..d3a7df5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -54,12 +54,14 @@ public class BinaryProtoLookupService implements 
LookupService {
     private final ServiceNameResolver serviceNameResolver;
     private final boolean useTls;
     private final ExecutorService executor;
+    private final int maxLookupRedirects;
 
     public BinaryProtoLookupService(PulsarClientImpl client, String 
serviceUrl, boolean useTls, ExecutorService executor)
             throws PulsarClientException {
         this.client = client;
         this.useTls = useTls;
         this.executor = executor;
+        this.maxLookupRedirects = 
client.getConfiguration().getMaxLookupRedirects();
         this.serviceNameResolver = new PulsarServiceNameResolver();
         updateServiceUrl(serviceUrl);
     }
@@ -77,7 +79,7 @@ public class BinaryProtoLookupService implements 
LookupService {
      * @return broker-socket-address that serves given topic
      */
     public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
getBroker(TopicName topicName) {
-        return findBroker(serviceNameResolver.resolveHost(), false, topicName);
+        return findBroker(serviceNameResolver.resolveHost(), false, topicName, 
0);
     }
 
     /**
@@ -89,9 +91,15 @@ public class BinaryProtoLookupService implements 
LookupService {
     }
 
     private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
findBroker(InetSocketAddress socketAddress,
-            boolean authoritative, TopicName topicName) {
+            boolean authoritative, TopicName topicName, final int 
redirectCount) {
         CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
addressFuture = new CompletableFuture<>();
 
+        if (maxLookupRedirects > 0 && redirectCount > maxLookupRedirects) {
+            addressFuture.completeExceptionally(
+                    new PulsarClientException.LookupException("Too many 
redirects: " + maxLookupRedirects));
+            return addressFuture;
+        }
+
         client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx 
-> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newLookup(topicName.toString(), 
authoritative, requestId);
@@ -110,13 +118,20 @@ public class BinaryProtoLookupService implements 
LookupService {
 
                     // (2) redirect to given address if response is: redirect
                     if (lookupDataResult.redirect) {
-                        findBroker(responseBrokerAddress, 
lookupDataResult.authoritative, topicName)
+                        findBroker(responseBrokerAddress, 
lookupDataResult.authoritative, topicName, redirectCount + 1)
                                 .thenAccept(addressPair -> {
                                     addressFuture.complete(addressPair);
                                 }).exceptionally((lookupException) -> {
                                     // lookup failed
-                                    log.warn("[{}] lookup failed : {}", 
topicName.toString(),
-                                            lookupException.getMessage(), 
lookupException);
+                                    if (redirectCount > 0) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("[{}] lookup redirection 
failed ({}) : {}", topicName.toString(),
+                                                    redirectCount, 
lookupException.getMessage());
+                                        }
+                                    } else {
+                                        log.warn("[{}] lookup failed : {}", 
topicName.toString(),
+                                                lookupException.getMessage(), 
lookupException);
+                                    }
                                     
addressFuture.completeExceptionally(lookupException);
                                     return null;
                                 });
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 8d3fdbe..7a56e42 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -241,6 +241,12 @@ public class ClientBuilderImpl implements ClientBuilder {
     }
 
     @Override
+    public ClientBuilder maxLookupRedirects(int maxLookupRedirects) {
+        conf.setMaxLookupRedirects(maxLookupRedirects);
+        return this;
+    }
+
+    @Override
     public ClientBuilder maxNumberOfRejectedRequestPerConnection(int 
maxNumberOfRejectedRequestPerConnection) {
         
conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
         return this;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 0b4ef32..3ba02bb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -72,6 +72,7 @@ public class HttpClient implements Closeable {
 
         DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setFollowRedirect(true);
+        confBuilder.setMaxRedirects(conf.getMaxLookupRedirects());
         confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 
1000);
         confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
         confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", 
PulsarVersion.getVersion()));
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 980630c..76bc8d9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -67,6 +67,7 @@ public class ClientConfigurationData implements Serializable, 
Cloneable {
     private boolean tlsHostnameVerificationEnable = false;
     private int concurrentLookupRequest = 5000;
     private int maxLookupRequest = 50000;
+    private int maxLookupRedirects = 100;
     private int maxNumberOfRejectedRequestPerConnection = 50;
     private int keepAliveIntervalSeconds = 30;
     private int connectionTimeoutMs = 10000;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
new file mode 100644
index 0000000..be09cde
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import io.netty.buffer.ByteBuf;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException.LookupException;
+import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class BinaryProtoLookupServiceTest {
+    private BinaryProtoLookupService lookup;
+    private TopicName topicName;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        LookupDataResult lookupResult1 = 
createLookupDataResult("pulsar://broker1.pulsar.apache.org:6650", true);
+        LookupDataResult lookupResult2 = 
createLookupDataResult("pulsar://broker2.pulsar.apache.org:6650", false);
+
+        CompletableFuture<LookupDataResult> lookupFuture1 = 
CompletableFuture.completedFuture(lookupResult1);
+        CompletableFuture<LookupDataResult> lookupFuture2 = 
CompletableFuture.completedFuture(lookupResult2);
+
+        ClientCnx clientCnx = mock(ClientCnx.class);
+        when(clientCnx.newLookup(any(ByteBuf.class), 
anyLong())).thenReturn(lookupFuture1, lookupFuture1,
+                lookupFuture2);
+
+        CompletableFuture<ClientCnx> connectionFuture = 
CompletableFuture.completedFuture(clientCnx);
+
+        ConnectionPool cnxPool = mock(ConnectionPool.class);
+        
when(cnxPool.getConnection(any(InetSocketAddress.class))).thenReturn(connectionFuture);
+
+        ClientConfigurationData clientConfig = 
mock(ClientConfigurationData.class);
+        doReturn(0).when(clientConfig).getMaxLookupRedirects();
+
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        doReturn(cnxPool).when(client).getCnxPool();
+        doReturn(clientConfig).when(client).getConfiguration();
+        doReturn(1L).when(client).newRequestId();
+
+        lookup = spy(
+                new BinaryProtoLookupService(client, 
"pulsar://localhost:6650", false, mock(ExecutorService.class)));
+        topicName = TopicName.get("persistent://tenant1/ns1/t1");
+    }
+
+    @Test(invocationTimeOut = 3000)
+    public void maxLookupRedirectsTest1() throws Exception {
+        Pair<InetSocketAddress, InetSocketAddress> addressPair = 
lookup.getBroker(topicName).get();
+        assertEquals(addressPair.getLeft().toString(), 
"broker2.pulsar.apache.org:6650");
+        assertEquals(addressPair.getRight().toString(), 
"broker2.pulsar.apache.org:6650");
+    }
+
+    @Test(invocationTimeOut = 3000)
+    public void maxLookupRedirectsTest2() throws Exception {
+        Field field = 
BinaryProtoLookupService.class.getDeclaredField("maxLookupRedirects");
+        field.setAccessible(true);
+        field.set(lookup, 2);
+
+        Pair<InetSocketAddress, InetSocketAddress> addressPair = 
lookup.getBroker(topicName).get();
+        assertEquals(addressPair.getLeft().toString(), 
"broker2.pulsar.apache.org:6650");
+        assertEquals(addressPair.getRight().toString(), 
"broker2.pulsar.apache.org:6650");
+    }
+
+    @Test(invocationTimeOut = 3000)
+    public void maxLookupRedirectsTest3() throws Exception {
+        Field field = 
BinaryProtoLookupService.class.getDeclaredField("maxLookupRedirects");
+        field.setAccessible(true);
+        field.set(lookup, 1);
+
+        try {
+            lookup.getBroker(topicName).get();
+            fail("should have thrown ExecutionException");
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            assertTrue(cause instanceof LookupException);
+            assertEquals(cause.getMessage(), "Too many redirects: 1");
+        }
+    }
+
+    private static LookupDataResult createLookupDataResult(String brokerUrl, 
boolean redirect) throws Exception {
+        LookupDataResult lookupResult = new LookupDataResult(-1);
+
+        Field brokerUrlField = 
LookupDataResult.class.getDeclaredField("brokerUrl");
+        brokerUrlField.setAccessible(true);
+        brokerUrlField.set(lookupResult, brokerUrl);
+
+        Field redirectField = 
LookupDataResult.class.getDeclaredField("redirect");
+        redirectField.setAccessible(true);
+        redirectField.set(lookupResult, redirect);
+
+        return lookupResult;
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index b144f71..9634a87 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -43,13 +43,16 @@ public class ConfigurationDataUtilsTest {
         ClientConfigurationData confData = new ClientConfigurationData();
         confData.setServiceUrl("pulsar://unknown:6650");
         confData.setMaxLookupRequest(600);
+        confData.setMaxLookupRedirects(10);
         confData.setNumIoThreads(33);
         Map<String, Object> config = new HashMap<>();
         config.put("serviceUrl", "pulsar://localhost:6650");
         config.put("maxLookupRequest", 70000);
+        config.put("maxLookupRedirects", 50);
         confData = ConfigurationDataUtils.loadData(config, confData, 
ClientConfigurationData.class);
         assertEquals("pulsar://localhost:6650", confData.getServiceUrl());
         assertEquals(70000, confData.getMaxLookupRequest());
+        assertEquals(50, confData.getMaxLookupRedirects());
         assertEquals(33, confData.getNumIoThreads());
     }
 

Reply via email to