This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e826d849cee [test]Fix Flaky-test: 
BrokerServiceTest.testLookupThrottlingForClientByClient (#16540)
e826d849cee is described below

commit e826d849ceef9d6aef28569ad57950bba90dfff1
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Thu Aug 11 00:06:54 2022 +0800

    [test]Fix Flaky-test: 
BrokerServiceTest.testLookupThrottlingForClientByClient (#16540)
---
 .../pulsar/broker/service/BrokerServiceTest.java   | 71 ++++++++++++++++++----
 1 file changed, 59 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index faa5de072aa..1fb63470456 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -61,6 +61,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -88,10 +89,13 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
+import 
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -966,8 +970,6 @@ public class BrokerServiceTest extends BrokerTestBase {
      */
     @Test
     public void testLookupThrottlingForClientByClient() throws Exception {
-        // This test looks like it could be flakey, if the broker responds
-        // quickly enough, there may never be concurrency in requests
         final String topicName = "persistent://prop/ns-abc/newTopic";
 
         PulsarServiceNameResolver resolver = new PulsarServiceNameResolver();
@@ -979,7 +981,30 @@ public class BrokerServiceTest extends BrokerTestBase {
         EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false,
                 new DefaultThreadFactory("test-pool", 
Thread.currentThread().isDaemon()));
         long reqId = 0xdeadbeef;
-        try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) {
+        // Using an AtomicReference in order to reset a new CountDownLatch
+        AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
+        latchRef.set(new CountDownLatch(1));
+        try (ConnectionPool pool = new ConnectionPool(conf, eventLoop, () -> 
new ClientCnx(conf, eventLoop) {
+            @Override
+            protected void handleLookupResponse(CommandLookupTopicResponse 
lookupResult) {
+                try {
+                    latchRef.get().await();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+                super.handleLookupResponse(lookupResult);
+            }
+
+            @Override
+            protected void 
handlePartitionResponse(CommandPartitionedTopicMetadataResponse lookupResult) {
+                try {
+                    latchRef.get().await();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+                super.handlePartitionResponse(lookupResult);
+            }
+        })) {
             // for PMR
             // 2 lookup will succeed
             long reqId1 = reqId++;
@@ -990,12 +1015,18 @@ public class BrokerServiceTest extends BrokerTestBase {
             long reqId2 = reqId++;
             ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, 
reqId2);
             CompletableFuture<?> f2 = 
pool.getConnection(resolver.resolveHost())
-                .thenCompose(clientCnx -> clientCnx.newLookup(request2, 
reqId2));
+                .thenCompose(clientCnx -> {
+                    CompletableFuture<?> future = 
clientCnx.newLookup(request2, reqId2);
+                    // pending other responses in `ClientCnx` until now
+                    latchRef.get().countDown();
+                    return future;
+                });
 
             f1.get();
             f2.get();
 
             // 3 lookup will fail
+            latchRef.set(new CountDownLatch(1));
             long reqId3 = reqId++;
             ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, 
reqId3);
             f1 = pool.getConnection(resolver.resolveHost())
@@ -1009,11 +1040,16 @@ public class BrokerServiceTest extends BrokerTestBase {
             long reqId5 = reqId++;
             ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, 
reqId5);
             CompletableFuture<?> f3 = 
pool.getConnection(resolver.resolveHost())
-                .thenCompose(clientCnx -> clientCnx.newLookup(request5, 
reqId5));
+                .thenCompose(clientCnx -> {
+                    CompletableFuture<?> future = 
clientCnx.newLookup(request5, reqId5);
+                    // pending other responses in `ClientCnx` until now
+                    latchRef.get().countDown();
+                    return future;
+                    });
 
+            f1.get();
+            f2.get();
             try {
-                f1.get();
-                f2.get();
                 f3.get();
                 fail("At least one should fail");
             } catch (ExecutionException e) {
@@ -1029,6 +1065,7 @@ public class BrokerServiceTest extends BrokerTestBase {
 
             // for Lookup
             // 2 lookup will succeed
+            latchRef.set(new CountDownLatch(1));
             long reqId6 = reqId++;
             ByteBuf request6 = Commands.newLookup(topicName, true, reqId6);
             f1 = pool.getConnection(resolver.resolveHost())
@@ -1037,12 +1074,18 @@ public class BrokerServiceTest extends BrokerTestBase {
             long reqId7 = reqId++;
             ByteBuf request7 = Commands.newLookup(topicName, true, reqId7);
             f2 = pool.getConnection(resolver.resolveHost())
-                .thenCompose(clientCnx -> clientCnx.newLookup(request7, 
reqId7));
+                .thenCompose(clientCnx -> {
+                    CompletableFuture<?> future = 
clientCnx.newLookup(request7, reqId7);
+                    // pending other responses in `ClientCnx` until now
+                    latchRef.get().countDown();
+                    return future;
+                });
 
             f1.get();
             f2.get();
 
             // 3 lookup will fail
+            latchRef.set(new CountDownLatch(1));
             long reqId8 = reqId++;
             ByteBuf request8 = Commands.newLookup(topicName, true, reqId8);
             f1 = pool.getConnection(resolver.resolveHost())
@@ -1056,11 +1099,16 @@ public class BrokerServiceTest extends BrokerTestBase {
             long reqId10 = reqId++;
             ByteBuf request10 = Commands.newLookup(topicName, true, reqId10);
             f3 = pool.getConnection(resolver.resolveHost())
-                .thenCompose(clientCnx -> clientCnx.newLookup(request10, 
reqId10));
+                .thenCompose(clientCnx -> {
+                    CompletableFuture<?> future = 
clientCnx.newLookup(request10, reqId10);
+                    // pending other responses in `ClientCnx` until now
+                    latchRef.get().countDown();
+                    return future;
+                });
 
+            f1.get();
+            f2.get();
             try {
-                f1.get();
-                f2.get();
                 f3.get();
                 fail("At least one should fail");
             } catch (ExecutionException e) {
@@ -1073,7 +1121,6 @@ public class BrokerServiceTest extends BrokerTestBase {
                     throw e;
                 }
             }
-
         }
     }
 

Reply via email to