This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a0f4ba20ef [fix][test] Fix invalid test 
CompactionTest.testDeleteCompactedLedgerWithSlowAck (#24166)
1a0f4ba20ef is described below

commit 1a0f4ba20ef30b07b98aa63dc421c484bc078fb2
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Apr 10 13:22:12 2025 +0300

    [fix][test] Fix invalid test 
CompactionTest.testDeleteCompactedLedgerWithSlowAck (#24166)
---
 .../apache/pulsar/compaction/CompactionTest.java   | 508 +++++++++++----------
 1 file changed, 260 insertions(+), 248 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index d75ccce7ff3..ff9026dbba6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -18,15 +18,19 @@
  */
 package org.apache.pulsar.compaction;
 
+import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocations;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
@@ -43,11 +47,11 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
@@ -62,7 +66,6 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.Topic;
@@ -98,7 +101,6 @@ import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -116,10 +118,12 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     public void setup() throws Exception {
         super.internalSetup();
 
-        admin.clusters().createCluster("use", 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
-        admin.tenants().createTenant("my-property",
-                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+        admin.clusters().createCluster(configClusterName,
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+
+        admin.tenants().createTenant("my-tenant",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Set.of(configClusterName)));
+        admin.namespaces().createNamespace("my-tenant/my-ns");
 
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
@@ -153,7 +157,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompaction() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
         final int numMessages = 20;
         final int maxKeys = 10;
 
@@ -182,21 +186,21 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
         PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topic, false);
         // Compacted topic ledger should have same number of entry equals to 
number of unique key.
-        Assert.assertEquals(expected.size(), 
internalStats.compactedLedger.entries);
-        Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
-        Assert.assertFalse(internalStats.compactedLedger.offloaded);
+        assertEquals(expected.size(), internalStats.compactedLedger.entries);
+        assertTrue(internalStats.compactedLedger.ledgerId > -1);
+        assertFalse(internalStats.compactedLedger.offloaded);
 
         // consumer with readCompacted enabled only get compacted entries
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
             while (true) {
                 Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
-                Assert.assertEquals(expected.remove(m.getKey()), m.getData());
+                assertEquals(expected.remove(m.getKey()), m.getData());
                 if (expected.isEmpty()) {
                     break;
                 }
             }
-            Assert.assertTrue(expected.isEmpty());
+            assertTrue(expected.isEmpty());
         }
 
         // can get full backlog if read compacted disabled
@@ -205,24 +209,24 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             while (true) {
                 Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
                 Pair<String, byte[]> expectedMessage = all.remove(0);
-                Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
-                Assert.assertEquals(expectedMessage.getRight(), m.getData());
+                assertEquals(expectedMessage.getLeft(), m.getKey());
+                assertEquals(expectedMessage.getRight(), m.getData());
                 if (all.isEmpty()) {
                     break;
                 }
             }
-            Assert.assertTrue(all.isEmpty());
+            assertTrue(all.isEmpty());
         }
     }
 
     @Test
     public void testCompactionWithReader() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
         final int numMessages = 20;
         final int maxKeys = 10;
 
         // Configure retention to ensue data is retained for reader
-        admin.namespaces().setRetention("my-property/use/my-ns", new 
RetentionPolicies(-1, -1));
+        admin.namespaces().setRetention("my-tenant/my-ns", new 
RetentionPolicies(-1, -1));
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -251,12 +255,12 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .startMessageId(MessageId.earliest).create()) {
             while (true) {
                 Message<byte[]> m = reader.readNext(2, TimeUnit.SECONDS);
-                Assert.assertEquals(expected.remove(m.getKey()), new 
String(m.getData()));
+                assertEquals(expected.remove(m.getKey()), new 
String(m.getData()));
                 if (expected.isEmpty()) {
                     break;
                 }
             }
-            Assert.assertTrue(expected.isEmpty());
+            assertTrue(expected.isEmpty());
         }
 
         // can get full backlog if read compacted disabled
@@ -265,20 +269,20 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             while (true) {
                 Message<byte[]> m = reader.readNext(2, TimeUnit.SECONDS);
                 Pair<String, String> expectedMessage = all.remove(0);
-                Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
-                Assert.assertEquals(expectedMessage.getRight(), new 
String(m.getData()));
+                assertEquals(expectedMessage.getLeft(), m.getKey());
+                assertEquals(expectedMessage.getRight(), new 
String(m.getData()));
                 if (all.isEmpty()) {
                     break;
                 }
             }
-            Assert.assertTrue(all.isEmpty());
+            assertTrue(all.isEmpty());
         }
     }
 
 
     @Test
     public void testReadCompactedBeforeCompaction() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -294,16 +298,16 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
             Message<byte[]> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content0".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content0".getBytes());
 
             m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content1".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content1".getBytes());
 
             m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content2".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content2".getBytes());
         }
 
         compact(topic);
