Hi Jun > JR11. value.serializers and value.deserializers: Should they be of type > List? Also, where are key.serializers and key.deserializers? > Updated now > JR12. Do we still need ComposableSerializer and ComposableDeserializer? The initial thinking here was if `value.serializers or value.deserializers ` exists the client will load ComposableSerializer or ComposableDeserializer automatically and use them. Unfortunately this would need us to define these serializers.
The other option is to update `Plugin<Serializer<V>> valueSerializerPlugin` and KafkaProducer constructor to accept List<Serializer<V>> and move the logic of the ComposableSerializer into KafkaProducer::doSend which when we do serialization (same with KafkaConsumer). This option hide the logic and reduce exposure for client to these. WDYT? > > JR13. large.message.payload.store.class : should it be of type class? Updated > > JR14. > org.apache.kafka.common.serialization.largemessage.LargeMessageSerializer : > The name seems redundant since largemessage appears twice. Updated > > JR15. PayloadResponse: It still mentions response code. It mentions > "isRetryable flag", which no longer exists in PayloadStoreException. There > are typos in "then it will serialiser will”. The KIP is updated now > JR16. Regarding returning new byte[0] if large.message.skip.not.found.error > is true, this will likely fail the next deserializer and the application > won't have the right context of the error. It's probably better to just > propagate the specific exception and let the caller handle it. You right this will cause issue if there is another deserializer waiting for the data. I have updated the KIP with this. > > JR17. LargeMessageSerializer: "Check if the estimated size of the data > (bytes) after applying provided compression (if there is one)" Compression > actually happens after serialization and is done on a batch of records. Yes the compression itself happened after but we also have `estimateSizeInBytesUpperBound` which my understanding is this take the compression type into the account as well > > > JR18. Could you define the type T for LargeMessageSerializer? Update the KIP this T would be byte[] Thanks Omnia > > Jun > > On Fri, Jul 25, 2025 at 6:28 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com > <mailto:o.g.h.ibra...@gmail.com>> > wrote: > >> Hi Jun, thanks for having the time to review this >> >>> JR1. While the KIP is potentially useful, I am wondering who is >> responsible >>> for retention for the objects in the payload store. Once a message with a >>> reference is deleted, the key of the external object is lost and the >> object >>> may never be deleted. >> >> The `ttl` in the object store is the responsibility of the owner of this >> store it should be configured in away that is reasonable with the retention >> config in Kafka. >> I have updated the KIP with `Consideration` section. >>> >>> JR2. Configs: For all new configs, it would be useful to list their >> types. >> Updated the KIP now >>> >>> JR3. value.serializers: Why is this required? If a user doesn't set it, >> we >>> should just use value.serializer, right? Ditto for key.serializers. >> No you right this was copy/past mistake >> >>> >>> JR4. For all new public interfaces such as LargeMessageSerializer, >>> PayloadStore and PayloadResponse, it would be useful to include the full >>> package name. >> Updated the KIP now >>> >>> JR5. large.message.payload.store.retry.max.backoff.ms and >>> large.message.payload.store.retry.delay.backoff.ms: Is the intention to >>> implement exponential backoff on retries? If so, it's more consistent if >> we >>> can follow the existing naming convention like retry.backoff.max.ms < >> http://retry.backoff.max.ms/> and >>> retry.backoff.ms <http://retry.backoff.ms/> <http://retry.backoff.ms/>. >> I have removed these to simplify the config more (as Luke suggested >> initially) and added these to the consideration section. >> >>> >>> JR6. large.message.skip.not.found.error : If the reference can't be >> found, >>> what value does the deserializer return? Note that null has a special >>> meaning for tombstone in compacted topics. >> The deserialiser will return `new byte[0]` not null. >>> >>> JR7. PayloadResponse: Why do we have both responseCode and >>> PayloadStoreException? >> We can do without responseCode, the initial though was to report response >> code form payload store. >> Update the KIP. >>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor the >>> metrics in a plugin through the Monitorable interface. >> Oh nice, I didn’t know about this interface before. Updated the KIP with >> this now. >>> >>> JR9. Why do we need the protected field >> PayloadStoreException.isRetryable? >> Initial thought here was the serializer can retry the upload. But I have >> removed all the retry logic from serializer and it will be up to the >> PayloadStore provider to implement this if they need it. >>> >>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an >> interface. >> It is updated now to interface. >> >> Hope the last version of the KIP is more simpler now >> >> Thanks >> Omnia >> >>> On 23 Jul 2025, at 00:43, Jun Rao <j...@confluent.io.INVALID >>> <mailto:j...@confluent.io.INVALID>> wrote: >>> >>> Thanks for the KIP. A few comments. >>> >>> JR1. While the KIP is potentially useful, I am wondering who is >> responsible >>> for retention for the objects in the payload store. Once a message with a >>> reference is deleted, the key of the external object is lost and the >> object >>> may never be deleted. >>> >>> JR2. Configs: For all new configs, it would be useful to list their >> types. >>> >>> JR3. value.serializers: Why is this required? If a user doesn't set it, >> we >>> should just use value.serializer, right? Ditto for key.serializers. >>> >>> JR4. For all new public interfaces such as LargeMessageSerializer, >>> PayloadStore and PayloadResponse, it would be useful to include the full >>> package name. >>> >>> JR5. large.message.payload.store.retry.max.backoff.ms and >>> large.message.payload.store.retry.delay.backoff.ms: Is the intention to >>> implement exponential backoff on retries? If so, it's more consistent if >> we >>> can follow the existing naming convention like retry.backoff.max.ms >>> <http://retry.backoff.max.ms/> < >> http://retry.backoff.max.ms/> and >>> retry.backoff.ms <http://retry.backoff.ms/> <http://retry.backoff.ms/>. >>> >>> JR6. large.message.skip.not.found.error : If the reference can't be >> found, >>> what value does the deserializer return? Note that null has a special >>> meaning for tombstone in compacted topics. >>> >>> JR7. PayloadResponse: Why do we have both responseCode and >>> PayloadStoreException? >>> >>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor the >>> metrics in a plugin through the Monitorable interface. >>> >>> JR9. Why do we need the protected field >> PayloadStoreException.isRetryable? >>> >>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an >> interface. >>> >>> Thanks,