Thanks for the KIP!

Few questions:

# SK1. Metadata Partitions

> The default number of partitions will be TBD.

I guess we would definitely want to start with **a number greater than one**, 
as there's no downside to it (compared to the alternative).

Throwing a number out there, if we have 3 coordinator partitions by default and 
they're all co-located in one broker, it's practically and effectively similar 
to having one partition on one broker, right? The only (good) difference is 
that we're given the ability to distribute the load to 3 nodes later on.

> Moving of user partition from Diskless coordinator to another is possible, 
> but remains out of scope of this KIP.

Which KIP will this be in scope for? I would think some rudimentary way to 
achieve this should be shipped as part of the first release, if we are to 
support multi metadata partitions.

The concern is if we don't have the ability to spread out batch commit 
hot-spots that form due to bad initial user_partition->metadata_partition 
placement, the cluster would be in a tough spot.

This becomes a greater concern when we keep in mind that:

> Adding partitions to __diskless_metadata will be possible to increase the 
> total capacity of the cluster, but only newly created user partitions will be 
> able to take advantage of the new metadata partitions / Diskless coordinators.

In other words, if we get the default partitioning wrong (or if a user gets it 
wrong by overriding the default config), there's little tools to fix the 
problem the moment it's felt.


# SK2. API Exceptions

It would be nice to describe the failure cases for some of the APIs. One 
obvious one I can think of is trying to commit a user-partition batch to a 
coordinator which doesn't own that user-partition. It's mentioned in one 
paragraph that NOT_COORDINATOR would be returned. It'd be nice to define it 
more formally alongside the API requests as is customary with KIPs

# SK3. SQLite

AFAICT, we would create a SQLite table to materialize the "Metadata records" 
described. Have we modelled the table and the type of queries we envision 
running against it? I think it would clear up the KIP very nicely if we could 
add some first-draft of this table and its usage.

# SK4. Request Serialization

Regarding the optimization regardign the speculative metadata application on 
the local state:

> We want to perform operations in the pipelined manner to reduce the operation 
> latency. That is, to be able to start working on the following operation 
> while the current one is still waiting to be replicated.

Which operations in particular is it thought that this will help with?

For example, I imagine two consecutive DisklessCommitFile requests (A, B) 
cannot be optimized as B needs to wait for A to be fully replicated before 
having its own records written to disk (and then wait to get replicated). The 
optimization of speculative application here would be to have B do steps 1-3 in 
parallel while waiting on A's replication (step 5). I figure those steps aren't 
heavyweight enough to warrant optimization via speculative/parallel 
application. If anything, steps 4,5,6 are the heavier ones -- and they're 
blocked on 5) finishing from the previous request. Am I missing something?

I guess, in other words, I'm asking - who and how will they read this 
speculative in-memory (or in tx) state?

# SK5. Snapshots

> The primary mechanism to keep the metadata log size contained will be 
> snapshotting and pruning of the log. Periodically, the leader will take 
> snapshots of the local state asynchronously and the followers will be able to 
> fetch these snapshots. Once a metadata log offset is in a snapshot, it could 
> be pruned. This mechanism is identical to the one in KRaft (see KIP-630).

Does this mean we will reuse the existing FetchSnapshot RPC and create new 
.checkpoint (or equivalent) files to manage diskless snapshots? (i.e we won't 
store them in the diskless metadata log). It's good to point out the high-level 
identical parts explicitly in this KIP.

It would be nice to define the snapshot format so that we can reason about how 
often we need to snapshot and how large the files could be.

# SK6. WAL File Management - Owner IDs

> { "name": "OwnerIds", "type": "[]int32", "versions": "0+", "entityType": 
> "brokerId", "about": "The list of Diskless Coordinator IDs that share 
> ownership of this file." },
> The list of owners is included as the field in DisklessCommitFile request. 
> Now each DC knows what other DCs claim the file.

AFAICT, ownership is through user_partition->diskless_metadata_partition->DC 
that's the leader of diskless_metadata_partition.

What happens if the user_partition is reassigned to another DC (as will 
eventually be supported)? Does ownership remain, or change with the new leader?

Are there downsides to storing and mapping it via DisklessMetadataPartitionIds 
here?

# SK7. WAL File Management

> The first DC in the owner list becomes the owner. Once the last batch in this 
> DC for the file is deleted, the DC hands over the file to the next owner in 
> the list.

> The status of the file is changed in the local state (but not in the metadata 
> log, because this information is already implicitly present there and the 
> followers will know it) and a background worker is started for the 
> corresponding operation (handover or deletion). If the target broker for 
> handover or the remote storage is not available, the background worker will 
> retry the operation indefinitely. If the DC leadership changes, the 
> background workers on the current broker will be stopped and started again on 
> the new leader (as the new leader has been reading the same metadata log and 
> knows the status of files).

If we have DCs [A,B,C] and A finishes deleting all batches, how is the 
hand-over done from A to B?

If it's only kept in the local state, what happens during crashes (e.g if A and 
B crash at once)? How does B recover and know it's its turn to own the file? I 
guess A deterministically starts the background worker and is the owner, 
figures out its batches are gone and then "hands over" to B. There could be 
edge cases in the "hands over" part, hence asking for a definition.

