This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a1f3322ed35 [fix][test] Fix flaky 
SubscriptionSeekTest.testSeekIsByReceive (#23170)
a1f3322ed35 is described below

commit a1f3322ed358ab6841f0d3e43f2afcc54788b887
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Fri Aug 16 10:40:28 2024 +0300

    [fix][test] Fix flaky SubscriptionSeekTest.testSeekIsByReceive (#23170)
---
 .../broker/service/SubscriptionSeekTest.java       | 37 ++++++++++++++++++----
 1 file changed, 30 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 3fc795a8c3e..582d10294a5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -86,9 +86,11 @@ public class SubscriptionSeekTest extends BrokerTestBase {
     public void testSeek() throws Exception {
         final String topicName = "persistent://prop/use/ns-abc/testSeek";
 
+        @Cleanup
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
 
         // Disable pre-fetch in consumer to track the messages received
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName("my-subscription").receiverQueueSize(0).subscribe();
 
@@ -138,11 +140,13 @@ public class SubscriptionSeekTest extends BrokerTestBase {
 
     @Test
     public void testSeekIsByReceive() throws PulsarClientException {
-        final String topicName = "persistent://prop/use/ns-abc/testSeek";
+        final String topicName = 
"persistent://prop/use/ns-abc/testSeekIsByReceive";
 
+        @Cleanup
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
 
         String subscriptionName = "my-subscription";
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName(subscriptionName)
                 .subscribe();
@@ -164,6 +168,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         final String topicName = 
"persistent://prop/use/ns-abcd/testSeekForBatch";
         String subscriptionName = "my-subscription-batch";
 
+        @Cleanup
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .enableBatching(true)
                 .batchingMaxMessages(3)
@@ -190,6 +195,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         producer.close();
 
 
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
                 .topic(topicName)
                 .subscriptionName(subscriptionName)
@@ -220,6 +226,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         final String topicName = 
"persistent://prop/use/ns-abcd/testSeekForBatchMessageAndSpecifiedBatchIndex";
         String subscriptionName = "my-subscription-batch";
 
+        @Cleanup
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .enableBatching(true)
                 .batchingMaxMessages(3)
@@ -264,6 +271,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
                 .serviceUrl(lookupUrl.toString())
                 .build();
 
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<String> consumer = 
newPulsarClient.newConsumer(Schema.STRING)
                 .topic(topicName)
                 .subscriptionName(subscriptionName)
@@ -300,6 +308,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         final String topicName = 
"persistent://prop/use/ns-abcd/testSeekForBatchByAdmin-" + 
UUID.randomUUID().toString();
         String subscriptionName = "my-subscription-batch";
 
+        @Cleanup
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .enableBatching(true)
                 .batchingMaxMessages(3)
@@ -325,7 +334,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
 
         producer.close();
 
-
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
                 .topic(topicName)
                 .subscriptionName(subscriptionName)
@@ -381,6 +390,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         final String topicName = 
"persistent://prop/use/ns-abc/testConcurrentReset_" + 
System.currentTimeMillis();
         final String subscriptionName = "test-sub-name";
 
+        @Cleanup
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
 
         admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
@@ -430,6 +440,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         final String topicName = 
"persistent://prop/use/ns-abc/testSeekPartitions";
 
         admin.topics().createPartitionedTopic(topicName, 2);
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-subscription").subscribe();
 
@@ -447,9 +458,11 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         long resetTimeInMillis = TimeUnit.SECONDS
                 
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
 
+        @Cleanup
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
 
         // Disable pre-fetch in consumer to track the messages received
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName("my-subscription").receiverQueueSize(0).subscribe();
 
@@ -483,6 +496,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         int msgNum = 20;
         admin.topics().createPartitionedTopic(topicName, partitionNum);
         creatProducerAndSendMsg(topicName, msgNum);
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
                 .newConsumer(Schema.STRING).startMessageIdInclusive()
                 .topic(topicName).subscriptionName("my-sub").subscribe();
@@ -530,6 +544,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         long resetTimeInMillis = TimeUnit.SECONDS
                 
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
         admin.topics().createPartitionedTopic(topicName, partitions);
+        @Cleanup
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
         // Disable pre-fetch in consumer to track the messages received
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
@@ -583,12 +598,14 @@ public class SubscriptionSeekTest extends BrokerTestBase {
     public void 
testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() throws 
Exception {
         final String topicName = 
"persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek";
         // Disable pre-fetch in consumer to track the messages received
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .subscriptionName("my-subscription")
                 .subscribe();
 
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
@@ -615,20 +632,20 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         for (Consumer consumer : consumers) {
             
assertFalse(connectedSinceSet.contains(consumer.getStats().getConnectedSince()));
         }
-        consumer1.close();
-        consumer2.close();
     }
 
     @Test
     public void 
testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek() throws 
Exception {
         final String topicName = 
"persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek";
         // Disable pre-fetch in consumer to track the messages received
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Failover)
                 .subscriptionName("my-subscription")
                 .subscribe();
 
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Failover)
@@ -668,11 +685,13 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         int msgNum = 160;
         admin.topics().createPartitionedTopic(topicName, partitionNum);
         creatProducerAndSendMsg(topicName, msgNum);
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
                 .newConsumer(Schema.STRING).startMessageIdInclusive()
                 .topic(topicName).subscriptionName("my-sub").subscribe();
 
         TopicName partitionedTopic = TopicName.get(topicName);
+        @Cleanup
         Reader<String> reader = pulsarClient.newReader(Schema.STRING)
                 .startMessageId(MessageId.earliest)
                 .topic(partitionedTopic.getPartition(0).toString()).create();
@@ -721,12 +740,11 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         for (MessageId messageId : msgNotIn) {
             assertFalse(received.contains(messageId));
         }
-        reader.close();
-        consumer.close();
     }
 
     private List<MessageId> creatProducerAndSendMsg(String topic, int msgNum) 
throws Exception {
         List<MessageId> messageIds = new ArrayList<>();
+        @Cleanup
         Producer<String> producer = pulsarClient
                 .newProducer(Schema.STRING)
                 .enableBatching(false)
@@ -735,7 +753,6 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         for (int i = 0; i < msgNum; i++) {
             messageIds.add(producer.send("msg" + i));
         }
-        producer.close();
         return messageIds;
     }
 
@@ -756,6 +773,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         MessageId msgIdInTopic2Partition0 = 
admin.topics().getLastMessageId(topic2.getPartition(0).toString());
         MessageId msgIdInTopic2Partition2 = 
admin.topics().getLastMessageId(topic2.getPartition(2).toString());
 
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
                 .newConsumer(Schema.STRING).startMessageIdInclusive()
                 .topics(Arrays.asList(topicName, 
topicName2)).subscriptionName("my-sub").subscribe();
@@ -796,6 +814,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         // Create a pulsar client with a subscription fenced counter.
         ClientBuilderImpl clientBuilder = (ClientBuilderImpl) 
PulsarClient.builder().serviceUrl(lookupUrl.toString());
         AtomicInteger receivedFencedErrorCounter = new AtomicInteger();
+        @Cleanup
         PulsarClient client = 
InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
                 new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
                     protected void handleError(CommandError error) {
@@ -807,10 +826,13 @@ public class SubscriptionSeekTest extends BrokerTestBase {
                 });
 
         // publish some messages.
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<String> consumer = 
client.newConsumer(Schema.STRING)
                 .topic(topicName)
                 .subscriptionName("s1")
                 .subscribe();
+
+        @Cleanup
         Producer<String> producer = client.newProducer(Schema.STRING)
                 .topic(topicName).create();
         MessageIdImpl msgId1 = (MessageIdImpl) producer.send("0");
@@ -850,6 +872,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
     public void testExceptionBySeekFunction() throws Exception {
         final String topicName = "persistent://prop/use/ns-abc/test" + 
UUID.randomUUID();
         creatProducerAndSendMsg(topicName,10);
+        @Cleanup
         org.apache.pulsar.client.api.Consumer consumer = pulsarClient
                 .newConsumer()
                 .topic(topicName).subscriptionName("my-sub").subscribe();

Reply via email to