@@ -311,14 +315,14 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
             Message<byte[]> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content2".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content2".getBytes());
         }
     }
 
     @Test
     public void testReadEntriesAfterCompaction() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -338,18 +342,18 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
             Message<byte[]> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content2".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content2".getBytes());
 
             m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content3".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content3".getBytes());
         }
     }
 
     @Test
     public void testSeekEarliestAfterCompaction() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -366,8 +370,8 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .readCompacted(true).subscribe()) {
             consumer.seek(MessageId.earliest);
             Message<byte[]> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content2".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content2".getBytes());
         }
 
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -375,22 +379,22 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             consumer.seek(MessageId.earliest);
 
             Message<byte[]> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content0".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content0".getBytes());
 
             m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content1".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content1".getBytes());
 
             m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content2".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content2".getBytes());
         }
     }
 
     @Test
     public void testBrokerRestartAfterCompaction() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -408,15 +412,15 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
             Message<byte[]> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content2".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content2".getBytes());
         }
 
         stopBroker();
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
             consumer.receive();
-            Assert.fail("Shouldn't have been able to receive anything");
+            fail("Shouldn't have been able to receive anything");
         } catch (PulsarClientException e) {
             // correct behaviour
         }
@@ -425,14 +429,14 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
             Message<byte[]> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content2".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content2".getBytes());
         }
     }
 
     @Test
     public void testCompactEmptyTopic() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -448,14 +452,14 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
             Message<byte[]> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), "key0");
-            Assert.assertEquals(m.getData(), "content0".getBytes());
+            assertEquals(m.getKey(), "key0");
+            assertEquals(m.getData(), "content0".getBytes());
         }
     }
 
     @Test
     public void testFirstMessageRetained() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -483,20 +487,20 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key1");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
-            Assert.assertEquals(message1.getMessageId(), 
messages.get(0).getMessageId());
+            assertEquals(message1.getKey(), "key1");
+            assertEquals(new String(message1.getData()), "my-message-1");
+            assertEquals(message1.getMessageId(), 
messages.get(0).getMessageId());
 
             Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key2");
-            Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
-            Assert.assertEquals(message2.getMessageId(), 
messages.get(2).getMessageId());
+            assertEquals(message2.getKey(), "key2");
+            assertEquals(new String(message2.getData()), "my-message-3");
+            assertEquals(message2.getMessageId(), 
messages.get(2).getMessageId());
         }
     }
 
     @Test
     public void testBatchMessageIdsDontChange() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -525,13 +529,13 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
 
         // Ensure all messages are in same batch
-        
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
+        
assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
                             
((BatchMessageIdImpl)messages.get(1).getMessageId()).getLedgerId());
-        
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
+        
assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
                             
((BatchMessageIdImpl)messages.get(2).getMessageId()).getLedgerId());
-        
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
+        
assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
                             
((BatchMessageIdImpl)messages.get(1).getMessageId()).getEntryId());
-        
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
+        
assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
                             
((BatchMessageIdImpl)messages.get(2).getMessageId()).getEntryId());
 
         // compact the topic
@@ -541,25 +545,25 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key1");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+            assertEquals(message1.getKey(), "key1");
+            assertEquals(new String(message1.getData()), "my-message-1");
 
             Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key2");
-            Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
+            assertEquals(message2.getKey(), "key2");
+            assertEquals(new String(message2.getData()), "my-message-3");
             if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
-                Assert.assertEquals(message1.getMessageId(), 
messages.get(0).getMessageId());
-                Assert.assertEquals(message2.getMessageId(), 
messages.get(1).getMessageId());
+                assertEquals(message1.getMessageId(), 
messages.get(0).getMessageId());
+                assertEquals(message2.getMessageId(), 
messages.get(1).getMessageId());
             } else {
-                Assert.assertEquals(message1.getMessageId(), 
messages.get(0).getMessageId());
-                Assert.assertEquals(message2.getMessageId(), 
messages.get(2).getMessageId());
+                assertEquals(message1.getMessageId(), 
messages.get(0).getMessageId());
+                assertEquals(message2.getMessageId(), 
messages.get(2).getMessageId());
             }
         }
     }
 
     @Test
     public void testBatchMessageWithNullValue() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .receiverQueueSize(1).readCompacted(true).subscribe().close();
@@ -613,7 +617,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testWholeBatchCompactedOut() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -642,8 +646,8 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
             Message<byte[]> message = consumer.receive();
-            Assert.assertEquals(message.getKey(), "key1");
-            Assert.assertEquals(new String(message.getData()), "my-message-4");
+            assertEquals(message.getKey(), "key1");
+            assertEquals(new String(message.getData()), "my-message-4");
         }
     }
 
@@ -658,7 +662,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         restartBroker();
         FieldUtils.writeField(compactor, "topicCompactionRetainNullKey", 
retainNullKey, true);
 
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -688,12 +692,12 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
             if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
                 Message<byte[]> message3 = consumer.receive();
-                Assert.assertEquals(message3.getKey(), "key1");
-                Assert.assertEquals(new String(message3.getData()), 
"my-message-4");
+                assertEquals(message3.getKey(), "key1");
+                assertEquals(new String(message3.getData()), "my-message-4");
 
                 Message<byte[]> message4 = consumer.receive();
-                Assert.assertEquals(message4.getKey(), "key2");
-                Assert.assertEquals(new String(message4.getData()), 
"my-message-6");
+                assertEquals(message4.getKey(), "key2");
+                assertEquals(new String(message4.getData()), "my-message-6");
 
                 Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
                 assertNull(m);
