This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new fe5f3b8c485 [fix][broker]Delete compacted ledger when topic is deleted 
(#21745) (#21850)
fe5f3b8c485 is described below

commit fe5f3b8c485d0ddc21b17293ede46b936c83d249
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Jan 10 22:27:55 2024 +0800

    [fix][broker]Delete compacted ledger when topic is deleted (#21745) (#21850)
---
 .../service/persistent/CompactorSubscription.java  |  17 +++
 .../broker/service/persistent/PersistentTopic.java |  59 ++++++++--
 .../apache/pulsar/client/impl/RawReaderImpl.java   |   3 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  11 ++
 .../apache/pulsar/compaction/CompactionTest.java   | 131 +++++++++++++++++++++
 5 files changed, 212 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index f7279968c51..f6f8d0b9e0f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import static com.google.common.base.Preconditions.checkArgument;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -28,6 +29,8 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.CompactedTopicImpl;
 import org.apache.pulsar.compaction.Compactor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,5 +109,19 @@ public class CompactorSubscription extends 
PersistentSubscription {
         }
     }
 
+    CompletableFuture<Void> cleanCompactedLedger() {
+        final CompletableFuture<CompactedTopicContext> 
compactedTopicContextFuture =
+                ((CompactedTopicImpl) 
compactedTopic).getCompactedTopicContextFuture();
+        if (compactedTopicContextFuture != null) {
+            return compactedTopicContextFuture.thenCompose(context -> {
+                long compactedLedgerId = context.getLedger().getId();
+                ((CompactedTopicImpl) compactedTopic).reset();
+                return compactedTopic.deleteCompactedLedger(compactedLedgerId);
+            });
+        } else {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(CompactorSubscription.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d57ed91ab81..8e3682af1df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -201,7 +201,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     protected final MessageDeduplication messageDeduplication;
 
     private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
-    private CompletableFuture<Long> currentCompaction = 
CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
+    private volatile CompletableFuture<Long> currentCompaction = 
CompletableFuture.completedFuture(
+            COMPACTION_NEVER_RUN);
     private final CompactedTopic compactedTopic;
 
     private CompletableFuture<MessageIdImpl> currentOffload = 
CompletableFuture.completedFuture(
@@ -1031,13 +1032,13 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     new AsyncCallbacks.DeleteLedgerCallback() {
                         @Override
                         public void deleteLedgerComplete(Object ctx) {
-                            asyncDeleteCursor(subscriptionName, 
unsubscribeFuture);
+                            
asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture);
                         }
 
                         @Override
                         public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                             if (exception instanceof 
MetadataNotFoundException) {
-                                asyncDeleteCursor(subscriptionName, 
unsubscribeFuture);
+                                
asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture);
                                 return;
                             }
 
@@ -1047,12 +1048,41 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                         }
                     }, null);
         } else {
-            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+            asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, 
unsubscribeFuture);
         }
 
         return unsubscribeFuture;
     }
 
