lhotari commented on code in PR #23980:
URL: https://github.com/apache/pulsar/pull/23980#discussion_r1955096813
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1340,18 +1341,42 @@ private void
asyncDeleteCursorWithCleanCompactionLedger(PersistentSubscription s
return;
}
- currentCompaction.handle((__, e) -> {
- if (e != null) {
- log.warn("[{}][{}] Last compaction task failed", topic,
subscriptionName);
+ // Avoid concurrently execute compaction and unsubscribing.
+ synchronized (this) {
Review Comment:
Is there really a need for a synchronized block here?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -245,6 +245,7 @@ public static boolean isDedupCursorName(String name) {
private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
private volatile CompletableFuture<Long> currentCompaction =
CompletableFuture.completedFuture(
COMPACTION_NEVER_RUN);
+ private volatile AtomicBoolean disablingCompaction = new
AtomicBoolean(false);
Review Comment:
```suggestion
private final AtomicBoolean disablingCompaction = new
AtomicBoolean(false);
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -308,6 +319,145 @@ public void
testGetLastMessageIdAfterCompactionWithCompression(boolean enabledBa
admin.topics().delete(topicName, false);
}
+ @DataProvider
+ public Object[][] isInjectedCursorDeleteError() {
+ return new Object[][] {
+ {false},
+ {true}
+ };
+ }
+
+ @Test(dataProvider = "isInjectedCursorDeleteError")
+ public void testReadMsgsAfterDisableCompaction(boolean
isInjectedCursorDeleteError) throws Exception {
+ String topicName = "persistent://public/default/" +
BrokerTestUtil.newUniqueName("tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topicPolicies().setCompactionThreshold(topicName, 1);
+ admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+ var producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ producer.newMessage().key("k0").value("v0").send();
+ producer.newMessage().key("k1").value("v1").send();
+ producer.newMessage().key("k2").value("v2").send();
+ triggerCompactionAndWait(topicName);
+ admin.topics().deleteSubscription(topicName, "s1");
+
+ // Disable compaction.
+ // Inject a failure that the first time to delete cursor will fail.
+ if (isInjectedCursorDeleteError) {
+ AtomicInteger times = new AtomicInteger();
+ String cursorPath =
String.format("/managed-ledgers/%s/__compaction",
+ TopicName.get(topicName).getPersistenceNamingEncoding());
+ admin.topicPolicies().removeCompactionThreshold(topicName);
+ mockZooKeeper.failConditional(KeeperException.Code.SESSIONEXPIRED,
(op, path) -> {
+ return op == MockZooKeeper.Op.DELETE &&
cursorPath.equals(path) && times.incrementAndGet() == 1;
+ });
+
mockZooKeeperGlobal.failConditional(KeeperException.Code.SESSIONEXPIRED, (op,
path) -> {
+ return op == MockZooKeeper.Op.DELETE &&
cursorPath.equals(path) && times.incrementAndGet() == 1;
+ });
+ try {
+ admin.topics().deleteSubscription(topicName, "__compaction");
+ fail("Should fail");
+ } catch (Exception ex) {
+ assertTrue(ex instanceof
PulsarAdminException.ServerSideErrorException);
+ }
+ }
+
+ // Create a reader with start at earliest.
+ // Verify: the reader will receive 3 messages.
+ admin.topics().unload(topicName);
+ Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topicName).readCompacted(true)
+ .startMessageId(MessageId.earliest).create();
+ producer.newMessage().key("k3").value("v3").send();
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m0 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m0.getValue(), "v0");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m1 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m1.getValue(), "v1");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m2 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m2.getValue(), "v2");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m3 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m3.getValue(), "v3");
+
+ // cleanup.
+ producer.close();
+ reader.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ @Test
+ public void testDisableCompactionConcurrently() throws Exception {
+ String topicName = "persistent://public/default/" +
BrokerTestUtil.newUniqueName("tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topicPolicies().setCompactionThreshold(topicName, 1);
+ admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+ var producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ producer.newMessage().key("k0").value("v0").send();
+ triggerCompactionAndWait(topicName);
+ admin.topics().deleteSubscription(topicName, "s1");
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+ AtomicBoolean disablingCompaction =
+ WhiteboxImpl.getInternalState(persistentTopic,
"disablingCompaction");
Review Comment:
Please avoid using reflection/`WhiteboxImpl`. A better approach is to have a
default access method which is annotation with `@VisibleForTesting`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1340,18 +1341,42 @@ private void
asyncDeleteCursorWithCleanCompactionLedger(PersistentSubscription s
return;
}
- currentCompaction.handle((__, e) -> {
- if (e != null) {
- log.warn("[{}][{}] Last compaction task failed", topic,
subscriptionName);
+ // Avoid concurrently execute compaction and unsubscribing.
+ synchronized (this) {
+ if (!disablingCompaction.compareAndSet(false, true)) {
+ unsubscribeFuture.completeExceptionally(
Review Comment:
this should be protected with `if (!unsubscribeFuture.isDone()) {` so that
creating the exception could be avoided.
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -308,6 +319,145 @@ public void
testGetLastMessageIdAfterCompactionWithCompression(boolean enabledBa
admin.topics().delete(topicName, false);
}
+ @DataProvider
+ public Object[][] isInjectedCursorDeleteError() {
+ return new Object[][] {
+ {false},
+ {true}
+ };
+ }
+
+ @Test(dataProvider = "isInjectedCursorDeleteError")
+ public void testReadMsgsAfterDisableCompaction(boolean
isInjectedCursorDeleteError) throws Exception {
+ String topicName = "persistent://public/default/" +
BrokerTestUtil.newUniqueName("tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topicPolicies().setCompactionThreshold(topicName, 1);
+ admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+ var producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ producer.newMessage().key("k0").value("v0").send();
+ producer.newMessage().key("k1").value("v1").send();
+ producer.newMessage().key("k2").value("v2").send();
+ triggerCompactionAndWait(topicName);
+ admin.topics().deleteSubscription(topicName, "s1");
+
+ // Disable compaction.
+ // Inject a failure that the first time to delete cursor will fail.
+ if (isInjectedCursorDeleteError) {
+ AtomicInteger times = new AtomicInteger();
+ String cursorPath =
String.format("/managed-ledgers/%s/__compaction",
+ TopicName.get(topicName).getPersistenceNamingEncoding());
+ admin.topicPolicies().removeCompactionThreshold(topicName);
+ mockZooKeeper.failConditional(KeeperException.Code.SESSIONEXPIRED,
(op, path) -> {
+ return op == MockZooKeeper.Op.DELETE &&
cursorPath.equals(path) && times.incrementAndGet() == 1;
+ });
+
mockZooKeeperGlobal.failConditional(KeeperException.Code.SESSIONEXPIRED, (op,
path) -> {
+ return op == MockZooKeeper.Op.DELETE &&
cursorPath.equals(path) && times.incrementAndGet() == 1;
+ });
+ try {
+ admin.topics().deleteSubscription(topicName, "__compaction");
+ fail("Should fail");
+ } catch (Exception ex) {
+ assertTrue(ex instanceof
PulsarAdminException.ServerSideErrorException);
+ }
+ }
+
+ // Create a reader with start at earliest.
+ // Verify: the reader will receive 3 messages.
+ admin.topics().unload(topicName);
+ Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topicName).readCompacted(true)
+ .startMessageId(MessageId.earliest).create();
+ producer.newMessage().key("k3").value("v3").send();
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m0 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m0.getValue(), "v0");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m1 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m1.getValue(), "v1");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m2 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m2.getValue(), "v2");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m3 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m3.getValue(), "v3");
+
+ // cleanup.
+ producer.close();
+ reader.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ @Test
+ public void testDisableCompactionConcurrently() throws Exception {
+ String topicName = "persistent://public/default/" +
BrokerTestUtil.newUniqueName("tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topicPolicies().setCompactionThreshold(topicName, 1);
+ admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+ var producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ producer.newMessage().key("k0").value("v0").send();
+ triggerCompactionAndWait(topicName);
+ admin.topics().deleteSubscription(topicName, "s1");
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+ AtomicBoolean disablingCompaction =
+ WhiteboxImpl.getInternalState(persistentTopic,
"disablingCompaction");
+
+ // Disable compaction.
+ // Inject a delay when the first time of deleting cursor.
+ AtomicInteger times = new AtomicInteger();
+ String cursorPath = String.format("/managed-ledgers/%s/__compaction",
+ TopicName.get(topicName).getPersistenceNamingEncoding());
+ admin.topicPolicies().removeCompactionThreshold(topicName);
+ mockZooKeeper.delay(5000, (op, path) -> {
+ return op == MockZooKeeper.Op.DELETE && cursorPath.equals(path) &&
times.incrementAndGet() == 1;
+ });
+ mockZooKeeperGlobal.delay(5000, (op, path) -> {
+ return op == MockZooKeeper.Op.DELETE && cursorPath.equals(path) &&
times.incrementAndGet() == 1;
+ });
+ AtomicReference<CompletableFuture<Void>> f1 = new
AtomicReference<CompletableFuture<Void>>();
+ AtomicReference<CompletableFuture<Void>> f2 = new
AtomicReference<CompletableFuture<Void>>();
+ new Thread(() -> {
+ f1.set(admin.topics().deleteSubscriptionAsync(topicName,
"__compaction"));
+ }).start();
+ new Thread(() -> {
+ f2.set(admin.topics().deleteSubscriptionAsync(topicName,
"__compaction"));
+ }).start();
+
+ // Verify: the next compaction will be skipped.
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(disablingCompaction.get());
+ });
+ producer.newMessage().key("k1").value("v1").send();
+ producer.newMessage().key("k2").value("v2").send();
+ CompletableFuture<Long> currentCompaction1 =
+ WhiteboxImpl.getInternalState(persistentTopic,
"currentCompaction");
+ persistentTopic.triggerCompaction();
+ CompletableFuture<Long> currentCompaction2 =
+ WhiteboxImpl.getInternalState(persistentTopic,
"currentCompaction");
Review Comment:
Again, avoid reflection/`WhiteboxImpl`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]