This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 96a3aa7abac [Tests] Fix flaky MultiTopicsConsumerImplTest (#15851)
96a3aa7abac is described below

commit 96a3aa7abac44ed9c63c9133a3d35b91633bf942
Author: Lari Hotari <[email protected]>
AuthorDate: Tue May 31 20:09:46 2022 +0300

    [Tests] Fix flaky MultiTopicsConsumerImplTest (#15851)
    
    Fixes #15850
---
 .../pulsar/client/impl/ClientTestFixtures.java     | 23 ++++++----
 .../client/impl/MultiTopicsConsumerImplTest.java   | 52 +++++++++++-----------
 2 files changed, 40 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 1b9b4a96baa..4f69755eaf9 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -18,24 +18,22 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoop;
 import io.netty.util.Timer;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.util.ExecutorProvider;
-import org.mockito.Mockito;
-
 import java.net.SocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.mockito.Mockito;
 
 class ClientTestFixtures {
     public static ScheduledExecutorService SCHEDULER = 
Executors.newSingleThreadScheduledExecutor();
@@ -55,6 +53,8 @@ class ClientTestFixtures {
         
when(clientMock.getInternalExecutorService()).thenReturn(internalExecutorService);
         
when(clientMock.externalExecutorProvider()).thenReturn(executorProvider);
         
when(clientMock.eventLoopGroup().next()).thenReturn(mock(EventLoop.class));
+        when(clientMock.preProcessSchemaBeforeSubscribe(any(), any(), any()))
+                .thenAnswer(invocation -> 
CompletableFuture.completedFuture(invocation.getArgument(1)));
 
         return clientMock;
     }
@@ -72,6 +72,11 @@ class ClientTestFixtures {
                 
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
         
when(clientCnxMock.channel().remoteAddress()).thenReturn(mock(SocketAddress.class));
         
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+        when(clientMock.getConnection(any(), 
any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+        ConnectionPool connectionPoolMock = mock(ConnectionPool.class);
+        when(clientMock.getCnxPool()).thenReturn(connectionPoolMock);
+        
when(connectionPoolMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+        when(connectionPoolMock.getConnection(any(), 
any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
         return clientMock;
     }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 95dc07c2625..5ec56ecfffc 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -18,13 +18,31 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
+import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
+import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
 import com.google.common.collect.Sets;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -41,26 +59,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
-import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
-import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.expectThrows;
-
 /**
  * Unit Tests of {@link MultiTopicsConsumerImpl}.
  */
@@ -91,7 +89,9 @@ public class MultiTopicsConsumerImplTest {
     public void testGetStats() throws Exception {
         String topicName = "test-stats";
         ClientConfigurationData conf = new ClientConfigurationData();
-        conf.setServiceUrl("pulsar://localhost:6650");
+        // ip and port is arbitrary since test will attempt to make a 
connection
+        // and the connection is not needed for making the test to pass. This 
test should be improved.
+        conf.setServiceUrl("pulsar://127.0.0.99:23456");
         conf.setStatsIntervalSeconds(100);
 
         ThreadFactory threadFactory = new 
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
@@ -112,9 +112,13 @@ public class MultiTopicsConsumerImplTest {
 
         MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(
             clientImpl, consumerConfData,
-            executorProvider, null, null, null, true);
+            executorProvider, null, Schema.BYTES, null, true);
 
         impl.getStats();
+
+        clientImpl.close();
+        executorProvider.shutdownNow();
+        eventLoopGroup.shutdownGracefully().get();
     }
 
     // Test uses a mocked PulsarClientImpl which will complete the 
getPartitionedTopicMetadata() internal async call
@@ -151,8 +155,6 @@ public class MultiTopicsConsumerImplTest {
         PulsarClientImpl clientMock = 
createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
         
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> 
createDelayedCompletedFuture(
                 new PartitionedTopicMetadata(), completionDelayMillis));
-        when(clientMock.<byte[]>preProcessSchemaBeforeSubscribe(any(), any(), 
any()))
-                .thenReturn(CompletableFuture.completedFuture(schema));
         MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(
                 clientMock, consumerConfData, executorProvider,
                 new CompletableFuture<>(), schema, null, true);
@@ -224,8 +226,6 @@ public class MultiTopicsConsumerImplTest {
 
         PulsarClientImpl clientMock = 
createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
         when(clientMock.timer()).thenReturn(timer);
-        when(clientMock.preProcessSchemaBeforeSubscribe(any(), any(), any()))
-                .thenReturn(CompletableFuture.completedFuture(null));
 
         // Simulate non partitioned topics
         PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0);

Reply via email to