Stan,

You are always on top of all new Kafka developments and again thanks for your 
time reading the KIP.

For your concern on version/message compatibility issue between client and 
broker (kind of similar to inter-broker-protocol version issue), we can 
introduce another field in FetchRequest: SupportedStorageFormatVersions, the 
consumer will fill in this field with the server side storage format versions 
it understands (we can mark the current storage format as version 1), the 
server can fallback to the old behavior if it thinks the client version is too 
low.  And we mark this field as list type to contain a list of versions the 
client supports, so in the case when the client is new while the server is old 
(forward-compatibility case), the server can still respond with the new 
behavior if server’s old version is still in the list of versions client 
understands.

For the consumer side implementation, we are writing that KIP as we speak.  The 
implementation of Java consumer side will follow the design of KIP-392 
(preferredReadReplica) and KIP-405 (RemoteStorageManager plugin).  
    * In KIP-392, when the consumer receives FetchResponse which only contains 
preferredReadReplica info, the consumer will save the preferredReadReplica info 
on SubscriptionState object and generate another fetch request to the new 
preferredReadReplica node.  We will do the same for KIP-1248, save the remote 
log segment location in SubscriptionState object.  
    * We will also create a new class RemoteStorageNetworkClient which talks to 
remote tiered storage (parallel to the current NetworkClient who talks to Kafka 
broker).  The RemoteStorageNetworkClient will takes in a FetchRequest and 
output a FetchResponse back to the consumer thread.  
    * A new interface RemoteStorageReader will be added to consumer side 
(similar to RemoteStorageManager on server side) which has one method:  
InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, 
int startPosition, int endPosition)
    * S3/GCS/Azure specific implementation of this interface can be plugged in 
(much like the plugin implementation of RemoteStorageManger on server side)
    * RemoteStorageNetworkClient will invoke  
RemoteStorageReader.fetchLogSegment to download the data segment file content 
from remote tiered storage and translate the file content in server batch 
format to MemoryRecords to FetchResponse object (reuse some of the code 
currently in RemoteLogManager.read() from server side);

S1: plugin mechanism for S3/GCS/Azure implementation, mentioned above
S2: performance on consumer side, usually multi-part download will improve the 
download performance, but this is a question for S3 plugin implementation;  The 
new fetchLogSegment method allows the consumer to specify a fetch range;
S3: Fallback mechanism, both the server or the client can fallback to the 
current behavior if the version mismatch or access to remote tiered storage has 
issues or on some other error conditions.

Henry Haiying Cai

On 2025/12/02 15:01:40 Stanislav Kozlovski wrote:
> Hey Henry, thanks for the KIP! I'm excited to see this proposal as I've heard 
> it be discussed privately before too.
> 
> Can we have some wording that talks about the trade-offs of coupling clients 
> to the underlying storage format? Today, the underlying segment format is 
> decoupled from the clients, since the broker handles conversion of log 
> messages to what the protocol expects. I'm sure certain proprietary Kafka 
> implementations use different formats for their underlying storage - it's an 
> interesting question how they would handle this (to be explicit, I'm not 
> proposing we should cater our design to those systems though, simply calling 
> it out as a potential contention point). 
> 
> Things I'm thinking about:
> - Would this be a optional feature?
> - How would forward-compatibility look like?
> 
> e.g if we ever want to switch the underlying storage format? To bullet-proof 
> ourselves, do we want to introduce some version matching which could then 
> help us understand non-compatibility and throw errors? (e.g we change storage 
> format in 6.x, and a 4.x client tries to read from a 6.x 
> broker/storage-foramt)
> 
> Can we also have some wording on how this feature would look like on the 
> consumer-side? The proposal right now suggests we handle this in a follow-up 
> KIP, which makes sense for the details - but what about a high-level overview 
> and motivation?
> 
> 1. We would likely need a similar plugin system for Consumers like brokers 
> have for KIP-405. Getting that interface right would be important. Ensuring 
> the plugin configured on the consumer matches the plugin configured on the 
> broker would be useful from a UX point of view too.
> 
> 2. From a cost and performance perspective, how do we envision this being 
> used/configured on the consumer side?
> 
> A single segment could be GBs of size. It's unlikely a consumer would want to 
> download the whole thing at once.
> 
> For tiered backends that are S3-compatible cloud object storage systems, we 
> could likely use byte-range GETs, thus avoiding reading too much data that'll 
> get discarded. Are there concerns with other systems? A few words on this 
> topic would help imo.
> 
> 3. Should we have fall-backs to the current behavior?
> 
> Best,
> Stan
> 
> On 2025/12/02 11:04:13 Kamal Chandraprakash wrote:
> > Hi Haiying,
> > 
> > Thanks for the KIP!
> > 
> > 1. Do you plan to add support for transactional consumers? Currently, the
> > consumer doesn't return the aborted transaction records to the handler.
> > 2. To access the remote storage directly, the client might need additional
> > certificates / keys. How do you plan to expose those configs on the client?
> > 3. Will it support the Queues for Kafka feature KIP-932
> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka>?
> > And so on.
> > 
> > --
> > Kamal
> > 
> > On Tue, Dec 2, 2025 at 10:29 AM Haiying Cai via dev <[email protected]>
> > wrote:
> > 
> > > For some reason, the KIP link was truncated in the original email.  Here
> > > is the link again:
> > >
> > > KIP:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1248%3A+Allow+consumer+to+fetch+from+remote+tiered+storage
> > >
> > > Henry Haiying Cai
> > >
> > > On 2025/12/02 04:34:39 Henry Haiying Cai via dev wrote:
> > > >
> > > > Hi all,
> > > >
> > > >
> > > >
> > > >
> > > > I would like to start discussion on KIP-1248: Allow consumer to fetch
> > > from remote tiered storage
> > > >
> > > >
> > > >
> > > > KIP link: KIP-1248: Allow consumer to fetch from remote tiered storage -
> > > Apache Kafka - Apache Software Foundation
> > > >
> > > > |
> > > > |
> > > > |  |
> > > > KIP-1248: Allow consumer to fetch from remote tiered storage - Apache
> > > Ka...
> > > >
> > > >
> > > >  |
> > > >
> > > >  |
> > > >
> > > >  |
> > > >
> > > >
> > > >
> > > >
> > > > The KIP proposes to allow consumer clients to fetch from remote tiered
> > > storage directly to avoid hitting broker's network capacity and cache
> > > performance.  This is very useful to serve large backfill requests from a
> > > new or fallen-off consumer.
> > > >
> > > >
> > > >
> > > >
> > > > Any feedback is appreciated.
> > > >
> > > >
> > > >
> > > >
> > > > Best regards,
> > > >
> > > >
> > > >
> > > > Henry
> > 
> 

Reply via email to