This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new cde35efac6b [fix][broker] Delete compacted ledger when topic is 
deleted (#21745)
cde35efac6b is described below

commit cde35efac6bdfd5be16dc49b64c264abc6adb198
Author: Cong Zhao <[email protected]>
AuthorDate: Fri Dec 29 14:39:45 2023 +0800

    [fix][broker] Delete compacted ledger when topic is deleted (#21745)
    
    (cherry picked from commit 23391d3c3db6b8db6200dc74adc6f4c4fd453fc4)
---
 .../service/persistent/CompactorSubscription.java  |  17 +++
 .../broker/service/persistent/PersistentTopic.java |  61 ++++++++--
 .../apache/pulsar/client/impl/RawReaderImpl.java   |   3 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  11 ++
 .../apache/pulsar/compaction/CompactionTest.java   | 130 +++++++++++++++++++++
 5 files changed, 209 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index ec34aeffbec..7921c25b071 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -22,12 +22,15 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.broker.service.AbstractBaseDispatcher.checkAndApplyReachedEndOfTopicOrTopicMigration;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.CompactedTopicImpl;
 import org.apache.pulsar.compaction.Compactor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,5 +109,19 @@ public class CompactorSubscription extends 
PersistentSubscription {
         }
     }
 
+    CompletableFuture<Void> cleanCompactedLedger() {
+        final CompletableFuture<CompactedTopicContext> 
compactedTopicContextFuture =
+                ((CompactedTopicImpl) 
compactedTopic).getCompactedTopicContextFuture();
+        if (compactedTopicContextFuture != null) {
+            return compactedTopicContextFuture.thenCompose(context -> {
+                long compactedLedgerId = context.getLedger().getId();
+                ((CompactedTopicImpl) compactedTopic).reset();
+                return compactedTopic.deleteCompactedLedger(compactedLedgerId);
+            });
+        } else {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(CompactorSubscription.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d521f6609a8..dabfd0379ec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -217,7 +217,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     protected final MessageDeduplication messageDeduplication;
 
     private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
-    private CompletableFuture<Long> currentCompaction = 
CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
+    private volatile CompletableFuture<Long> currentCompaction = 
CompletableFuture.completedFuture(
+            COMPACTION_NEVER_RUN);
     private final CompactedTopic compactedTopic;
 
     // TODO: Create compaction strategy from topic policy when exposing 
strategic compaction to users.
@@ -1164,13 +1165,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                                           
CompletableFuture<Void> unsubscribeFuture) {
         PersistentSubscription persistentSubscription = 
subscriptions.get(subscriptionName);
         if (persistentSubscription == null) {
-            log.warn("[{}][{}] Can't find subscription, skip clear delayed 
message", topic, subscriptionName);
+            log.warn("[{}][{}] Can't find subscription, skip delete cursor", 
topic, subscriptionName);
             unsubscribeFuture.complete(null);
             return;
         }
+
         if (!isDelayedDeliveryEnabled()
                 || !(brokerService.getDelayedDeliveryTrackerFactory() 
instanceof BucketDelayedDeliveryTrackerFactory)) {
-            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+            asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, 
unsubscribeFuture);
             return;
         }
 
@@ -1185,7 +1187,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     if (ex != null) {
                         unsubscribeFuture.completeExceptionally(ex);
                     } else {
-                        asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                        
asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, 
unsubscribeFuture);
                     }
                 });
             }