@@ -716,7 +720,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 } else {
                     expectList = List.of(Pair.of("key1", "my-message-4"), 
Pair.of("key2", "my-message-6"));
                 }
-                Assert.assertEquals(result, expectList);
+                assertEquals(result, expectList);
             }
         }
     }
@@ -724,7 +728,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testEmptyPayloadDeletes() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -795,18 +799,18 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key0");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-0");
+            assertEquals(message1.getKey(), "key0");
+            assertEquals(new String(message1.getData()), "my-message-0");
 
             Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key4");
-            Assert.assertEquals(new String(message2.getData()), 
"my-message-4");
+            assertEquals(message2.getKey(), "key4");
+            assertEquals(new String(message2.getData()), "my-message-4");
         }
     }
 
     @Test
     public void testEmptyPayloadDeletesWhenCompressed() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -874,12 +878,12 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key0");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-0");
+            assertEquals(message1.getKey(), "key0");
+            assertEquals(new String(message1.getData()), "my-message-0");
 
             Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key4");
-            Assert.assertEquals(new String(message2.getData()), 
"my-message-4");
+            assertEquals(message2.getKey(), "key4");
+            assertEquals(new String(message2.getData()), "my-message-4");
         }
     }
 
@@ -887,13 +891,13 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactorReadsCompacted() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // capture opened ledgers
         Set<Long> ledgersOpened = Sets.newConcurrentHashSet();
         
when(pulsarTestContext.getBookKeeperClient().newOpenLedgerOp()).thenAnswer(
                 (invocation) -> {
-                    OpenBuilder builder = 
(OpenBuilder)spy(invocation.callRealMethod());
+                    OpenBuilder builder = 
(OpenBuilder)spyWithoutRecordingInvocations(invocation.callRealMethod());
                     when(builder.withLedgerId(anyLong())).thenAnswer(
                             (invocation2) -> {
                                 
ledgersOpened.add((Long)invocation2.getArguments()[0]);
@@ -928,15 +932,15 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         String managedLedgerName = 
((PersistentTopic)pulsar.getBrokerService().getTopicReference(topic).get())
             .getManagedLedger().getName();
         ManagedLedgerInfo info = 
pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
-        Assert.assertEquals(info.ledgers.size(), 2);
-        Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have 
been opened
+        assertEquals(info.ledgers.size(), 2);
+        assertTrue(ledgersOpened.isEmpty()); // no ledgers should have been 
opened
 
         // compact the topic
         compact(topic);
 
         // should have opened all except last to read
-        
Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
-        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+        assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+        assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
         ledgersOpened.clear();
 
         // force broker to close resources for topic
@@ -951,12 +955,12 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
 
         info = 
pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
-        Assert.assertEquals(info.ledgers.size(), 3);
+        assertEquals(info.ledgers.size(), 3);
 
         // should only have opened the penultimate ledger to get stat
-        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
-        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
-        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
+        assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+        assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+        assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
         ledgersOpened.clear();
 
         // compact the topic again
@@ -964,30 +968,30 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
         // shouldn't have opened first ledger (already compacted), penultimate 
would have some uncompacted data.
         // last ledger already open for writing
-        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
-        
Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
-        
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
+        assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+        assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+        assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
 
         // all three messages should be there when we read compacted
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key0");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-0");
+            assertEquals(message1.getKey(), "key0");
+            assertEquals(new String(message1.getData()), "my-message-0");
 
             Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key1");
-            Assert.assertEquals(new String(message2.getData()), 
"my-message-1");
+            assertEquals(message2.getKey(), "key1");
+            assertEquals(new String(message2.getData()), "my-message-1");
 
             Message<byte[]> message3 = consumer.receive();
-            Assert.assertEquals(message3.getKey(), "key2");
-            Assert.assertEquals(new String(message3.getData()), 
"my-message-2");
+            assertEquals(message3.getKey(), "key2");
+            assertEquals(new String(message3.getData()), "my-message-2");
         }
     }
 
     @Test
     public void testCompactCompressedNoBatch() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1015,18 +1019,18 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key1");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+            assertEquals(message1.getKey(), "key1");
+            assertEquals(new String(message1.getData()), "my-message-1");
 
             Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key2");
-            Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
+            assertEquals(message2.getKey(), "key2");
+            assertEquals(new String(message2.getData()), "my-message-3");
         }
     }
 
     @Test
     public void testCompactCompressedBatching() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1058,12 +1062,12 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key1");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+            assertEquals(message1.getKey(), "key1");
+            assertEquals(new String(message1.getData()), "my-message-1");
 
             Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key2");
-            Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
+            assertEquals(message2.getKey(), "key2");
+            assertEquals(new String(message2.getData()), "my-message-3");
         }
     }
 
