BewareMyPower opened a new issue #11962:
URL: https://github.com/apache/pulsar/issues/11962


   ## Motivation
   
   The initial motivation was from Kafka's protocol handler, i.e. KoP 
(https://github.com/streamnative/kop). KoP allows Kafka producer to configure 
entry format to `kafka`, which means the message from Kafka producer can be 
written to bookies directly without any conversion between Kafka's message 
format and Pulsar's message format. This improves the performance 
significantly. However, it also introduced the limit that Pulsar consumer 
cannot consume this topic because it cannot recognize the entry's format.
   
   This proposal tries to introduce a message converter, which is responsible 
to convert the buffer of an entry to the format that Pulsar consumer can 
recognize. Once the converter was configured, before dispatching messages to 
Pulsar consumers, the converter would check if the buffer needs to be converted 
and then perform the conversion if necessary. We can configure multiple 
converters because we can configure multiple protocol handlers as well. Each 
protocol handler could write the entry with its own format.
   
   The benefit is, after this change:
   
   - When other clients write an entry, no conversion is needed.
   - When other clients read an entry, no conversion is needed.
   - When a Pulsar consumer reads an entry, the conversion will be performed in 
broker.
   
   Before this change, if we want to interact Pulsar consumer with other 
clients:
   
   - When other clients write an entry, we need to convert it to Pulsar format.
   - When other clients read an entry, we need to convert it from Pulsar format 
to the specific format.
   - When a Pulsar consumer reads an entry, no conversion is needed.
   
   This proposal is mainly for protocol handlers because they can access 
`PersistentTopic` and write bytes to bookies directly. In a rare case, if users 
want to write something to the topic's ledger directly by BookKeeper client, 
the converter can also handle the case.
   
   ## Goal
   
   This proposal's goal is only adding message converter at broker level. Once 
the related broker configs were enabled, the converters would be applied to all 
topics. An overhead would be brought to the topics which are only created for 
Pulsar clients. Because we need to check if the buffer needs to be converted. 
See `MessageConverter#accept` method in the next section.
   
   In future, we can configure the message converters at namespace level or 
topic level. Even we can also configure the message converter for Pulsar client 
so that the conversion only happens at client side and the CPU overload of 
broker can be reduced.
   
   ## API changes
   
   First an interface is added under package `org.apache.pulsar.common.api.raw`
   
   ```java
   public interface MessageConverter {
   
       /**
        * Determine whether the buffer can be converted
        *
        * @param buffer the buffer that might need to be converted
        * @return whether the buffer can be converted
        */
       boolean accept(ByteBuf buffer);
   
       /**
        * Convert the buffer to the format that Pulsar consumer can recognize.
        *
        * @param originalBuffer the original buffer
        * @return the converted buffer
        */
       ByteBuf convert(ByteBuf originalBuffer);
   }
   ```
   
   The a new configuration is added
   
   ```java
       @FieldContext(
               category = CATEGORY_PLUGIN,
               doc = "List of message converters, which are responsible to 
convert entries before dispatching. If multiple"
                       + " converters are accepted for the same payload, the 
previous one in this list is preferred."
       )
       private List<String> messageConverters;
   ```
   
   ## Implementation
   
   The implementation is simple. When the broker starts, load all classes that 
implement `MessageConverter` interface from `messageConverters` config. Then we 
can pass the converters to `ServerCnx`. Each time a dispatcher dispatches 
messages to consumer, it will eventually call 
`ServerCnx#newMessageAndIntercept` method, in which we can perform the 
conversion.
   
   For unit tests, we can test following converters:
   
   1. `RejectAllConverter`: `accept`  returns false so that no conversion is 
performed.
   2. `EchoConverter`: `accept` returns true and `convert` simply returns the 
original buffer.
   3. `BytesConverter`: It's an example of a real world converter. The message 
format has the `MessageMetadata` part that has the `entry.format=bytes` 
property. And the payload part is only the raw bytes without 
`SingleMessageMetadata`. The `BytesConverter#converter`  will convert the raw 
bytes to the format that Pulsar consumer can recognize.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to