This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1a0f4ba20ef [fix][test] Fix invalid test
CompactionTest.testDeleteCompactedLedgerWithSlowAck (#24166)
1a0f4ba20ef is described below
commit 1a0f4ba20ef30b07b98aa63dc421c484bc078fb2
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Apr 10 13:22:12 2025 +0300
[fix][test] Fix invalid test
CompactionTest.testDeleteCompactedLedgerWithSlowAck (#24166)
---
.../apache/pulsar/compaction/CompactionTest.java | 508 +++++++++++----------
1 file changed, 260 insertions(+), 248 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index d75ccce7ff3..ff9026dbba6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -18,15 +18,19 @@
*/
package org.apache.pulsar.compaction;
+import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocations;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
@@ -43,11 +47,11 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.SneakyThrows;
@@ -62,7 +66,6 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Topic;
@@ -98,7 +101,6 @@ import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
-import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@@ -116,10 +118,12 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
public void setup() throws Exception {
super.internalSetup();
- admin.clusters().createCluster("use",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
- admin.tenants().createTenant("my-property",
- new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("use")));
- admin.namespaces().createNamespace("my-property/use/my-ns");
+ admin.clusters().createCluster(configClusterName,
+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+
+ admin.tenants().createTenant("my-tenant",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Set.of(configClusterName)));
+ admin.namespaces().createNamespace("my-tenant/my-ns");
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
@@ -153,7 +157,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompaction() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
final int numMessages = 20;
final int maxKeys = 10;
@@ -182,21 +186,21 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topic, false);
// Compacted topic ledger should have same number of entry equals to
number of unique key.
- Assert.assertEquals(expected.size(),
internalStats.compactedLedger.entries);
- Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
- Assert.assertFalse(internalStats.compactedLedger.offloaded);
+ assertEquals(expected.size(), internalStats.compactedLedger.entries);
+ assertTrue(internalStats.compactedLedger.ledgerId > -1);
+ assertFalse(internalStats.compactedLedger.offloaded);
// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
while (true) {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
- Assert.assertEquals(expected.remove(m.getKey()), m.getData());
+ assertEquals(expected.remove(m.getKey()), m.getData());
if (expected.isEmpty()) {
break;
}
}
- Assert.assertTrue(expected.isEmpty());
+ assertTrue(expected.isEmpty());
}
// can get full backlog if read compacted disabled
@@ -205,24 +209,24 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
while (true) {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
Pair<String, byte[]> expectedMessage = all.remove(0);
- Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
- Assert.assertEquals(expectedMessage.getRight(), m.getData());
+ assertEquals(expectedMessage.getLeft(), m.getKey());
+ assertEquals(expectedMessage.getRight(), m.getData());
if (all.isEmpty()) {
break;
}
}
- Assert.assertTrue(all.isEmpty());
+ assertTrue(all.isEmpty());
}
}
@Test
public void testCompactionWithReader() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
final int numMessages = 20;
final int maxKeys = 10;
// Configure retention to ensue data is retained for reader
- admin.namespaces().setRetention("my-property/use/my-ns", new
RetentionPolicies(-1, -1));
+ admin.namespaces().setRetention("my-tenant/my-ns", new
RetentionPolicies(-1, -1));
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -251,12 +255,12 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.startMessageId(MessageId.earliest).create()) {
while (true) {
Message<byte[]> m = reader.readNext(2, TimeUnit.SECONDS);
- Assert.assertEquals(expected.remove(m.getKey()), new
String(m.getData()));
+ assertEquals(expected.remove(m.getKey()), new
String(m.getData()));
if (expected.isEmpty()) {
break;
}
}
- Assert.assertTrue(expected.isEmpty());
+ assertTrue(expected.isEmpty());
}
// can get full backlog if read compacted disabled
@@ -265,20 +269,20 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
while (true) {
Message<byte[]> m = reader.readNext(2, TimeUnit.SECONDS);
Pair<String, String> expectedMessage = all.remove(0);
- Assert.assertEquals(expectedMessage.getLeft(), m.getKey());
- Assert.assertEquals(expectedMessage.getRight(), new
String(m.getData()));
+ assertEquals(expectedMessage.getLeft(), m.getKey());
+ assertEquals(expectedMessage.getRight(), new
String(m.getData()));
if (all.isEmpty()) {
break;
}
}
- Assert.assertTrue(all.isEmpty());
+ assertTrue(all.isEmpty());
}
}
@Test
public void testReadCompactedBeforeCompaction() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -294,16 +298,16 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content0".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content0".getBytes());
m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content1".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content1".getBytes());
m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content2".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content2".getBytes());
}
compact(topic);
@@ -311,14 +315,14 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content2".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content2".getBytes());
}
}
@Test
public void testReadEntriesAfterCompaction() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -338,18 +342,18 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content2".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content2".getBytes());
m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content3".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content3".getBytes());
}
}
@Test
public void testSeekEarliestAfterCompaction() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -366,8 +370,8 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.readCompacted(true).subscribe()) {
consumer.seek(MessageId.earliest);
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content2".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content2".getBytes());
}
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -375,22 +379,22 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
consumer.seek(MessageId.earliest);
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content0".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content0".getBytes());
m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content1".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content1".getBytes());
m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content2".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content2".getBytes());
}
}
@Test
public void testBrokerRestartAfterCompaction() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -408,15 +412,15 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content2".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content2".getBytes());
}
stopBroker();
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
consumer.receive();
- Assert.fail("Shouldn't have been able to receive anything");
+ fail("Shouldn't have been able to receive anything");
} catch (PulsarClientException e) {
// correct behaviour
}
@@ -425,14 +429,14 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content2".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content2".getBytes());
}
}
@Test
public void testCompactEmptyTopic() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -448,14 +452,14 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content0".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content0".getBytes());
}
}
@Test
public void testFirstMessageRetained() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -483,20 +487,20 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key1");
- Assert.assertEquals(new String(message1.getData()),
"my-message-1");
- Assert.assertEquals(message1.getMessageId(),
messages.get(0).getMessageId());
+ assertEquals(message1.getKey(), "key1");
+ assertEquals(new String(message1.getData()), "my-message-1");
+ assertEquals(message1.getMessageId(),
messages.get(0).getMessageId());
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()),
"my-message-3");
- Assert.assertEquals(message2.getMessageId(),
messages.get(2).getMessageId());
+ assertEquals(message2.getKey(), "key2");
+ assertEquals(new String(message2.getData()), "my-message-3");
+ assertEquals(message2.getMessageId(),
messages.get(2).getMessageId());
}
}
@Test
public void testBatchMessageIdsDontChange() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -525,13 +529,13 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
}
// Ensure all messages are in same batch
-
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
+
assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
((BatchMessageIdImpl)messages.get(1).getMessageId()).getLedgerId());
-
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
+
assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
((BatchMessageIdImpl)messages.get(2).getMessageId()).getLedgerId());
-
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
+
assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
((BatchMessageIdImpl)messages.get(1).getMessageId()).getEntryId());
-
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
+
assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
((BatchMessageIdImpl)messages.get(2).getMessageId()).getEntryId());
// compact the topic
@@ -541,25 +545,25 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key1");
- Assert.assertEquals(new String(message1.getData()),
"my-message-1");
+ assertEquals(message1.getKey(), "key1");
+ assertEquals(new String(message1.getData()), "my-message-1");
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()),
"my-message-3");
+ assertEquals(message2.getKey(), "key2");
+ assertEquals(new String(message2.getData()), "my-message-3");
if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
- Assert.assertEquals(message1.getMessageId(),
messages.get(0).getMessageId());
- Assert.assertEquals(message2.getMessageId(),
messages.get(1).getMessageId());
+ assertEquals(message1.getMessageId(),
messages.get(0).getMessageId());
+ assertEquals(message2.getMessageId(),
messages.get(1).getMessageId());
} else {
- Assert.assertEquals(message1.getMessageId(),
messages.get(0).getMessageId());
- Assert.assertEquals(message2.getMessageId(),
messages.get(2).getMessageId());
+ assertEquals(message1.getMessageId(),
messages.get(0).getMessageId());
+ assertEquals(message2.getMessageId(),
messages.get(2).getMessageId());
}
}
}
@Test
public void testBatchMessageWithNullValue() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.receiverQueueSize(1).readCompacted(true).subscribe().close();
@@ -613,7 +617,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testWholeBatchCompactedOut() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -642,8 +646,8 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
Message<byte[]> message = consumer.receive();
- Assert.assertEquals(message.getKey(), "key1");
- Assert.assertEquals(new String(message.getData()), "my-message-4");
+ assertEquals(message.getKey(), "key1");
+ assertEquals(new String(message.getData()), "my-message-4");
}
}
@@ -658,7 +662,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
restartBroker();
FieldUtils.writeField(compactor, "topicCompactionRetainNullKey",
retainNullKey, true);
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -688,12 +692,12 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.subscriptionName("sub1").readCompacted(true).subscribe()){
if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key1");
- Assert.assertEquals(new String(message3.getData()),
"my-message-4");
+ assertEquals(message3.getKey(), "key1");
+ assertEquals(new String(message3.getData()), "my-message-4");
Message<byte[]> message4 = consumer.receive();
- Assert.assertEquals(message4.getKey(), "key2");
- Assert.assertEquals(new String(message4.getData()),
"my-message-6");
+ assertEquals(message4.getKey(), "key2");
+ assertEquals(new String(message4.getData()), "my-message-6");
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
assertNull(m);
@@ -716,7 +720,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
} else {
expectList = List.of(Pair.of("key1", "my-message-4"),
Pair.of("key2", "my-message-6"));
}
- Assert.assertEquals(result, expectList);
+ assertEquals(result, expectList);
}
}
}
@@ -724,7 +728,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testEmptyPayloadDeletes() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -795,18 +799,18 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key0");
- Assert.assertEquals(new String(message1.getData()),
"my-message-0");
+ assertEquals(message1.getKey(), "key0");
+ assertEquals(new String(message1.getData()), "my-message-0");
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key4");
- Assert.assertEquals(new String(message2.getData()),
"my-message-4");
+ assertEquals(message2.getKey(), "key4");
+ assertEquals(new String(message2.getData()), "my-message-4");
}
}
@Test
public void testEmptyPayloadDeletesWhenCompressed() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -874,12 +878,12 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key0");
- Assert.assertEquals(new String(message1.getData()),
"my-message-0");
+ assertEquals(message1.getKey(), "key0");
+ assertEquals(new String(message1.getData()), "my-message-0");
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key4");
- Assert.assertEquals(new String(message2.getData()),
"my-message-4");
+ assertEquals(message2.getKey(), "key4");
+ assertEquals(new String(message2.getData()), "my-message-4");
}
}
@@ -887,13 +891,13 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactorReadsCompacted() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// capture opened ledgers
Set<Long> ledgersOpened = Sets.newConcurrentHashSet();
when(pulsarTestContext.getBookKeeperClient().newOpenLedgerOp()).thenAnswer(
(invocation) -> {
- OpenBuilder builder =
(OpenBuilder)spy(invocation.callRealMethod());
+ OpenBuilder builder =
(OpenBuilder)spyWithoutRecordingInvocations(invocation.callRealMethod());
when(builder.withLedgerId(anyLong())).thenAnswer(
(invocation2) -> {
ledgersOpened.add((Long)invocation2.getArguments()[0]);
@@ -928,15 +932,15 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
String managedLedgerName =
((PersistentTopic)pulsar.getBrokerService().getTopicReference(topic).get())
.getManagedLedger().getName();
ManagedLedgerInfo info =
pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
- Assert.assertEquals(info.ledgers.size(), 2);
- Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have
been opened
+ assertEquals(info.ledgers.size(), 2);
+ assertTrue(ledgersOpened.isEmpty()); // no ledgers should have been
opened
// compact the topic
compact(topic);
// should have opened all except last to read
-
Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
-
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+ assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+ assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
ledgersOpened.clear();
// force broker to close resources for topic
@@ -951,12 +955,12 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
}
info =
pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
- Assert.assertEquals(info.ledgers.size(), 3);
+ assertEquals(info.ledgers.size(), 3);
// should only have opened the penultimate ledger to get stat
-
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
-
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
-
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
+ assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+ assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+ assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
ledgersOpened.clear();
// compact the topic again
@@ -964,30 +968,30 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
// shouldn't have opened first ledger (already compacted), penultimate
would have some uncompacted data.
// last ledger already open for writing
-
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
-
Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
-
Assert.assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
+ assertFalse(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
+ assertTrue(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
+ assertFalse(ledgersOpened.contains(info.ledgers.get(2).ledgerId));
// all three messages should be there when we read compacted
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key0");
- Assert.assertEquals(new String(message1.getData()),
"my-message-0");
+ assertEquals(message1.getKey(), "key0");
+ assertEquals(new String(message1.getData()), "my-message-0");
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key1");
- Assert.assertEquals(new String(message2.getData()),
"my-message-1");
+ assertEquals(message2.getKey(), "key1");
+ assertEquals(new String(message2.getData()), "my-message-1");
Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key2");
- Assert.assertEquals(new String(message3.getData()),
"my-message-2");
+ assertEquals(message3.getKey(), "key2");
+ assertEquals(new String(message3.getData()), "my-message-2");
}
}
@Test
public void testCompactCompressedNoBatch() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1015,18 +1019,18 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key1");
- Assert.assertEquals(new String(message1.getData()),
"my-message-1");
+ assertEquals(message1.getKey(), "key1");
+ assertEquals(new String(message1.getData()), "my-message-1");
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()),
"my-message-3");
+ assertEquals(message2.getKey(), "key2");
+ assertEquals(new String(message2.getData()), "my-message-3");
}
}
@Test
public void testCompactCompressedBatching() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1058,12 +1062,12 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key1");
- Assert.assertEquals(new String(message1.getData()),
"my-message-1");
+ assertEquals(message1.getKey(), "key1");
+ assertEquals(new String(message1.getData()), "my-message-1");
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()),
"my-message-3");
+ assertEquals(message2.getKey(), "key2");
+ assertEquals(new String(message2.getData()), "my-message-3");
}
}
@@ -1078,10 +1082,10 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
- Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
- Assert.fail("Certificate file " + CERT_FILE_PATH + " is not
present or not readable.");
+ fail("Certificate file " + CERT_FILE_PATH + " is not present
or not readable.");
}
return null;
}
@@ -1094,10 +1098,10 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
- Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
- Assert.fail("Certificate file " + CERT_FILE_PATH + " is not
present or not readable.");
+ fail("Certificate file " + CERT_FILE_PATH + " is not present
or not readable.");
}
return null;
}
@@ -1105,7 +1109,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactEncryptedNoBatch() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1136,18 +1140,18 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.subscriptionName("sub1").cryptoKeyReader(new EncKeyReader())
.readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key1");
- Assert.assertEquals(new String(message1.getData()),
"my-message-1");
+ assertEquals(message1.getKey(), "key1");
+ assertEquals(new String(message1.getData()), "my-message-1");
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()),
"my-message-3");
+ assertEquals(message2.getKey(), "key2");
+ assertEquals(new String(message2.getData()), "my-message-3");
}
}
@Test
public void testCompactEncryptedBatching() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1180,30 +1184,30 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.subscriptionName("sub1").cryptoKeyReader(new EncKeyReader())
.readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key1");
- Assert.assertEquals(new String(message1.getData()),
"my-message-1");
+ assertEquals(message1.getKey(), "key1");
+ assertEquals(new String(message1.getData()), "my-message-1");
if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key2");
- Assert.assertEquals(new String(message3.getData()),
"my-message-3");
+ assertEquals(message3.getKey(), "key2");
+ assertEquals(new String(message3.getData()), "my-message-3");
} else {
// with encryption, all messages are passed through compaction
as it doesn't
// have the keys to decrypt the batch payload
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()),
"my-message-2");
+ assertEquals(message2.getKey(), "key2");
+ assertEquals(new String(message2.getData()), "my-message-2");
Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key2");
- Assert.assertEquals(new String(message3.getData()),
"my-message-3");
+ assertEquals(message3.getKey(), "key2");
+ assertEquals(new String(message3.getData()), "my-message-3");
}
}
}
@Test
public void testCompactEncryptedAndCompressedNoBatch() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1235,18 +1239,18 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.subscriptionName("sub1").cryptoKeyReader(new EncKeyReader())
.readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key1");
- Assert.assertEquals(new String(message1.getData()),
"my-message-1");
+ assertEquals(message1.getKey(), "key1");
+ assertEquals(new String(message1.getData()), "my-message-1");
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()),
"my-message-3");
+ assertEquals(message2.getKey(), "key2");
+ assertEquals(new String(message2.getData()), "my-message-3");
}
}
@Test
public void testCompactEncryptedAndCompressedBatching() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1282,29 +1286,29 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.subscriptionName("sub1").cryptoKeyReader(new EncKeyReader())
.readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key1");
- Assert.assertEquals(new String(message1.getData()),
"my-message-1");
+ assertEquals(message1.getKey(), "key1");
+ assertEquals(new String(message1.getData()), "my-message-1");
if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key2");
- Assert.assertEquals(new String(message3.getData()),
"my-message-3");
+ assertEquals(message3.getKey(), "key2");
+ assertEquals(new String(message3.getData()), "my-message-3");
} else {
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()),
"my-message-2");
+ assertEquals(message2.getKey(), "key2");
+ assertEquals(new String(message2.getData()), "my-message-2");
Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key2");
- Assert.assertEquals(new String(message3.getData()),
"my-message-3");
+ assertEquals(message3.getKey(), "key2");
+ assertEquals(new String(message3.getData()), "my-message-3");
}
}
}
@Test
public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1361,34 +1365,34 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.cryptoKeyReader(new EncKeyReader())
.subscriptionName("sub1").readCompacted(true).subscribe()){
Message<byte[]> message1 = consumer.receive();
- Assert.assertEquals(message1.getKey(), "key0");
- Assert.assertEquals(new String(message1.getData()),
"my-message-0");
+ assertEquals(message1.getKey(), "key0");
+ assertEquals(new String(message1.getData()), "my-message-0");
if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key3");
- Assert.assertEquals(new String(message3.getData()),
"my-message-3");
+ assertEquals(message3.getKey(), "key3");
+ assertEquals(new String(message3.getData()), "my-message-3");
Message<byte[]> message5 = consumer.receive();
- Assert.assertEquals(message5.getKey(), "key4");
- Assert.assertEquals(new String(message5.getData()),
"my-message-4");
+ assertEquals(message5.getKey(), "key4");
+ assertEquals(new String(message5.getData()), "my-message-4");
} else {
// see all messages from batch
Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()),
"my-message-2");
+ assertEquals(message2.getKey(), "key2");
+ assertEquals(new String(message2.getData()), "my-message-2");
Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key3");
- Assert.assertEquals(new String(message3.getData()),
"my-message-3");
+ assertEquals(message3.getKey(), "key3");
+ assertEquals(new String(message3.getData()), "my-message-3");
Message<byte[]> message4 = consumer.receive();
- Assert.assertEquals(message4.getKey(), "key2");
+ assertEquals(message4.getKey(), "key2");
assertNull(message4.getData());
Message<byte[]> message5 = consumer.receive();
- Assert.assertEquals(message5.getKey(), "key4");
- Assert.assertEquals(new String(message5.getData()),
"my-message-4");
+ assertEquals(message5.getKey(), "key4");
+ assertEquals(new String(message5.getData()), "my-message-4");
}
}
}
@@ -1400,7 +1404,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testCompactionWithLastDeletedKey(boolean batching) throws
Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).enableBatching(batching)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
@@ -1426,7 +1430,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testEmptyCompactionLedger(boolean batching) throws Exception {
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/my-topic1";
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).enableBatching(batching)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
@@ -1450,7 +1454,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testAllEmptyCompactionLedger(boolean batchEnabled) throws
Exception {
- final String topic =
"persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" +
UUID.randomUUID().toString();
+ final String topic =
"persistent://my-tenant/my-ns/testAllEmptyCompactionLedger" +
UUID.randomUUID().toString();
final int messages = 10;
@@ -1484,7 +1488,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000)
public void testBatchAndNonBatchWithoutEmptyPayload() throws
PulsarClientException, ExecutionException, InterruptedException {
- final String topic =
"persistent://my-property/use/my-ns/testBatchAndNonBatchWithoutEmptyPayload" +
UUID.randomUUID().toString();
+ final String topic =
"persistent://my-tenant/my-ns/testBatchAndNonBatchWithoutEmptyPayload" +
UUID.randomUUID().toString();
// 1.create producer and publish message to the topic.
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1535,7 +1539,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
}
@Test(timeOut = 20000)
public void testBatchAndNonBatchWithEmptyPayload() throws
PulsarClientException, ExecutionException, InterruptedException {
- final String topic =
"persistent://my-property/use/my-ns/testBatchAndNonBatchWithEmptyPayload" +
UUID.randomUUID().toString();
+ final String topic =
"persistent://my-tenant/my-ns/testBatchAndNonBatchWithEmptyPayload" +
UUID.randomUUID().toString();
// 1.create producer and publish message to the topic.
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1591,7 +1595,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000)
public void testBatchAndNonBatchEndOfEmptyPayload() throws
PulsarClientException, ExecutionException, InterruptedException {
- final String topic =
"persistent://my-property/use/my-ns/testBatchAndNonBatchWithEmptyPayload" +
UUID.randomUUID().toString();
+ final String topic =
"persistent://my-tenant/my-ns/testBatchAndNonBatchWithEmptyPayload" +
UUID.randomUUID().toString();
// 1.create producer and publish message to the topic.
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1640,7 +1644,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testCompactMultipleTimesWithoutEmptyMessage(boolean
batchEnabled) throws PulsarClientException, ExecutionException,
InterruptedException {
- final String topic =
"persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage"
+ UUID.randomUUID().toString();
+ final String topic =
"persistent://my-tenant/my-ns/testCompactMultipleTimesWithoutEmptyMessage" +
UUID.randomUUID().toString();
final int messages = 10;
final String key = "1";
@@ -1688,7 +1692,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 2000000, dataProvider = "lastDeletedBatching")
public void testReadUnCompacted(boolean batchEnabled) throws
PulsarClientException, ExecutionException, InterruptedException {
- final String topic =
"persistent://my-property/use/my-ns/testReadUnCompacted" +
UUID.randomUUID().toString();
+ final String topic =
"persistent://my-tenant/my-ns/testReadUnCompacted" +
UUID.randomUUID().toString();
final int messages = 10;
final String key = "1";
@@ -1794,9 +1798,9 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 60000)
public void testCompactionWithMarker() throws Exception {
- String namespace = "my-property/use/my-ns";
+ String namespace = "my-tenant/my-ns";
final TopicName dest = TopicName.get(
- BrokerTestUtil.newUniqueName("persistent://" + namespace +
"/testWriteMarker"));
+ newUniqueName("persistent://" + namespace +
"/testWriteMarker"));
admin.topics().createNonPartitionedTopic(dest.toString());
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
@@ -1814,7 +1818,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.create();
producer.send("msg-1".getBytes(StandardCharsets.UTF_8));
Optional<Topic> topic =
pulsar.getBrokerService().getTopic(dest.toString(), true).join();
- Assert.assertTrue(topic.isPresent());
+ assertTrue(topic.isPresent());
PersistentTopic persistentTopic = (PersistentTopic) topic.get();
Random random = new Random();
for (int i = 0; i < 100; i++) {
@@ -1847,13 +1851,13 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.pollInterval(1, TimeUnit.SECONDS)
.untilAsserted(() -> {
long ledgerId =
admin.topics().getInternalStats(dest.toString()).compactedLedger.ledgerId;
- Assert.assertNotEquals(ledgerId, -1L);
+ assertNotEquals(ledgerId, -1L);
});
}
@Test(timeOut = 100000)
public void testReceiverQueueSize() throws Exception {
- final String topicName =
"persistent://my-property/use/my-ns/testReceiverQueueSize" + UUID.randomUUID();
+ final String topicName =
newUniqueName("persistent://my-tenant/my-ns/testReceiverQueueSize");
final String subName = "my-sub";
final int receiveQueueSize = 1;
@Cleanup
@@ -1880,7 +1884,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
//Give some time to consume
Awaitility.await()
- .untilAsserted(() ->
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
+ .untilAsserted(() ->
assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
receiveQueueSize));
consumer.close();
producer.close();
@@ -1889,7 +1893,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testDispatcherMaxReadSizeBytes() throws Exception {
final String topicName =
-
"persistent://my-property/use/my-ns/testDispatcherMaxReadSizeBytes" +
UUID.randomUUID();
+
newUniqueName("persistent://my-tenant/my-ns/testDispatcherMaxReadSizeBytes");
final String subName = "my-sub";
final int receiveQueueSize = 1;
@Cleanup
@@ -1914,7 +1918,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
PersistentTopic topic =
(PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, true, Map.of()).get().get();
- TopicCompactionService topicCompactionService =
Mockito.spy(topic.getTopicCompactionService());
+ TopicCompactionService topicCompactionService =
spy(topic.getTopicCompactionService());
FieldUtils.writeDeclaredField(topic, "topicCompactionService",
topicCompactionService, true);
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
client.newConsumer(Schema.BYTES)
@@ -1935,7 +1939,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactionDuplicate() throws Exception {
- String topic =
"persistent://my-property/use/my-ns/testCompactionDuplicate";
+ String topic = "persistent://my-tenant/my-ns/testCompactionDuplicate";
final int numMessages = 1000;
final int maxKeys = 800;
@@ -1976,7 +1980,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Thread.sleep(500);
Optional<Topic> previousTopicRef =
pulsar.getBrokerService().getTopicIfExists(topic).get();
- Assert.assertTrue(previousTopicRef.isPresent());
+ assertTrue(previousTopicRef.isPresent());
PersistentTopic previousPersistentTopic = (PersistentTopic)
previousTopicRef.get();
// Unload topic make reader of compaction reconnect
@@ -1986,7 +1990,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
LongRunningProcessStatus previousLongRunningProcessStatus =
previousPersistentTopic.compactionStatus();
Optional<Topic> currentTopicReference =
pulsar.getBrokerService().getTopicReference(topic);
- Assert.assertTrue(currentTopicReference.isPresent());
+ assertTrue(currentTopicReference.isPresent());
PersistentTopic currentPersistentTopic = (PersistentTopic)
currentTopicReference.get();
LongRunningProcessStatus currentLongRunningProcessStatus =
currentPersistentTopic.compactionStatus();
@@ -1995,18 +1999,18 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
|| currentLongRunningProcessStatus.status ==
LongRunningProcessStatus.Status.ERROR)) {
// trigger compaction again
admin.topics().triggerCompaction(topic);
- Assert.assertEquals(currentLongRunningProcessStatus.status,
LongRunningProcessStatus.Status.SUCCESS);
+ assertEquals(currentLongRunningProcessStatus.status,
LongRunningProcessStatus.Status.SUCCESS);
} else if (previousLongRunningProcessStatus.status ==
LongRunningProcessStatus.Status.RUNNING) {
- Assert.assertEquals(previousLongRunningProcessStatus.status,
LongRunningProcessStatus.Status.SUCCESS);
+ assertEquals(previousLongRunningProcessStatus.status,
LongRunningProcessStatus.Status.SUCCESS);
}
});
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topic, false);
// Compacted topic ledger should have same number of entry equals
to number of unique key.
- Assert.assertEquals(internalStats.compactedLedger.entries,
expected.size());
- Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
- Assert.assertFalse(internalStats.compactedLedger.offloaded);
+ assertEquals(internalStats.compactedLedger.entries,
expected.size());
+ assertTrue(internalStats.compactedLedger.ledgerId > -1);
+ assertFalse(internalStats.compactedLedger.offloaded);
});
// consumer with readCompacted enabled only get compacted entries
@@ -2014,7 +2018,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.readCompacted(true).subscribe()) {
while (true) {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
- Assert.assertEquals(expected.remove(m.getKey()), m.getData());
+ assertEquals(expected.remove(m.getKey()), m.getData());
if (expected.isEmpty()) {
break;
}
@@ -2024,7 +2028,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testDeleteCompactedLedger() throws Exception {
- String topicName =
"persistent://my-property/use/my-ns/testDeleteCompactedLedger";
+ String topicName =
"persistent://my-tenant/my-ns/testDeleteCompactedLedger";
final String subName = "my-sub";
@Cleanup
@@ -2043,9 +2047,9 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
MutableLong compactedLedgerId = new MutableLong(-1);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topicName);
- Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+ assertNotEquals(stats.compactedLedger.ledgerId, -1L);
compactedLedgerId.setValue(stats.compactedLedger.ledgerId);
- Assert.assertEquals(stats.compactedLedger.entries, 2L);
+ assertEquals(stats.compactedLedger.entries, 2L);
});
// delete compacted ledger
@@ -2053,8 +2057,8 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topicName);
- Assert.assertEquals(stats.compactedLedger.ledgerId, -1L);
- Assert.assertEquals(stats.compactedLedger.entries, -1L);
+ assertEquals(stats.compactedLedger.ledgerId, -1L);
+ assertEquals(stats.compactedLedger.entries, -1L);
assertThrows(BKException.BKNoSuchLedgerExistsException.class, ()
-> pulsarTestContext.getBookKeeperClient()
.openLedger(compactedLedgerId.getValue(),
BookKeeper.DigestType.CRC32C, new byte[]{}));
});
@@ -2064,9 +2068,9 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
MutableLong compactedLedgerId2 = new MutableLong(-1);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topicName);
- Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+ assertNotEquals(stats.compactedLedger.ledgerId, -1L);
compactedLedgerId2.setValue(stats.compactedLedger.ledgerId);
- Assert.assertEquals(stats.compactedLedger.entries, 2L);
+ assertEquals(stats.compactedLedger.entries, 2L);
});
producer.close();
@@ -2077,77 +2081,85 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
compactedLedgerId2.getValue(),
BookKeeper.DigestType.CRC32, new byte[]{})));
}
- @Test
+ @Test(timeOut = 10000)
public void testDeleteCompactedLedgerWithSlowAck() throws Exception {
- // Disable topic level policies, since block ack thread may also block
thread of delete topic policies.
- conf.setTopicLevelPoliciesEnabled(false);
- restartBroker();
+ String topicName =
newUniqueName("persistent://my-tenant/my-ns/testDeleteCompactedLedgerWithSlowAck");
+ admin.topics().createNonPartitionedTopic(topicName);
+ // minimum compaction threshold
+ admin.topicPolicies().setCompactionThreshold(topicName, 1);
+ // infinite retention
+ admin.topicPolicies().setRetention(topicName, new
RetentionPolicies(-1, -1));
- String topicName =
"persistent://my-property/use/my-ns/testDeleteCompactedLedgerWithSlowAck";
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();
-
pulsarClient.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Exclusive)
- .subscriptionName(Compactor.COMPACTION_SUBSCRIPTION)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe()
- .close();
+ // send a single message
+ producer.newMessage().key(String.valueOf(0)).value("0").send();
+ // trigger compaction once to create __compaction subscription
+ triggerCompactionAndWait(topicName);
- for (int i = 0; i < 10; i++) {
+ int numberOfMessages = 10;
+ for (int i = 0; i < numberOfMessages; i++) {
producer.newMessage().key(String.valueOf(i %
2)).value(String.valueOf(i)).sendAsync();
}
producer.flush();
+ // replace the PersistentSubscription with a spy
PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
- PersistentSubscription subscription =
spy(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
+ PersistentSubscription subscription =
+
spyWithoutRecordingInvocations(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION,
subscription);
+ // delay the ack of compaction
+ CountDownLatch compactionAckedLatch = new CountDownLatch(1);
AtomicLong compactedLedgerId = new AtomicLong(-1);
- AtomicBoolean pauseAck = new AtomicBoolean();
Mockito.doAnswer(invocationOnMock -> {
- Map<String, Long> properties = (Map<String, Long>)
invocationOnMock.getArguments()[2];
- log.info("acknowledgeMessage properties: {}", properties);
+ List<Position> positions = invocationOnMock.getArgument(0);
+ Map<String, Long> properties = invocationOnMock.getArgument(2);
+ log.info("acknowledgeMessage positions: {} properties: {}",
positions, properties);
compactedLedgerId.set(properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
- pauseAck.set(true);
- while (pauseAck.get()) {
- Thread.sleep(200);
+ try {
+ return invocationOnMock.callRealMethod();
+ } finally {
+ log.info("acknowledgeMessage completed {}", positions);
+ compactionAckedLatch.countDown();
+ // add delay here to introduce possible races with deletion
+ Thread.sleep(500);
}
- return invocationOnMock.callRealMethod();
}).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq(
CommandAck.AckType.Cumulative), Mockito.any());
+ // trigger compaction
admin.topics().triggerCompaction(topicName);
- while (!pauseAck.get()) {
- Thread.sleep(100);
- }
+ // wait for compaction to acknowledge
+ compactionAckedLatch.await(9, TimeUnit.SECONDS);
- CompletableFuture<Long> currentCompaction =
- (CompletableFuture<Long>) FieldUtils.readDeclaredField(topic,
"currentCompaction", true);
- CompletableFuture<Long> spyCurrentCompaction = spy(currentCompaction);
- FieldUtils.writeDeclaredField(topic, "currentCompaction",
spyCurrentCompaction, true);
- currentCompaction.whenComplete((obj, throwable) -> {
- if (throwable != null) {
- spyCurrentCompaction.completeExceptionally(throwable);
- } else {
- spyCurrentCompaction.complete(obj);
- }
- });
- Mockito.doAnswer(invocationOnMock -> {
- pauseAck.set(false);
- return invocationOnMock.callRealMethod();
- }).when(spyCurrentCompaction).handle(Mockito.any());
+ // close the producer
+ producer.close();
+ // delete compacted ledger
admin.topics().delete(topicName, true);
+ // ensure that the compacted ledger is deleted
Awaitility.await().untilAsserted(() ->
assertThrows(BKException.BKNoSuchLedgerExistsException.class,
() -> pulsarTestContext.getBookKeeperClient().openLedger(
compactedLedgerId.get(), BookKeeper.DigestType.CRC32,
new byte[]{})));
}
+ private void triggerCompactionAndWait(String topicName) throws Exception {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+ persistentTopic.triggerCompaction();
+ CompletableFuture<Long> currentCompaction =
+ (CompletableFuture<Long>)
FieldUtils.readDeclaredField(persistentTopic, "currentCompaction", true);
+ currentCompaction.get(10, TimeUnit.SECONDS);
+ }
+
@Test
public void testCompactionWithTTL() throws Exception {
- String topicName =
"persistent://my-property/use/my-ns/testCompactionWithTTL";
+ String topicName =
"persistent://my-tenant/my-ns/testCompactionWithTTL";
String subName = "sub";
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
.subscribe().close();
@@ -2208,12 +2220,12 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
result.add(receive.getValue());
}
- Assert.assertEquals(result, List.of("V3", "V4", "V5"));
+ assertEquals(result, List.of("V3", "V4", "V5"));
}
@Test
public void testAcknowledgeWithReconnection() throws Exception {
- final String topicName =
"persistent://my-property/use/my-ns/testAcknowledge" + UUID.randomUUID();
+ final String topicName =
newUniqueName("persistent://my-tenant/my-ns/testAcknowledge");
final String subName = "my-sub";
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
@@ -2268,7 +2280,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
// Wait for consumer to reconnect and clear incomingMessages
consumer.pause();
Awaitility.await().untilAsserted(() -> {
- Assert.assertEquals(consumer.numMessagesInQueue(), 0);
+ assertEquals(consumer.numMessagesInQueue(), 0);
});
consumer.resume();
@@ -2285,22 +2297,22 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getStats(topicName,
true).getSubscriptions().get(subName).getMsgBacklog(),
0));
- Assert.assertEquals(results, expected);
+ assertEquals(results, expected);
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
- Assert.assertNull(message);
+ assertNull(message);
// Make consumer reconnect to broker
admin.topics().unload(topicName);
producer.newMessage().key("K").value("V").send();
Message<String> message2 = consumer.receive(3, TimeUnit.SECONDS);
- Assert.assertEquals(message2.getValue(), "V");
+ assertEquals(message2.getValue(), "V");
consumer.acknowledge(message2);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topicName);
- Assert.assertEquals(internalStats.lastConfirmedEntry,
+ assertEquals(internalStats.lastConfirmedEntry,
internalStats.cursors.get(subName).markDeletePosition);
});
@@ -2310,7 +2322,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testEarliestSubsAfterRollover() throws Exception {
- final String topicName =
"persistent://my-property/use/my-ns/testEarliestSubsAfterRollover" +
UUID.randomUUID();
+ final String topicName =
newUniqueName("persistent://my-tenant/my-ns/testEarliestSubsAfterRollover");
final String subName = "my-sub";
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
@@ -2362,6 +2374,6 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
consumer.acknowledge(message);
}
- Assert.assertEquals(results, expected);
+ assertEquals(results, expected);
}
}