+    private void asyncDeleteCursorWithCleanCompactionLedger(String 
subscriptionName,
+                                                            
CompletableFuture<Void> unsubscribeFuture) {
+        PersistentSubscription subscription = 
subscriptions.get(subscriptionName);
+        if (subscription == null) {
+            log.warn("[{}][{}] Can't find subscription, skip delete cursor", 
topic, subscriptionName);
+            unsubscribeFuture.complete(null);
+            return;
+        }
+
+        if ((!isCompactionSubscription(subscriptionName)) || !(subscription 
instanceof CompactorSubscription)) {
+            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+            return;
+        }
+
+        currentCompaction.handle((__, e) -> {
+            if (e != null) {
+                log.warn("[{}][{}] Last compaction task failed", topic, 
subscriptionName);
+            }
+            return ((CompactorSubscription) 
subscription).cleanCompactedLedger();
+        }).whenComplete((__, ex) -> {
+            if (ex != null) {
+                log.error("[{}][{}] Error cleaning compacted ledger", topic, 
subscriptionName, ex);
+                unsubscribeFuture.completeExceptionally(ex);
+            } else {
+                asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+            }
+        });
+    }
+
     private void asyncDeleteCursor(String subscriptionName, 
CompletableFuture<Void> unsubscribeFuture) {
         ledger.asyncDeleteCursor(Codec.encode(subscriptionName), new 
DeleteCursorCallback() {
             @Override
@@ -2844,11 +2874,24 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     public synchronized void triggerCompaction()
             throws PulsarServerException, AlreadyRunningException {
         if (currentCompaction.isDone()) {
-            currentCompaction = 
brokerService.pulsar().getCompactor().compact(topic);
+            if (!lock.readLock().tryLock()) {
+                log.info("[{}] Conflict topic-close, topic-delete, skip 
triggering compaction", topic);
+                return;
+            }
+            try {
+                if (isClosingOrDeleting) {
+                    log.info("[{}] Topic is closing or deleting, skip 
triggering compaction", topic);
+                    return;
+                }
+
+                currentCompaction = 
brokerService.pulsar().getCompactor().compact(topic);
+            } finally {
+                lock.readLock().unlock();
+            }
             currentCompaction.whenComplete((ignore, ex) -> {
-               if (ex != null){
-                   log.warn("[{}] Compaction failure.", topic, ex);
-               }
+                if (ex != null) {
+                    log.warn("[{}] Compaction failure.", topic, ex);
+                }
             });
         } else {
             throw new AlreadyRunningException("Compaction already in 
progress");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 8faf02c81b3..8cc771959f1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -59,6 +59,7 @@ public class RawReaderImpl implements RawReader {
         
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
         consumerConfiguration.setReadCompacted(true);
         
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+        consumerConfiguration.setAckReceiptEnabled(true);
 
         consumer = new RawConsumerImpl(client, consumerConfiguration,
                                        consumerFuture);
@@ -122,7 +123,7 @@ public class RawReaderImpl implements RawReader {
                     MessageId.earliest,
                     0 /* startMessageRollbackDurationInSec */,
                     Schema.BYTES, null,
-                    true
+                    false
             );
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 9b25b895ba8..f4767fc6533 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -31,6 +31,7 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -330,6 +331,16 @@ public class CompactedTopicImpl implements CompactedTopic {
     public synchronized Optional<Position> getCompactionHorizon() {
         return Optional.ofNullable(this.compactionHorizon);
     }
+
+    public void reset() {
+        this.compactionHorizon = null;
+        this.compactedTopicContext = null;
+    }
+
+    @Nullable
+    public CompletableFuture<CompactedTopicContext> 
getCompactedTopicContextFuture() {
+        return compactedTopicContext;
+    }
     private static final Logger log = 
LoggerFactory.getLogger(CompactedTopicImpl.class);
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 6792d4a4e2c..c1fb370ae6f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -47,15 +48,19 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.BrokerTestUtil;
@@ -64,6 +69,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -84,6 +90,7 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -1969,4 +1976,128 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             }
         }
     }
+
+    @Test
+    public void testDeleteCompactedLedger() throws Exception {
+        String topicName = 
"persistent://my-property/use/my-ns/testDeleteCompactedLedger";
+
+        final String subName = "my-sub";
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().key(String.valueOf(i % 
2)).value(String.valueOf(i)).sendAsync();
+        }
+        producer.flush();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topicName).get();
+
+        MutableLong compactedLedgerId = new MutableLong(-1);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topicName);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+            compactedLedgerId.setValue(stats.compactedLedger.ledgerId);
+            Assert.assertEquals(stats.compactedLedger.entries, 2L);
+        });
+
+        // delete compacted ledger
+        admin.topics().deleteSubscription(topicName, "__compaction");
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topicName);
+            Assert.assertEquals(stats.compactedLedger.ledgerId, -1L);
+            Assert.assertEquals(stats.compactedLedger.entries, -1L);
+            assertThrows(BKException.BKNoSuchLedgerExistsException.class, () 
-> pulsar.getBookKeeperClient()
+                        .openLedger(compactedLedgerId.getValue(), 
BookKeeper.DigestType.CRC32C, new byte[]{}));
+        });
+
+        compactor.compact(topicName).get();
+
+        MutableLong compactedLedgerId2 = new MutableLong(-1);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topicName);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
+            compactedLedgerId2.setValue(stats.compactedLedger.ledgerId);
+            Assert.assertEquals(stats.compactedLedger.entries, 2L);
+        });
+
+        producer.close();
+        admin.topics().delete(topicName);
+
+        Awaitility.await().untilAsserted(() -> 
assertThrows(BKException.BKNoSuchLedgerExistsException.class,
+                () -> pulsar.getBookKeeperClient().openLedger(
+                        compactedLedgerId2.getValue(), 
BookKeeper.DigestType.CRC32, new byte[]{})));
+    }
+
+    @Test
+    public void testDeleteCompactedLedgerWithSlowAck() throws Exception {
+        // Disable topic level policies, since block ack thread may also block 
thread of delete topic policies.
+        conf.setTopicLevelPoliciesEnabled(false);
+        restartBroker();
+
+        String topicName = 
"persistent://my-property/use/my-ns/testDeleteCompactedLedgerWithSlowAck";
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        
pulsarClient.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(Compactor.COMPACTION_SUBSCRIPTION)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe()
+                .close();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().key(String.valueOf(i % 
2)).value(String.valueOf(i)).sendAsync();
+        }
+        producer.flush();
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentSubscription subscription = 
spy(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
+        topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, 
subscription);
+
+        AtomicLong compactedLedgerId = new AtomicLong(-1);
+        AtomicBoolean pauseAck = new AtomicBoolean();
+        Mockito.doAnswer(invocationOnMock -> {
+            Map<String, Long> properties = (Map<String, Long>) 
invocationOnMock.getArguments()[2];
+            log.info("acknowledgeMessage properties: {}", properties);
+            
compactedLedgerId.set(properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
+            pauseAck.set(true);
+            while (pauseAck.get()) {
+                Thread.sleep(200);
+            }
+            return invocationOnMock.callRealMethod();
+        }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq(
+                CommandAck.AckType.Cumulative), Mockito.any());
+
+        admin.topics().triggerCompaction(topicName);
+
+        while (!pauseAck.get()) {
+            Thread.sleep(100);
+        }
+
+        CompletableFuture<Long> currentCompaction =
+                (CompletableFuture<Long>) FieldUtils.readDeclaredField(topic, 
"currentCompaction", true);
+        CompletableFuture<Long> spyCurrentCompaction = spy(currentCompaction);
+        FieldUtils.writeDeclaredField(topic, "currentCompaction", 
spyCurrentCompaction, true);
+        currentCompaction.whenComplete((obj, throwable) -> {
+            if (throwable != null) {
+                spyCurrentCompaction.completeExceptionally(throwable);
+            } else {
+                spyCurrentCompaction.complete(obj);
+            }
+        });
+        Mockito.doAnswer(invocationOnMock -> {
+            pauseAck.set(false);
+            return invocationOnMock.callRealMethod();
+        }).when(spyCurrentCompaction).handle(Mockito.any());
+
+        admin.topics().delete(topicName, true);
+
+        Awaitility.await().untilAsserted(() -> 
assertThrows(BKException.BKNoSuchLedgerExistsException.class,
+                () -> pulsar.getBookKeeperClient().openLedger(
+                        compactedLedgerId.get(), BookKeeper.DigestType.CRC32, 
new byte[]{})));
+    }
 }

Reply via email to