Hi Jun
> JR11. value.serializers and value.deserializers: Should they be of type
> List? Also, where are key.serializers and key.deserializers?
> 
Updated now
> JR12. Do we still need ComposableSerializer and ComposableDeserializer?
The initial thinking here was if `value.serializers or value.deserializers ` 
exists the client will load  ComposableSerializer or ComposableDeserializer  
automatically and use them. Unfortunately this would need us to define these 
serializers. 

The other option is to update `Plugin<Serializer<V>> valueSerializerPlugin` and 
KafkaProducer constructor to accept List<Serializer<V>> and move the logic of 
the ComposableSerializer into KafkaProducer::doSend which when we do 
serialization (same with KafkaConsumer). This option hide the logic and reduce 
exposure for client to these. 
WDYT?

> 
> JR13. large.message.payload.store.class : should it be of type class?
Updated
> 
> JR14.
> org.apache.kafka.common.serialization.largemessage.LargeMessageSerializer :
> The name seems redundant since largemessage appears twice.
Updated 
> 
> JR15. PayloadResponse: It still mentions response code. It mentions
> "isRetryable flag", which no longer exists in PayloadStoreException. There
> are typos in "then it will serialiser will”.
The KIP is updated now 

> JR16. Regarding returning new byte[0] if large.message.skip.not.found.error
> is true, this will likely fail the next deserializer and the application
> won't have the right context of the error. It's probably better to just
> propagate the specific exception and let the caller handle it.
You right this will cause issue if there is another deserializer waiting for 
the data. I have updated the KIP with this.

> 
> JR17. LargeMessageSerializer:  "Check if the estimated size of the data
> (bytes) after applying provided compression (if there is one)"  Compression
> actually happens after serialization and is done on a batch of records.
Yes the compression itself happened after but we also have 
`estimateSizeInBytesUpperBound` which my understanding is this take the 
compression type into the account as well 
> 
> 
> JR18. Could you define the type T for LargeMessageSerializer?
Update the KIP this T would be byte[]

Thanks 
Omnia
> 
> Jun
> 
> On Fri, Jul 25, 2025 at 6:28 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
> <mailto:o.g.h.ibra...@gmail.com>>
> wrote:
> 
>> Hi Jun, thanks for having the time to review this
>> 
>>> JR1. While the KIP is potentially useful, I am wondering who is
>> responsible
>>> for retention for the objects in the payload store. Once a message with a
>>> reference is deleted, the key of the external object is lost and the
>> object
>>> may never be deleted.
>> 
>> The `ttl` in the object store is the responsibility of the owner of this
>> store it should be configured in away that is reasonable with the retention
>> config in Kafka.
>> I have updated the KIP with `Consideration` section.
>>> 
>>> JR2. Configs: For all new configs, it would be useful to list their
>> types.
>> Updated the KIP now
>>> 
>>> JR3. value.serializers: Why is this required? If a user doesn't set it,
>> we
>>> should just use value.serializer, right? Ditto for key.serializers.
>> No you right this was copy/past mistake
>> 
>>> 
>>> JR4. For all new public interfaces such as LargeMessageSerializer,
>>> PayloadStore and PayloadResponse, it would be useful to include the full
>>> package name.
>> Updated the KIP now
>>> 
>>> JR5. large.message.payload.store.retry.max.backoff.ms and
>>> large.message.payload.store.retry.delay.backoff.ms: Is the intention to
>>> implement exponential backoff on retries? If so, it's more consistent if
>> we
>>> can follow the existing naming convention like retry.backoff.max.ms <
>> http://retry.backoff.max.ms/> and
>>> retry.backoff.ms <http://retry.backoff.ms/> <http://retry.backoff.ms/>.
>> I have removed these to simplify the config more (as Luke suggested
>> initially) and added these to the consideration section.
>> 
>>> 
>>> JR6. large.message.skip.not.found.error : If the reference can't be
>> found,
>>> what value does the deserializer return? Note that null has a special
>>> meaning for tombstone in compacted topics.
>> The deserialiser will return `new byte[0]` not null.
>>> 
>>> JR7. PayloadResponse: Why do we have both responseCode and
>>> PayloadStoreException?
>> We can do without responseCode, the initial though was to report response
>> code form payload store.
>> Update the KIP.
>>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor the
>>> metrics in a plugin through the Monitorable interface.
>> Oh nice, I didn’t know about this interface before. Updated the KIP with
>> this now.
>>> 
>>> JR9. Why do we need the protected field
>> PayloadStoreException.isRetryable?
>> Initial thought here was the serializer can retry the upload. But I have
>> removed all the retry logic from serializer and it will be up to the
>> PayloadStore provider to implement this if they need it.
>>> 
>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
>> interface.
>> It is updated now to interface.
>> 
>> Hope the last version of the KIP is more simpler now
>> 
>> Thanks
>> Omnia
>> 
>>> On 23 Jul 2025, at 00:43, Jun Rao <j...@confluent.io.INVALID 
>>> <mailto:j...@confluent.io.INVALID>> wrote:
>>> 
>>> Thanks for the KIP. A few comments.
>>> 
>>> JR1. While the KIP is potentially useful, I am wondering who is
>> responsible
>>> for retention for the objects in the payload store. Once a message with a
>>> reference is deleted, the key of the external object is lost and the
>> object
>>> may never be deleted.
>>> 
>>> JR2. Configs: For all new configs, it would be useful to list their
>> types.
>>> 
>>> JR3. value.serializers: Why is this required? If a user doesn't set it,
>> we
>>> should just use value.serializer, right? Ditto for key.serializers.
>>> 
>>> JR4. For all new public interfaces such as LargeMessageSerializer,
>>> PayloadStore and PayloadResponse, it would be useful to include the full
>>> package name.
>>> 
>>> JR5. large.message.payload.store.retry.max.backoff.ms and
>>> large.message.payload.store.retry.delay.backoff.ms: Is the intention to
>>> implement exponential backoff on retries? If so, it's more consistent if
>> we
>>> can follow the existing naming convention like retry.backoff.max.ms 
>>> <http://retry.backoff.max.ms/> <
>> http://retry.backoff.max.ms/> and
>>> retry.backoff.ms <http://retry.backoff.ms/> <http://retry.backoff.ms/>.
>>> 
>>> JR6. large.message.skip.not.found.error : If the reference can't be
>> found,
>>> what value does the deserializer return? Note that null has a special
>>> meaning for tombstone in compacted topics.
>>> 
>>> JR7. PayloadResponse: Why do we have both responseCode and
>>> PayloadStoreException?
>>> 
>>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor the
>>> metrics in a plugin through the Monitorable interface.
>>> 
>>> JR9. Why do we need the protected field
>> PayloadStoreException.isRetryable?
>>> 
>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
>> interface.
>>> 
>>> Thanks,

Reply via email to