This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new d5a844f [Branch-2.7] Fix branch-2.7 test. (#11254)
d5a844f is described below
commit d5a844f327b02537196e2901db8b480e25c0043b
Author: congbo <[email protected]>
AuthorDate: Thu Jul 8 19:44:40 2021 +0800
[Branch-2.7] Fix branch-2.7 test. (#11254)
---
.../client/api/KeySharedSubscriptionTest.java | 2 ++
.../apache/pulsar/client/api/TopicReaderTest.java | 15 +++++-----
.../client/api/v1/V1_ProducerConsumerTest.java | 35 ++++++++++++----------
.../pulsar/client/impl/ConsumerImplTest.java | 3 +-
.../proxy/server/ProxyServiceStarterTest.java | 5 ++--
5 files changed, 33 insertions(+), 27 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index d87c257..575bd19 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -46,6 +46,7 @@ import
org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.broker.service.Topic;
import
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
@@ -669,6 +670,7 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
c1.close();
+ ((ConsumerImpl<Integer>)
c2).clearIncomingMessagesAndGetMessageNumber();
// Now C2 will get all messages
for (int i = 0; i < 20; i++) {
Message<Integer> msg = c2.receive();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 68baf34..6c81319 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -1376,24 +1376,23 @@ public class TopicReaderTest extends
ProducerConsumerBase {
final int halfMessages = numOfMessage / 2;
admin.topics().createPartitionedTopic(topicName, 3);
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
-
- long l = System.currentTimeMillis();
+ long halfTime = 0;
for (int i = 0; i < numOfMessage; i++) {
+ if (i == numOfMessage / 2) {
+ halfTime = System.currentTimeMillis();
+ }
producer.send(String.format("msg num %d", i).getBytes());
}
-
+ Assert.assertTrue(halfTime != 0);
Reader<byte[]> reader =
pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
- int plusTime = (halfMessages + 1) * 100;
- reader.seek(l + plusTime);
-
+ reader.seek(halfTime);
Set<String> messageSet = Sets.newHashSet();
for (int i = halfMessages + 1; i < numOfMessage; i++) {
- Message<byte[]> message = reader.readNext();
+ Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
String receivedMessage = new String(message.getData());
Assert.assertTrue(messageSet.add(receivedMessage), "Received
duplicate message " + receivedMessage);
}
-
reader.close();
producer.close();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 2baec56..6a1d5f2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -51,6 +51,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -74,6 +75,7 @@ import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -90,7 +92,7 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(V1_ProducerConsumerTest.class);
private static final long BATCHING_MAX_PUBLISH_DELAY_THRESHOLD = 1;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
@@ -542,7 +544,8 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
ConsumerImpl<byte[]> consumerImpl = (ConsumerImpl<byte[]>) consumer;
// The available permits should be 10 and num messages in the queue
should be 90
- Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads);
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads));
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize -
numConsumersThreads);
barrier.reset();
@@ -560,7 +563,8 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
Thread.sleep(100);
// The available permits should be 20 and num messages in the queue
should be 80
- Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads * 2);
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads * 2));
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize -
(numConsumersThreads * 2));
// clear the queue
@@ -594,7 +598,8 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
Thread.sleep(2000);
// The available permits should be 10 and num messages in the queue
should be 90
- Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads);
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(consumerImpl.getAvailablePermits(),
numConsumersThreads));
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize -
numConsumersThreads);
consumer.close();
@@ -760,6 +765,7 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
log.info(" start receiving messages :");
CountDownLatch latch = new CountDownLatch(totalMsg);
// receive messages
+ @Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(1);
receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor);
@@ -773,7 +779,6 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
producer.close();
consumer.close();
- executor.shutdownNow();
log.info("-- Exiting {} test --", methodName);
}
@@ -804,6 +809,7 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
log.info(" start receiving messages :");
CountDownLatch latch = new CountDownLatch(totalMsg);
// receive messages
+ @Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(1);
receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor);
@@ -817,7 +823,6 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
producer.close();
consumer.close();
- executor.shutdownNow();
log.info("-- Exiting {} test --", methodName);
}
@@ -1131,6 +1136,7 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
.subscriptionType(SubscriptionType.Shared)
.subscribe();
+ @Cleanup
PulsarClient newPulsarClient =
newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/unacked-topic")
@@ -1207,7 +1213,6 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
producer.close();
consumer1.close();
consumer2.close();
- newPulsarClient.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
@@ -1753,21 +1758,25 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
public void testPriorityConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
+ @Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
Consumer<byte[]> consumer1 =
newPulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.priorityLevel(1).receiverQueueSize(5).subscribe();
+ @Cleanup
PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
Consumer<byte[]> consumer2 =
newPulsarClient1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.priorityLevel(1).receiverQueueSize(5).subscribe();
+ @Cleanup
PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
Consumer<byte[]> consumer3 =
newPulsarClient2.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.priorityLevel(1).receiverQueueSize(5).subscribe();
+ @Cleanup
PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
Consumer<byte[]> consumer4 =
newPulsarClient3.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
@@ -1814,10 +1823,6 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
consumer2.close();
consumer3.close();
consumer4.close();
- newPulsarClient.close();
- newPulsarClient1.close();
- newPulsarClient2.close();
- newPulsarClient3.close();
log.info("-- Exiting {} test --", methodName);
}
@@ -1845,6 +1850,7 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(queueSize).subscribe();
+ @Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
Consumer<byte[]> c2 =
newPulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
@@ -1892,16 +1898,19 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
Assert.assertEquals(queueSize * 2, messages.size());
// create new consumers with the same priority
+ @Cleanup
PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
Consumer<byte[]> c3 =
newPulsarClient1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(queueSize).subscribe();
+ @Cleanup
PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
Consumer<byte[]> c4 =
newPulsarClient2.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(queueSize).subscribe();
+ @Cleanup
PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
Consumer<byte[]> c5 =
newPulsarClient3.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
@@ -1947,10 +1956,6 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
c3.close();
c4.close();
c5.close();
- newPulsarClient.close();
- newPulsarClient1.close();
- newPulsarClient2.close();
- newPulsarClient3.close();
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs);
log.info("-- Exiting {} test --", methodName);
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index c7aad50..0f32c41 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -50,13 +50,14 @@ import org.testng.annotations.Test;
public class ConsumerImplTest {
- private ExecutorProvider executorProvider = new ExecutorProvider(1, new
DefaultThreadFactory("ConsumerImplTest"));
+ private ExecutorProvider executorProvider;
private ConsumerImpl<byte[]> consumer;
private ConsumerConfigurationData consumerConf;
private ExecutorService executorService;
@BeforeMethod
public void setUp() {
+ executorProvider = new ExecutorProvider(1, new
DefaultThreadFactory("ConsumerImplTest"));
consumerConf = new ConsumerConfigurationData<>();
PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
executorService = Executors.newSingleThreadExecutor();
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 228927f..6dbb663 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -18,9 +18,9 @@
*/
package org.apache.pulsar.proxy.server;
+import io.prometheus.client.CollectorRegistry;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -35,14 +35,12 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
-
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -61,6 +59,7 @@ public class ProxyServiceStarterTest extends
MockedPulsarServiceBaseTest {
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
serviceStarter.getConfig().setServicePort(Optional.of(11000));
serviceStarter.getConfig().setWebSocketServiceEnabled(true);
+ CollectorRegistry.defaultRegistry.clear();
serviceStarter.start();
}