This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e826d849cee [test]Fix Flaky-test:
BrokerServiceTest.testLookupThrottlingForClientByClient (#16540)
e826d849cee is described below
commit e826d849ceef9d6aef28569ad57950bba90dfff1
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Thu Aug 11 00:06:54 2022 +0800
[test]Fix Flaky-test:
BrokerServiceTest.testLookupThrottlingForClientByClient (#16540)
---
.../pulsar/broker/service/BrokerServiceTest.java | 71 ++++++++++++++++++----
1 file changed, 59 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index faa5de072aa..1fb63470456 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -61,6 +61,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -88,10 +89,13 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
+import
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -966,8 +970,6 @@ public class BrokerServiceTest extends BrokerTestBase {
*/
@Test
public void testLookupThrottlingForClientByClient() throws Exception {
- // This test looks like it could be flakey, if the broker responds
- // quickly enough, there may never be concurrency in requests
final String topicName = "persistent://prop/ns-abc/newTopic";
PulsarServiceNameResolver resolver = new PulsarServiceNameResolver();
@@ -979,7 +981,30 @@ public class BrokerServiceTest extends BrokerTestBase {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false,
new DefaultThreadFactory("test-pool",
Thread.currentThread().isDaemon()));
long reqId = 0xdeadbeef;
- try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) {
+ // Using an AtomicReference in order to reset a new CountDownLatch
+ AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
+ latchRef.set(new CountDownLatch(1));
+ try (ConnectionPool pool = new ConnectionPool(conf, eventLoop, () ->
new ClientCnx(conf, eventLoop) {
+ @Override
+ protected void handleLookupResponse(CommandLookupTopicResponse
lookupResult) {
+ try {
+ latchRef.get().await();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ super.handleLookupResponse(lookupResult);
+ }
+
+ @Override
+ protected void
handlePartitionResponse(CommandPartitionedTopicMetadataResponse lookupResult) {
+ try {
+ latchRef.get().await();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ super.handlePartitionResponse(lookupResult);
+ }
+ })) {
// for PMR
// 2 lookup will succeed
long reqId1 = reqId++;
@@ -990,12 +1015,18 @@ public class BrokerServiceTest extends BrokerTestBase {
long reqId2 = reqId++;
ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName,
reqId2);
CompletableFuture<?> f2 =
pool.getConnection(resolver.resolveHost())
- .thenCompose(clientCnx -> clientCnx.newLookup(request2,
reqId2));
+ .thenCompose(clientCnx -> {
+ CompletableFuture<?> future =
clientCnx.newLookup(request2, reqId2);
+ // pending other responses in `ClientCnx` until now
+ latchRef.get().countDown();
+ return future;
+ });
f1.get();
f2.get();
// 3 lookup will fail
+ latchRef.set(new CountDownLatch(1));
long reqId3 = reqId++;
ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName,
reqId3);
f1 = pool.getConnection(resolver.resolveHost())
@@ -1009,11 +1040,16 @@ public class BrokerServiceTest extends BrokerTestBase {
long reqId5 = reqId++;
ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName,
reqId5);
CompletableFuture<?> f3 =
pool.getConnection(resolver.resolveHost())
- .thenCompose(clientCnx -> clientCnx.newLookup(request5,
reqId5));
+ .thenCompose(clientCnx -> {
+ CompletableFuture<?> future =
clientCnx.newLookup(request5, reqId5);
+ // pending other responses in `ClientCnx` until now
+ latchRef.get().countDown();
+ return future;
+ });
+ f1.get();
+ f2.get();
try {
- f1.get();
- f2.get();
f3.get();
fail("At least one should fail");
} catch (ExecutionException e) {
@@ -1029,6 +1065,7 @@ public class BrokerServiceTest extends BrokerTestBase {
// for Lookup
// 2 lookup will succeed
+ latchRef.set(new CountDownLatch(1));
long reqId6 = reqId++;
ByteBuf request6 = Commands.newLookup(topicName, true, reqId6);
f1 = pool.getConnection(resolver.resolveHost())
@@ -1037,12 +1074,18 @@ public class BrokerServiceTest extends BrokerTestBase {
long reqId7 = reqId++;
ByteBuf request7 = Commands.newLookup(topicName, true, reqId7);
f2 = pool.getConnection(resolver.resolveHost())
- .thenCompose(clientCnx -> clientCnx.newLookup(request7,
reqId7));
+ .thenCompose(clientCnx -> {
+ CompletableFuture<?> future =
clientCnx.newLookup(request7, reqId7);
+ // pending other responses in `ClientCnx` until now
+ latchRef.get().countDown();
+ return future;
+ });
f1.get();
f2.get();
// 3 lookup will fail
+ latchRef.set(new CountDownLatch(1));
long reqId8 = reqId++;
ByteBuf request8 = Commands.newLookup(topicName, true, reqId8);
f1 = pool.getConnection(resolver.resolveHost())
@@ -1056,11 +1099,16 @@ public class BrokerServiceTest extends BrokerTestBase {
long reqId10 = reqId++;
ByteBuf request10 = Commands.newLookup(topicName, true, reqId10);
f3 = pool.getConnection(resolver.resolveHost())
- .thenCompose(clientCnx -> clientCnx.newLookup(request10,
reqId10));
+ .thenCompose(clientCnx -> {
+ CompletableFuture<?> future =
clientCnx.newLookup(request10, reqId10);
+ // pending other responses in `ClientCnx` until now
+ latchRef.get().countDown();
+ return future;
+ });
+ f1.get();
+ f2.get();
try {
- f1.get();
- f2.get();
f3.get();
fail("At least one should fail");
} catch (ExecutionException e) {
@@ -1073,7 +1121,6 @@ public class BrokerServiceTest extends BrokerTestBase {
throw e;
}
}
-
}
}