Hi Luke, sorry for late response. > IMO, the retry should be the logic inside the > "large.message.payload.store.class" > implementation. If we really want it, I think we need to make it clear in > which circumstance we will retry. For example, if it's an unknown exception > thrown from S3 API, what will we do to it? There was one class definition missing which is PayloadStoreException that is used in PayloadResponse. The PayloadStoreException has flag if it is true then we retry. And then serialised retry on any exception with this flag. We can make publish and download methods throw PayloadStoreException directly to simplify things WDYT?
> Moving it into the store class makes it much clearer. And it's good to have > a default implementation. > But to me, it's still an implementation-level detail that we don't need to > expose to users to implement it. > Could I know more about when the Id generator will be invoked? > My thought is : > Users can implement a `publish` method to publish like this: > public PayloadResponse publish(String topic, byte[] data) { > String id = genId(); > // put the id and data to the remote storage > s3Client.put(id, data, ...); > } > > So, with the id method in the interface, who will invoke it? Suppose it > will be the serializer/deserializer, but no one passes the generated id to > the publish method, how do we use it? It will be invoked by the payload class when publish the payload. So it is an implementation details of this class yes. > 7. Why do we need "PayloadResponse"? Why can't we return a String or a URL > object? Originally because I needed something bit of wrapper for re-try and skip logic. But as I mentioned in first point we can make publish and download methods return the ref or payload otherwise they throw PayloadStoreException directly to simplify things WDYT? Other option have publish return only ref as string and download would be the one that could throw PayloadNotFoundException. > 8. Could we change the abstract class to interface? We can once we agree on what config we will drop and move to the details of payload provided by the users. regards Omnia > On 19 May 2025, at 09:54, Luke Chen <show...@gmail.com> wrote: > > Hi Omnia, > > Thanks for the explanation and update. > It's better now. > > Questions: >> 2. It's not clear how the "retry count" comes into play in the KIP. It >> needs more explanation. > My initial thinking is the retry configs are a must for all blob stores, so > we can provide them, and validate them for free for all blob stores so not > every implementation will go through verifying them. > > IMO, the retry should be the logic inside the > "large.message.payload.store.class" > implementation. If we really want it, I think we need to make it clear in > which circumstance we will retry. For example, if it's an unknown exception > thrown from S3 API, what will we do to it? > >> 6. About the BlobIdGenerator, why do we need it, could you explain more? >> Again, I thought we only need to replace value to a path, and add the >> "large-message" header, so that when consumer reads this message, it'll >> read the path from value and get the original data via BlobStore. Why do > we >> need this ID generator? I think users should do the object naming when >> putting the object by themselves, not via another interface. WDYT? > In some cases generating the ID might need some smart work for example to > avoid s3 throttling the recommended way on their doc it to create sub paths > under the original bucket, to decide this we might hash the data to find a > suitable sub-path. > Here is an example of how I would generate an path for s3 file > ``` > public String id(byte[] data) { > String subFolder = topic + "-" + Utils.toPositive(Utils.murmur2(data)) % > distributionFactor // distributionFactor is a config for the Id generator > and it represent the max number of sub-folders under the bucket > return subFolder + “/“ + UUID.randomUUID().toString() > } > ``` > Hope this example clarify a bit. However I do agree here it might not need > a class. I have move it to be part of the store class. > > Moving it into the store class makes it much clearer. And it's good to have > a default implementation. > But to me, it's still an implementation-level detail that we don't need to > expose to users to implement it. > Could I know more about when the Id generator will be invoked? > My thought is : > Users can implement a `publish` method to publish like this: > public PayloadResponse publish(String topic, byte[] data) { > String id = genId(); > // put the id and data to the remote storage > s3Client.put(id, data, ...); > } > > So, with the id method in the interface, who will invoke it? Suppose it > will be the serializer/deserializer, but no one passes the generated id to > the publish method, how do we use it? > > 7. Why do we need "PayloadResponse"? Why can't we return a String or a URL > object? > > 8. Could we change the abstract class to interface? > > Thanks. > Luke > > > > > On Wed, Apr 30, 2025 at 9:21 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com > <mailto:o.g.h.ibra...@gmail.com>> > wrote: > >> Hi Luke, >> >>>> 3. What does "LargeMessageFormatter" do in the process? >>>> I thought all we want to do is to replace the "large value data" into a >>>> path, and consumers will read the path via blob store class. >>>> All these should be done in serializer/deserializer, so why do we need >> the >>>> formatter? >>> >>> I wanted to bit of more info than just the path to download, for example >> I want to add stuff like the class path for the original blob store for >> example if consumer is setup with the unmatched blob store to the one used >> during publishing. >>> I have updated the KIP to simplify this by having this always as a >> simple json of path and publisher class which can be represented as >> PayloadReferenceValue. WDYT? >> >> I thought of another case where having the freedom to form the reference >> might be nice feature, which is DR. Let’s imagine this case where someone >> publish large messages to S3 and reference to Kafka topic then they want to >> have DR. This can be achievable if they have mirrored Kafka topic which >> contains the references but if S3 is unreachable form the DR backup >> location then the reference they have is bit useless. However if the >> message formatter is customisable then dev can implement a complicated >> store that publish to two store locations and the publish both references >> to Kafka as one message and the consumer store can download from either >> buckets that are available. I think keeping the door open to such use-case >> might be good feature but also having such use case might be questionable a >> bit with the latency it will add as we will be publishing to N number of >> stores. >> >> Regards >> Omnia >> >>> On 24 Apr 2025, at 17:40, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: >>> >>> Hi Luke, thanks for having the time to look into the KIP >>>> 2. It's not clear how the "retry count" comes into play in the KIP. It >>>> needs more explanation. >>> My initial thinking is the retry configs are a must for all blob stores, >> so we can provide them, and validate them for free for all blob stores so >> not every implementation will go through verifying them. >>> >>>> 3. What does "LargeMessageFormatter" do in the process? >>>> I thought all we want to do is to replace the "large value data" into a >>>> path, and consumers will read the path via blob store class. >>>> All these should be done in serializer/deserializer, so why do we need >> the >>>> formatter? >>> >>> I wanted to bit of more info than just the path to download, for example >> I want to add stuff like the class path for the original blob store for >> example if consumer is setup with the unmatched blob store to the one used >> during publishing. >>> I have updated the KIP to simplify this by having this always as a >> simple json of path and publisher class which can be represented as >> PayloadReferenceValue. WDYT? >>> >>>> 4. In the BlobStore, it looks like we presume users will use object >> stores, >>>> which is not good. >>>> Could we make it more generic? Javadoc, method names, … >>> This is a good point, I have updated the method names and Javadoc. I >> also thinking of renaming the class name to PayloadStoreinstead of >> BlobStore as blob store still tide to object store as well. To set some >> context here, I am proposing this after working with some community form >> Apache Cassandra who are working on Cassandra CEP < >> https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-44%3A+Kafka+integration+for+Cassandra+CDC+using+Sidecar#CEP44:KafkaintegrationforCassandraCDCusingSidecar-LargeBlob>-44 >> to handle large CDC and the initial thinking was let’s publish any large >> cdc to an object store instead of Kafka this why the naming was suggesting >> “blob store” only. >>> >>>> 5. It would be good to have some explanation for the purpose of each new >>>> interface/class, and clear javadoc for each method. >>> >>> Updated the KIP with javadocs >>> >>>> 6. About the BlobIdGenerator, why do we need it, could you explain more? >>>> Again, I thought we only need to replace value to a path, and add the >>>> "large-message" header, so that when consumer reads this message, it'll >>>> read the path from value and get the original data via BlobStore. Why >> do we >>>> need this ID generator? I think users should do the object naming when >>>> putting the object by themselves, not via another interface. WDYT? >>> In some cases generating the ID might need some smart work for example >> to avoid s3 throttling the recommended way on their doc it to create sub >> paths under the original bucket, to decide this we might hash the data to >> find a suitable sub-path. >>> Here is an example of how I would generate an path for s3 file >>> ``` >>> public String id(byte[] data) { >>> String subFolder = topic + "-" + >> Utils.toPositive(Utils.murmur2(data)) % distributionFactor // >> distributionFactor is a config for the Id generator and it represent the >> max number of sub-folders under the bucket >>> return subFolder + “/“ + UUID.randomUUID().toString() >>> } >>> ``` >>> Hope this example clarify a bit. However I do agree here it might not >> need a class. I have move it to be part of the store class. >>> >>> Please let me know WDYT of the final shape of the KIP now >>> >>> Thanks >>> Omnia >>> >>>> On 24 Apr 2025, at 13:31, Luke Chen <show...@gmail.com> wrote: >>>> >>>> Hi Omnia, >>>> >>>> Thanks for proposing this feature that many users expected. >>>> >>>> Some comments: >>>> 1. It's quite interesting to see the idea of chained >>>> serializer/deserializer used here. I like it. >>>> >>>> 2. It's not clear how the "retry count" comes into play in the KIP. It >>>> needs more explanation. >>>> >>>> 3. What does "LargeMessageFormatter" do in the process? >>>> I thought all we want to do is to replace the "large value data" into a >>>> path, and consumers will read the path via blob store class. >>>> All these should be done in serializer/deserializer, so why do we need >> the >>>> formatter? >>>> >>>> 4. In the BlobStore, it looks like we presume users will use object >> stores, >>>> which is not good. >>>> Could we make it more generic? Javadoc, method names, ... >>>> >>>> 5. It would be good to have some explanation for the purpose of each new >>>> interface/class, and clear javadoc for each method. >>>> >>>> 6. About the BlobIdGenerator, why do we need it, could you explain more? >>>> Again, I thought we only need to replace value to a path, and add the >>>> "large-message" header, so that when consumer reads this message, it'll >>>> read the path from value and get the original data via BlobStore. Why >> do we >>>> need this ID generator? I think users should do the object naming when >>>> putting the object by themselves, not via another interface. WDYT? >>>> >>>> Thanks. >>>> Luke >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Apr 10, 2025 at 9:31 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>> wrote: >>>> >>>>> Hi there I would like to start discussions on >>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1159%3A+Large+message+reference+based+Serializer >>>>> >>>>> Thanks >>>>> Omnia