BewareMyPower opened a new issue #12087:
URL: https://github.com/apache/pulsar/issues/12087


   ## Motivation
   
   The initial motivation was from Kafka's protocol handler, i.e. KoP 
(https://github.com/streamnative/kop). KoP allows Kafka producer to configure 
entry format to `kafka`, which means the message from Kafka producer can be 
written to bookies directly without any conversion between Kafka's message 
format and Pulsar's message format. This improves the performance 
significantly. However, it also introduced the limit that Pulsar consumer 
cannot consume this topic because it cannot recognize the entry's format.
   
   The existing `ConsumerInterceptor` cannot work for this case because it's 
based on a `Message<T>` object that is converted from an entry with valid 
Pulsar format. 
   
   ## Goal
   
   This proposal tries to introduce a payload converter for Pulsar client, 
which performs a conversion for payload (without metadata parts) at client 
side. After that, Pulsar client would recognize an entry of any format once a 
suitable converter was configured.
   
   This proposal is mainly for protocol handlers because they can access 
`PersistentTopic` and write bytes to bookies directly. In a rare case, if users 
want to write something to the topic's ledger directly by BookKeeper client, 
the converter can also handle the case.
   
   ## API Changes
   
   First an interface is added under package `org.apache.pulsar.common.api.raw`.
   
   ```java
   public interface PayloadConverter {
   
       /**
        * Convert the buffer to the format that Pulsar consumer can recognize.
        *
        * @param brokerEntryMetadata the {@link BrokerEntryMetadata} object 
that might contain the buffer's format
        * @param metadata the {@link MessageMetadata} object that might contain 
the buffer's format
        * @param payload the payload buffer that doesn't contain the {@link 
MessageMetadata} part
        * @return the converted payload buffer
        * @implNote There are some constraints for the valid implementation:
        *   1. The refCnt of `payload` must not change after this call.
        *   2. It could either return `payload` itself or a new allocated 
buffer whose refCnt is 1.
        *   3. If it returns `payload`, the reader index and writer index must 
keep unchanged.
        */
       ByteBuf convert(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata 
metadata, ByteBuf payload);
   }
   ```
   
   The metadata parts, including `BrokerEntryMetadata` and `MessageMetadata`, 
determine the format of payload. These metadata types are necessary because the 
current consumer implementation parsed them first for subsequent operations, see
   
   
https://github.com/apache/pulsar/blob/5fd62a9847a4f574708e3002482aaf876de8d540/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1011-L1019
   
   Since pulsar-client-api module should not bring any dependency to avoid 
cyclic dependency, this proposal adopted a way that loads `PayloadConverter` at 
runtime, like what SLF4J API does. We only need to configure whether it's 
enabled and the optional package. See following methods in `ConsumerBuilder`.
   
   ```java
       /**
        * Enable the payload converter to convert message payload before 
parsing it.
        *
        * Default: false
        *
        * If it's enabled, each time a consumer is created, it will try to load 
classes under a specific package. The first
        * class that implements PayloadConverter will be used as the converter 
instance's type. If no PayloadConverter is
        * found, the default converter is a trivial converter that returns 
payload itself.
        *
        * The converter instance is constructed with no argument.
        *
        * See PayloadConverter interface's definition from 
org.apache.pulsar.common.api.raw package of
        * org.apache.pulsar:pulsar-common artifact.
        *
        * @see ConsumerBuilder#payloadConverterPackage(String)
        *
        * @param enablePayloadConverter
        * @return
        */
       ConsumerBuilder<T> enablePayloadConverter(boolean 
enablePayloadConverter);
   
       /**
        * Set the package that might con
        *
        * Default: org.apache.pulsar.client.converter
        *
        * @param packageName
        * @return
        */
       ConsumerBuilder<T> payloadConverterPackage(String packageName);
   ```
   
   In conclusion, we should follow these steps to use a payload converter:
   
   1. Implement `PayloadConverter` under a specific package like 
`org.apache.pulsar.client.converter`.
   2. Configure `enablePayloadConverter` at client side and 
`payloadConverterPackage` if the converter's package is not 
`org.apache.pulsar.client.converter`.
   3. Add the compiled class that implements `PayloadConverter` to the class 
path.
   
   ## Implementation
   
   Add a `PayloadConverter` field in `ConsumerBase` and initialize it when a 
consumer is created. The initialization process is:
   
   1. Find all classes that implement `PayloadConverter` interface under the 
configured package.
   2. If no classes were found, use a the default converter that returns the 
payload simply.
   3. Otherwise, choose the first class as the actual converter.
   
   Once the converter is initialized, after the `BrokerEntryMetadata` and 
`MessageMetadata` objects are parsed from the original buffer, apply the 
converter to generate a new payload buffer for the subsequent processes.
   
   It should be noted that the original buffer will be released in 
`PulsarDecoder#handleRead`. However, if the `convert` method returns a new 
buffer, we need to release it.
   
   Regarding to the tests, they should cover following cases:
   
   - Consumers for a non-partitioned topic or multiple topics both work.
   - The accepted `BrokerEntryMetadata` or `MessageMetadata` are the actual 
values.
   - The converters don't affect the default behavior if 
`enablePayloadConverter` is false.
   
   ## Reject Alternatives
   
   There was a proposal that was discarded before. It tried to perform the 
conversion at the broker side before dispatching entries. However, the 
dispatcher run in a single thread when dispatching entries. The conversion cost 
might be affect all other topics.
   
   Therefore, we should do it at the client side. It could bring overhead for 
Pulsar consumer but doesn't affect other topics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to