This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 96a3aa7abac [Tests] Fix flaky MultiTopicsConsumerImplTest (#15851)
96a3aa7abac is described below
commit 96a3aa7abac44ed9c63c9133a3d35b91633bf942
Author: Lari Hotari <[email protected]>
AuthorDate: Tue May 31 20:09:46 2022 +0300
[Tests] Fix flaky MultiTopicsConsumerImplTest (#15851)
Fixes #15850
---
.../pulsar/client/impl/ClientTestFixtures.java | 23 ++++++----
.../client/impl/MultiTopicsConsumerImplTest.java | 52 +++++++++++-----------
2 files changed, 40 insertions(+), 35 deletions(-)
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 1b9b4a96baa..4f69755eaf9 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -18,24 +18,22 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.util.Timer;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.client.util.ExecutorProvider;
-import org.mockito.Mockito;
-
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.mockito.Mockito;
class ClientTestFixtures {
public static ScheduledExecutorService SCHEDULER =
Executors.newSingleThreadScheduledExecutor();
@@ -55,6 +53,8 @@ class ClientTestFixtures {
when(clientMock.getInternalExecutorService()).thenReturn(internalExecutorService);
when(clientMock.externalExecutorProvider()).thenReturn(executorProvider);
when(clientMock.eventLoopGroup().next()).thenReturn(mock(EventLoop.class));
+ when(clientMock.preProcessSchemaBeforeSubscribe(any(), any(), any()))
+ .thenAnswer(invocation ->
CompletableFuture.completedFuture(invocation.getArgument(1)));
return clientMock;
}
@@ -72,6 +72,11 @@ class ClientTestFixtures {
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
when(clientCnxMock.channel().remoteAddress()).thenReturn(mock(SocketAddress.class));
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+ when(clientMock.getConnection(any(),
any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+ ConnectionPool connectionPoolMock = mock(ConnectionPool.class);
+ when(clientMock.getCnxPool()).thenReturn(connectionPoolMock);
+
when(connectionPoolMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+ when(connectionPoolMock.getConnection(any(),
any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
return clientMock;
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 95dc07c2625..5ec56ecfffc 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -18,13 +18,31 @@
*/
package org.apache.pulsar.client.impl;
+import static
org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
+import static
org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
+import static
org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
import com.google.common.collect.Sets;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -41,26 +59,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
-import static
org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
-import static
org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.expectThrows;
-
/**
* Unit Tests of {@link MultiTopicsConsumerImpl}.
*/
@@ -91,7 +89,9 @@ public class MultiTopicsConsumerImplTest {
public void testGetStats() throws Exception {
String topicName = "test-stats";
ClientConfigurationData conf = new ClientConfigurationData();
- conf.setServiceUrl("pulsar://localhost:6650");
+ // ip and port is arbitrary since test will attempt to make a
connection
+ // and the connection is not needed for making the test to pass. This
test should be improved.
+ conf.setServiceUrl("pulsar://127.0.0.99:23456");
conf.setStatsIntervalSeconds(100);
ThreadFactory threadFactory = new
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
@@ -112,9 +112,13 @@ public class MultiTopicsConsumerImplTest {
MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(
clientImpl, consumerConfData,
- executorProvider, null, null, null, true);
+ executorProvider, null, Schema.BYTES, null, true);
impl.getStats();
+
+ clientImpl.close();
+ executorProvider.shutdownNow();
+ eventLoopGroup.shutdownGracefully().get();
}
// Test uses a mocked PulsarClientImpl which will complete the
getPartitionedTopicMetadata() internal async call
@@ -151,8 +155,6 @@ public class MultiTopicsConsumerImplTest {
PulsarClientImpl clientMock =
createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation ->
createDelayedCompletedFuture(
new PartitionedTopicMetadata(), completionDelayMillis));
- when(clientMock.<byte[]>preProcessSchemaBeforeSubscribe(any(), any(),
any()))
- .thenReturn(CompletableFuture.completedFuture(schema));
MultiTopicsConsumerImpl<byte[]> impl = new
MultiTopicsConsumerImpl<byte[]>(
clientMock, consumerConfData, executorProvider,
new CompletableFuture<>(), schema, null, true);
@@ -224,8 +226,6 @@ public class MultiTopicsConsumerImplTest {
PulsarClientImpl clientMock =
createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
when(clientMock.timer()).thenReturn(timer);
- when(clientMock.preProcessSchemaBeforeSubscribe(any(), any(), any()))
- .thenReturn(CompletableFuture.completedFuture(null));
// Simulate non partitioned topics
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0);