asafm commented on code in PR #20493: URL: https://github.com/apache/pulsar/pull/20493#discussion_r1221313072
########## pip/pip-274.md: ########## @@ -0,0 +1,126 @@ +# Background knowledge + +Apache Pulsar is a distributed messaging system that supports multiple messaging protocols and storage methods. Among them, Pulsar Topic Compaction is a mechanism to clean up duplicate messages in topics to reduce storage space and improve system efficiency. +More topic compaction details can be found in [Pulsar Topic Compaction](https://pulsar.apache.org/docs/en/concepts-topic-compaction/). + +# Motivation + +Currently, the implementation of Pulsar Topic Compaction is fixed and does not support custom strategy, which limits users from using more Compactor policies in their applications. + + +For example, we need to parse the Kafka format then compact message in Kop, but the current implementation of Pulsar topic compaction does not support this feature. +Another the topic compaction logic implemented in `TwoPhaseCompactor` only compacts messages to the last one, but sometimes we need to keep the first valid message e.g [`StrategicTwoPhaseCompactor`](https://github.com/coderzc/pulsar/blob/0e9935c493060b13b322a84c5418146423992369/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java). + +So we need to make the topic compactor pluggable to support more compaction strategy. + +# Goals + +## In Scope + +<!-- +What this PIP intend to achieve once It's integrated into Pulsar. +Why does it benefit Pulsar. +--> + +Make the compactor pluggable. + +## Out of Scope + +<!-- +Describe what you have decided to keep out of scope, perhaps left for a different PIP/s. +--> + + +# High Level Design + +<!-- +Describe the design of your solution in *high level*. +Describe the solution end to end, from a birds-eye view. +Don't go into implementation details in this section. + +I should be able to finish reading from beginning of the PIP to here (including) and understand the feature and +how you intend to solve it, end to end. + +DON'T +* Avoid code snippets, unless it's essential to explain your intent. +--> + +Make the topic compactor pluggable, users can customize the compactor implementation according to their own special scenarios. + + +# Detailed Design + +## Design & Implementation Details + +<!-- +This is the section where you dive into the details. It can be: +* Concrete class names and their roles and responsibility, including methods. +* Code snippets of existing code. +* Interface names and its methods. +* ... +--> +* Define a standard Compactor interface that specifies the methods and properties that the Compactor implementation needs to implement. This interface should include methods for Compactor initialization, Compactor execution, and getting Compactor stats. +```java +public interface Compactor { + + void initialize(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler); + + CompletableFuture<Long> compact(String topic); + + CompactorMXBean getStats(); Review Comment: I think `CompactorMXBean` is a very specific abstraction for metrics. It doesn't allow a compactor to add it's own metrics and will make future update to metrics hard. First, some metrics are going to be common for any compactor used: * CompactionSucceedCount - how many compactions have been successful * CompactionFailedCount - ... failed * `CompactionDurationInMillis` - a counter counting how much was spent in compactions so far Those metrics should be maintained by the service, which calls the compactor interface of `compact(topic`. It only needs to know if the compaction was a success or not based on the return value. I suggest using two interfaces which will be given in `initialize()`: ``` interface CompactionMetricsRecorder { TopicCompactionMetricsRecorder provideTopicCompactionMetricsRecorder(String topic) } interface TopicCompactionMetricsRecorder { recordMessageRemoved() recordMessagesWritten(numOfMessages, messagesSizeInBytes, duration, durationUnit) recordMessagesRead(numOfMessages, messagesSizeInBytes, duration, durationUnit) ... } ``` There are some metrics I still need to figure out where to fit since I need to help understand something. The metrics are: * compactedEntriesCount - how many entries were written in the last compaction * compactedEntriesSize - what's the total size of entries written in the last compaction Once the compact() has ended, how does the compactor hand over the details on the output - may be the ledger ID, new compaction horizon, etc? I don't see that in the interface. The reason I'm asking is that maybe other compactors would do incremental compaction, so it changes what to expect from it. Now, what's left is a way to record custom metrics specific to your compactor. Need to think about it ########## pip/pip-274.md: ########## @@ -0,0 +1,126 @@ +# Background knowledge + +Apache Pulsar is a distributed messaging system that supports multiple messaging protocols and storage methods. Among them, Pulsar Topic Compaction is a mechanism to clean up duplicate messages in topics to reduce storage space and improve system efficiency. +More topic compaction details can be found in [Pulsar Topic Compaction](https://pulsar.apache.org/docs/en/concepts-topic-compaction/). + +# Motivation + +Currently, the implementation of Pulsar Topic Compaction is fixed and does not support custom strategy, which limits users from using more Compactor policies in their applications. + + +For example, we need to parse the Kafka format then compact message in Kop, but the current implementation of Pulsar topic compaction does not support this feature. +Another the topic compaction logic implemented in `TwoPhaseCompactor` only compacts messages to the last one, but sometimes we need to keep the first valid message e.g [`StrategicTwoPhaseCompactor`](https://github.com/coderzc/pulsar/blob/0e9935c493060b13b322a84c5418146423992369/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java). + +So we need to make the topic compactor pluggable to support more compaction strategy. + +# Goals + +## In Scope + +<!-- +What this PIP intend to achieve once It's integrated into Pulsar. +Why does it benefit Pulsar. +--> + +Make the compactor pluggable. + +## Out of Scope + +<!-- +Describe what you have decided to keep out of scope, perhaps left for a different PIP/s. +--> + + +# High Level Design + +<!-- +Describe the design of your solution in *high level*. +Describe the solution end to end, from a birds-eye view. +Don't go into implementation details in this section. + +I should be able to finish reading from beginning of the PIP to here (including) and understand the feature and +how you intend to solve it, end to end. + +DON'T +* Avoid code snippets, unless it's essential to explain your intent. +--> + +Make the topic compactor pluggable, users can customize the compactor implementation according to their own special scenarios. + + +# Detailed Design + +## Design & Implementation Details + +<!-- +This is the section where you dive into the details. It can be: +* Concrete class names and their roles and responsibility, including methods. +* Code snippets of existing code. +* Interface names and its methods. +* ... +--> +* Define a standard Compactor interface that specifies the methods and properties that the Compactor implementation needs to implement. This interface should include methods for Compactor initialization, Compactor execution, and getting Compactor stats. +```java +public interface Compactor { + + void initialize(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler); + + CompletableFuture<Long> compact(String topic); + + CompactorMXBean getStats(); +} +``` + +* Rename `org.apache.pulsar.compaction.Compactor` to `org.apache.pulsar.compaction.AbstractCompactor` and make it implement `Compactor` interface. + +* Load custom compactor based on configuration in `org.apache.pulsar.broker.PulsarService.newCompactor` and `CompactorTool`. + +## Public-facing Changes + +<!-- Review Comment: I think it's ok to remove all comments if you're done writing ########## pip/pip-274.md: ########## @@ -0,0 +1,126 @@ +# Background knowledge + +Apache Pulsar is a distributed messaging system that supports multiple messaging protocols and storage methods. Among them, Pulsar Topic Compaction is a mechanism to clean up duplicate messages in topics to reduce storage space and improve system efficiency. +More topic compaction details can be found in [Pulsar Topic Compaction](https://pulsar.apache.org/docs/en/concepts-topic-compaction/). + +# Motivation + +Currently, the implementation of Pulsar Topic Compaction is fixed and does not support custom strategy, which limits users from using more Compactor policies in their applications. + + +For example, we need to parse the Kafka format then compact message in Kop, but the current implementation of Pulsar topic compaction does not support this feature. +Another the topic compaction logic implemented in `TwoPhaseCompactor` only compacts messages to the last one, but sometimes we need to keep the first valid message e.g [`StrategicTwoPhaseCompactor`](https://github.com/coderzc/pulsar/blob/0e9935c493060b13b322a84c5418146423992369/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java). + +So we need to make the topic compactor pluggable to support more compaction strategy. + +# Goals + +## In Scope + +<!-- +What this PIP intend to achieve once It's integrated into Pulsar. +Why does it benefit Pulsar. +--> + +Make the compactor pluggable. + +## Out of Scope + +<!-- +Describe what you have decided to keep out of scope, perhaps left for a different PIP/s. +--> + + +# High Level Design + +<!-- +Describe the design of your solution in *high level*. +Describe the solution end to end, from a birds-eye view. +Don't go into implementation details in this section. + +I should be able to finish reading from beginning of the PIP to here (including) and understand the feature and +how you intend to solve it, end to end. + +DON'T +* Avoid code snippets, unless it's essential to explain your intent. +--> + +Make the topic compactor pluggable, users can customize the compactor implementation according to their own special scenarios. + + +# Detailed Design + +## Design & Implementation Details + +<!-- +This is the section where you dive into the details. It can be: +* Concrete class names and their roles and responsibility, including methods. +* Code snippets of existing code. +* Interface names and its methods. +* ... +--> +* Define a standard Compactor interface that specifies the methods and properties that the Compactor implementation needs to implement. This interface should include methods for Compactor initialization, Compactor execution, and getting Compactor stats. +```java +public interface Compactor { + + void initialize(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler); + + CompletableFuture<Long> compact(String topic); + + CompactorMXBean getStats(); +} +``` + +* Rename `org.apache.pulsar.compaction.Compactor` to `org.apache.pulsar.compaction.AbstractCompactor` and make it implement `Compactor` interface. + +* Load custom compactor based on configuration in `org.apache.pulsar.broker.PulsarService.newCompactor` and `CompactorTool`. + +## Public-facing Changes + +<!-- +Describe the additions you plan to make for each public facing component. +Remove the sections you are not changing. +Clearly mark any changes which are BREAKING backward compatability. +--> + + +### Configuration Review Comment: Did you change the interface to support all functions of TwoPhaseCompactor? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
