horizonzy opened a new issue, #16569:
URL: https://github.com/apache/pulsar/issues/16569

   ## Motivation
   Original issue: #13238
   In current ledger deletion, we divided it into two separate steps. It 
happens in ManagedLedger and ManagedCursor.
   Remove all the waiting to delete ledgers from ledger list and update the 
newest ledger list into meta store.
   In the meta store update callback operation, delete the waiting to delete 
ledgers from storage system, such as BookKeeper or Tiered storage.
   
   Due to the separate step, we can’t ensure the ledger deletion transaction. 
If the first step succeed and the second step failed, it will lead to ledgers 
can’t be deleted from storage system forever. The second step maybe fail by 
broker restart or storage system deletion failed.
   
   In our customer’s environment, we have found many orphan ledgers cause by 
the above reason.
   ## Design
   Based on the above, we Introduce LedgerDeletionService to support two phase 
deletion. We hope it provide general solution for two phase deletion. It will 
cover the problem we already found in managed-ledger, managed-cursor and 
schema-storage.
   
   In this design, we use the system topic to help us to store the pending 
delete ledger.
   * pulsar/system/persistent/__ledger_deletion : store the pending delete 
ledger
   * pulsar/system/persistent/__ledger_deletion_archive : as the DLQ for above
   
   
   ### The first phase:
   ```
   client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class))
    .topic("pulsar/system/persistent/__ledger_deletion")
    .enableBatching(false)
    .createAsync();
   ```
   In the LedgerDeletionService  start, it will create  a producer to send 
pending delete ledger.
   When delete a ledger,  a PendingDeleteLedgerInfo msg with 1 min delay (the 
delay is for consumer side, if send it immediately, maybe the metadata din't 
change when consumer receive it). After the send operation succeed,  then to 
operate metadata. If send msg failed, we think this deletion operation failed, 
didn't operate metadata.
   
   **PendingDeleteLedgerInfo**
   ```
   public class PendingDeleteLedgerInfo {
       /**
        * Partitioned topic name without domain. Likes 
public/default/test-topic-partition-1 or
        * public/default/test-topic
        */
       private String topicName;
   
       /**
        * The ledger component . managed-ledger, managed-cursor and 
schema-storage.
        */
       private LedgerComponent ledgerComponent;
   
       /**
        * The ledger type. ledger or offload-ledger.
        */
       private LedgerType ledgerType;
   
       /**
        * LedgerId.
        */
       private Long ledgerId;
   
       /**
        * Context, holds offload info. If bk ledger, the context is null.
        */
       private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;
   
       /**
        * When consumer received pending delete ledger, maybe the ledger still 
in use, we need check the ledger is in use.
        * In some cases, we needn't check the ledger still in use.
        */
       private boolean checkLedgerStillInUse;
   
       /**
        * Extent properties.
        */
       private Map<String, String> properties = new HashMap<>();
   }
   ```
   ### The second phase
   ```
   client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class))
   .topic("pulsar/system/persistent/__ledger_deletion")
   .subscriptionName("ledger-deletion-worker")
   .subscriptionType(SubscriptionType.Shared)
   .deadLetterPolicy(DeadLetterPolicy.builder()
            
.deadLetterTopic(SystemTopicNames.LEDGER_DELETION_ARCHIVE_TOPIC.getPartitionedTopicName())
            .maxRedeliverCount(10).build())
    .subscribeAsync()
   ```
   In the LedgerDeletionService start, it will start a consumer to consume 
pending delete ledger. 
   
   ### Check the ledger is still in use
   When received a pending delete ledger, we should check the pending delete 
ledger is still exists in the metadata. If exists, we should give up to delete 
this ledger in this time, the consumer won't ack this message,  do 
reconsumeLater 10 min. If not exist, it will try to delete legder or 
offload-ledger according the ledger-type. If delete succeed, ack this message, 
if delete failed, reconsumerLater 10 min. If a PendingDeleteLedger msg 
reconsume reach 10, the msg will transfer to DLQ 
pulsar/system/persistent/__ledger_deletion_archive
   
   _Tips: We define DLQ maxRedeliverCount is 10 and reconsmeLater 10 min, If 
the storage system shutdown, the pending delete ledger will try to delete 10 
times in 100 min. So we don't worry if the storage system shutdown, the ledger 
can't be delete._
   
   ### How to get exists ledger when consumer recieved pending delete legder
   Now we supply admin api topics getInternalStats {topic-name}, the response 
contains three part info we want.
   * managed-ledger ledgerIds
   * managed-cursor ledgerIds
   * schema-storage ledgerIds
   
   So when received pending delete ledger,  we will fetch topic internal stats. 
 To avoid fetching topic internal stats too frequent, we define ledgersCache  
