poorbarcode commented on code in PR #23980:
URL: https://github.com/apache/pulsar/pull/23980#discussion_r1955483840
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -308,6 +319,145 @@ public void
testGetLastMessageIdAfterCompactionWithCompression(boolean enabledBa
admin.topics().delete(topicName, false);
}
+ @DataProvider
+ public Object[][] isInjectedCursorDeleteError() {
+ return new Object[][] {
+ {false},
+ {true}
+ };
+ }
+
+ @Test(dataProvider = "isInjectedCursorDeleteError")
+ public void testReadMsgsAfterDisableCompaction(boolean
isInjectedCursorDeleteError) throws Exception {
+ String topicName = "persistent://public/default/" +
BrokerTestUtil.newUniqueName("tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topicPolicies().setCompactionThreshold(topicName, 1);
+ admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+ var producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ producer.newMessage().key("k0").value("v0").send();
+ producer.newMessage().key("k1").value("v1").send();
+ producer.newMessage().key("k2").value("v2").send();
+ triggerCompactionAndWait(topicName);
+ admin.topics().deleteSubscription(topicName, "s1");
+
+ // Disable compaction.
+ // Inject a failure that the first time to delete cursor will fail.
+ if (isInjectedCursorDeleteError) {
+ AtomicInteger times = new AtomicInteger();
+ String cursorPath =
String.format("/managed-ledgers/%s/__compaction",
+ TopicName.get(topicName).getPersistenceNamingEncoding());
+ admin.topicPolicies().removeCompactionThreshold(topicName);
+ mockZooKeeper.failConditional(KeeperException.Code.SESSIONEXPIRED,
(op, path) -> {
+ return op == MockZooKeeper.Op.DELETE &&
cursorPath.equals(path) && times.incrementAndGet() == 1;
+ });
+
mockZooKeeperGlobal.failConditional(KeeperException.Code.SESSIONEXPIRED, (op,
path) -> {
+ return op == MockZooKeeper.Op.DELETE &&
cursorPath.equals(path) && times.incrementAndGet() == 1;
+ });
+ try {
+ admin.topics().deleteSubscription(topicName, "__compaction");
+ fail("Should fail");
+ } catch (Exception ex) {
+ assertTrue(ex instanceof
PulsarAdminException.ServerSideErrorException);
+ }
+ }
+
+ // Create a reader with start at earliest.
+ // Verify: the reader will receive 3 messages.
+ admin.topics().unload(topicName);
+ Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topicName).readCompacted(true)
+ .startMessageId(MessageId.earliest).create();
+ producer.newMessage().key("k3").value("v3").send();
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m0 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m0.getValue(), "v0");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m1 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m1.getValue(), "v1");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m2 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m2.getValue(), "v2");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m3 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m3.getValue(), "v3");
+
+ // cleanup.
+ producer.close();
+ reader.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ @Test
+ public void testDisableCompactionConcurrently() throws Exception {
+ String topicName = "persistent://public/default/" +
BrokerTestUtil.newUniqueName("tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topicPolicies().setCompactionThreshold(topicName, 1);
+ admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+ var producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ producer.newMessage().key("k0").value("v0").send();
+ triggerCompactionAndWait(topicName);
+ admin.topics().deleteSubscription(topicName, "s1");
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+ AtomicBoolean disablingCompaction =
+ WhiteboxImpl.getInternalState(persistentTopic,
"disablingCompaction");
+
+ // Disable compaction.
+ // Inject a delay when the first time of deleting cursor.
+ AtomicInteger times = new AtomicInteger();
+ String cursorPath = String.format("/managed-ledgers/%s/__compaction",
+ TopicName.get(topicName).getPersistenceNamingEncoding());
+ admin.topicPolicies().removeCompactionThreshold(topicName);
+ mockZooKeeper.delay(5000, (op, path) -> {
+ return op == MockZooKeeper.Op.DELETE && cursorPath.equals(path) &&
times.incrementAndGet() == 1;
+ });
+ mockZooKeeperGlobal.delay(5000, (op, path) -> {
+ return op == MockZooKeeper.Op.DELETE && cursorPath.equals(path) &&
times.incrementAndGet() == 1;
+ });
+ AtomicReference<CompletableFuture<Void>> f1 = new
AtomicReference<CompletableFuture<Void>>();
+ AtomicReference<CompletableFuture<Void>> f2 = new
AtomicReference<CompletableFuture<Void>>();
+ new Thread(() -> {
+ f1.set(admin.topics().deleteSubscriptionAsync(topicName,
"__compaction"));
+ }).start();
+ new Thread(() -> {
+ f2.set(admin.topics().deleteSubscriptionAsync(topicName,
"__compaction"));
+ }).start();
+
+ // Verify: the next compaction will be skipped.
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(disablingCompaction.get());
+ });
+ producer.newMessage().key("k1").value("v1").send();
+ producer.newMessage().key("k2").value("v2").send();
+ CompletableFuture<Long> currentCompaction1 =
+ WhiteboxImpl.getInternalState(persistentTopic,
"currentCompaction");
+ persistentTopic.triggerCompaction();
+ CompletableFuture<Long> currentCompaction2 =
+ WhiteboxImpl.getInternalState(persistentTopic,
"currentCompaction");
Review Comment:
Changed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]