Hi, Omnia,

Thanks for the reply. A few more comments.

JR20. The inconsistency is still there. It's probably simpler to
consolidate on
org.apache.kafka.common.serialization.largemessage.PayloadStore.

JR21. Could you document how the payload id string is encoded? For example,
is it always UTF8?

JR22. "Since compacted topics can retain data indefinitely, users must
choose between setting a business-appropriate TTL or accepting indefinite
storage costs." Setting a TTL is not ideal since it can break the consumer
application. So, we probably don't want to recommend it to the users.
Another option is to run a cleaning job by consuming the topic and if it
observes a null payload, delete the object based on the key in the record.

JR23. We probably don't need to force the PayloadStore implementer to
implement Monitorable. We could document that if it's desirable to monitor
a PayloadStore plugin, one can implement the Monitorable interface.

JR24. In the following, org.apache.kafka.common.serialization.Serializer
should include the largemessage namespace.
producerConfig.put("value.serializers",
"kafka.serializers.KafkaAvroDeserializer,
org.apache.kafka.common.serialization.Serializer");

Jun

On Tue, Aug 12, 2025 at 3:27 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
wrote:

> Hi Jun,
>
> > JR20. Could you fix the inconsistency?
> > org.apache.kafka.common.serialization.largemessage.PayloadStore vs
> > org.apache.kafka.common.serialization.largemessage.store.PayloadStore
> Updated
> >
> > JR21. "Encapsulate that ID into a simple Kafka event using a structured
> > format.". Could you define the structured format explicitly?
>
> This was from the initial design we had Id format but after discussing
> this with Luke I ended up simplifying this by removing the format and I
> guess I forgot this line. I have updated the kip now.
>
> > JR22. TTL for object stores: Could the new serializer be used on a
> > compacted topic? If so, how should a user configure the TTL since the
> Kafka
> > retention is infinite.
>
> Great question about compacted topics! There are few usage of compacted
> topic to consider:
> 1. Typical Compacted Topic Usage: The compacted topics are commonly used
> as cache/state layers where the latest value for each key represents the
> current state. These use cases typically involve smaller records since they
> need to be loaded efficiently for state reconstruction. Large message
> serialization would be unusual for this pattern.
>
> 2. User-Defined Compacted Topics with Large Payloads: for applications
> that do use compacted topics with large payloads, the PayloadStore
> implementation should handle this by:
>         a. Consistent ID Generation: Use deterministic IDs based on the
> Kafka key (rather than random UUIDs) so that when a payload id is updated,
> it overwrites the same payload store object instead of creating new ones.
>
>         b. TTL Strategy: Since compacted topics can retain data
> indefinitely, users have two options:
>                 - Set a business-appropriate TTL (basically "we know our
> cache data becomes stale after 30 days”)
>                 - Configure no TTL and accept indefinite storage costs as
> a trade-off for the architectural benefits
>
> 3. Compact + Delete Policy: Topics with `cleanup.policy=compact,delete`
> will eventually remove old data, so standard TTL approaches work normally.
> The key insight is that TTL configuration depends on the business
> requirements and usage patterns rather than just the Kafka retention
> policy. The PayloadStore implementation should provide flexibility for
> users to make this trade-off consciously.
>
> I have update the consideration section with this.
>
> Thanks
> Omnia
>
> > On 11 Aug 2025, at 23:32, Jun Rao <j...@confluent.io.INVALID> wrote:
> >
> > Hi, Omnia,
> >
> > Thanks for the reply. A few more comments.
> >
> > JR20. Could you fix the inconsistency?
> > org.apache.kafka.common.serialization.largemessage.PayloadStore vs
> > org.apache.kafka.common.serialization.largemessage.store.PayloadStore
> >
> > JR21. "Encapsulate that ID into a simple Kafka event using a structured
> > format.". Could you define the structured format explicitly?
> >
> > JR22. TTL for object stores: Could the new serializer be used on a
> > compacted topic? If so, how should a user configure the TTL since the
> Kafka
> > retention is infinite.
> >
> > Jun
> >
> > On Mon, Aug 11, 2025 at 10:08 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
> <mailto:o.g.h.ibra...@gmail.com>>
> > wrote:
> >
> >> Hi Jun
> >>> JR11. Do you think that we need to add key.serializers and
> >>> key.deserializers or do you think covering large messages in value is
> >>> enough?
> >> The large message issue usually is a value issue, I never saw a key that
> >> is bigger than broker message size. If I would add key.serializers and
> >> key.deserializers it would be for consistency and maybe there are
> use-cases
> >> where developers want to apply multiple serializations in order on the
> key
> >> as well outside the context of the large message support.
> >> I updated the KIP to add these two configs now.
> >>
> >>> JR12. We can load ComposableSerializer automatically if
> value.serializers
> >>> or value.deserializers are specified. But, it seems that we could keep
> >>> ComposableSerializer as an internal implementation. For example,
> >>> ProducerInterceptors is automatically loaded when multiple interceptors
> >> are
> >>> specified and is an internal class.
> >> I updated the kip to highlight that the changes is updating
> ProducerConfig
> >> and ConsumerConfig to have new config for serializers/deserializers and
> not
> >> the actual class this is implementation details and not public
> interfaces.
> >>
> >>> JR17. We could estimate the size after compression, but the estimator
> is
> >>> not 100% accurate. It seems that it's simpler to just use the original
> >>> message size.
> >> We can keep it as original message size, I was thinking if it is good
> >> enough for max.request.size it might be good enough for this. I updated
> the
> >> KIP anyway to simplify it and keep it to the original size.
> >>
> >> Hope the final version addressed all feedbacks and we can resume with
> the
> >> voting
> >>
> >> Thanks
> >> Omnia
> >>
> >>> On 8 Aug 2025, at 22:10, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>
> >>> Hi, Omnia,
> >>>
> >>> Thanks for the reply.
> >>>
> >>> JR11. Do you think that we need to add key.serializers and
> >>> key.deserializers or do you think covering large messages in value is
> >>> enough?
> >>>
> >>> JR12. We can load ComposableSerializer automatically if
> value.serializers
> >>> or value.deserializers are specified. But, it seems that we could keep
> >>> ComposableSerializer as an internal implementation. For example,
> >>> ProducerInterceptors is automatically loaded when multiple interceptors
> >> are
> >>> specified and is an internal class.
> >>>
> >>> JR17. We could estimate the size after compression, but the estimator
> is
> >>> not 100% accurate. It seems that it's simpler to just use the original
> >>> message size.
> >>>
> >>> Jun
> >>>
> >>> On Mon, Aug 4, 2025 at 10:33 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
> >> <mailto:o.g.h.ibra...@gmail.com>>
> >>> wrote:
> >>>
> >>>>
> >>>> Hi Jun
> >>>>> JR11. value.serializers and value.deserializers: Should they be of
> type
> >>>>> List? Also, where are key.serializers and key.deserializers?
> >>>>>
> >>>> Updated now
> >>>>> JR12. Do we still need ComposableSerializer and
> ComposableDeserializer?
> >>>> The initial thinking here was if `value.serializers or
> >> value.deserializers
> >>>> ` exists the client will load  ComposableSerializer or
> >>>> ComposableDeserializer  automatically and use them. Unfortunately this
> >>>> would need us to define these serializers.
> >>>>
> >>>> The other option is to update `Plugin<Serializer<V>>
> >>>> valueSerializerPlugin` and KafkaProducer constructor to accept
> >>>> List<Serializer<V>> and move the logic of the ComposableSerializer
> into
> >>>> KafkaProducer::doSend which when we do serialization (same with
> >>>> KafkaConsumer). This option hide the logic and reduce exposure for
> >> client
> >>>> to these.
> >>>> WDYT?
> >>>>
> >>>>>
> >>>>> JR13. large.message.payload.store.class : should it be of type class?
> >>>> Updated
> >>>>>
> >>>>> JR14.
> >>>>>
> >>>>
> >>
> org.apache.kafka.common.serialization.largemessage.LargeMessageSerializer :
> >>>>> The name seems redundant since largemessage appears twice.
> >>>> Updated
> >>>>>
> >>>>> JR15. PayloadResponse: It still mentions response code. It mentions
> >>>>> "isRetryable flag", which no longer exists in PayloadStoreException.
> >>>> There
> >>>>> are typos in "then it will serialiser will”.
> >>>> The KIP is updated now
> >>>>
> >>>>> JR16. Regarding returning new byte[0] if
> >>>> large.message.skip.not.found.error
> >>>>> is true, this will likely fail the next deserializer and the
> >> application
> >>>>> won't have the right context of the error. It's probably better to
> just
> >>>>> propagate the specific exception and let the caller handle it.
> >>>> You right this will cause issue if there is another deserializer
> waiting
> >>>> for the data. I have updated the KIP with this.
> >>>>
> >>>>>
> >>>>> JR17. LargeMessageSerializer:  "Check if the estimated size of the
> data
> >>>>> (bytes) after applying provided compression (if there is one)"
> >>>> Compression
> >>>>> actually happens after serialization and is done on a batch of
> records.
> >>>> Yes the compression itself happened after but we also have
> >>>> `estimateSizeInBytesUpperBound` which my understanding is this take
> the
> >>>> compression type into the account as well
> >>>>>
> >>>>>
> >>>>> JR18. Could you define the type T for LargeMessageSerializer?
> >>>> Update the KIP this T would be byte[]
> >>>>
> >>>> Thanks
> >>>> Omnia
> >>>>>
> >>>>> Jun
> >>>>>
> >>>>> On Fri, Jul 25, 2025 at 6:28 AM Omnia Ibrahim <
> o.g.h.ibra...@gmail.com
> >>>> <mailto:o.g.h.ibra...@gmail.com>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Jun, thanks for having the time to review this
> >>>>>>
> >>>>>>> JR1. While the KIP is potentially useful, I am wondering who is
> >>>>>> responsible
> >>>>>>> for retention for the objects in the payload store. Once a message
> >>>> with a
> >>>>>>> reference is deleted, the key of the external object is lost and
> the
> >>>>>> object
> >>>>>>> may never be deleted.
> >>>>>>
> >>>>>> The `ttl` in the object store is the responsibility of the owner of
> >> this
> >>>>>> store it should be configured in away that is reasonable with the
> >>>> retention
> >>>>>> config in Kafka.
> >>>>>> I have updated the KIP with `Consideration` section.
> >>>>>>>
> >>>>>>> JR2. Configs: For all new configs, it would be useful to list their
> >>>>>> types.
> >>>>>> Updated the KIP now
> >>>>>>>
> >>>>>>> JR3. value.serializers: Why is this required? If a user doesn't set
> >> it,
> >>>>>> we
> >>>>>>> should just use value.serializer, right? Ditto for key.serializers.
> >>>>>> No you right this was copy/past mistake
> >>>>>>
> >>>>>>>
> >>>>>>> JR4. For all new public interfaces such as LargeMessageSerializer,
> >>>>>>> PayloadStore and PayloadResponse, it would be useful to include the
> >>>> full
> >>>>>>> package name.
> >>>>>> Updated the KIP now
> >>>>>>>
> >>>>>>> JR5. large.message.payload.store.retry.max.backoff.ms and
> >>>>>>> large.message.payload.store.retry.delay.backoff.ms: Is the
> intention
> >>>> to
> >>>>>>> implement exponential backoff on retries? If so, it's more
> consistent
> >>>> if
> >>>>>> we
> >>>>>>> can follow the existing naming convention like
> retry.backoff.max.ms
> >> <
> >>>>>> http://retry.backoff.max.ms/> and
> >>>>>>> retry.backoff.ms <http://retry.backoff.ms/> <
> >> http://retry.backoff.ms/> <http://retry.backoff.ms/
> >>>>> .
> >>>>>> I have removed these to simplify the config more (as Luke suggested
> >>>>>> initially) and added these to the consideration section.
> >>>>>>
> >>>>>>>
> >>>>>>> JR6. large.message.skip.not.found.error : If the reference can't be
> >>>>>> found,
> >>>>>>> what value does the deserializer return? Note that null has a
> special
> >>>>>>> meaning for tombstone in compacted topics.
> >>>>>> The deserialiser will return `new byte[0]` not null.
> >>>>>>>
> >>>>>>> JR7. PayloadResponse: Why do we have both responseCode and
> >>>>>>> PayloadStoreException?
> >>>>>> We can do without responseCode, the initial though was to report
> >>>> response
> >>>>>> code form payload store.
> >>>>>> Update the KIP.
> >>>>>>> JR8. Why do we need PayloadStore.metrics? Note that we could
> monitor
> >>>> the
> >>>>>>> metrics in a plugin through the Monitorable interface.
> >>>>>> Oh nice, I didn’t know about this interface before. Updated the KIP
> >> with
> >>>>>> this now.
> >>>>>>>
> >>>>>>> JR9. Why do we need the protected field
> >>>>>> PayloadStoreException.isRetryable?
> >>>>>> Initial thought here was the serializer can retry the upload. But I
> >> have
> >>>>>> removed all the retry logic from serializer and it will be up to the
> >>>>>> PayloadStore provider to implement this if they need it.
> >>>>>>>
> >>>>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
> >>>>>> interface.
> >>>>>> It is updated now to interface.
> >>>>>>
> >>>>>> Hope the last version of the KIP is more simpler now
> >>>>>>
> >>>>>> Thanks
> >>>>>> Omnia
> >>>>>>
> >>>>>>> On 23 Jul 2025, at 00:43, Jun Rao <j...@confluent.io.INVALID
> <mailto:j...@confluent.io.INVALID> <mailto:
> >> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID>> <mailto:
> >>>> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID> <mailto:
> j...@confluent.io.INVALID>>> wrote:
> >>>>>>>
> >>>>>>> Thanks for the KIP. A few comments.
> >>>>>>>
> >>>>>>> JR1. While the KIP is potentially useful, I am wondering who is
> >>>>>> responsible
> >>>>>>> for retention for the objects in the payload store. Once a message
> >>>> with a
> >>>>>>> reference is deleted, the key of the external object is lost and
> the
> >>>>>> object
> >>>>>>> may never be deleted.
> >>>>>>>
> >>>>>>> JR2. Configs: For all new configs, it would be useful to list their
> >>>>>> types.
> >>>>>>>
> >>>>>>> JR3. value.serializers: Why is this required? If a user doesn't set
> >> it,
> >>>>>> we
> >>>>>>> should just use value.serializer, right? Ditto for key.serializers.
> >>>>>>>
> >>>>>>> JR4. For all new public interfaces such as LargeMessageSerializer,
> >>>>>>> PayloadStore and PayloadResponse, it would be useful to include the
> >>>> full
> >>>>>>> package name.
> >>>>>>>
> >>>>>>> JR5. large.message.payload.store.retry.max.backoff.ms and
> >>>>>>> large.message.payload.store.retry.delay.backoff.ms: Is the
> intention
> >>>> to
> >>>>>>> implement exponential backoff on retries? If so, it's more
> consistent
> >>>> if
> >>>>>> we
> >>>>>>> can follow the existing naming convention like
> retry.backoff.max.ms <http://retry.backoff.max.ms/>
> >> <http://retry.backoff.max.ms/> <
> >>>> http://retry.backoff.max.ms/> <
> >>>>>> http://retry.backoff.max.ms/> and
> >>>>>>> retry.backoff.ms <http://retry.backoff.ms/> <
> http://retry.backoff.ms/> <
> >> http://retry.backoff.ms/> <http://retry.backoff.ms/
> >>>>> .
> >>>>>>>
> >>>>>>> JR6. large.message.skip.not.found.error : If the reference can't be
> >>>>>> found,
> >>>>>>> what value does the deserializer return? Note that null has a
> special
> >>>>>>> meaning for tombstone in compacted topics.
> >>>>>>>
> >>>>>>> JR7. PayloadResponse: Why do we have both responseCode and
> >>>>>>> PayloadStoreException?
> >>>>>>>
> >>>>>>> JR8. Why do we need PayloadStore.metrics? Note that we could
> monitor
> >>>> the
> >>>>>>> metrics in a plugin through the Monitorable interface.
> >>>>>>>
> >>>>>>> JR9. Why do we need the protected field
> >>>>>> PayloadStoreException.isRetryable?
> >>>>>>>
> >>>>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
> >>>>>> interface.
> >>>>>>>
> >>>>>>> Thanks,
>
>

Reply via email to