heesung-sn opened a new issue, #18099:
URL: https://github.com/apache/pulsar/issues/18099

   ### Motivation
   
   Currently, the Topic compaction logic implemented in `TwoPhaseCompactor` 
only compacts messages to the last one. 
   
   Here, we want to configure Topic compaction with different strategies. For 
example, to support the Conflict State Resolution(Race Conditions) in PIP-192 
(https://github.com/apache/pulsar/issues/16691), we need to compact messages 
with `the first valid states`.
   
   
   ### Goal
   
   - Create another Topic compactor, `StrategicTwoPhaseCompactor`, where we can 
configure a compaction strategy, 
    `TopicCompactionStrategy`
   
   - Update the `TableViewConfigurationData` to load and consider the 
`TopicCompactionStrategy` when updating the internal key-value map in 
`TableView`.
   
   - Add `TopicCompactionStrategy` in Topic-level Policy to run 
`StrategicTwoPhaseCompactor` instead of `TwoPhaseCompactor` when executing 
compaction.
   
   ### API Changes
   
   
   
   
   ```
   public interface TopicCompactionStrategy<T> {
   
       /**
        * Returns the schema object for this strategy.
        * @return
        */
       Schema<T> getSchema();
       /**
        * Tests if the current message is valid compared to the previous 
message for the same key.
        *
        * @param prev previous message
        * @param cur current message
        * @return True if the prev to the cur message transition is valid. 
Otherwise, false.
        */
       boolean isValid(T prev, T cur);
   
       /**
        * Returns true if this strategy supports merge.
        * Warning: `merge` will make the topic compaction more 
complex(requiring more cpu and memory load),
        * as the compaction process needs to cache the previous messages to 
merge.
        */
       boolean isMergeEnabled();
   
       /**
        * Merges the prev and cur messages for the same key, if isValid() and 
isMergeEnabled() return true.
        *
        * @param prev previous message
        * @param cur current message
        * @return merged message
        */
       T merge(T prev, T cur);
   
   
       static TopicCompactionStrategy load(String topicCompactionStrategy) {
           if (topicCompactionStrategy == null) {
               return null;
           }
           try {
               //
               Class<?> clazz = Class.forName(topicCompactionStrategy);
               Object loadSheddingInstance = 
clazz.getDeclaredConstructor().newInstance();
               return (TopicCompactionStrategy) loadSheddingInstance;
           } catch (Exception e) {
               throw new IllegalArgumentException("Error when loading topic 
compaction strategy.", e);
           }
       }
   }
   ```
   
   ```
   public class TableViewConfigurationData implements Serializable, Cloneable {
   ...
       private String topicCompactionStrategy;
   
   
   
   ```
   
   
   ```
   public class TableViewImpl<T> implements TableView<T> {
   
       private final TableViewConfigurationData conf;
   
   ...
   
       private TopicCompactionStrategy<T> compactionStrategy;
   
   
       TableViewImpl(PulsarClientImpl client, Schema<T> schema, 
TableViewConfigurationData conf) {
   ...
           this.compactionStrategy = 
TopicCompactionStrategy.load(conf.getTopicCompactionStrategy());
   ```
   
   
   
   ```
   pulsar-admin topicPolicies set-compaction-strategy options
   pulsar-admin topicPolicies get-compaction-strategy options
   ```
   
   ### Implementation
   
   ```
   # Goal 1:
   - Create another Topic compactor, `StrategicTwoPhaseCompactor`, where we can 
configure a compaction strategy, 
    `TopicCompactionStrategy`
   ```
   
   StrategicTwoPhaseCompactor will have two phases.
   
   First Phase:
   Using the `Reader<T>`, instead of `RawReader`, it will iterate each message 
and compact messages with the same keys by following the `isValid() and merge() 
if(mergeEnabled())` in  `TopicCompactionStrategy`.
   
   Second Phase:
   The compacted messages will be written to a ledger.
   
   ```
   # Goal 2:
   - Update the `TableViewConfigurationData` to load and consider the 
`TopicCompactionStrategy` when updating the internal key-value map in 
`TableView`. 
   ```
   When updating the internal key-value map, it will follow the same compaction 
logic defined in `TopicCompactionStrategy` .
   
   ```
   # Goal 3:
   - Add `TopicCompactionStrategy` in Topic-level Policy to run 
`StrategicTwoPhaseCompactor` instead of `TwoPhaseCompactor` when executing 
compaction.
   ```
   When running the compaction, it will look up the `TopicCompactionStrategy` 
in the Topic-level Policy and run `StrategicTwoPhaseCompactor`, if configured. 
By default, it should run `TwoPhaseCompactor`.
   
   ### Alternatives
   
   N/A
   
   ### Anything else?
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to