This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new fe5f3b8c485 [fix][broker]Delete compacted ledger when topic is deleted
(#21745) (#21850)
fe5f3b8c485 is described below
commit fe5f3b8c485d0ddc21b17293ede46b936c83d249
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Jan 10 22:27:55 2024 +0800
[fix][broker]Delete compacted ledger when topic is deleted (#21745) (#21850)
---
.../service/persistent/CompactorSubscription.java | 17 +++
.../broker/service/persistent/PersistentTopic.java | 59 ++++++++--
.../apache/pulsar/client/impl/RawReaderImpl.java | 3 +-
.../pulsar/compaction/CompactedTopicImpl.java | 11 ++
.../apache/pulsar/compaction/CompactionTest.java | 131 +++++++++++++++++++++
5 files changed, 212 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index f7279968c51..f6f8d0b9e0f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -28,6 +29,8 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,5 +109,19 @@ public class CompactorSubscription extends
PersistentSubscription {
}
}
+ CompletableFuture<Void> cleanCompactedLedger() {
+ final CompletableFuture<CompactedTopicContext>
compactedTopicContextFuture =
+ ((CompactedTopicImpl)
compactedTopic).getCompactedTopicContextFuture();
+ if (compactedTopicContextFuture != null) {
+ return compactedTopicContextFuture.thenCompose(context -> {
+ long compactedLedgerId = context.getLedger().getId();
+ ((CompactedTopicImpl) compactedTopic).reset();
+ return compactedTopic.deleteCompactedLedger(compactedLedgerId);
+ });
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
private static final Logger log =
LoggerFactory.getLogger(CompactorSubscription.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d57ed91ab81..8e3682af1df 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -201,7 +201,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
protected final MessageDeduplication messageDeduplication;
private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
- private CompletableFuture<Long> currentCompaction =
CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
+ private volatile CompletableFuture<Long> currentCompaction =
CompletableFuture.completedFuture(
+ COMPACTION_NEVER_RUN);
private final CompactedTopic compactedTopic;
private CompletableFuture<MessageIdImpl> currentOffload =
CompletableFuture.completedFuture(
@@ -1031,13 +1032,13 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
- asyncDeleteCursor(subscriptionName,
unsubscribeFuture);
+
asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture);
}
@Override
public void deleteLedgerFailed(ManagedLedgerException
exception, Object ctx) {
if (exception instanceof
MetadataNotFoundException) {
- asyncDeleteCursor(subscriptionName,
unsubscribeFuture);
+
asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture);
return;
}
@@ -1047,12 +1048,41 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
}, null);
} else {
- asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+ asyncDeleteCursorWithCleanCompactionLedger(subscriptionName,
unsubscribeFuture);
}
return unsubscribeFuture;
}
+ private void asyncDeleteCursorWithCleanCompactionLedger(String
subscriptionName,
+
CompletableFuture<Void> unsubscribeFuture) {
+ PersistentSubscription subscription =
subscriptions.get(subscriptionName);
+ if (subscription == null) {
+ log.warn("[{}][{}] Can't find subscription, skip delete cursor",
topic, subscriptionName);
+ unsubscribeFuture.complete(null);
+ return;
+ }
+
+ if ((!isCompactionSubscription(subscriptionName)) || !(subscription
instanceof CompactorSubscription)) {
+ asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+ return;
+ }
+
+ currentCompaction.handle((__, e) -> {
+ if (e != null) {
+ log.warn("[{}][{}] Last compaction task failed", topic,
subscriptionName);
+ }
+ return ((CompactorSubscription)
subscription).cleanCompactedLedger();
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("[{}][{}] Error cleaning compacted ledger", topic,
subscriptionName, ex);
+ unsubscribeFuture.completeExceptionally(ex);
+ } else {
+ asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+ }
+ });
+ }
+
private void asyncDeleteCursor(String subscriptionName,
CompletableFuture<Void> unsubscribeFuture) {
ledger.asyncDeleteCursor(Codec.encode(subscriptionName), new
DeleteCursorCallback() {
@Override
@@ -2844,11 +2874,24 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
public synchronized void triggerCompaction()
throws PulsarServerException, AlreadyRunningException {
if (currentCompaction.isDone()) {
- currentCompaction =
brokerService.pulsar().getCompactor().compact(topic);
+ if (!lock.readLock().tryLock()) {
+ log.info("[{}] Conflict topic-close, topic-delete, skip
triggering compaction", topic);
+ return;
+ }
+ try {
+ if (isClosingOrDeleting) {
+ log.info("[{}] Topic is closing or deleting, skip
triggering compaction", topic);
+ return;
+ }
+
+ currentCompaction =
brokerService.pulsar().getCompactor().compact(topic);
+ } finally {
+ lock.readLock().unlock();
+ }
currentCompaction.whenComplete((ignore, ex) -> {
- if (ex != null){
- log.warn("[{}] Compaction failure.", topic, ex);
- }
+ if (ex != null) {
+ log.warn("[{}] Compaction failure.", topic, ex);
+ }
});
} else {
throw new AlreadyRunningException("Compaction already in
progress");
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 8faf02c81b3..8cc771959f1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -59,6 +59,7 @@ public class RawReaderImpl implements RawReader {
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
consumerConfiguration.setReadCompacted(true);
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+ consumerConfiguration.setAckReceiptEnabled(true);
consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture);
@@ -122,7 +123,7 @@ public class RawReaderImpl implements RawReader {
MessageId.earliest,
0 /* startMessageRollbackDurationInSec */,
Schema.BYTES, null,
- true
+ false
);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 9b25b895ba8..f4767fc6533 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -31,6 +31,7 @@ import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
@@ -330,6 +331,16 @@ public class CompactedTopicImpl implements CompactedTopic {
public synchronized Optional<Position> getCompactionHorizon() {
return Optional.ofNullable(this.compactionHorizon);
}
+
+ public void reset() {
+ this.compactionHorizon = null;
+ this.compactedTopicContext = null;
+ }
+
+ @Nullable
+ public CompletableFuture<CompactedTopicContext>
getCompactedTopicContextFuture() {
+ return compactedTopicContext;
+ }
private static final Logger log =
LoggerFactory.getLogger(CompactedTopicImpl.class);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 6792d4a4e2c..c1fb370ae6f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -47,15 +48,19 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -64,6 +69,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Topic;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -84,6 +90,7 @@ import
org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -1969,4 +1976,128 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
}
}
}
+
+ @Test
+ public void testDeleteCompactedLedger() throws Exception {
+ String topicName =
"persistent://my-property/use/my-ns/testDeleteCompactedLedger";
+
+ final String subName = "my-sub";
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+
+
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close();
+
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().key(String.valueOf(i %
2)).value(String.valueOf(i)).sendAsync();
+ }
+ producer.flush();
+
+ Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topicName).get();
+
+ MutableLong compactedLedgerId = new MutableLong(-1);
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topicName);
+ Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+ compactedLedgerId.setValue(stats.compactedLedger.ledgerId);
+ Assert.assertEquals(stats.compactedLedger.entries, 2L);
+ });
+
+ // delete compacted ledger
+ admin.topics().deleteSubscription(topicName, "__compaction");
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topicName);
+ Assert.assertEquals(stats.compactedLedger.ledgerId, -1L);
+ Assert.assertEquals(stats.compactedLedger.entries, -1L);
+ assertThrows(BKException.BKNoSuchLedgerExistsException.class, ()
-> pulsar.getBookKeeperClient()
+ .openLedger(compactedLedgerId.getValue(),
BookKeeper.DigestType.CRC32C, new byte[]{}));
+ });
+
+ compactor.compact(topicName).get();
+
+ MutableLong compactedLedgerId2 = new MutableLong(-1);
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topicName);
+ Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+ compactedLedgerId2.setValue(stats.compactedLedger.ledgerId);
+ Assert.assertEquals(stats.compactedLedger.entries, 2L);
+ });
+
+ producer.close();
+ admin.topics().delete(topicName);
+
+ Awaitility.await().untilAsserted(() ->
assertThrows(BKException.BKNoSuchLedgerExistsException.class,
+ () -> pulsar.getBookKeeperClient().openLedger(
+ compactedLedgerId2.getValue(),
BookKeeper.DigestType.CRC32, new byte[]{})));
+ }
+
+ @Test
+ public void testDeleteCompactedLedgerWithSlowAck() throws Exception {
+ // Disable topic level policies, since block ack thread may also block
thread of delete topic policies.
+ conf.setTopicLevelPoliciesEnabled(false);
+ restartBroker();
+
+ String topicName =
"persistent://my-property/use/my-ns/testDeleteCompactedLedgerWithSlowAck";
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+
+
pulsarClient.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName(Compactor.COMPACTION_SUBSCRIPTION)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe()
+ .close();
+
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().key(String.valueOf(i %
2)).value(String.valueOf(i)).sendAsync();
+ }
+ producer.flush();
+
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentSubscription subscription =
spy(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
+ topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION,
subscription);
+
+ AtomicLong compactedLedgerId = new AtomicLong(-1);
+ AtomicBoolean pauseAck = new AtomicBoolean();
+ Mockito.doAnswer(invocationOnMock -> {
+ Map<String, Long> properties = (Map<String, Long>)
invocationOnMock.getArguments()[2];
+ log.info("acknowledgeMessage properties: {}", properties);
+
compactedLedgerId.set(properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
+ pauseAck.set(true);
+ while (pauseAck.get()) {
+ Thread.sleep(200);
+ }
+ return invocationOnMock.callRealMethod();
+ }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq(
+ CommandAck.AckType.Cumulative), Mockito.any());
+
+ admin.topics().triggerCompaction(topicName);
+
+ while (!pauseAck.get()) {
+ Thread.sleep(100);
+ }
+
+ CompletableFuture<Long> currentCompaction =
+ (CompletableFuture<Long>) FieldUtils.readDeclaredField(topic,
"currentCompaction", true);
+ CompletableFuture<Long> spyCurrentCompaction = spy(currentCompaction);
+ FieldUtils.writeDeclaredField(topic, "currentCompaction",
spyCurrentCompaction, true);
+ currentCompaction.whenComplete((obj, throwable) -> {
+ if (throwable != null) {
+ spyCurrentCompaction.completeExceptionally(throwable);
+ } else {
+ spyCurrentCompaction.complete(obj);
+ }
+ });
+ Mockito.doAnswer(invocationOnMock -> {
+ pauseAck.set(false);
+ return invocationOnMock.callRealMethod();
+ }).when(spyCurrentCompaction).handle(Mockito.any());
+
+ admin.topics().delete(topicName, true);
+
+ Awaitility.await().untilAsserted(() ->
assertThrows(BKException.BKNoSuchLedgerExistsException.class,
+ () -> pulsar.getBookKeeperClient().openLedger(
+ compactedLedgerId.get(), BookKeeper.DigestType.CRC32,
new byte[]{})));
+ }
}