teabot opened a new issue, #19665:
URL: https://github.com/apache/pulsar/issues/19665

   ### Motivation
   
   In the current compact topic implementation, there are no time or 
capacity-based retention policies on the compacted ledger. This means that 
unless a user intervenes manually, the cardinality of the compacted topic can 
only increase over time. This makes it challenging to manage compact topic 
resource usage as the cluster has no automatic mechanism to do so. It also 
makes compacted topics unsuitable for a number of use cases such as windowing, 
limited retention FIFO cache, etc.
   
   It should be noted that compact topics with retention are [supported by 
Kafka](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_cleanup.policy)
 (`cleanup.policy = [delete, compact]`), and are heavily used within KStreams 
to create look-up tables with short-lived records. Reference: [KIP-71: Enable 
log compaction and deletion to 
co-exist](https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist)
   
   ### Goal
   
   The scope of this proposal is to add a user-configurable, automatic 
retention capability to the compacted ledger component of compact topics. Once 
configured, the brokers would be responsible for expunging messages from the 
compacted ledger that violate the user-declared retention policy.
   
   To adhere to the principle of least surprise, the behaviour of the retention 
mechanism should wherever possible, be identical to that of a regular topic (or 
indeed the uncompacted ledger of the compact topic).
   
   ### API Changes
   
   - `org.apache.pulsar.client.admin.NamespacePolicies`
       - `RetentionPolicies getCompactedRetention(String namespace);`
       - `RetentionPolicies getCompactedRetention(String namespace, boolean 
applied);`
       - `CompletableFuture<RetentionPolicies> 
getCompactedRetentionAsync(namespace topic);`
       - `CompletableFuture<RetentionPolicies> 
getCompactedRetentionAsync(namespace topic, boolean applied);`
       - `RetentionPolicies removeCompactedRetention(String namespace);`
       - `CompletableFuture<RetentionPolicies> 
removeCompactedRetentionAsync(String namespace);`
       - `void removeCompactedRetention(String namespace, RetentionPolicies 
policies);`
       - `CompletableFuture<void> removeCompactedRetentionAsync(String 
namespace, RetentionPolicies policies);`
   - `org.apache.pulsar.client.admin.TopicPolicies`
       - `RetentionPolicies getCompactedRetention(String topic);`
       - `RetentionPolicies getCompactedRetention(String topic, boolean 
applied);`
       - `CompletableFuture<RetentionPolicies> 
getCompactedRetentionAsync(String topic);`
       - `CompletableFuture<RetentionPolicies> 
getCompactedRetentionAsync(String topic, boolean applied);`
       - `RetentionPolicies removeCompactedRetention(String topic);`
       - `CompletableFuture<RetentionPolicies> 
removeCompactedRetentionAsync(String topic);`
       - `void removeCompactedRetention(String topic, RetentionPolicies 
policies);`
       - `CompletableFuture<void> removeCompactedRetentionAsync(String topic, 
RetentionPolicies policies);`
   
   ### Implementation
   
   ## Terminology
   
   For this discussion, we will use the term ‘marked message’ to refer to a 
message that a retention policy would target for deletion. In practice, this 
means that the message is older than the retention interval 
(`RetentionPolicies.sizeInMins`) or is not in the last area of storage of a 
size equal to the retention size (`RetentionPolicies.sizeInMB`).
   
   ## Admin client
   
   ```java
   PulsarAdmin admin = ...;
   admin.topicPolicies().setCompactedRetention(
     “topic-name”,
     new RetentionPolicies(sizeInMins, sizeInMB)
   );
   ```
   
   ## Compactor
   
   Uncompacted topic data comprises multiple ledgers. These ledgers are closed 
and new ones are created as messages are produced into the topic. Ledgers are 
immutable and it is not possible to remove marked messages in an existing 
ledger. However, we can eventually free storage resources when all messages in 
the ledger have been marked by the retention policy. Therefore we release 
storage on a per-ledger basis.
   
   Compacted topic data comprises a single ledger, created by the compactor 
process when compaction begins and then closed when it ends. While a retention 
policy can identify marked messages, we cannot release storage in a similar 
incremental manner to uncompacted data.
   
   Instead the compactor (`org.apache.pulsar.compaction.TwoPhaseCompactor`) can 
exclude marked messages as it writes a compacted ledger. This will allow for a 
retention window to be applied on the new ledger, and the old ledger will be 
deleted thus freeing resources. An implication of this is that retention 
enforcement is wholly dependent on compactor execution.
   
   
![image](https://user-images.githubusercontent.com/228950/221940863-4d168227-73dd-41cd-a22d-588334c61753.png)
   
   ## Broker
   
   The API residing in `org.apache.pulsar.broker.admin` will need to be 
extended to manage query/persist actions on Topic and Namespace compact 
retention policies.
   
   ## Metadata persistence
   
   In the case of topic-scoped compaction retention policies, we must add 
additional metadata entities/attributes to 
`org.apache.pulsar.broker.service.TopicPoliciesService` and associate classes 
so that the compact retention policy can be persisted and queries. We must 
perform a similar set of changes to 
`org.apache.pulsar.broker.resources.NamespaceResources` for the 
namespace-scoped compaction retention policies.
   
   ## Documentation
   
   The following documentation artefacts will need to be updated:
   
   - [Topic 
compaction](https://pulsar.apache.org/docs/next/concepts-topic-compaction)
   - [Compaction cookbook](https://pulsar.apache.org/docs/cookbooks-compaction)
   
   
   ### Alternatives
   
   1. Users can iterate over messages manually and then send tombstones based 
on observed message timestamps and/or cumulative message size. These messages 
will then be removed from the next compacted ledger during the next compaction.
   2. Topic compaction implemented as a retention policy using the existing 
ledger set.
   
   ### Anything else?
   
   There are two configurations that currently influence the logical compacted 
event log: the topic’s retention policy and the compactor’s time of execution. 
There exists therefore some subtle interactions in the regions where these 
configurations overlap. What if a recent key update is removed from the 
uncompacted ledgers by a retention policy before the update is merged into the 
compacted ledger by a compaction event? If this behaviour is not desirable, it 
is clearly necessary to run compaction at an interval smaller than that of any 
possible retention window. This change introduces a third influence: the 
compacted ledger’s retention policy, and this will make the overall log 
behaviour harder to reason about in these small, overlapping regions. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to