for them. 
   ```
       private final LoadingCache<String, TreeSet<Long>> ledgersCache = 
Caffeine.newBuilder()
               .expireAfterWrite(10, TimeUnit.MINUTES)
               .build(new CacheLoader<>() {
                   @Override
                   public @Nullable TreeSet<Long> load(@NonNull String key) 
throws Exception {
                       return fetchInUseLedgerIds(key);
                   }
               });
   
       private TreeSet<Long> fetchInUseLedgerIds(String topicName) throws 
PulsarAdminException {
           TreeSet<Long> ledgerIds = new TreeSet<>();
   
           PersistentTopicInternalStats internalStats = 
pulsarAdmin.topics().getInternalStats(topicName);
   
           ledgerIds.addAll(internalStats.ledgers.stream().map(ele1 -> 
ele1.ledgerId).collect(Collectors.toSet()));
           ledgerIds.addAll(
                   internalStats.cursors.values().stream().map(ele -> 
ele.cursorLedger).collect(Collectors.toSet()));
           ledgerIds.addAll(internalStats.schemaLedgers.stream().map(ele -> 
ele.ledgerId).collect(Collectors.toSet()));
   
           return ledgerIds;
       }
   ```
   
   The key is **topicName**, the value is **ledgerId set**. 
   It will expire after write 10 min.
    * Reduce memory, if some topic didn't delete ledger anymore, it won't spend 
memory.
    * Decrease frequency of fetch metadata. 
   
   When received pending delete ledger, we get ledgerId set from ledgersCache, 
then check the ledgerId is greater than the max ledgerId from ledgerId set. if 
the ledgerId greater than max ledgerId from ledgerId set, it means the cache is 
out of date, we shouldn't belive it, we should reconsume this msg 10min later. 
After 10 min reconsume, the cache already expire, will fetch new ledgerIds. If 
the ledgerId not greater than the max ledgerId from ledgerId set, then the 
ledgerId set is contains this ledgerId, if contains, reconsume this msg 10min 
later. If not contains, do delete ledger operation.
   
   ### Here maybe some middle state
   * Case1: Send pending delete ledger succeed, before operate metadata, the 
broker down.
   The consumer will received the pending delete ledger msg, but the ledger id 
still exists, consumer didn't delete this ledger.
   * Case2: Send pending delete ledger succeed, before operate metadata, the 
broker down.After awhile, send this pending delete ledger again, and operate 
metadata succeed.
   The consumer will received two pending delete ledger msg with a same 
ledgerId, the ledger deletion is idempotent, the first deletion and second 
deletion will be succeed, ack both msg.
   * Case3: Consumer received pending delete ledger msg, and delete ledger 
succeed, the broker shutdown before ack the msg.
   The pending delete ledger msg will redelivery, the second consume will 
delete pending delete ledger again, cause the ledger deletion is idempotent, 
the msg will be ack finally.
   
   ### How to create system topic
   In LedgerDeletionService start, we will use pulsarAdmin to create 
partitioned system topic 
pulsar/system/persistent/__ledger_deletion-partition-x. We should ensure the 
system topic is partitioned, or if the metadata change by user, the old system 
topic pulsar/system/persistent/__ledger_deletion will be lost.
   
   ## Process flow
   ### The first deletion phase:
   
![lwdBzs4upOufb9DAWkVHMpUq](https://user-images.githubusercontent.com/22524871/178689500-281e9ff3-a213-468e-a1ff-d999e6a8be20.png)
   
   
   ### The second deletion phase:
   
![Q1_-vAy4yLtv5PSBo5b_YTxI](https://user-images.githubusercontent.com/22524871/178689513-fce476de-7dd3-4f05-9528-dc3c427fffe4.png)
   
   ## API Changes
   Introduce LedgerDeletionService interface
   ```
   public interface LedgerDeletionService {
   
   
       void start() throws PulsarClientException, PulsarAdminException;
   
       CompletableFuture<?> appendPendingDeleteLedger(String topicName, long 
ledgerId, LedgerInfo context, LedgerComponent component,
                                                      LedgerType type, boolean 
checkLedgerStillInUse);
   
       void close() throws Exception;
   
       CompletableFuture<?> asyncClose();
   }
   ```
   
   ## Configuration Changes
   Add property in ServiceConfiguration
   ```
   public class ServiceConfiguration {
       @FieldContext(
               dynamic = true,
               category = CATEGORY_SERVER,
               doc = "Using two phase deletion when delete ledger. if true, "
                       + "LedgerDeletionService will take over ledger deletion. 
(Default false)"
       )
       private boolean twoPhaseDeletionEnabled;
   
       @FieldContext(
               category = CATEGORY_SERVER,
               doc = "Ledger deletion parallelism. Create partitioned system 
topic with this value when "
                       + "twoPhaseDeletionEnabled is true."
       )
       private int ledgerDeletionParallelismOfTwoPhaseDeletion = 4;  
   
       @FieldContext(
               category = CATEGORY_SERVER,
               doc = "When delete ledger of two phase deletion, it will send 
PendingDeleteLedgerInfo to system topic,"
                       + " send it delay according this value. (Default 60s)"
       )
       private int sendDelaySecondsOfTwoPhaseDeletion = 60;
   
       @FieldContext(
               category = CATEGORY_SERVER,
               doc = "When delete ledger of two phase deletion, it will start 
consumer to subscribe system topic,"
                       + " when consume PendingDeleteLedgerInfo failed, will 
reconsume later according this value."
                       + " (Default 600s)"
       )
       private int reconsumeLaterSecondsOfTwoPhaseDeletion = 600; 
   }
   ```
   
   ## Documentation Changes
   We should add some document for this new feature.
   
   ## Compatibility
   If user upgrade and enable two phase deletion, the ledger deletion msg will 
store in system topic. If user rollback to old version and the system topic msg 
haven't consumed all, some ledger maybe not delete.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to