heesung-sn opened a new issue, #18099: URL: https://github.com/apache/pulsar/issues/18099
### Motivation Currently, the Topic compaction logic implemented in `TwoPhaseCompactor` only compacts messages to the last one. Here, we want to configure Topic compaction with different strategies. For example, to support the Conflict State Resolution(Race Conditions) in PIP-192 (https://github.com/apache/pulsar/issues/16691), we need to compact messages with `the first valid states`. ### Goal - Create another Topic compactor, `StrategicTwoPhaseCompactor`, where we can configure a compaction strategy, `TopicCompactionStrategy` - Update the `TableViewConfigurationData` to load and consider the `TopicCompactionStrategy` when updating the internal key-value map in `TableView`. - Add `TopicCompactionStrategy` in Topic-level Policy to run `StrategicTwoPhaseCompactor` instead of `TwoPhaseCompactor` when executing compaction. ### API Changes ``` public interface TopicCompactionStrategy<T> { /** * Returns the schema object for this strategy. * @return */ Schema<T> getSchema(); /** * Tests if the current message is valid compared to the previous message for the same key. * * @param prev previous message * @param cur current message * @return True if the prev to the cur message transition is valid. Otherwise, false. */ boolean isValid(T prev, T cur); /** * Returns true if this strategy supports merge. * Warning: `merge` will make the topic compaction more complex(requiring more cpu and memory load), * as the compaction process needs to cache the previous messages to merge. */ boolean isMergeEnabled(); /** * Merges the prev and cur messages for the same key, if isValid() and isMergeEnabled() return true. * * @param prev previous message * @param cur current message * @return merged message */ T merge(T prev, T cur); static TopicCompactionStrategy load(String topicCompactionStrategy) { if (topicCompactionStrategy == null) { return null; } try { // Class<?> clazz = Class.forName(topicCompactionStrategy); Object loadSheddingInstance = clazz.getDeclaredConstructor().newInstance(); return (TopicCompactionStrategy) loadSheddingInstance; } catch (Exception e) { throw new IllegalArgumentException("Error when loading topic compaction strategy.", e); } } } ``` ``` public class TableViewConfigurationData implements Serializable, Cloneable { ... private String topicCompactionStrategy; ``` ``` public class TableViewImpl<T> implements TableView<T> { private final TableViewConfigurationData conf; ... private TopicCompactionStrategy<T> compactionStrategy; TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) { ... this.compactionStrategy = TopicCompactionStrategy.load(conf.getTopicCompactionStrategy()); ``` ``` pulsar-admin topicPolicies set-compaction-strategy options pulsar-admin topicPolicies get-compaction-strategy options ``` ### Implementation ``` # Goal 1: - Create another Topic compactor, `StrategicTwoPhaseCompactor`, where we can configure a compaction strategy, `TopicCompactionStrategy` ``` StrategicTwoPhaseCompactor will have two phases. First Phase: Using the `Reader<T>`, instead of `RawReader`, it will iterate each message and compact messages with the same keys by following the `isValid() and merge() if(mergeEnabled())` in `TopicCompactionStrategy`. Second Phase: The compacted messages will be written to a ledger. ``` # Goal 2: - Update the `TableViewConfigurationData` to load and consider the `TopicCompactionStrategy` when updating the internal key-value map in `TableView`. ``` When updating the internal key-value map, it will follow the same compaction logic defined in `TopicCompactionStrategy` . ``` # Goal 3: - Add `TopicCompactionStrategy` in Topic-level Policy to run `StrategicTwoPhaseCompactor` instead of `TwoPhaseCompactor` when executing compaction. ``` When running the compaction, it will look up the `TopicCompactionStrategy` in the Topic-level Policy and run `StrategicTwoPhaseCompactor`, if configured. By default, it should run `TwoPhaseCompactor`. ### Alternatives N/A ### Anything else? _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