@@ -1195,6 +1197,29 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         dispatcher.clearDelayedMessages().whenComplete((__, ex) -> {
             if (ex != null) {
                 unsubscribeFuture.completeExceptionally(ex);
+            } else {
+                
asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, 
unsubscribeFuture);
+            }
+        });
+    }
+
+    private void 
asyncDeleteCursorWithCleanCompactionLedger(PersistentSubscription subscription,
+                                                            
CompletableFuture<Void> unsubscribeFuture) {
+        final String subscriptionName = subscription.getName();
+        if ((!isCompactionSubscription(subscriptionName)) || !(subscription 
instanceof CompactorSubscription)) {
+            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+            return;
+        }
+
+        currentCompaction.handle((__, e) -> {
+            if (e != null) {
+                log.warn("[{}][{}] Last compaction task failed", topic, 
subscriptionName);
+            }
+            return ((CompactorSubscription) 
subscription).cleanCompactedLedger();
+        }).whenComplete((__, ex) -> {
+            if (ex != null) {
+                log.error("[{}][{}] Error cleaning compacted ledger", topic, 
subscriptionName, ex);
+                unsubscribeFuture.completeExceptionally(ex);
             } else {
                 asyncDeleteCursor(subscriptionName, unsubscribeFuture);
             }
@@ -3205,17 +3230,29 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     public synchronized void triggerCompaction()
             throws PulsarServerException, AlreadyRunningException {
         if (currentCompaction.isDone()) {
+            if (!lock.readLock().tryLock()) {
+                log.info("[{}] Conflict topic-close, topic-delete, skip 
triggering compaction", topic);
+                return;
+            }
+            try {
+                if (isClosingOrDeleting) {
+                    log.info("[{}] Topic is closing or deleting, skip 
triggering compaction", topic);
+                    return;
+                }
 
-            if (strategicCompactionMap.containsKey(topic)) {
-                currentCompaction = 
brokerService.pulsar().getStrategicCompactor()
-                        .compact(topic, strategicCompactionMap.get(topic));
-            } else {
-                currentCompaction = 
brokerService.pulsar().getCompactor().compact(topic);
+                if (strategicCompactionMap.containsKey(topic)) {
+                    currentCompaction = 
brokerService.pulsar().getStrategicCompactor()
+                            .compact(topic, strategicCompactionMap.get(topic));
+                } else {
+                    currentCompaction = 
brokerService.pulsar().getCompactor().compact(topic);
+                }
+            } finally {
+                lock.readLock().unlock();
             }
             currentCompaction.whenComplete((ignore, ex) -> {
-               if (ex != null){
-                   log.warn("[{}] Compaction failure.", topic, ex);
-               }
+                if (ex != null) {
+                    log.warn("[{}] Compaction failure.", topic, ex);
+                }
             });
         } else {
             throw new AlreadyRunningException("Compaction already in 
progress");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 70bda888bf7..f6523241399 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -59,6 +59,7 @@ public class RawReaderImpl implements RawReader {
         
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
         consumerConfiguration.setReadCompacted(true);
         
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+        consumerConfiguration.setAckReceiptEnabled(true);
 
         consumer = new RawConsumerImpl(client, consumerConfiguration,
                                        consumerFuture);
@@ -122,7 +123,7 @@ public class RawReaderImpl implements RawReader {
                     MessageId.earliest,
                     0 /* startMessageRollbackDurationInSec */,
                     Schema.BYTES, null,
-                    true
+                    false
             );
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 75d8fcffd3f..22520654135 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -32,6 +32,7 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -332,6 +333,16 @@ public class CompactedTopicImpl implements CompactedTopic {
     public synchronized Optional<Position> getCompactionHorizon() {
         return Optional.ofNullable(this.compactionHorizon);
     }
+
+    public void reset() {
+        this.compactionHorizon = null;
+        this.compactedTopicContext = null;
+    }
+
+    @Nullable
+    public CompletableFuture<CompactedTopicContext> 
getCompactedTopicContextFuture() {
+        return compactedTopicContext;
+    }
     private static final Logger log = 
LoggerFactory.getLogger(CompactedTopicImpl.class);
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index add1b3e9252..58646105f31 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -26,7 +26,9 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
+
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
@@ -47,15 +49,19 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.BrokerTestUtil;
@@ -84,6 +90,7 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -1997,4 +2004,127 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             }
         }
     }
+
+    @Test
+    public void testDeleteCompactedLedger() throws Exception {
+        String topicName = 
"persistent://my-property/use/my-ns/testDeleteCompactedLedger";
+
+        final String subName = "my-sub";
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().key(String.valueOf(i % 
2)).value(String.valueOf(i)).sendAsync();
+        }
+        producer.flush();
+
+        compact(topicName);
+
+        MutableLong compactedLedgerId = new MutableLong(-1);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topicName);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+            compactedLedgerId.setValue(stats.compactedLedger.ledgerId);
+            Assert.assertEquals(stats.compactedLedger.entries, 2L);
+        });
+
+        // delete compacted ledger
+        admin.topics().deleteSubscription(topicName, "__compaction");
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topicName);
+            Assert.assertEquals(stats.compactedLedger.ledgerId, -1L);
+            Assert.assertEquals(stats.compactedLedger.entries, -1L);
+            assertThrows(BKException.BKNoSuchLedgerExistsException.class, () 
-> pulsarTestContext.getBookKeeperClient()
+                        .openLedger(compactedLedgerId.getValue(), 
BookKeeper.DigestType.CRC32C, new byte[]{}));
+        });
+
+        compact(topicName);
+
+        MutableLong compactedLedgerId2 = new MutableLong(-1);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topicName);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+            compactedLedgerId2.setValue(stats.compactedLedger.ledgerId);
+            Assert.assertEquals(stats.compactedLedger.entries, 2L);
+        });
+
+        producer.close();
+        admin.topics().delete(topicName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThrows(BKException.BKNoSuchLedgerExistsException.class,
+                () -> pulsarTestContext.getBookKeeperClient().openLedger(
+                        compactedLedgerId2.getValue(), 
BookKeeper.DigestType.CRC32, new byte[]{})));
+    }
+
+    @Test
+    public void testDeleteCompactedLedgerWithSlowAck() throws Exception {
+        // Disable topic level policies, since block ack thread may also block 
thread of delete topic policies.
+        conf.setTopicLevelPoliciesEnabled(false);
+        restartBroker();
+
+        String topicName = 
"persistent://my-property/use/my-ns/testDeleteCompactedLedgerWithSlowAck";
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        
pulsarClient.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(Compactor.COMPACTION_SUBSCRIPTION)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe()
+                .close();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().key(String.valueOf(i % 
2)).value(String.valueOf(i)).sendAsync();
+        }
+        producer.flush();
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentSubscription subscription = 
spy(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
+        topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, 
subscription);
+
+        AtomicLong compactedLedgerId = new AtomicLong(-1);
+        AtomicBoolean pauseAck = new AtomicBoolean();
+        Mockito.doAnswer(invocationOnMock -> {
+            Map<String, Long> properties = (Map<String, Long>) 
invocationOnMock.getArguments()[2];
+            log.info("acknowledgeMessage properties: {}", properties);
+            
compactedLedgerId.set(properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
+            pauseAck.set(true);
+            while (pauseAck.get()) {
+                Thread.sleep(200);
+            }
+            return invocationOnMock.callRealMethod();
+        }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq(
+                CommandAck.AckType.Cumulative), Mockito.any());
+
+        admin.topics().triggerCompaction(topicName);
+
+        while (!pauseAck.get()) {
+            Thread.sleep(100);
+        }
+
+        CompletableFuture<Long> currentCompaction =
+                (CompletableFuture<Long>) FieldUtils.readDeclaredField(topic, 
"currentCompaction", true);
+        CompletableFuture<Long> spyCurrentCompaction = spy(currentCompaction);
+        FieldUtils.writeDeclaredField(topic, "currentCompaction", 
spyCurrentCompaction, true);
+        currentCompaction.whenComplete((obj, throwable) -> {
+            if (throwable != null) {
+                spyCurrentCompaction.completeExceptionally(throwable);
+            } else {
+                spyCurrentCompaction.complete(obj);
+            }
+        });
+        Mockito.doAnswer(invocationOnMock -> {
+            pauseAck.set(false);
+            return invocationOnMock.callRealMethod();
+        }).when(spyCurrentCompaction).handle(Mockito.any());
+
+        admin.topics().delete(topicName, true);
+
+        Awaitility.await().untilAsserted(() -> 
assertThrows(BKException.BKNoSuchLedgerExistsException.class,
+                () -> pulsarTestContext.getBookKeeperClient().openLedger(
+                        compactedLedgerId.get(), BookKeeper.DigestType.CRC32, 
new byte[]{})));
+    }
 }

Reply via email to