@@ -1078,10 +1082,10 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                     
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
                     return keyInfo;
                 } catch (IOException e) {
-                    Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    fail("Failed to read certificate from " + CERT_FILE_PATH);
                 }
             } else {
-                Assert.fail("Certificate file " + CERT_FILE_PATH + " is not 
present or not readable.");
+                fail("Certificate file " + CERT_FILE_PATH + " is not present 
or not readable.");
             }
             return null;
         }
@@ -1094,10 +1098,10 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                     
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
                     return keyInfo;
                 } catch (IOException e) {
-                    Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    fail("Failed to read certificate from " + CERT_FILE_PATH);
                 }
             } else {
-                Assert.fail("Certificate file " + CERT_FILE_PATH + " is not 
present or not readable.");
+                fail("Certificate file " + CERT_FILE_PATH + " is not present 
or not readable.");
             }
             return null;
         }
@@ -1105,7 +1109,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactEncryptedNoBatch() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1136,18 +1140,18 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .subscriptionName("sub1").cryptoKeyReader(new EncKeyReader())
                 .readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key1");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+            assertEquals(message1.getKey(), "key1");
+            assertEquals(new String(message1.getData()), "my-message-1");
 
             Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key2");
-            Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
+            assertEquals(message2.getKey(), "key2");
+            assertEquals(new String(message2.getData()), "my-message-3");
         }
     }
 
     @Test
     public void testCompactEncryptedBatching() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1180,30 +1184,30 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .subscriptionName("sub1").cryptoKeyReader(new EncKeyReader())
                 .readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key1");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+            assertEquals(message1.getKey(), "key1");
+            assertEquals(new String(message1.getData()), "my-message-1");
 
             if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
                 Message<byte[]> message3 = consumer.receive();
-                Assert.assertEquals(message3.getKey(), "key2");
-                Assert.assertEquals(new String(message3.getData()), 
"my-message-3");
+                assertEquals(message3.getKey(), "key2");
+                assertEquals(new String(message3.getData()), "my-message-3");
             } else {
                 // with encryption, all messages are passed through compaction 
as it doesn't
                 // have the keys to decrypt the batch payload
                 Message<byte[]> message2 = consumer.receive();
-                Assert.assertEquals(message2.getKey(), "key2");
-                Assert.assertEquals(new String(message2.getData()), 
"my-message-2");
+                assertEquals(message2.getKey(), "key2");
+                assertEquals(new String(message2.getData()), "my-message-2");
 
                 Message<byte[]> message3 = consumer.receive();
-                Assert.assertEquals(message3.getKey(), "key2");
-                Assert.assertEquals(new String(message3.getData()), 
"my-message-3");
+                assertEquals(message3.getKey(), "key2");
+                assertEquals(new String(message3.getData()), "my-message-3");
             }
         }
     }
 
     @Test
     public void testCompactEncryptedAndCompressedNoBatch() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1235,18 +1239,18 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .subscriptionName("sub1").cryptoKeyReader(new EncKeyReader())
                 .readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key1");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+            assertEquals(message1.getKey(), "key1");
+            assertEquals(new String(message1.getData()), "my-message-1");
 
             Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key2");
-            Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
+            assertEquals(message2.getKey(), "key2");
+            assertEquals(new String(message2.getData()), "my-message-3");
         }
     }
 
     @Test
     public void testCompactEncryptedAndCompressedBatching() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1282,29 +1286,29 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .subscriptionName("sub1").cryptoKeyReader(new EncKeyReader())
                 .readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key1");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+            assertEquals(message1.getKey(), "key1");
+            assertEquals(new String(message1.getData()), "my-message-1");
 
 
             if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
                 Message<byte[]> message3 = consumer.receive();
-                Assert.assertEquals(message3.getKey(), "key2");
-                Assert.assertEquals(new String(message3.getData()), 
"my-message-3");
+                assertEquals(message3.getKey(), "key2");
+                assertEquals(new String(message3.getData()), "my-message-3");
             } else {
                 Message<byte[]> message2 = consumer.receive();
-                Assert.assertEquals(message2.getKey(), "key2");
-                Assert.assertEquals(new String(message2.getData()), 
"my-message-2");
+                assertEquals(message2.getKey(), "key2");
+                assertEquals(new String(message2.getData()), "my-message-2");
 
                 Message<byte[]> message3 = consumer.receive();
-                Assert.assertEquals(message3.getKey(), "key2");
-                Assert.assertEquals(new String(message3.getData()), 
"my-message-3");
+                assertEquals(message3.getKey(), "key2");
+                assertEquals(new String(message3.getData()), "my-message-3");
             }
         }
     }
 
     @Test
     public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1361,34 +1365,34 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .cryptoKeyReader(new EncKeyReader())
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
             Message<byte[]> message1 = consumer.receive();
-            Assert.assertEquals(message1.getKey(), "key0");
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-0");
+            assertEquals(message1.getKey(), "key0");
+            assertEquals(new String(message1.getData()), "my-message-0");
 
             if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
                 Message<byte[]> message3 = consumer.receive();
-                Assert.assertEquals(message3.getKey(), "key3");
-                Assert.assertEquals(new String(message3.getData()), 
"my-message-3");
+                assertEquals(message3.getKey(), "key3");
+                assertEquals(new String(message3.getData()), "my-message-3");
 
                 Message<byte[]> message5 = consumer.receive();
