BewareMyPower opened a new issue #11962: URL: https://github.com/apache/pulsar/issues/11962
## Motivation The initial motivation was from Kafka's protocol handler, i.e. KoP (https://github.com/streamnative/kop). KoP allows Kafka producer to configure entry format to `kafka`, which means the message from Kafka producer can be written to bookies directly without any conversion between Kafka's message format and Pulsar's message format. This improves the performance significantly. However, it also introduced the limit that Pulsar consumer cannot consume this topic because it cannot recognize the entry's format. This proposal tries to introduce a message converter, which is responsible to convert the buffer of an entry to the format that Pulsar consumer can recognize. Once the converter was configured, before dispatching messages to Pulsar consumers, the converter would check if the buffer needs to be converted and then perform the conversion if necessary. We can configure multiple converters because we can configure multiple protocol handlers as well. Each protocol handler could write the entry with its own format. The benefit is, after this change: - When other clients write an entry, no conversion is needed. - When other clients read an entry, no conversion is needed. - When a Pulsar consumer reads an entry, the conversion will be performed in broker. Before this change, if we want to interact Pulsar consumer with other clients: - When other clients write an entry, we need to convert it to Pulsar format. - When other clients read an entry, we need to convert it from Pulsar format to the specific format. - When a Pulsar consumer reads an entry, no conversion is needed. This proposal is mainly for protocol handlers because they can access `PersistentTopic` and write bytes to bookies directly. In a rare case, if users want to write something to the topic's ledger directly by BookKeeper client, the converter can also handle the case. ## Goal This proposal's goal is only adding message converter at broker level. Once the related broker configs were enabled, the converters would be applied to all topics. An overhead would be brought to the topics which are only created for Pulsar clients. Because we need to check if the buffer needs to be converted. See `MessageConverter#accept` method in the next section. In future, we can configure the message converters at namespace level or topic level. Even we can also configure the message converter for Pulsar client so that the conversion only happens at client side and the CPU overload of broker can be reduced. ## API changes First an interface is added under package `org.apache.pulsar.common.api.raw` ```java public interface MessageConverter { /** * Determine whether the buffer can be converted * * @param buffer the buffer that might need to be converted * @return whether the buffer can be converted */ boolean accept(ByteBuf buffer); /** * Convert the buffer to the format that Pulsar consumer can recognize. * * @param originalBuffer the original buffer * @return the converted buffer */ ByteBuf convert(ByteBuf originalBuffer); } ``` The a new configuration is added ```java @FieldContext( category = CATEGORY_PLUGIN, doc = "List of message converters, which are responsible to convert entries before dispatching. If multiple" + " converters are accepted for the same payload, the previous one in this list is preferred." ) private List<String> messageConverters; ``` ## Implementation The implementation is simple. When the broker starts, load all classes that implement `MessageConverter` interface from `messageConverters` config. Then we can pass the converters to `ServerCnx`. Each time a dispatcher dispatches messages to consumer, it will eventually call `ServerCnx#newMessageAndIntercept` method, in which we can perform the conversion. For unit tests, we can test following converters: 1. `RejectAllConverter`: `accept` returns false so that no conversion is performed. 2. `EchoConverter`: `accept` returns true and `convert` simply returns the original buffer. 3. `BytesConverter`: It's an example of a real world converter. The message format has the `MessageMetadata` part that has the `entry.format=bytes` property. And the payload part is only the raw bytes without `SingleMessageMetadata`. The `BytesConverter#converter` will convert the raw bytes to the format that Pulsar consumer can recognize. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
