This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c883f50e117 [fix][test] Fix flaky
CompactionTest.testDispatcherMaxReadSizeBytes (#21329)
c883f50e117 is described below
commit c883f50e117ff9da310c879aa048993c28ea955a
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Oct 9 17:48:38 2023 +0300
[fix][test] Fix flaky CompactionTest.testDispatcherMaxReadSizeBytes (#21329)
---
.../java/org/apache/pulsar/compaction/CompactionTest.java | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 4c6db644f1e..52837cbdcd5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1898,22 +1898,21 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
admin.topics().unload(topicName);
- ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
client.newConsumer(Schema.BYTES)
-
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
- .subscribe();
-
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentTopic topic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, true, Map.of()).get().get();
TopicCompactionService topicCompactionService =
Mockito.spy(topic.getTopicCompactionService());
FieldUtils.writeDeclaredField(topic, "topicCompactionService",
topicCompactionService, true);
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
client.newConsumer(Schema.BYTES)
+
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+ .subscribe();
+
Awaitility.await().untilAsserted(() -> {
assertEquals(consumer.getStats().getMsgNumInReceiverQueue(),
1);
});
- consumer.increaseAvailablePermits(2);
-
Mockito.verify(topicCompactionService,
Mockito.times(1)).readCompactedEntries(Mockito.any(), Mockito.same(1));
consumer.close();