-                Assert.assertEquals(message5.getKey(), "key4");
-                Assert.assertEquals(new String(message5.getData()), 
"my-message-4");
+                assertEquals(message5.getKey(), "key4");
+                assertEquals(new String(message5.getData()), "my-message-4");
             } else {
                 // see all messages from batch
                 Message<byte[]> message2 = consumer.receive();
-                Assert.assertEquals(message2.getKey(), "key2");
-                Assert.assertEquals(new String(message2.getData()), 
"my-message-2");
+                assertEquals(message2.getKey(), "key2");
+                assertEquals(new String(message2.getData()), "my-message-2");
 
                 Message<byte[]> message3 = consumer.receive();
-                Assert.assertEquals(message3.getKey(), "key3");
-                Assert.assertEquals(new String(message3.getData()), 
"my-message-3");
+                assertEquals(message3.getKey(), "key3");
+                assertEquals(new String(message3.getData()), "my-message-3");
 
                 Message<byte[]> message4 = consumer.receive();
-                Assert.assertEquals(message4.getKey(), "key2");
+                assertEquals(message4.getKey(), "key2");
                 assertNull(message4.getData());
 
                 Message<byte[]> message5 = consumer.receive();
-                Assert.assertEquals(message5.getKey(), "key4");
-                Assert.assertEquals(new String(message5.getData()), 
"my-message-4");
+                assertEquals(message5.getKey(), "key4");
+                assertEquals(new String(message5.getData()), "my-message-4");
             }
         }
     }
