This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new d5a844f  [Branch-2.7] Fix branch-2.7 test. (#11254)
d5a844f is described below

commit d5a844f327b02537196e2901db8b480e25c0043b
Author: congbo <[email protected]>
AuthorDate: Thu Jul 8 19:44:40 2021 +0800

    [Branch-2.7] Fix branch-2.7 test. (#11254)
---
 .../client/api/KeySharedSubscriptionTest.java      |  2 ++
 .../apache/pulsar/client/api/TopicReaderTest.java  | 15 +++++-----
 .../client/api/v1/V1_ProducerConsumerTest.java     | 35 ++++++++++++----------
 .../pulsar/client/impl/ConsumerImplTest.java       |  3 +-
 .../proxy/server/ProxyServiceStarterTest.java      |  5 ++--
 5 files changed, 33 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index d87c257..575bd19 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -46,6 +46,7 @@ import 
org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
@@ -669,6 +670,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
 
         c1.close();
 
+        ((ConsumerImpl<Integer>) 
c2).clearIncomingMessagesAndGetMessageNumber();
         // Now C2 will get all messages
         for (int i = 0; i < 20; i++) {
             Message<Integer> msg = c2.receive();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 68baf34..6c81319 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -1376,24 +1376,23 @@ public class TopicReaderTest extends 
ProducerConsumerBase {
         final int halfMessages = numOfMessage / 2;
         admin.topics().createPartitionedTopic(topicName, 3);
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
-
-        long l = System.currentTimeMillis();
+        long halfTime = 0;
         for (int i = 0; i < numOfMessage; i++) {
+            if (i == numOfMessage / 2) {
+                halfTime = System.currentTimeMillis();
+            }
             producer.send(String.format("msg num %d", i).getBytes());
         }
-
+        Assert.assertTrue(halfTime != 0);
         Reader<byte[]> reader = 
pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
 
-        int plusTime = (halfMessages + 1) * 100;
-        reader.seek(l + plusTime);
-
+        reader.seek(halfTime);
         Set<String> messageSet = Sets.newHashSet();
         for (int i = halfMessages + 1; i < numOfMessage; i++) {
-            Message<byte[]> message = reader.readNext();
+            Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
             String receivedMessage = new String(message.getData());
             Assert.assertTrue(messageSet.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
         }
-
         reader.close();
         producer.close();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 2baec56..6a1d5f2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -51,6 +51,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -74,6 +75,7 @@ import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -90,7 +92,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(V1_ProducerConsumerTest.class);
     private static final long BATCHING_MAX_PUBLISH_DELAY_THRESHOLD = 1;
 
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
         super.internalSetup();
@@ -542,7 +544,8 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
 
         ConsumerImpl<byte[]> consumerImpl = (ConsumerImpl<byte[]>) consumer;
         // The available permits should be 10 and num messages in the queue 
should be 90
-        Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads);
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads));
         Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - 
numConsumersThreads);
 
         barrier.reset();
@@ -560,7 +563,8 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         Thread.sleep(100);
 
         // The available permits should be 20 and num messages in the queue 
should be 80
-        Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads * 2);
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads * 2));
         Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - 
(numConsumersThreads * 2));
 
         // clear the queue
@@ -594,7 +598,8 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         Thread.sleep(2000);
 
         // The available permits should be 10 and num messages in the queue 
should be 90
-        Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads);
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertEquals(consumerImpl.getAvailablePermits(), 
numConsumersThreads));
         Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - 
numConsumersThreads);
         consumer.close();
 
@@ -760,6 +765,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         log.info(" start receiving messages :");
         CountDownLatch latch = new CountDownLatch(totalMsg);
         // receive messages
+        @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newFixedThreadPool(1);
         receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor);
 
@@ -773,7 +779,6 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
 
         producer.close();
         consumer.close();
-        executor.shutdownNow();
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -804,6 +809,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         log.info(" start receiving messages :");
         CountDownLatch latch = new CountDownLatch(totalMsg);
         // receive messages
+        @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newFixedThreadPool(1);
         receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor);
 