# SK8. Preferred Produce Brokers

> The value of this field will be dynamically calculated on the broker side

Which KIP will focus on this field? (algorithm, etc)

It's a very critical config w.r.t cost management (S3 request counts)

In particular I think it would be important to have this persisted somewhere, 
defined and modifiable in some way, so that tools like Cruise Control could be 
extended to have control over it, as well as visibility/monitoring.

Best,
Stan

On 2026/03/02 13:43:25 Ivan Yurchenko wrote:
> Hi all,
> 
> The parent KIP-1150 was voted for and accepted. Let's now focus on the 
> technical details presented in this KIP-1164 and also in KIP-1163: Diskless 
> Core [1].
> 
> Best,
> Ivan
> 
> [1] 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1163%3A+Diskless+Core
> 
> On Tue, Apr 29, 2025, at 07:54, yuxia wrote:
> > Thanks Giuseppe for the explanation! It make sense to me.
> > 
> > Best regards,
> > Yuxia
> > 
> > ----- 原始邮件 -----
> > 发件人: "Giuseppe Lillo" <[email protected]>
> > 收件人: "dev" <[email protected]>
> > 发送时间: 星期二, 2025年 4 月 29日 上午 12:14:14
> > 主题: Re: [SPAM][DISCUSS] KIP-1164: Topic Based Batch Coordinator
> > 
> > Hello Yuxia, thanks for your question and interest!
> > 
> > When producing, the broker will call the relevant Batch Coordinator with a
> > CommitBatches request.
> > The Batch Coordinator will then write the metadata about these batches into
> > the __diskless-metadata topic and update its internal state persisted on
> > SQLite. It will then reply with the assigned offsets.
> > Read-only Batch Coordinators will also replicate those metadata into their
> > own internal state.
> > 
> > When consuming, the broker will call the relevant Batch Coordinator with a
> > FindBatches request.
> > The Batch Coordinator will search the requested offsets within its internal
> > state and reply with the batch coordinates (object key, offset within the
> > object).
> > 
> > In your example, I suppose that A, B and C are all messages written to the
> > same topic-partition.
> > The problem you described is solved by the idempotent producer. In order to
> > support idempotent producer in Diskless topics, information about producer
> > ID and sequence numbers must be communicated to the Batch Coordinator when
> > committing a new batch. We included information about the producer
> > (producer id and producer epoch) and the sequence numbers (base sequence,
> > last sequence) both in the commitFile public interface and in the
> > CommitBatches API. When serving a CommitBatches request that includes
> > idempotent producer information, the Batch Coordinator will also perform
> > some checks to understand if the produce request is a duplicate or if it
> > contains out-of-order messages by checking with the internal state.
> > 
> > Best regards,
> > Giuseppe
> > 
> > On Thu, Apr 24, 2025 at 4:24 AM yuxia <[email protected]> wrote:
> > 
> > > Hi!
> > >
> > > Thanks for the greate work and I'm excited to see it happens. This KIP
> > > looks well to me.
> > > Seems Batch Coordinator is very important in the diskless implementation,
> > > could you explain more details on the implementation? I think it'll be 
> > > much
> > > better to show what Batch Coordinator will do when write/read or other
> > > request comes.
> > >
> > > I'm also wondering how it "chooses the total ordering for writes" and
> > > what's the "information necessary to support idempotent producers".
> > > I'm thinking about the following cases:
> > > 1: client is going to send message A, B, C to Kafka
> > > 2: client sending A, B to broker1, broker1 recieve A, B
> > > 3: broker1 down, client send C to broker2
> > > 4: since broker1 is down, then client recieve A,B fail and retry to send
> > > A,B to broker2
> > > Then, how Batch Coordinator can choose total order to be A,B,C ?
> > >
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > ----- 原始邮件 -----
> > > 发件人: "Ivan Yurchenko" <[email protected]>
> > > 收件人: "dev" <[email protected]>
> > > 发送时间: 星期三, 2025年 4 月 23日 下午 5:46:46
> > > 主题: [SPAM][DISCUSS] KIP-1164: Topic Based Batch Coordinator
> > >
> > > Hi all!
> > >
> > > We want to start the discussion thread for KIP-1164: Topic Based Batch
> > > Coordinator [1], which is a sub-KIP for KIP-1150 [2].
> > >
> > > Let's use the main KIP-1150 discuss thread [3] for high-level questions,
> > > motivation, and general direction of the feature and this thread for
> > > discussing the batch coordinator interface and the proposed topic-based
> > > implementation.
> > >
> > > Best,
> > > Ivan
> > >
> > > [1]
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1164%3A+Topic+Based+Batch+Coordinator
> > > [2]
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1150%3A+Diskless+Topics
> > > [3] https://lists.apache.org/thread/ljxc495nf39myp28pmf77sm2xydwjm6d
> > >
> > 
> 

Reply via email to