@@ -1400,7 +1404,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
     public void testCompactionWithLastDeletedKey(boolean batching) throws 
Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(batching)
                 
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
@@ -1426,7 +1430,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
     public void testEmptyCompactionLedger(boolean batching) throws Exception {
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/my-topic1";
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(batching)
                 
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
@@ -1450,7 +1454,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
     public void testAllEmptyCompactionLedger(boolean batchEnabled) throws 
Exception {
-        final String topic = 
"persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + 
UUID.randomUUID().toString();
+        final String topic = 
"persistent://my-tenant/my-ns/testAllEmptyCompactionLedger" + 
UUID.randomUUID().toString();
 
         final int messages = 10;
 
@@ -1484,7 +1488,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000)
     public void testBatchAndNonBatchWithoutEmptyPayload() throws 
PulsarClientException, ExecutionException, InterruptedException {
-        final String topic = 
"persistent://my-property/use/my-ns/testBatchAndNonBatchWithoutEmptyPayload" + 
UUID.randomUUID().toString();
+        final String topic = 
"persistent://my-tenant/my-ns/testBatchAndNonBatchWithoutEmptyPayload" + 
UUID.randomUUID().toString();
 
         // 1.create producer and publish message to the topic.
         Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1535,7 +1539,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     }
     @Test(timeOut = 20000)
     public void testBatchAndNonBatchWithEmptyPayload() throws 
PulsarClientException, ExecutionException, InterruptedException {
-        final String topic = 
"persistent://my-property/use/my-ns/testBatchAndNonBatchWithEmptyPayload" + 
UUID.randomUUID().toString();
+        final String topic = 
"persistent://my-tenant/my-ns/testBatchAndNonBatchWithEmptyPayload" + 
UUID.randomUUID().toString();
 
         // 1.create producer and publish message to the topic.
         Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1591,7 +1595,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000)
     public void testBatchAndNonBatchEndOfEmptyPayload() throws 
PulsarClientException, ExecutionException, InterruptedException {
-        final String topic = 
"persistent://my-property/use/my-ns/testBatchAndNonBatchWithEmptyPayload" + 
UUID.randomUUID().toString();
+        final String topic = 
"persistent://my-tenant/my-ns/testBatchAndNonBatchWithEmptyPayload" + 
UUID.randomUUID().toString();
 
         // 1.create producer and publish message to the topic.
         Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1640,7 +1644,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
     public void testCompactMultipleTimesWithoutEmptyMessage(boolean 
batchEnabled) throws PulsarClientException, ExecutionException, 
InterruptedException {
-        final String topic = 
"persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage"
 + UUID.randomUUID().toString();
+        final String topic = 
"persistent://my-tenant/my-ns/testCompactMultipleTimesWithoutEmptyMessage" + 
UUID.randomUUID().toString();
 
         final int messages = 10;
         final String key = "1";
@@ -1688,7 +1692,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 2000000, dataProvider = "lastDeletedBatching")
     public void testReadUnCompacted(boolean batchEnabled) throws 
PulsarClientException, ExecutionException, InterruptedException {
-        final String topic = 
"persistent://my-property/use/my-ns/testReadUnCompacted" + 
UUID.randomUUID().toString();
+        final String topic = 
"persistent://my-tenant/my-ns/testReadUnCompacted" + 
UUID.randomUUID().toString();
 
         final int messages = 10;
         final String key = "1";
@@ -1794,9 +1798,9 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 60000)
     public void testCompactionWithMarker() throws Exception {
-        String namespace = "my-property/use/my-ns";
+        String namespace = "my-tenant/my-ns";
         final TopicName dest = TopicName.get(
-                BrokerTestUtil.newUniqueName("persistent://" + namespace + 
"/testWriteMarker"));
+                newUniqueName("persistent://" + namespace + 
"/testWriteMarker"));
         admin.topics().createNonPartitionedTopic(dest.toString());
         @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
@@ -1814,7 +1818,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .create();
         producer.send("msg-1".getBytes(StandardCharsets.UTF_8));
         Optional<Topic> topic = 
pulsar.getBrokerService().getTopic(dest.toString(), true).join();
-        Assert.assertTrue(topic.isPresent());
+        assertTrue(topic.isPresent());
         PersistentTopic persistentTopic = (PersistentTopic) topic.get();
         Random random = new Random();
         for (int i = 0; i < 100; i++) {
@@ -1847,13 +1851,13 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .pollInterval(1, TimeUnit.SECONDS)
                 .untilAsserted(() -> {
                     long ledgerId = 
admin.topics().getInternalStats(dest.toString()).compactedLedger.ledgerId;
-                    Assert.assertNotEquals(ledgerId, -1L);
+                    assertNotEquals(ledgerId, -1L);
                 });
     }
 
     @Test(timeOut = 100000)
     public void testReceiverQueueSize() throws Exception {
-        final String topicName = 
"persistent://my-property/use/my-ns/testReceiverQueueSize" + UUID.randomUUID();
+        final String topicName = 
newUniqueName("persistent://my-tenant/my-ns/testReceiverQueueSize");
         final String subName = "my-sub";
         final int receiveQueueSize = 1;
         @Cleanup
@@ -1880,7 +1884,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
         //Give some time to consume
         Awaitility.await()
-                .untilAsserted(() -> 
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
+                .untilAsserted(() -> 
assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
                         receiveQueueSize));
         consumer.close();
         producer.close();
@@ -1889,7 +1893,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testDispatcherMaxReadSizeBytes() throws Exception {
         final String topicName =
-                
"persistent://my-property/use/my-ns/testDispatcherMaxReadSizeBytes" + 
UUID.randomUUID();
+                
newUniqueName("persistent://my-tenant/my-ns/testDispatcherMaxReadSizeBytes");
         final String subName = "my-sub";
         final int receiveQueueSize = 1;
         @Cleanup
@@ -1914,7 +1918,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
         PersistentTopic topic =
                 (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, true, Map.of()).get().get();
-        TopicCompactionService topicCompactionService = 
Mockito.spy(topic.getTopicCompactionService());
+        TopicCompactionService topicCompactionService = 
spy(topic.getTopicCompactionService());
         FieldUtils.writeDeclaredField(topic, "topicCompactionService", 
topicCompactionService, true);
 
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
client.newConsumer(Schema.BYTES)
@@ -1935,7 +1939,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactionDuplicate() throws Exception {
-        String topic = 
"persistent://my-property/use/my-ns/testCompactionDuplicate";
+        String topic = "persistent://my-tenant/my-ns/testCompactionDuplicate";
         final int numMessages = 1000;
         final int maxKeys = 800;
 
@@ -1976,7 +1980,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         Thread.sleep(500);
 
         Optional<Topic> previousTopicRef = 
pulsar.getBrokerService().getTopicIfExists(topic).get();
-        Assert.assertTrue(previousTopicRef.isPresent());
+        assertTrue(previousTopicRef.isPresent());
         PersistentTopic previousPersistentTopic = (PersistentTopic) 
previousTopicRef.get();
 
         // Unload topic make reader of compaction reconnect
@@ -1986,7 +1990,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             LongRunningProcessStatus previousLongRunningProcessStatus = 
previousPersistentTopic.compactionStatus();
 
             Optional<Topic> currentTopicReference = 
pulsar.getBrokerService().getTopicReference(topic);
-            Assert.assertTrue(currentTopicReference.isPresent());
+            assertTrue(currentTopicReference.isPresent());
             PersistentTopic currentPersistentTopic = (PersistentTopic) 
currentTopicReference.get();
             LongRunningProcessStatus currentLongRunningProcessStatus = 
currentPersistentTopic.compactionStatus();
 
@@ -1995,18 +1999,18 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                     || currentLongRunningProcessStatus.status == 
LongRunningProcessStatus.Status.ERROR)) {
                 // trigger compaction again
                 admin.topics().triggerCompaction(topic);
-                Assert.assertEquals(currentLongRunningProcessStatus.status, 
LongRunningProcessStatus.Status.SUCCESS);
+                assertEquals(currentLongRunningProcessStatus.status, 
LongRunningProcessStatus.Status.SUCCESS);
             } else if (previousLongRunningProcessStatus.status == 
LongRunningProcessStatus.Status.RUNNING) {
-                Assert.assertEquals(previousLongRunningProcessStatus.status, 
LongRunningProcessStatus.Status.SUCCESS);
+                assertEquals(previousLongRunningProcessStatus.status, 
LongRunningProcessStatus.Status.SUCCESS);
             }
         });
 
         Awaitility.await().untilAsserted(() -> {
             PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topic, false);
             // Compacted topic ledger should have same number of entry equals 
to number of unique key.
-            Assert.assertEquals(internalStats.compactedLedger.entries, 
expected.size());
-            Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
-            Assert.assertFalse(internalStats.compactedLedger.offloaded);
+            assertEquals(internalStats.compactedLedger.entries, 
expected.size());
+            assertTrue(internalStats.compactedLedger.ledgerId > -1);
+            assertFalse(internalStats.compactedLedger.offloaded);
         });
 
         // consumer with readCompacted enabled only get compacted entries
