horizonzy opened a new issue, #16569:
URL: https://github.com/apache/pulsar/issues/16569
## Motivation
Original issue: #13238
In current ledger deletion, we divided it into two separate steps. It
happens in ManagedLedger and ManagedCursor.
Remove all the waiting to delete ledgers from ledger list and update the
newest ledger list into meta store.
In the meta store update callback operation, delete the waiting to delete
ledgers from storage system, such as BookKeeper or Tiered storage.
Due to the separate step, we can’t ensure the ledger deletion transaction.
If the first step succeed and the second step failed, it will lead to ledgers
can’t be deleted from storage system forever. The second step maybe fail by
broker restart or storage system deletion failed.
In our customer’s environment, we have found many orphan ledgers cause by
the above reason.
## Design
Based on the above, we Introduce LedgerDeletionService to support two phase
deletion. We hope it provide general solution for two phase deletion. It will
cover the problem we already found in managed-ledger, managed-cursor and
schema-storage.
In this design, we use the system topic to help us to store the pending
delete ledger.
* pulsar/system/persistent/__ledger_deletion : store the pending delete
ledger
* pulsar/system/persistent/__ledger_deletion_archive : as the DLQ for above
### The first phase:
```
client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class))
.topic("pulsar/system/persistent/__ledger_deletion")
.enableBatching(false)
.createAsync();
```
In the LedgerDeletionService start, it will create a producer to send
pending delete ledger.
When delete a ledger, a PendingDeleteLedgerInfo msg with 1 min delay (the
delay is for consumer side, if send it immediately, maybe the metadata din't
change when consumer receive it). After the send operation succeed, then to
operate metadata. If send msg failed, we think this deletion operation failed,
didn't operate metadata.
**PendingDeleteLedgerInfo**
```
public class PendingDeleteLedgerInfo {
/**
* Partitioned topic name without domain. Likes
public/default/test-topic-partition-1 or
* public/default/test-topic
*/
private String topicName;
/**
* The ledger component . managed-ledger, managed-cursor and
schema-storage.
*/
private LedgerComponent ledgerComponent;
/**
* The ledger type. ledger or offload-ledger.
*/
private LedgerType ledgerType;
/**
* LedgerId.
*/
private Long ledgerId;
/**
* Context, holds offload info. If bk ledger, the context is null.
*/
private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;
/**
* When consumer received pending delete ledger, maybe the ledger still
in use, we need check the ledger is in use.
* In some cases, we needn't check the ledger still in use.
*/
private boolean checkLedgerStillInUse;
/**
* Extent properties.
*/
private Map<String, String> properties = new HashMap<>();
}
```
### The second phase
```
client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class))
.topic("pulsar/system/persistent/__ledger_deletion")
.subscriptionName("ledger-deletion-worker")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.deadLetterTopic(SystemTopicNames.LEDGER_DELETION_ARCHIVE_TOPIC.getPartitionedTopicName())
.maxRedeliverCount(10).build())
.subscribeAsync()
```
In the LedgerDeletionService start, it will start a consumer to consume
pending delete ledger.
### Check the ledger is still in use
When received a pending delete ledger, we should check the pending delete
ledger is still exists in the metadata. If exists, we should give up to delete
this ledger in this time, the consumer won't ack this message, do
reconsumeLater 10 min. If not exist, it will try to delete legder or
offload-ledger according the ledger-type. If delete succeed, ack this message,
if delete failed, reconsumerLater 10 min. If a PendingDeleteLedger msg
reconsume reach 10, the msg will transfer to DLQ
pulsar/system/persistent/__ledger_deletion_archive
_Tips: We define DLQ maxRedeliverCount is 10 and reconsmeLater 10 min, If
the storage system shutdown, the pending delete ledger will try to delete 10
times in 100 min. So we don't worry if the storage system shutdown, the ledger
can't be delete._
### How to get exists ledger when consumer recieved pending delete legder
Now we supply admin api topics getInternalStats {topic-name}, the response
contains three part info we want.
* managed-ledger ledgerIds
* managed-cursor ledgerIds
* schema-storage ledgerIds
So when received pending delete ledger, we will fetch topic internal stats.
To avoid fetching topic internal stats too frequent, we define ledgersCache
for them.
```
private final LoadingCache<String, TreeSet<Long>> ledgersCache =
Caffeine.newBuilder()
.expireAfterWrite(10, TimeUnit.MINUTES)
.build(new CacheLoader<>() {
@Override
public @Nullable TreeSet<Long> load(@NonNull String key)
throws Exception {
return fetchInUseLedgerIds(key);
}
});
private TreeSet<Long> fetchInUseLedgerIds(String topicName) throws
PulsarAdminException {
TreeSet<Long> ledgerIds = new TreeSet<>();
PersistentTopicInternalStats internalStats =
pulsarAdmin.topics().getInternalStats(topicName);
ledgerIds.addAll(internalStats.ledgers.stream().map(ele1 ->
ele1.ledgerId).collect(Collectors.toSet()));
ledgerIds.addAll(
internalStats.cursors.values().stream().map(ele ->
ele.cursorLedger).collect(Collectors.toSet()));
ledgerIds.addAll(internalStats.schemaLedgers.stream().map(ele ->
ele.ledgerId).collect(Collectors.toSet()));
return ledgerIds;
}
```
The key is **topicName**, the value is **ledgerId set**.
It will expire after write 10 min.
* Reduce memory, if some topic didn't delete ledger anymore, it won't spend
memory.
* Decrease frequency of fetch metadata.
When received pending delete ledger, we get ledgerId set from ledgersCache,
then check the ledgerId is greater than the max ledgerId from ledgerId set. if
the ledgerId greater than max ledgerId from ledgerId set, it means the cache is
out of date, we shouldn't belive it, we should reconsume this msg 10min later.
After 10 min reconsume, the cache already expire, will fetch new ledgerIds. If
the ledgerId not greater than the max ledgerId from ledgerId set, then the
ledgerId set is contains this ledgerId, if contains, reconsume this msg 10min
later. If not contains, do delete ledger operation.
### Here maybe some middle state
* Case1: Send pending delete ledger succeed, before operate metadata, the
broker down.
The consumer will received the pending delete ledger msg, but the ledger id
still exists, consumer didn't delete this ledger.
* Case2: Send pending delete ledger succeed, before operate metadata, the
broker down.After awhile, send this pending delete ledger again, and operate
metadata succeed.
The consumer will received two pending delete ledger msg with a same
ledgerId, the ledger deletion is idempotent, the first deletion and second
deletion will be succeed, ack both msg.
* Case3: Consumer received pending delete ledger msg, and delete ledger
succeed, the broker shutdown before ack the msg.
The pending delete ledger msg will redelivery, the second consume will
delete pending delete ledger again, cause the ledger deletion is idempotent,
the msg will be ack finally.
### How to create system topic
In LedgerDeletionService start, we will use pulsarAdmin to create
partitioned system topic
pulsar/system/persistent/__ledger_deletion-partition-x. We should ensure the
system topic is partitioned, or if the metadata change by user, the old system
topic pulsar/system/persistent/__ledger_deletion will be lost.
## Process flow
### The first deletion phase:

### The second deletion phase:

## API Changes
Introduce LedgerDeletionService interface
```
public interface LedgerDeletionService {
void start() throws PulsarClientException, PulsarAdminException;
CompletableFuture<?> appendPendingDeleteLedger(String topicName, long
ledgerId, LedgerInfo context, LedgerComponent component,
LedgerType type, boolean
checkLedgerStillInUse);
void close() throws Exception;
CompletableFuture<?> asyncClose();
}
```
## Configuration Changes
Add property in ServiceConfiguration
```
public class ServiceConfiguration {
@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "Using two phase deletion when delete ledger. if true, "
+ "LedgerDeletionService will take over ledger deletion.
(Default false)"
)
private boolean twoPhaseDeletionEnabled;
@FieldContext(
category = CATEGORY_SERVER,
doc = "Ledger deletion parallelism. Create partitioned system
topic with this value when "
+ "twoPhaseDeletionEnabled is true."
)
private int ledgerDeletionParallelismOfTwoPhaseDeletion = 4;
@FieldContext(
category = CATEGORY_SERVER,
doc = "When delete ledger of two phase deletion, it will send
PendingDeleteLedgerInfo to system topic,"
+ " send it delay according this value. (Default 60s)"
)
private int sendDelaySecondsOfTwoPhaseDeletion = 60;
@FieldContext(
category = CATEGORY_SERVER,
doc = "When delete ledger of two phase deletion, it will start
consumer to subscribe system topic,"
+ " when consume PendingDeleteLedgerInfo failed, will
reconsume later according this value."
+ " (Default 600s)"
)
private int reconsumeLaterSecondsOfTwoPhaseDeletion = 600;
}
```
## Documentation Changes
We should add some document for this new feature.
## Compatibility
If user upgrade and enable two phase deletion, the ledger deletion msg will
store in system topic. If user rollback to old version and the system topic msg
haven't consumed all, some ledger maybe not delete.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]