@@ -817,7 +823,6 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
 
         producer.close();
         consumer.close();
-        executor.shutdownNow();
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -1131,6 +1136,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
                     .subscriptionType(SubscriptionType.Shared)
                     .subscribe();
 
+            @Cleanup
             PulsarClient newPulsarClient = 
newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
             Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
                     .topic("persistent://my-property/use/my-ns/unacked-topic")
@@ -1207,7 +1213,6 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
             producer.close();
             consumer1.close();
             consumer2.close();
-            newPulsarClient.close();
             log.info("-- Exiting {} test --", methodName);
         } catch (Exception e) {
             fail();
@@ -1753,21 +1758,25 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
     public void testPriorityConsumer() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
+        @Cleanup
         PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
         Consumer<byte[]> consumer1 = 
newPulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
                 
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
                 .priorityLevel(1).receiverQueueSize(5).subscribe();
 
+        @Cleanup
         PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
         Consumer<byte[]> consumer2 = 
newPulsarClient1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
                 
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
                 .priorityLevel(1).receiverQueueSize(5).subscribe();
 
+        @Cleanup
         PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
         Consumer<byte[]> consumer3 = 
newPulsarClient2.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
                 
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
                 .priorityLevel(1).receiverQueueSize(5).subscribe();
 
+        @Cleanup
         PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
         Consumer<byte[]> consumer4 = 
newPulsarClient3.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
                 
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
@@ -1814,10 +1823,6 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         consumer2.close();
         consumer3.close();
         consumer4.close();
-        newPulsarClient.close();
-        newPulsarClient1.close();
-        newPulsarClient2.close();
-        newPulsarClient3.close();
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -1845,6 +1850,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
                 
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
                 .receiverQueueSize(queueSize).subscribe();
 
+        @Cleanup
         PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
         Consumer<byte[]> c2 = 
newPulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
                 
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
@@ -1892,16 +1898,19 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         Assert.assertEquals(queueSize * 2, messages.size());
 
         // create new consumers with the same priority
+        @Cleanup
         PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
         Consumer<byte[]> c3 = 
newPulsarClient1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
                 
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
                 .receiverQueueSize(queueSize).subscribe();
 
+        @Cleanup
         PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
         Consumer<byte[]> c4 = 
newPulsarClient2.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
                 
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
                 .receiverQueueSize(queueSize).subscribe();
 
+        @Cleanup
         PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
         Consumer<byte[]> c5 = 
newPulsarClient3.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
                 
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
@@ -1947,10 +1956,6 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
         c3.close();
         c4.close();
         c5.close();
-        newPulsarClient.close();
-        newPulsarClient1.close();
-        newPulsarClient2.close();
-        newPulsarClient3.close();
         
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs);
         log.info("-- Exiting {} test --", methodName);
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index c7aad50..0f32c41 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -50,13 +50,14 @@ import org.testng.annotations.Test;
 public class ConsumerImplTest {
 
 
-    private ExecutorProvider executorProvider = new ExecutorProvider(1, new 
DefaultThreadFactory("ConsumerImplTest"));
+    private ExecutorProvider executorProvider;
     private ConsumerImpl<byte[]> consumer;
     private ConsumerConfigurationData consumerConf;
     private ExecutorService executorService;
 
     @BeforeMethod
     public void setUp() {
+        executorProvider = new ExecutorProvider(1, new 
DefaultThreadFactory("ConsumerImplTest"));
         consumerConf = new ConsumerConfigurationData<>();
         PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
         executorService = Executors.newSingleThreadExecutor();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 228927f..6dbb663 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import io.prometheus.client.CollectorRegistry;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -35,14 +35,12 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.Base64;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Future;
-
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
@@ -61,6 +59,7 @@ public class ProxyServiceStarterTest extends 
MockedPulsarServiceBaseTest {
         
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
         serviceStarter.getConfig().setServicePort(Optional.of(11000));
         serviceStarter.getConfig().setWebSocketServiceEnabled(true);
+        CollectorRegistry.defaultRegistry.clear();
         serviceStarter.start();
     }
 

Reply via email to