@@ -2014,7 +2018,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .readCompacted(true).subscribe()) {
             while (true) {
                 Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
-                Assert.assertEquals(expected.remove(m.getKey()), m.getData());
+                assertEquals(expected.remove(m.getKey()), m.getData());
                 if (expected.isEmpty()) {
                     break;
                 }
@@ -2024,7 +2028,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testDeleteCompactedLedger() throws Exception {
-        String topicName = 
"persistent://my-property/use/my-ns/testDeleteCompactedLedger";
+        String topicName = 
"persistent://my-tenant/my-ns/testDeleteCompactedLedger";
 
         final String subName = "my-sub";
         @Cleanup
@@ -2043,9 +2047,9 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         MutableLong compactedLedgerId = new MutableLong(-1);
         Awaitility.await().untilAsserted(() -> {
             PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topicName);
-            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+            assertNotEquals(stats.compactedLedger.ledgerId, -1L);
             compactedLedgerId.setValue(stats.compactedLedger.ledgerId);
-            Assert.assertEquals(stats.compactedLedger.entries, 2L);
+            assertEquals(stats.compactedLedger.entries, 2L);
         });
 
         // delete compacted ledger
@@ -2053,8 +2057,8 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
         Awaitility.await().untilAsserted(() -> {
             PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topicName);
-            Assert.assertEquals(stats.compactedLedger.ledgerId, -1L);
-            Assert.assertEquals(stats.compactedLedger.entries, -1L);
+            assertEquals(stats.compactedLedger.ledgerId, -1L);
+            assertEquals(stats.compactedLedger.entries, -1L);
             assertThrows(BKException.BKNoSuchLedgerExistsException.class, () 
-> pulsarTestContext.getBookKeeperClient()
                         .openLedger(compactedLedgerId.getValue(), 
BookKeeper.DigestType.CRC32C, new byte[]{}));
         });
@@ -2064,9 +2068,9 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         MutableLong compactedLedgerId2 = new MutableLong(-1);
         Awaitility.await().untilAsserted(() -> {
             PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topicName);
-            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+            assertNotEquals(stats.compactedLedger.ledgerId, -1L);
             compactedLedgerId2.setValue(stats.compactedLedger.ledgerId);
-            Assert.assertEquals(stats.compactedLedger.entries, 2L);
+            assertEquals(stats.compactedLedger.entries, 2L);
         });
 
         producer.close();
@@ -2077,77 +2081,85 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                         compactedLedgerId2.getValue(), 
BookKeeper.DigestType.CRC32, new byte[]{})));
     }
 
-    @Test
+    @Test(timeOut = 10000)
     public void testDeleteCompactedLedgerWithSlowAck() throws Exception {
-        // Disable topic level policies, since block ack thread may also block 
thread of delete topic policies.
-        conf.setTopicLevelPoliciesEnabled(false);
-        restartBroker();
+        String topicName = 
newUniqueName("persistent://my-tenant/my-ns/testDeleteCompactedLedgerWithSlowAck");
+        admin.topics().createNonPartitionedTopic(topicName);
+        // minimum compaction threshold
+        admin.topicPolicies().setCompactionThreshold(topicName, 1);
+        // infinite retention
+        admin.topicPolicies().setRetention(topicName, new 
RetentionPolicies(-1, -1));
 
-        String topicName = 
"persistent://my-property/use/my-ns/testDeleteCompactedLedgerWithSlowAck";
         @Cleanup
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .enableBatching(false).topic(topicName).create();
 
-        
pulsarClient.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Exclusive)
-                .subscriptionName(Compactor.COMPACTION_SUBSCRIPTION)
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe()
-                .close();
+        // send a single message
+        producer.newMessage().key(String.valueOf(0)).value("0").send();
+        // trigger compaction once to create __compaction subscription
+        triggerCompactionAndWait(topicName);
 
