BewareMyPower opened a new issue #12087: URL: https://github.com/apache/pulsar/issues/12087
## Motivation The initial motivation was from Kafka's protocol handler, i.e. KoP (https://github.com/streamnative/kop). KoP allows Kafka producer to configure entry format to `kafka`, which means the message from Kafka producer can be written to bookies directly without any conversion between Kafka's message format and Pulsar's message format. This improves the performance significantly. However, it also introduced the limit that Pulsar consumer cannot consume this topic because it cannot recognize the entry's format. The existing `ConsumerInterceptor` cannot work for this case because it's based on a `Message<T>` object that is converted from an entry with valid Pulsar format. ## Goal This proposal tries to introduce a payload converter for Pulsar client, which performs a conversion for payload (without metadata parts) at client side. After that, Pulsar client would recognize an entry of any format once a suitable converter was configured. This proposal is mainly for protocol handlers because they can access `PersistentTopic` and write bytes to bookies directly. In a rare case, if users want to write something to the topic's ledger directly by BookKeeper client, the converter can also handle the case. ## API Changes First an interface is added under package `org.apache.pulsar.common.api.raw`. ```java public interface PayloadConverter { /** * Convert the buffer to the format that Pulsar consumer can recognize. * * @param brokerEntryMetadata the {@link BrokerEntryMetadata} object that might contain the buffer's format * @param metadata the {@link MessageMetadata} object that might contain the buffer's format * @param payload the payload buffer that doesn't contain the {@link MessageMetadata} part * @return the converted payload buffer * @implNote There are some constraints for the valid implementation: * 1. The refCnt of `payload` must not change after this call. * 2. It could either return `payload` itself or a new allocated buffer whose refCnt is 1. * 3. If it returns `payload`, the reader index and writer index must keep unchanged. */ ByteBuf convert(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata metadata, ByteBuf payload); } ``` The metadata parts, including `BrokerEntryMetadata` and `MessageMetadata`, determine the format of payload. These metadata types are necessary because the current consumer implementation parsed them first for subsequent operations, see https://github.com/apache/pulsar/blob/5fd62a9847a4f574708e3002482aaf876de8d540/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1011-L1019 Since pulsar-client-api module should not bring any dependency to avoid cyclic dependency, this proposal adopted a way that loads `PayloadConverter` at runtime, like what SLF4J API does. We only need to configure whether it's enabled and the optional package. See following methods in `ConsumerBuilder`. ```java /** * Enable the payload converter to convert message payload before parsing it. * * Default: false * * If it's enabled, each time a consumer is created, it will try to load classes under a specific package. The first * class that implements PayloadConverter will be used as the converter instance's type. If no PayloadConverter is * found, the default converter is a trivial converter that returns payload itself. * * The converter instance is constructed with no argument. * * See PayloadConverter interface's definition from org.apache.pulsar.common.api.raw package of * org.apache.pulsar:pulsar-common artifact. * * @see ConsumerBuilder#payloadConverterPackage(String) * * @param enablePayloadConverter * @return */ ConsumerBuilder<T> enablePayloadConverter(boolean enablePayloadConverter); /** * Set the package that might con * * Default: org.apache.pulsar.client.converter * * @param packageName * @return */ ConsumerBuilder<T> payloadConverterPackage(String packageName); ``` In conclusion, we should follow these steps to use a payload converter: 1. Implement `PayloadConverter` under a specific package like `org.apache.pulsar.client.converter`. 2. Configure `enablePayloadConverter` at client side and `payloadConverterPackage` if the converter's package is not `org.apache.pulsar.client.converter`. 3. Add the compiled class that implements `PayloadConverter` to the class path. ## Implementation Add a `PayloadConverter` field in `ConsumerBase` and initialize it when a consumer is created. The initialization process is: 1. Find all classes that implement `PayloadConverter` interface under the configured package. 2. If no classes were found, use a the default converter that returns the payload simply. 3. Otherwise, choose the first class as the actual converter. Once the converter is initialized, after the `BrokerEntryMetadata` and `MessageMetadata` objects are parsed from the original buffer, apply the converter to generate a new payload buffer for the subsequent processes. It should be noted that the original buffer will be released in `PulsarDecoder#handleRead`. However, if the `convert` method returns a new buffer, we need to release it. Regarding to the tests, they should cover following cases: - Consumers for a non-partitioned topic or multiple topics both work. - The accepted `BrokerEntryMetadata` or `MessageMetadata` are the actual values. - The converters don't affect the default behavior if `enablePayloadConverter` is false. ## Reject Alternatives There was a proposal that was discarded before. It tried to perform the conversion at the broker side before dispatching entries. However, the dispatcher run in a single thread when dispatching entries. The conversion cost might be affect all other topics. Therefore, we should do it at the client side. It could bring overhead for Pulsar consumer but doesn't affect other topics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