-        for (int i = 0; i < 10; i++) {
+        int numberOfMessages = 10;
+        for (int i = 0; i < numberOfMessages; i++) {
             producer.newMessage().key(String.valueOf(i % 
2)).value(String.valueOf(i)).sendAsync();
         }
         producer.flush();
 
+        // replace the PersistentSubscription with a spy
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-        PersistentSubscription subscription = 
spy(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
+        PersistentSubscription subscription =
+                
spyWithoutRecordingInvocations(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
         topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, 
subscription);
 
+        // delay the ack of compaction
+        CountDownLatch compactionAckedLatch = new CountDownLatch(1);
         AtomicLong compactedLedgerId = new AtomicLong(-1);
-        AtomicBoolean pauseAck = new AtomicBoolean();
         Mockito.doAnswer(invocationOnMock -> {
-            Map<String, Long> properties = (Map<String, Long>) 
invocationOnMock.getArguments()[2];
-            log.info("acknowledgeMessage properties: {}", properties);
+            List<Position> positions = invocationOnMock.getArgument(0);
+            Map<String, Long> properties = invocationOnMock.getArgument(2);
+            log.info("acknowledgeMessage positions: {} properties: {}", 
positions, properties);
             
compactedLedgerId.set(properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
-            pauseAck.set(true);
-            while (pauseAck.get()) {
-                Thread.sleep(200);
+            try {
+                return invocationOnMock.callRealMethod();
+            } finally {
+                log.info("acknowledgeMessage completed {}", positions);
+                compactionAckedLatch.countDown();
+                // add delay here to introduce possible races with deletion
+                Thread.sleep(500);
             }
-            return invocationOnMock.callRealMethod();
         }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq(
                 CommandAck.AckType.Cumulative), Mockito.any());
 
+        // trigger compaction
         admin.topics().triggerCompaction(topicName);
 
-        while (!pauseAck.get()) {
-            Thread.sleep(100);
-        }
+        // wait for compaction to acknowledge
+        compactionAckedLatch.await(9, TimeUnit.SECONDS);
 
-        CompletableFuture<Long> currentCompaction =
-                (CompletableFuture<Long>) FieldUtils.readDeclaredField(topic, 
"currentCompaction", true);
-        CompletableFuture<Long> spyCurrentCompaction = spy(currentCompaction);
-        FieldUtils.writeDeclaredField(topic, "currentCompaction", 
spyCurrentCompaction, true);
-        currentCompaction.whenComplete((obj, throwable) -> {
-            if (throwable != null) {
-                spyCurrentCompaction.completeExceptionally(throwable);
-            } else {
-                spyCurrentCompaction.complete(obj);
-            }
-        });
-        Mockito.doAnswer(invocationOnMock -> {
-            pauseAck.set(false);
-            return invocationOnMock.callRealMethod();
-        }).when(spyCurrentCompaction).handle(Mockito.any());
+        // close the producer
+        producer.close();
 
+        // delete compacted ledger
         admin.topics().delete(topicName, true);
 
+        // ensure that the compacted ledger is deleted
         Awaitility.await().untilAsserted(() -> 
assertThrows(BKException.BKNoSuchLedgerExistsException.class,
                 () -> pulsarTestContext.getBookKeeperClient().openLedger(
                         compactedLedgerId.get(), BookKeeper.DigestType.CRC32, 
new byte[]{})));
     }
 
+    private void triggerCompactionAndWait(String topicName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        persistentTopic.triggerCompaction();
+        CompletableFuture<Long> currentCompaction =
+                (CompletableFuture<Long>) 
FieldUtils.readDeclaredField(persistentTopic, "currentCompaction", true);
+        currentCompaction.get(10, TimeUnit.SECONDS);
+    }
+
     @Test
     public void testCompactionWithTTL() throws Exception {
-        String topicName = 
"persistent://my-property/use/my-ns/testCompactionWithTTL";
+        String topicName = 
"persistent://my-tenant/my-ns/testCompactionWithTTL";
         String subName = "sub";
         
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
                 .subscribe().close();
@@ -2208,12 +2220,12 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             result.add(receive.getValue());
         }
 
-        Assert.assertEquals(result, List.of("V3", "V4", "V5"));
+        assertEquals(result, List.of("V3", "V4", "V5"));
     }
 
     @Test
     public void testAcknowledgeWithReconnection() throws Exception {
-        final String topicName = 
"persistent://my-property/use/my-ns/testAcknowledge" + UUID.randomUUID();
+        final String topicName = 
newUniqueName("persistent://my-tenant/my-ns/testAcknowledge");
         final String subName = "my-sub";
         @Cleanup
         PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
@@ -2268,7 +2280,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         // Wait for consumer to reconnect and clear incomingMessages
         consumer.pause();
         Awaitility.await().untilAsserted(() -> {
-            Assert.assertEquals(consumer.numMessagesInQueue(), 0);
+            assertEquals(consumer.numMessagesInQueue(), 0);
         });
         consumer.resume();
 
@@ -2285,22 +2297,22 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 assertEquals(admin.topics().getStats(topicName, 
true).getSubscriptions().get(subName).getMsgBacklog(),
                         0));
 
-        Assert.assertEquals(results, expected);
+        assertEquals(results, expected);
 
         Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
-        Assert.assertNull(message);
+        assertNull(message);
 
         // Make consumer reconnect to broker
         admin.topics().unload(topicName);
 
         producer.newMessage().key("K").value("V").send();
         Message<String> message2 = consumer.receive(3, TimeUnit.SECONDS);
-        Assert.assertEquals(message2.getValue(), "V");
+        assertEquals(message2.getValue(), "V");
         consumer.acknowledge(message2);
 
         Awaitility.await().untilAsserted(() -> {
             PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topicName);
-            Assert.assertEquals(internalStats.lastConfirmedEntry,
+            assertEquals(internalStats.lastConfirmedEntry,
                     internalStats.cursors.get(subName).markDeletePosition);
         });
 
@@ -2310,7 +2322,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testEarliestSubsAfterRollover() throws Exception {
-        final String topicName = 
"persistent://my-property/use/my-ns/testEarliestSubsAfterRollover" + 
UUID.randomUUID();
+        final String topicName = 
newUniqueName("persistent://my-tenant/my-ns/testEarliestSubsAfterRollover");
         final String subName = "my-sub";
         @Cleanup
         PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
@@ -2362,6 +2374,6 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             consumer.acknowledge(message);
         }
 
-        Assert.assertEquals(results, expected);
+        assertEquals(results, expected);
     }
 }

Reply via email to