Hi Federico and team,

Thank you for the latest updates and the revised command workflows. It is
great to see the CRC recalculation and chained mirroring issues (which I
initially flagged in VK9 and Rajini subsequently noted in RS12) being
actively addressed, and I appreciate the new approach proposed by Michael
Maison for the chained mirroring topology.

However, it appears my second technical review email detailing several
critical, code-level blockers was missed ( shared above ) in the latest
round of responses. Because these items directly impact data integrity and
cluster stability, they must be resolved on the record and within the KIP
text before we can confidently proceed to a vote.

Could the authors please provide technical clarifications on the following
outstanding items:


*1. The Negative PID Bug & LSO Calculation (VK4 / VK8 - Critical Blocker)*As
I noted previously, generating negative Producer IDs via -(sourceProducerId
+ 2) fundamentally breaks Kafka's internal state management. In
AbstractRecordBatch.java, hasProducerId() strictly checks if the ID is
greater than -1. If we use negative PIDs, the broker will skip
ProducerStateManager.update(), the ongoingTxns map will remain empty, and
the Last Stable Offset (LSO) will default to the High Watermark. This will
cause read_committed consumers to immediately read uncommitted data. We
must shift this formula into the positive space (e.g., Long.MAX_VALUE -
sourceProducerId - 1) to pass these internal checks.


*2. TransactionIndex Rebuild Ambiguity (VK11 - Critical Blocker)*Does the
destination broker rebuild the TransactionIndex locally, or is it copied
byte-for-byte from the source? If it is copied, it will retain the original
source PIDs. Since the destination log contains our rewritten PIDs, they
will not match, causing consumers to fail the abort check and read aborted
messages. The KIP must explicitly mandate a local rebuild.

*3. Thundering Herd Heap Allocation (VK1 Follow-up)*
While Omnia addressed bandwidth throttling, the immediate RAM demand on the
JVM remains a severe risk. If 50,000 partitions wake up, fetcher threads
eagerly claiming the default 1MB buffer will create an instant 50GB heap
demand, risking a broker-wide OOM. How does the design cap the number of
concurrent active fetch buffers to protect the heap?

*4. Control Plane Hotspots (VK3 Follow-up)*
We established that the global Controller is safe, but shifting the load to
the __mirror_state topic creates a new hotspot. If a link flaps and 50,000
partitions transition states simultaneously, the broker hosting the leader
for that coordinator topic will be overwhelmed. What is the partition
assignment strategy or sizing guideline to mitigate this?

*5. Offset Sync Race Condition and Data Loss (VK12)*
The KIP's suggested mitigation for synced offsets exceeding the destination
log end offset is to instruct operators to set auto.offset.reset=latest.
This will cause actual data loss, as the consumer will skip from the log
end offset to the newly committed offset. To prevent data loss, shouldn't
we dynamically cap synced offsets to min(sourceCommitted, destinationLEO)?

I want to ensure this new architecture is as robust as possible. I look
forward to your thoughts on resolving these data safety blockers.

Regards,
Viquar Khan

On Tue, 7 Apr 2026 at 10:46, Federico Valeri <[email protected]> wrote:

> Hi Andrew,
>
> AS6, AS7: Thanks! I have to admit that I didn't consider operations
> which act on two kinds of resources, but it makes total sense as we
> want to prevent privilege escalation. For example, a user may have
> ALTER on CLUSTER_MIRROR but no topic access, so shouldn't be able to
> add a topic to mirror. I guess pause/resume mirror topics should
> follow the same pattern.
>
> AS9: Yes, CLUSTER_MIRROR is a distinct resource type and should have
> its own error code, like the others. Fixed.
>
> Let us think more about the rest.
>
> On Fri, Apr 3, 2026 at 4:59 PM Andrew Schofield <[email protected]>
> wrote:
> >
> > Hi Fede and friends,
> > Thanks for the responses to my excessive comments. I like the direction
> it's heading in.
> >
> > AS6, AS7: Thanks for adding the tables of permissions. Making
> ClusterMirror an ACL resource seems like a good improvement to me. A few
> detailed clarifications:
> >
> > DescribeConfigs RPC: DESCRIBE_CONFIGS on TOPIC, not DESCRIBE.
> > CreatePartitions RPC: ALTER on TOPIC.
> > IncrementalAlterConfigs RPC: ALTER_CONFIGS on CLUSTER_MIRROR on mirrors,
> and ALTER_CONFIGS on TOPIC for topics.
> > OffsetCommit RPC: READ on GROUP and READ on TOPIC.
> > CreateAcls RPC: ALTER on CLUSTER.
> > DeleteAcls RPC: ALTER on CLUSTER.
> >
> > For operations which act on two kinds of resources, such as doing topic
> things to groups, we generally need permission on both resources. I suggest:
> >
> > AddTopicsToMirror RPC: Maybe add READ on TOPIC.
> > RemovesTopicsFromMirror RPC: Maybe add READ on TOPIC.
> >
> > AS8: I think that including AuthorizedOperations in the
> DescribeMirrorsResponse will work nicely now.
> >
> > AS9: Thanks for the table of error codes. Looks comprehensive, but
> you'll need MIRROR_AUTHORIZATION_FAILED too I think.
> >
> > AS10-15: Thanks. Looks good.
> >
> > AS16: Thanks. But I now have AS24 below. Sorry.
> >
> > AS17: Thanks. Looks good.
> >
> > AS18: The epoch information has been reworked in the latest version.
> Looks good to me.
> >
> > AS19-21: Thanks. Looks good.
> >
> > AS22-23: Also, see AS24 below.
> >
> > And here are a few new comments.
> >
> > AS24: It seems to me that the list of topics being mirrored is really a
> property of the mirror resource. Having `mirror.name` as a topic config,
> and then overloading it with various state suffixes seems a bit inelegant.
> >
> > I suggest:
> > * The mirror name follows the same rules as topic name (which it cannot
> quite do as the KIP is written because of .deleted and so on).
> > * The list of topics are a property of the mirror. Adding and removing
> topics mutates the mirror resource.
> > * The mirror name is no longer a topic config. Then, you do not need
> special handling to hide it if the user is not authorized to describe the
> cluster mirror, and you don't need to fiddle with the names as the topic's
> mirroring state changes. Since the state changes are mediated by the
> cluster mirroring components, keeping control of the state in the mirror
> resource seems workable.
> > * The mirror state for the topics in the mirror is also handled as
> properties or metadata of the mirror resource.
> > * You probably would need DESCRIBE on TOPIC to see that a topic was
> being mirrored, as well as DESCRIBE on MIRROR. This matches resources such
> as consumer groups where you can only see the committed offsets for the
> topics you can describe.
> >
> > I know this is largely a matter of opinion, so feel free to reject my
> suggestion.
> >
> > AS25: When cluster mirroring creates a topic on the destination, I
> wonder why it does not inherit the replication factor of the source topic
> by default. I can understand why you might want source and mirror topics to
> have different replication factors, but I think the default is currently
> the default replication factor for the destination cluster, as opposed to
> the replication factor of the source topic.
> >
> > AS26: The source topic and mirror topic use the same topic ID. I like
> the simplicity of this, but there's a theoretical implication which I
> thought I would raise. Although Apache Kafka itself does not support
> multi-tenancy yet, people have built multi-tenancy on top using proxies and
> techniques such as adding and removing topic name prefixes transparently to
> the clients. It seems to me that multi-tenancy in Apache Kafka is a gap
> waiting to be filled and before long a suitable KIP will be brought
> forward. With such techniques, if someone tried to use cluster mirroring
> where the source and destination "clusters" were actually virtual clusters
> on the same Kafka cluster, the attempt to create the mirror topic with the
> same topic ID would fail. I'm sure this is just a theoretical concern for
> AK because the project itself doesn't have multi-tenancy, yet, but I
> wondered what the authors think about this.
> >
> > AS27: This is definitely a matter of personal taste and it probably
> indicates that my mental model of the KIP differs from the authors'. It
> seems to me that `kafka-mirrors.sh --remove` should remove a topic from a
> mirror because mirroring it is no longer required. However, this is
> actually the command for initiating failover. Should there be a separate
> `kafka-mirrors.sh --failover`?
> >
> > Thanks,
> > Andrew
> >
> > On 2026/04/03 09:35:47 Federico Valeri wrote:
> > > Hello everyone,
> > >
> > > Thank you all for the thoughtful questions and suggestions, and thanks
> > > to Michael Maison for proposing a better approach to the chained
> > > mirroring problem. Some of these required careful consideration and
> > > led us to refine the design.
> > >
> > > For reviewers, I recommend focusing on the paragraphs with significant
> changes:
> > >
> > > 1. MirrorFetcherThread (updated)
> > > 2. Security Control (updated)
> > > 3. Idempotent Producer (updated)
> > > 4. Command Workflows (new)
> > > 5. Error Names (new)
> > >
> > > Below are answers to the outstanding questions.
> > >
> > > AS6, AS7: I added a table to "Security Control" illustrating
> > > authorization requirements for each component. Let me know if it
> > > covers what you had in mind.
> > >
> > > AS8: I removed that field for now. It is on my backlog and I will post
> > > an update soon.
> > >
> > > AS9, AS17: There is a new "Errors" paragraph with a table listing new
> > > and reused error codes, along with the RPCs that use each one.
> > >
> > > AS18: Could you clarify which LeaderEpoch checks you are referring to?
> > > Here is our understanding:
> > >
> > > 1. WriteMirrorStatesRequest: The coordinator validates that the
> > > current leader epoch matches the request leader epoch, rejecting
> > > outdated requests.
> > > 2. ReadMirrorStatesResponse: The coordinator includes the current
> > > leader epoch in the response, and the leader node validates it against
> > > its own current leader epoch.
> > > 3. LastMirroredOffsetsResponse: We replaced this with
> > > LastMirroredEpochsResponse, so we believe this no longer applies.
> > >
> > > AS22: Description updated to: "This property is filtered out from
> > > DescribeConfigs responses to avoid exposing internal state to users."
> > >
> > > ---
> > >
> > > RS1: I added the DeleteMirror RPC and updated all relevant sections.
> > >
> > > RS5: As you suggested, I introduced a new ResourceType called
> > > ClusterMirror. See "Security Control."
> > >
> > > RS6: The refactoring suggestions make sense, but we need more time to
> > > think them through.
> > >
> > > RS9: Until we support tiered storage, we will mirror all records
> > > including data in remote storage into the destination cluster.
> > >
> > > RS10: There is a future improvement briefly described in "Bandwidth
> > > Control." Let me know if you need more details.
> > >
> > > ---
> > >
> > > JR3: I created a new "Command Workflows" section that covers the
> > > workflow for each mirror operation, including failover and failback.
> > > Let me know if you spot any issues.
> > >
> > > I hope I have addressed everything, but please let me know if I missed
> > > any question.
> > >
> > > Cheers
> > > Fede
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Mar 20, 2026 at 5:12 AM Luke Chen <[email protected]> wrote:
> > > >
> > > > Hi Jun,
> > > >
> > > > Thanks for the review comments.
> > > > Answer them below.
> > > >
> > > > > JR1. "Epoch rewriting: When records are appended to the
> destination log,
> > > > the batch epochs are rewritten to match the destination cluster's
> leader
> > > > epochs, maintaining consistency within the destination cluster."
> This has a
> > > > couple of impacts.
> > > >
> > > > We have an updated design to support unclean leader election in this
> doc,
> > > > where we will NOT rewrite the leader epoch anymore.
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring
> > > >
> > > >
> > > > > JR2. "Tiered Storage is not initially supported": Ideally, we
> should
> > > > support tiered storage. Same as RS9, the destination cluster issues
> > > > consumer requests, which support tiering.
> > > > The OffsetMovedToTieredStorageException is used by replication
> > > > inside AbstractFetcherThread. This suggests that it's probably not a
> good
> > > > fit for cluster mirroring to use AbstractFetcherThread.
> > > >
> > > > Yes, thanks for pointing this out. I was wrong about this. Before we
> > > > support tiered storage, the mirroring will mirror all records
> including
> > > > data in the remote storage into the destination cluster.
> > > >
> > > > > JR3. For each new cluster mirroring command, it would be useful to
> > > > document
> > > > the underlying workflow (e.g, which RPC requests are issued, to
> which node;
> > > > what records are written to metadata topic, or internal topic, which
> > > > actions are triggered on the broker, etc).
> > > >
> > > > Will do.
> > > >
> > > > > JR4. Truncating a log to LMO. Currently, there is no internal API
> that
> > > > truncates a partition from the end. Could you describe how this will
> be
> > > > implemented to ensure all replicas are consistent after the
> truncation?
> > > >
> > > > The truncation flow is like this:
> > > > 1. When the MirrorMetadataManager in the node gets notified about the
> > > > partition leader assignment when onMetadataUpdate (via TopicsDelta),
> it'll
> > > > query the mirror coordinator about mirror partition state.
> > > > 2. When it's the PREPARING state, the MirrorMetadataManager in the
> leader
> > > > node will get the last mirrored offset (or epoch) from the source
> cluster
> > > > (new API) and then do the log truncate.
> > > > 3. In (2), we'll also register a callback in Partition instance, and
> wait
> > > > until all ISRs complete the truncation by checking the follower
> replica's
> > > > LEO.
> > > > 4. In (3), this check will be invoked every time the leader node
> update
> > > > follower fetch state, like how we check if high watermark should be
> > > > incremented.
> > > > 5. After all ISRs complete the truncation, we'll invoke the callback
> and
> > > > move the mirror partition state to MIRRORING, and then start
> fetching data
> > > > from the source cluster.
> > > >
> > > > Note:
> > > > (1) In PREPARING state, the partition is READ-ONLY, so there will no
> any
> > > > data written in the leader node
> > > > (2) During step (1) ~ (4), if any leadership change happens, the new
> leader
> > > > will start from step (1) to complete the log truncation process.
> > > > (3) If unclean leader election is supported
> > > > (i.e. mirror.support.unclean.leader.election=true), then we'll wait
> until
> > > > ALL registered replicas complete the truncation before moving on to
> > > > MIRRORING state.
> > > >
> > > > We'll update the KIP in the following days to address the community
> > > > feedback. Some questions need more thinking.
> > > > Please give us some time. :)
> > > >
> > > > Thank you,
> > > > Luke
> > > >
> > > >
> > > > On Fri, Mar 20, 2026 at 9:10 AM Jun Rao via dev <
> [email protected]>
> > > > wrote:
> > > >
> > > > > Hi, Federico,
> > > > >
> > > > > Thanks for the KIP. A few comments.
> > > > >
> > > > > JR1. "Epoch rewriting: When records are appended to the
> destination log,
> > > > > the batch epochs are rewritten to match the destination cluster's
> leader
> > > > > epochs, maintaining consistency within the destination cluster."
> This has a
> > > > > couple of impacts.
> > > > > JR1.1 How do we ensure that the leader epoch in committed offsets
> is
> > > > > consistent with the leader epoch in the batch? This consistency is
> > > > > important when the consumer fails over to a different cluster. It
> seems the
> > > > > KIP doesn't translate the leader epoch when mirroring the comitted
> offsets.
> > > > > JR1.2 Typically, leader epochs increase monotonically in the log.
> Do we
> > > > > ensure this remains the case after failover and failback?
> > > > >
> > > > > JR2. "Tiered Storage is not initially supported": Ideally, we
> should
> > > > > support tiered storage. Same as RS9, the destination cluster issues
> > > > > consumer requests, which support tiering.
> > > > > The OffsetMovedToTieredStorageException is used by replication
> > > > > inside AbstractFetcherThread. This suggests that it's probably not
> a good
> > > > > fit for cluster mirroring to use AbstractFetcherThread.
> > > > >
> > > > > JR3. For each new cluster mirroring command, it would be useful to
> document
> > > > > the underlying workflow (e.g, which RPC requests are issued, to
> which node;
> > > > > what records are written to metadata topic, or internal topic,
> which
> > > > > actions are triggered on the broker, etc).
> > > > >
> > > > > JR4. Truncating a log to LMO. Currently, there is no internal API
> that
> > > > > truncates a partition from the end. Could you describe how this
> will be
> > > > > implemented to ensure all replicas are consistent after the
> truncation?
> > > > >
> > > > > Jun
> > > > >
> > > > > On Mon, Mar 16, 2026 at 2:44 AM Federico Valeri <
> [email protected]>
> > > > > wrote:
> > > > >
> > > > > > I mentioned a corner case in the chained mirroring use case. Let
> me
> > > > > > clarify what I mean with a simple example:
> > > > > >
> > > > > > 1. B is fetching from A, and C is fetching from B (A --> B --> C)
> > > > > > 2. A producer with PID 5 sends records to A
> > > > > > 3. Failover happens and B becomes writable (A -x-> B --> C)
> > > > > > 4. A different producer with PID 5 sends records to B
> > > > > > 5. Collision on cluster C (two different producers mapped to PID
> -7 in C)
> > > > > >
> > > > > > (arrows represent data flowing, not fetch direction)
> > > > > >
> > > > > >
> > > > > > On Sun, Mar 15, 2026 at 7:14 PM Federico Valeri <
> [email protected]>
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi Rajini, thanks for your thoughtful review and for catching
> a few
> > > > > > > bugs. I'll skip some questions that we will address later.
> > > > > > >
> > > > > > > RS2: The metadata records are described in "Mirror Metadata
> Records"
> > > > > > > paragraph. Currently there are only two records:
> "LastMirroredOffsets"
> > > > > > > record tracks the latest successfully mirrored offset for each
> > > > > > > partition, while "MirrorPartitionState" record represents the
> > > > > > > lifecycle states of a mirrored partition.
> > > > > > >
> > > > > > > RS3: That's a good point that was also raised by Andrew. It
> was an
> > > > > > > easy solution that we used for our prototype, but we need to
> think
> > > > > > > about a better solution.
> > > > > > >
> > > > > > > RS4: Current design is that mirror fetcher threads behaves
> like a
> > > > > > > read_committed consumer fetching up to "source LSO". On
> failover we
> > > > > > > truncate destination log to "local LSO".
> > > > > > >
> > > > > > > The approach of fetching up to HW that you propose is still
> safe as we
> > > > > > > keep truncating to local LSO on failover, but it trades lower
> > > > > > > steady-state lag (especially when long-running transactions
> exist on
> > > > > > > the source) for more data loss on failover (the net data loss
> relative
> > > > > > > to the source is the same in both approaches). In other words,
> with
> > > > > > > your approach we fetch more data that we may then need to
> truncate.
> > > > > > > Also, read_uncommited consumers on the destination cluster
> would be
> > > > > > > able to read records that may be truncated on failover. These
> are just
> > > > > > > my consideration, but we are open to discussion on which is
> the best
> > > > > > > approach here.
> > > > > > >
> > > > > > > When a failover is triggered (RemoveTopicsFromMirror), the
> sequence is:
> > > > > > >
> > > > > > > 1. Partitions transition to STOPPING state
> > > > > > > 2. Fetchers are removed
> > > > > > > 3. For each partition, truncate to local LSO is called
> > > > > > >   3.1. Reads LSO from each partition's local log
> > > > > > >   3.2. Calls log.truncateTo(offset) on the UnifiedLog
> > > > > > >   3.3. Ensures ISR members complete truncation before the
> partition
> > > > > > > becomes writable
> > > > > > > 4. For each partition, the LSO is recorded as the last
> mirrored offset
> > > > > > > (LMO) in __mirror_state
> > > > > > > 5. Partitions transition to STOPPED and become writable
> > > > > > >
> > > > > > > When a failback is triggered (AddTopicsToMirror), the sequence
> is:
> > > > > > >
> > > > > > > 1. Partitions transition to PREPARING state
> > > > > > > 2. For each partition, truncation to LMO is called
> > > > > > >   2.1. This sends a LastMirroredOffsetsRequest to the source
> cluster
> > > > > > > to fetch the offsets that were recorded during the previous
> failover
> > > > > > >   2.2.a. The response offsets are used to truncate local logs
> > > > > > >   2.2.b. If the source cluster doesn't support the
> LastMirroredOffsets
> > > > > > > API or first-time mirror, it truncates to offset 0
> > > > > > > 3. Partitions transition to MIRRORING
> > > > > > >
> > > > > > > RS7: Can you point me to the section that says configs are
> stored in
> > > > > > > __mirror_state? Mirror connection configs (bootstrap servers,
> > > > > > > credentials, etc.) are stored in KRaft metadata via
> > > > > > > ConfigResource.Type.MIRROR, not in __mirror_state. The
> internal topic
> > > > > > > only stores partition states and last mirrored offsets.
> Sensitive
> > > > > > > credentials follow the existing KRaft config handling, which is
> > > > > > > already protected by controller/broker access controls and
> sensitive
> > > > > > > config redaction in DescribeConfigs responses.
> > > > > > >
> > > > > > > RS8: Not sure what's the recommended approach here. Adding a
> new error
> > > > > > > code does not change the response schema and older clients
> that don't
> > > > > > > recognize the new error code will surface it as an
> > > > > > > UnknownServerException (non-retriable).
> > > > > > >
> > > > > > > RS11: Good catch. This is a prototype simplification that we
> need to
> > > > > > > address. To properly sync consumer group offsets, the
> implementation
> > > > > > > would need to send ListGroups to all source brokers (or use the
> > > > > > > AdminClient which does this internally), send FindCoordinator
> to
> > > > > > > discover the group coordinator for each group, send
> OffsetFetch to the
> > > > > > > correct group coordinator.
> > > > > > >
> > > > > > > RS12: You are absolutely right, the transformation is not
> idempotent,
> > > > > > > so it is not safe for chained mirroring (A -> B -> C). Instead,
> > > > > > > round-trip mirroring (A -> B, then B -> A) works because, when
> doing a
> > > > > > > failback, the log is truncated before mirroring resumes, so
> previously
> > > > > > > mirrored records with negative pids are removed and the
> transformation
> > > > > > > is only applied to new records produced natively on that
> cluster
> > > > > > > (double-transformation never occurs). Non-transactional
> batches stay
> > > > > > > at -1 [ -(-1 + 2) = -(1) = -1], which is correct.
> > > > > > >
> > > > > > > The chained mirroring would work if we skip the transformation
> when
> > > > > > > pid is negative, but there is still an edge case: A -> B -> C
> with
> > > > > > > local B producer. If cluster A has local pid 5 and cluster B
> also has
> > > > > > > local pid 5, both end up as -7 on cluster C. Collision: two
> different
> > > > > > > producers with the same pid on the destination. No pid-only
> > > > > > > transformation can solve that. We would need to incorporate
> cluster
> > > > > > > identity.
> > > > > > >
> > > > > > > Possible solution that would handle any topology: The producer
> IDs are
> > > > > > > 64-bit signed longs used to identify a producer. The clusterId
> (UUID)
> > > > > > > is a globally 128-bit unique identifier for each source
> cluster. We
> > > > > > > could use the clusterId hash to partition the entire negative
> PID
> > > > > > > space into regions, one per source cluster. Basically we
> divide the 64
> > > > > > > bits into three fields: bit 63 (sign bit), bits 62-31 (region
> > > > > > > selector), bits 30-0 (producer identity). Once a non-negative
> PID is
> > > > > > > mapped to a region, it passes through unchanged no matter how
> many
> > > > > > > hops follow (i.e. we apply the transformation only for PIDs >=
> 0).
> > > > > > >
> > > > > > > Example with two clusters:
> > > > > > >
> > > > > > > - Bit 63: This is the sign bit that makes the value negative
> and
> > > > > > > distinguishes mirrored pids from local ones (which are
> non-negative).
> > > > > > > - Bits 62-31 cluster A: clusterId = "abc-123", clusterHash = 42
> > > > > > > - Bits 62-31 cluster B: clusterId = "xyz-789", clusterHash = 99
> > > > > > > - Bits 30-0: Local producer ID 5 that is the same on both
> clusters.
> > > > > > >
> > > > > > >   A's pid 5 -->
> > > > > > >
> 1|00000000000000000000000000101010|0000000000000000000000000000101
> > > > > > >   B's pid 5 -->
> > > > > > >
> 1|00000000000000000000000001100011|0000000000000000000000000000101
> > > > > > >
> > > > > > > On Wed, Mar 11, 2026 at 1:23 PM Rajini Sivaram <
> > > > > [email protected]>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > A few more questions about the KIP for clarification:
> > > > > > > >
> > > > > > > > RS8: The KIP says produce requests to mirror topics will
> throw
> > > > > > > > ReadOnlyTopicException. For Produce Requests returning a new
> error to
> > > > > > > > clients, don’t we need to bump Produce request version?
> > > > > > > >
> > > > > > > > RS9: The KIP says we use OffsetMovedToTieredStorageException
> to
> > > > > prevent
> > > > > > > > mirroring of data in tiered storage. But doesn’t the mirror
> client
> > > > > look
> > > > > > > > like a regular consumer to the source cluster and return
> records
> > > > > > fetched
> > > > > > > > from tiered storage?
> > > > > > > >
> > > > > > > > RS10: Client-id based quotas for the source cluster look
> hard to
> > > > > manage
> > > > > > > > since there is no hierarchy or grouping possible. Seems
> better to
> > > > > rely
> > > > > > on
> > > > > > > > secure user-principal based quotas on the source-side.
> > > > > > > >
> > > > > > > > RS11: The KIP says `The manager maintains a connection pool
> with one
> > > > > > > > blocking sender per source cluster`. If this is the
> connection used
> > > > > for
> > > > > > > > periodic sync of offsets, topic configs etc. the coordinator
> is
> > > > > likely
> > > > > > to
> > > > > > > > need connections to all source brokers (i.e. all group
> coordinators).
> > > > > > > >
> > > > > > > > RS12: The KIP proposes to transform producer ids for mirror
> records
> > > > > to
> > > > > > > > avoid conflicts. This comes at a cost because CRC checksum
> needs to
> > > > > be
> > > > > > > > recomputed. To justify this cost, we need to ensure that this
> > > > > > > > transformation works in all cases. What happens if you are
> mirroring
> > > > > a
> > > > > > > > mirror topic? Is that a supported scenario? Or mirroring back
> > > > > mirrored
> > > > > > data
> > > > > > > > during failback because the source was truncated?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Rajini
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Mar 10, 2026 at 8:19 PM Rajini Sivaram <
> > > > > > [email protected]>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi team,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP! I have a few questions, mostly
> clarification at
> > > > > > this
> > > > > > > > > point.
> > > > > > > > >
> > > > > > > > > RS1: There is a `CreateMirror` request but no corresponding
> > > > > > `DeleteMirror`
> > > > > > > > > request. Is that intentional?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > RS2: It will be good to define the format of data going
> into the
> > > > > > internal
> > > > > > > > > mirror state topic. There is an example under
> kafka-dump-logs,
> > > > > which
> > > > > > > > > shows partition-level state in the payload and the mirror
> name as
> > > > > > key. I
> > > > > > > > > guess that is not what we expect it to be. Do we delete
> this
> > > > > > information
> > > > > > > > > when a topic is deleted or a mirror is deleted?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > RS3: KIP currently says mirror name cannot end with
> .removed. I
> > > > > > guess it
> > > > > > > > > cannot also end with .paused.  Have we considered storing
> state and
> > > > > > > > > mirror name separately, but updated together for a topic?
> Since new
> > > > > > > > > states may be added in future, name restrictions may
> become hard to
> > > > > > > > > implement.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > RS4: The KIP says *“mirroring must fetch only up to the
> LSO to
> > > > > > maintain
> > > > > > > > > transactional consistency”* and it also says *“During the
> mirror
> > > > > > stopping
> > > > > > > > > transition, the MirrorCoordinator performs a log truncation
> > > > > > operation that
> > > > > > > > > resets each mirror partition to its LSO.”*  I guess the
> plan is to
> > > > > > fetch
> > > > > > > > > up to high watermark and truncate to locally computed LSO
> on
> > > > > > failover?
> > > > > > > > > Details of the sequence here will be useful. How does
> > > > > > MirrorCoordinator
> > > > > > > > > perform truncation?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > RS5: The KIP says “*On the destination cluster,
> mirror-related
> > > > > > operations
> > > > > > > > > (creating mirrors, adding/removing topics from mirrors,
> managing
> > > > > > mirror
> > > > > > > > > configurations) require the CLUSTER_ACTION permission on
> the
> > > > > cluster
> > > > > > > > > resource.*” The  `Cluster:ClusterAction` ACL is currently
> used for
> > > > > > broker
> > > > > > > > > service account, e.g. local replication is authorized
> using this.
> > > > > It
> > > > > > seems
> > > > > > > > > odd to grant this permission to users managing a resource
> on the
> > > > > > cluster.
> > > > > > > > > Have we considered adding a new resource type
> `ClusterMirror` and
> > > > > > define
> > > > > > > > > ACLs like `ClusterMirror:Create`, `ClusterMirror:Alter`
> and `
> > > > > > > > > ClusterMirror:AlterConfigs`?
> > > > > > > > >
> > > > > > > > > RS6: The KIP talks about three entities: Cluster Mirror,
> Mirror
> > > > > > Topic and Mirror
> > > > > > > > > Partition, with Cluster Mirroring as the feature name.
> Since we
> > > > > > already
> > > > > > > > > have MirrorMaker that also refers to mirrors, it will be
> nice if we
> > > > > > can
> > > > > > > > > refer to the entities using their full name in the CLI and
> public
> > > > > > APIs.
> > > > > > > > > That will enable us to add more mirror topic and mirror
> partition
> > > > > > APIs in
> > > > > > > > > the future if needed. For example:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >    - `kafka-cluster-mirrors.sh` to manage cluster mirrors
> > > > > > > > >    -  createClusterMirrors(), listClusterMirrors(),
> > > > > > > > >    describeClusterMirrors() etc on the Admin API and Kafka
> > > > > Protocol.
> > > > > > > > >    -  KIP proposes pauseMirrorTopics(),
> resumeMirrorTopics() which
> > > > > > are
> > > > > > > > >    good.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > RS7: The KIP proposes to store mirror configs in the
> internal
> > > > > mirror
> > > > > > state
> > > > > > > > > topic. This includes sensitive credentials of another
> cluster. Have
> > > > > > we
> > > > > > > > > considered other options? Can a user with read access read
> the data
> > > > > > from
> > > > > > > > > the state topic using a consumer?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Rajini
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Sun, Mar 8, 2026 at 8:58 PM Andrew Schofield <
> > > > > > [email protected]>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi Fede and friends,
> > > > > > > > >> I've re-read in detail and have quite a lot of comments,
> mostly
> > > > > > minor
> > > > > > > > >> clarifications, but as it approaches a vote, it's good to
> get the
> > > > > > details
> > > > > > > > >> nailed down.
> > > > > > > > >>
> > > > > > > > >> AS6: Could we have a diagram which shows which RPCs are
> served by
> > > > > > which
> > > > > > > > >> components? This will help illustrate the authorisation
> > > > > > requirements for
> > > > > > > > >> the various components, which is an aspect of the KIP
> that I don't
> > > > > > think is
> > > > > > > > >> completely specified yet.
> > > > > > > > >>
> > > > > > > > >> AS7: Please could you include a table of the operations
> and
> > > > > > resources
> > > > > > > > >> which will be checked for authorisation of each of the
> RPCs
> > > > > > introduced.
> > > > > > > > >> Also, please could you document the permissions which the
> > > > > > destination
> > > > > > > > >> cluster will require to mirror data and ACLs (for
> example, I think
> > > > > > it will
> > > > > > > > >> need ALTER on the CLUSTER resource to manipulate ACLs)?
> It's going
> > > > > > to need
> > > > > > > > >> Metadata, DescribeConfigs, DescribeAcls, ListGroups,
> OffsetFetch,
> > > > > > > > >> LastMirrorOffset and Fetch RPCs I think, possibly others
> too. The
> > > > > > user is
> > > > > > > > >> probably going to want to give as little permission as
> possible to
> > > > > > the
> > > > > > > > >> destination cluster to get its job done.
> > > > > > > > >>
> > > > > > > > >> AS8: You include AuthorizedOperations in
> DescribeMirrorsResponse,
> > > > > > but I
> > > > > > > > >> don't know what the operations are. I think the implies
> MIRROR is
> > > > > a
> > > > > > new
> > > > > > > > >> resource type in the Kafka security model and
> DescribeMirrors can
> > > > > > be used
> > > > > > > > >> to enquire the authorised operations for the client
> making the
> > > > > > Admin API
> > > > > > > > >> request.
> > > > > > > > >>
> > > > > > > > >> AS9: I think you're going to need some new error codes in
> the
> > > > > Kafka
> > > > > > > > >> protocol, as least:
> > > > > > > > >>
> > > > > > > > >> * INVALID_MIRROR_NAME or similar if the mirror name
> doesn't meet
> > > > > the
> > > > > > > > >> rules for a topic name
> > > > > > > > >> * UNKNOWN_MIRROR if the mirror doesn't exist
> > > > > > > > >>
> > > > > > > > >> And probably some more for logical inconsistencies such
> as this
> > > > > > topic
> > > > > > > > >> isn't in that mirror, that topic is already in another
> mirror, and
> > > > > > so on.
> > > > > > > > >>
> > > > > > > > >> AS10: Could you add the usage information for
> kafka-mirrors.sh
> > > > > (the
> > > > > > > > >> intended output from kafka-mirrors.sh --help) so all of
> the
> > > > > options
> > > > > > are
> > > > > > > > >> documented together? For example, I see that
> --replication-factor
> > > > > is
> > > > > > > > >> included in one of the examples, which seems a bit
> surprising and
> > > > > > I'm not
> > > > > > > > >> sure whether it's a mistake or a feature. I can probably
> use
> > > > > > --describe
> > > > > > > > >> with a specific --mirror but it's not specified.
> > > > > > > > >>
> > > > > > > > >> AS11: I would expect the signature for
> Admin.addTopicsToMirror to
> > > > > be
> > > > > > > > >> Admin.addTopicsToMirror(String mirrorName, Set<String>
> topics,
> > > > > > > > >> AddTopicsToMirrorOptions options) because it's for adding
> topics
> > > > > to
> > > > > > a
> > > > > > > > >> mirror, as the counterpart to
> Admin.removeTopicsFromMirror(String
> > > > > > > > >> mirrorName, Set<String> topics,
> RemoveTopicsFromMirrorOptions
> > > > > > options).
> > > > > > > > >>
> > > > > > > > >> AS12: I don't think ignorable RPC fields in version 0
> RPCs make
> > > > > > sense
> > > > > > > > >> because they're not trying to be compatible with a
> previous
> > > > > version.
> > > > > > > > >>
> > > > > > > > >> AS13: I would have expected AddTopicsToMirrorRequest to
> have
> > > > > mirror
> > > > > > name
> > > > > > > > >> above the list of topics because the same mirror name
> applies to
> > > > > > all of the
> > > > > > > > >> topics being added. As specified, you repeat the mirror
> name for
> > > > > > all of the
> > > > > > > > >> topics.
> > > > > > > > >>
> > > > > > > > >> AS14: I suggest adding ErrorMessage to the responses in
> all cases
> > > > > > to make
> > > > > > > > >> it easier to give more descriptive exception messages
> than just
> > > > > the
> > > > > > default
> > > > > > > > >> for the error codes.
> > > > > > > > >>
> > > > > > > > >> AS15: I may have the wrong end of the stick here, but I
> expected
> > > > > > > > >> RemoveTopicsFromMirrorRequest to remove the topics from a
> specific
> > > > > > named
> > > > > > > > >> mirror as implied by the example of the kafka-mirrors.sh
> command.
> > > > > > In fact,
> > > > > > > > >> I was expecting the mirror to contain the topics in the
> admin RPC
> > > > > > requests
> > > > > > > > >> and responses, and that's only true for about half of
> them.
> > > > > > > > >>
> > > > > > > > >> AS16: Can I change the mirror.name config using
> > > > > > IncrementalAlterConfigs?
> > > > > > > > >> If I attempt it, what's the error?
> > > > > > > > >>
> > > > > > > > >> AS17: If I attempt mirror RPCs when the mirror is in the
> wrong
> > > > > > state, the
> > > > > > > > >> error is specified as INVALID_REQUEST. That's usually
> kept for
> > > > > > badly formed
> > > > > > > > >> requests, as opposed to logically invalid ones. Maybe
> > > > > > MIRROR_NOT_STOPPED or
> > > > > > > > >> MIRRORING_ACTIVE or similar would be more expressive.
> > > > > > > > >>
> > > > > > > > >> AS18: Should the LastMirroredOffsetsResponse,
> > > > > > ReadMirrorStatesResponse
> > > > > > > > >> and WriteMirrorStatesRequest include LeaderEpoch? I
> suspect so.
> > > > > > > > >>
> > > > > > > > >> AS19: In DescribeMirrorsResponse, I suspect you will want
> "null"
> > > > > > values
> > > > > > > > >> for some fields which don't have values during
> initialisation and
> > > > > > so on,
> > > > > > > > >> such as lag.
> > > > > > > > >>
> > > > > > > > >> AS20: Do you need to add new versions of the
> DescribeConfigs and
> > > > > > > > >> IncrementalAlterConfigs RPCs to support mirror resources?
> > > > > > > > >>
> > > > > > > > >> AS21: The topic configuration
> > > > > mirror.replication.throttled.replicas
> > > > > > is
> > > > > > > > >> described as a list, but the default is MAX_LONG.
> > > > > > > > >>
> > > > > > > > >> AS22: By including mirror.name as a topic config, a
> client which
> > > > > > has
> > > > > > > > >> permission to describe configs for the topic is able to
> discover
> > > > > > the name
> > > > > > > > >> of the mirror, whether they are permitted to list the
> mirrors or
> > > > > > describe
> > > > > > > > >> that particular mirror. Generally, the Kafka
> authorisation model
> > > > > > does not
> > > > > > > > >> permit this kind of unauthorised information disclosure.
> For
> > > > > > example, when
> > > > > > > > >> a client describes the committed offsets for a consumer
> group, the
> > > > > > list of
> > > > > > > > >> topics returned is filtered to only those topics which
> the client
> > > > > is
> > > > > > > > >> permitted to describe, even though that may results in an
> > > > > > incomplete set of
> > > > > > > > >> topic partitions being returned. Is there an alternative
> way in
> > > > > > which this
> > > > > > > > >> information could be stored so Kafka only reveals mirror
> > > > > > information to
> > > > > > > > >> principals authorised to see it?
> > > > > > > > >>
> > > > > > > > >> AS23: I observe that there are situations in which a
> `.removed`
> > > > > > suffix is
> > > > > > > > >> added to the mirror name. Is it permitted for the user to
> define a
> > > > > > mirror
> > > > > > > > >> called "my.nasty.mirror.removed" and does it break
> anything?
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> Thanks,
> > > > > > > > >> Andrew
> > > > > > > > >>
> > > > > > > > >> On 2026/03/06 13:41:52 Paolo Patierno wrote:
> > > > > > > > >> > Hi Fede,
> > > > > > > > >> > something more ...
> > > > > > > > >> >
> > > > > > > > >> > Is there any migration path for users who want to
> migrate from
> > > > > > using
> > > > > > > > >> Mirror
> > > > > > > > >> > Maker 2 to the cluster mirroring?
> > > > > > > > >> > I mean, something like a tool useful to create a
> corresponding
> > > > > > cluster
> > > > > > > > >> > mirroring configuration starting from a MM2 one.
> Nothing that
> > > > > > runs the
> > > > > > > > >> > migration automatically but something that can be
> provided to
> > > > > the
> > > > > > users
> > > > > > > > >> as
> > > > > > > > >> > output to be validated and put in place by them.
> > > > > > > > >> >
> > > > > > > > >> > The Admin Client is missing methods to pause and stop
> mirroring
> > > > > > (but we
> > > > > > > > >> > have corresponding protocol messages). Is it on
> purpose? Any
> > > > > > specific
> > > > > > > > >> > reasons? They would be important from an automatic
> operator
> > > > > > perspective
> > > > > > > > >> use
> > > > > > > > >> > case.
> > > > > > > > >> > Also a method to provide the LastMirroredOffset from
> the source
> > > > > > cluster
> > > > > > > > >> > could be useful for progress and tracking purposes.
> > > > > > > > >> > Finally, what about a method to get the mirror states?
> I don't
> > > > > > think the
> > > > > > > > >> > describe method provides such information.
> > > > > > > > >> > In general, I think that the Admin Client section needs
> to cover
> > > > > > in more
> > > > > > > > >> > details the new classes definition like
> CreateMirrorOptions,
> > > > > > > > >> > CreateMirrorResult, ... and so on for all the defined
> new
> > > > > methods.
> > > > > > > > >> >
> > > > > > > > >> > > AddTopicsToMirrorResult addTopicsToMirror(Map<String,
> String>
> > > > > > > > >> > topicToMirrorName, AddTopicsToMirrorOptions options);
> > > > > > > > >> >
> > > > > > > > >> > Isn't it missing the mirrorName (as you have in the
> > > > > > > > >> removeTopicsFromMirror
> > > > > > > > >> > counterpart)?
> > > > > > > > >> > What's the topicToMirrorName parameter if it's defined
> as a Map?
> > > > > > The
> > > > > > > > >> method
> > > > > > > > >> > is also plural using "topics" so comparing to the
> > > > > > removeTopicsFromMirror
> > > > > > > > >> > method, I would assume the parameter really is
> Set<String>
> > > > > topics?
> > > > > > > > >> > Comparing to the corresponding protocol message
> > > > > > > > >> AddTopicsToMirrorRequest, I
> > > > > > > > >> > see a list of topics but each of them has id, name and
> > > > > > corresponding
> > > > > > > > >> > mirror. So it's unclear how the addTopicsToMirror is
> defined.
> > > > > > > > >> >
> > > > > > > > >> > > RemoveTopicsFromMirrorResult
> removeTopicsFromMirror(String
> > > > > > mirrorName,
> > > > > > > > >> > Set<String> topics, RemoveTopicsFromMirrorOptions
> options);
> > > > > > > > >> >
> > > > > > > > >> > This method gets a mirrorName but if I look at the
> corresponding
> > > > > > > > >> protocol
> > > > > > > > >> > message RemoveTopicsFromMirrorRequest, it says "Allows
> users to
> > > > > > detach
> > > > > > > > >> > topics from their associated mirror" so the mirror is
> actually
> > > > > not
> > > > > > > > >> provided
> > > > > > > > >> > and it's exactly what I see in the JSON definition
> (only topics
> > > > > > list
> > > > > > > > >> with
> > > > > > > > >> > id and name).
> > > > > > > > >> >
> > > > > > > > >> > Finally, regarding the protocol change:
> > > > > > > > >> >
> > > > > > > > >> > * ListMirrorsResponse I would add the clusterId in the
> JSON
> > > > > > definition
> > > > > > > > >> > (it's related to my comments in the previous email when
> using
> > > > > the
> > > > > > tool).
> > > > > > > > >> > * WriteMirrorStatesRequest has the following in the
> JSON which
> > > > > > should
> > > > > > > > >> not
> > > > > > > > >> > be part of it "{ "name": "RemovedTopics", "type":
> "[]string",
> > > > > > > > >> "versions":
> > > > > > > > >> > "0+", "about": "The topic names to be removed." }"
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> > Paolo.
> > > > > > > > >> >
> > > > > > > > >> > On Fri, 6 Mar 2026 at 13:08, Paolo Patierno <
> > > > > > [email protected]>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Hi Fede,
> > > > > > > > >> > > thank you for the proposal. I had a first pass with
> following
> > > > > > > > >> thoughts and
> > > > > > > > >> > > questions.
> > > > > > > > >> > >
> > > > > > > > >> > > > When the unclean.leader.election.enable is set to
> true, the
> > > > > > broker
> > > > > > > > >> will
> > > > > > > > >> > > log a warning at every configuration synchronization
> period.
> > > > > > > > >> > > Be more explicit about what the warning says.
> > > > > > > > >> > >
> > > > > > > > >> > > > This topic ID is not used by other topics in the
> current
> > > > > > cluster
> > > > > > > > >> > > In such a case, which should be very unlikely, what's
> going to
> > > > > > happen?
> > > > > > > > >> > > Isn't it possible to mirror the topic?
> > > > > > > > >> > >
> > > > > > > > >> > > > To enable it, all cluster nodes (controllers and
> brokers)
> > > > > must
> > > > > > > > >> > > explicitly enable unstable API versions and unstable
> feature
> > > > > > versions
> > > > > > > > >> in
> > > > > > > > >> > > all configuration files. After starting the cluster
> with a
> > > > > > minimum
> > > > > > > > >> metadata
> > > > > > > > >> > > version, operators can dynamically enable the mirror
> version
> > > > > > feature
> > > > > > > > >> to
> > > > > > > > >> > > activate Cluster Mirroring.
> > > > > > > > >> > > AFAIU there is going to be a dedicated feature flag
> for it,
> > > > > > right? If
> > > > > > > > >> yes
> > > > > > > > >> > > can we state it clearly also specifying the exact
> name (i.e.
> > > > > > > > >> mirror.version
> > > > > > > > >> > > or something similar)?
> > > > > > > > >> > >
> > > > > > > > >> > > When running the kafka-mirrors.sh tool to list the
> mirrors,
> > > > > > other than
> > > > > > > > >> > > showing the SOURCE-BOOTSTRAP, it could be useful to
> have also
> > > > > > the
> > > > > > > > >> clusterId
> > > > > > > > >> > > which, as a unique identifier, could be helpful in
> automated
> > > > > > systems
> > > > > > > > >> using
> > > > > > > > >> > > the cluster mirroring. Of course, it would be
> important to
> > > > > have
> > > > > > in the
> > > > > > > > >> > > ListMirrorsResponse as well as an additional field.
> > > > > > > > >> > >
> > > > > > > > >> > > What happens in case of Kafka downgrade from a version
> > > > > > supporting
> > > > > > > > >> > > mirroring to an older one not supporting it.
> > > > > > > > >> > > The mirror won't be running but the topic
> configuration will
> > > > > > still
> > > > > > > > >> have
> > > > > > > > >> > > config parameters like mirror.name and so on, right?
> Are they
> > > > > > just
> > > > > > > > >> > > ignored by the older Kafka version and the cluster
> will work
> > > > > > without
> > > > > > > > >> issues?
> > > > > > > > >> > >
> > > > > > > > >> > > Thanks,
> > > > > > > > >> > > Paolo
> > > > > > > > >> > >
> > > > > > > > >> > > On Fri, 6 Mar 2026 at 10:43, Luke Chen <
> [email protected]>
> > > > > > wrote:
> > > > > > > > >> > >
> > > > > > > > >> > >> Hi Andrew and all,
> > > > > > > > >> > >>
> > > > > > > > >> > >> About AS5, yes, I've created a sub-document
> > > > > > > > >> > >> <
> > > > > > > > >> > >>
> > > > > > > > >>
> > > > > >
> > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring
> > > > > > > > >> > >> >to
> > > > > > > > >> > >> explain the algorithm to support unclean leader
> election in
> > > > > > cluster
> > > > > > > > >> > >> mirroring.
> > > > > > > > >> > >> Thanks for your comments, I'm inspired by that! :)
> > > > > > > > >> > >>
> > > > > > > > >> > >> About your idea, to store the owner of the leader
> epoch when
> > > > > > > > >> leadership
> > > > > > > > >> > >> change, I think it might not be needed because the
> most
> > > > > > important
> > > > > > > > >> thing
> > > > > > > > >> > >> should be this:
> > > > > > > > >> > >> > you might find that both ends have declared a
> local epoch
> > > > > N,
> > > > > > but
> > > > > > > > >> someone
> > > > > > > > >> > >> has to win.
> > > > > > > > >> > >>
> > > > > > > > >> > >> That is, as long as we have a way to declare who is
> the owner
> > > > > > of
> > > > > > > > >> leader
> > > > > > > > >> > >> epoch N, then the 2 clusters can sync up
> successfully.
> > > > > > > > >> > >> And that's why I proposed to the "last mirrored
> leader epoch"
> > > > > > > > >> semantic in
> > > > > > > > >> > >> the sub-proposal
> > > > > > > > >> > >> <
> > > > > > > > >> > >>
> > > > > > > > >>
> > > > > >
> > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring
> > > > > > > > >> > >> >,
> > > > > > > > >> > >> which is a solution to draw a line between these 2
> clusters
> > > > > to
> > > > > > > > >> declare
> > > > > > > > >> > >> records beyond the "last mirrored leader epoch" N,
> it belongs
> > > > > > to
> > > > > > > > >> who. I
> > > > > > > > >> > >> think this should work well, as long as all replicas
> in the
> > > > > > cluster
> > > > > > > > >> can
> > > > > > > > >> > >> truncate the log correctly.
> > > > > > > > >> > >>
> > > > > > > > >> > >> What do you think?
> > > > > > > > >> > >>
> > > > > > > > >> > >> Any feedback is appreciated.
> > > > > > > > >> > >>
> > > > > > > > >> > >> Thank you,
> > > > > > > > >> > >> Luke
> > > > > > > > >> > >>
> > > > > > > > >> > >> On Fri, Mar 6, 2026 at 6:02 PM Andrew Schofield <
> > > > > > > > >> [email protected]>
> > > > > > > > >> > >> wrote:
> > > > > > > > >> > >>
> > > > > > > > >> > >> > Hi Fede,
> > > > > > > > >> > >> > Thanks for your response.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > AS1: Thanks for the clarification.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > AS2: I expect you'll include a version bump of
> > > > > > > > >> AlterShareGroupOffsets in
> > > > > > > > >> > >> > this KIP, but that's a small matter compared with
> the rest
> > > > > > of the
> > > > > > > > >> > >> protocol
> > > > > > > > >> > >> > changes.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > AS3: OK.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > AS4: Thanks for the details. My only comment is
> that it
> > > > > > might be a
> > > > > > > > >> bit
> > > > > > > > >> > >> > laborious when you want to failover all topics. I
> suggest
> > > > > > adding
> > > > > > > > >> > >> > `--all-topics` so you could do:
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > $ bin/kafka-mirror.sh --bootstrap-server :9094
> --remove
> > > > > > > > >> --all-topics
> > > > > > > > >> > >> > --mirror my-mirror
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > AS5: Thanks for the response. I understand there
> are good
> > > > > > reasons
> > > > > > > > >> for
> > > > > > > > >> > >> the
> > > > > > > > >> > >> > way epochs are handled in the KIP. I see that
> there is a
> > > > > > > > >> sub-document
> > > > > > > > >> > >> for
> > > > > > > > >> > >> > the KIP about unclean leader election. I'll spend
> some time
> > > > > > > > >> reviewing
> > > > > > > > >> > >> that.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > Thanks,
> > > > > > > > >> > >> > Andrew
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > On 2026/02/18 13:27:07 Federico Valeri wrote:
> > > > > > > > >> > >> > > Hi Andrew, thanks for the review.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Let me try to answer your questions and then
> other
> > > > > authors
> > > > > > can
> > > > > > > > >> join
> > > > > > > > >> > >> > > the discussion.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > AS1
> > > > > > > > >> > >> > > ------
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Destination topics are created with the same
> topic IDs
> > > > > > using the
> > > > > > > > >> > >> > > extended CreateTopics API. Then, data is
> replicated
> > > > > > starting from
> > > > > > > > >> > >> > > offset 0 with byte-for-byte batch copying, so
> destination
> > > > > > offsets
> > > > > > > > >> > >> > > always match source offsets. When failing over,
> we record
> > > > > > the
> > > > > > > > >> last
> > > > > > > > >> > >> > > mirrored offset (LMO) in the destination
> cluster. When
> > > > > > failing
> > > > > > > > >> back,
> > > > > > > > >> > >> > > the LMO is used for truncating and then start
> mirroring
> > > > > the
> > > > > > > > >> delta,
> > > > > > > > >> > >> > > otherwise we start mirroring from scratch by
> truncating
> > > > > to
> > > > > > zero.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Retention: If the mirror leader attempts to
> fetch an
> > > > > > offset that
> > > > > > > > >> is
> > > > > > > > >> > >> > > below the current log start offset of the source
> leader
> > > > > > (e.g.
> > > > > > > > >> fetching
> > > > > > > > >> > >> > > offset 50 when log start offset is 100), the
> source
> > > > > broker
> > > > > > > > >> returns an
> > > > > > > > >> > >> > > OffsetOutOfRangeException that the mirror leader
> handles
> > > > > by
> > > > > > > > >> truncating
> > > > > > > > >> > >> > > to the source's current log start offset and
> resuming
> > > > > > fetching
> > > > > > > > >> from
> > > > > > > > >> > >> > > that point. Compaction: The mirror leader
> replicates
> > > > > these
> > > > > > > > >> compacted
> > > > > > > > >> > >> > > log segments exactly as they exist in the source
> cluster,
> > > > > > > > >> maintaining
> > > > > > > > >> > >> > > the same offset assignments and gaps.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Do you have any specific corner case in mind?
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > AS2
> > > > > > > > >> > >> > > ------
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Agreed. The current
> AlterShareGroupOffsetsRequest (v0)
> > > > > only
> > > > > > > > >> includes
> > > > > > > > >> > >> > > PartitionIndex and StartOffset with no epoch
> field. When
> > > > > > > > >> mirroring
> > > > > > > > >> > >> > > share group offsets across clusters, the epoch
> is needed
> > > > > to
> > > > > > > > >> ensure the
> > > > > > > > >> > >> > > offset alteration targets the correct leader
> generation.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > AS3
> > > > > > > > >> > >> > > ------
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Right, the enum is now fixed. Yes, we will parse
> from the
> > > > > > right
> > > > > > > > >> and
> > > > > > > > >> > >> > > apply the same naming rules used for topic name
> ;)
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > AS4
> > > > > > > > >> > >> > > -------
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Agreed. I'll try to improve those paragraphs
> because they
> > > > > > are
> > > > > > > > >> crucial
> > > > > > > > >> > >> > > from an operational point of view.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Let me shortly explain how it is supposed to
> work:
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > 9091 (source) -----> 9094 (destination)
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > The single operation that allows an operator to
> switch
> > > > > all
> > > > > > > > >> topics at
> > > > > > > > >> > >> > > once in case of disaster is the following:
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > bin/kafka-mirror.sh --bootstrap-server :9094
> --remove
> > > > > > --topic .*
> > > > > > > > >> > >> > > --mirror my-mirror
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > 9091 (source) --x--> 9094 (destination)
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > After that, all mirror topics become detached
> from the
> > > > > > source
> > > > > > > > >> cluster
> > > > > > > > >> > >> > > and start accepting writes (the two cluster are
> allowed
> > > > > to
> > > > > > > > >> diverge).
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > When the source cluster is back, the operator can
> > > > > failback
> > > > > > by
> > > > > > > > >> creating
> > > > > > > > >> > >> > > a mirror with the same name on the source
> cluster (new
> > > > > > > > >> destination):
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > echo "bootstrap.servers=localhost:9094" >
> > > > > > > > >> /tmp/my-mirror.properties
> > > > > > > > >> > >> > > bin/kafka-mirrors.sh --bootstrap-server :9091
> --create
> > > > > > --mirror
> > > > > > > > >> > >> > > my-mirror --mirror-config
> /tmp/my-mirror.properties
> > > > > > > > >> > >> > > bin/kafka-mirrors.sh --bootstrap-server :"9091
> --add
> > > > > > --topic .*
> > > > > > > > >> > >> > > --mirror my-mirror
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > 9091 (destination) <----- 9094 (source)
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > AS5
> > > > > > > > >> > >> > > -------
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > This is the core of our design and we reached
> that
> > > > > > empirically by
> > > > > > > > >> > >> > > trying out different options. We didn't want to
> change
> > > > > > local
> > > > > > > > >> > >> > > replication, and this is something you need to
> do when
> > > > > > > > >> preserving the
> > > > > > > > >> > >> > > source leader epoch. The current design is
> simple and
> > > > > > keeps the
> > > > > > > > >> epoch
> > > > > > > > >> > >> > > domains entirely separate. Destination cluster
> is in
> > > > > > charge of
> > > > > > > > >> the
> > > > > > > > >> > >> > > leader epoch for its own log. The source epoch
> is only
> > > > > used
> > > > > > > > >> during the
> > > > > > > > >> > >> > > fetch protocol to validate responses and detect
> > > > > divergence.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > The polarity idea of tracking whether an epoch
> bump
> > > > > > originated
> > > > > > > > >> from
> > > > > > > > >> > >> > > replication vs. local leadership change is
> interesting,
> > > > > > but adds
> > > > > > > > >> > >> > > significant complexity and coupling between
> source and
> > > > > > > > >> destination
> > > > > > > > >> > >> > > epochs. Could you clarify what specific scenario
> polarity
> > > > > > > > >> tracking
> > > > > > > > >> > >> > > would address that the current separation
> doesn't handle?
> > > > > > One
> > > > > > > > >> case we
> > > > > > > > >> > >> > > don't support is unclean leader election
> reconciliation
> > > > > > across
> > > > > > > > >> > >> > > clusters, is that the gap you're aiming at?
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > I tried to rewrite the unclean leader election
> paragraph
> > > > > > in the
> > > > > > > > >> > >> > > rejected alternatives to be easier to digest.
> Let me know
> > > > > > if it
> > > > > > > > >> works.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > On Tue, Feb 17, 2026 at 2:57 PM Andrew Schofield
> > > > > > > > >> > >> > > <[email protected]> wrote:
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > Hi Fede and friends,
> > > > > > > > >> > >> > > > Thanks for the KIP.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > It’s a comprehensive design, easy to read and
> has
> > > > > clearly
> > > > > > > > >> taken a
> > > > > > > > >> > >> lot
> > > > > > > > >> > >> > of work.
> > > > > > > > >> > >> > > > The principle of integrating mirroring into
> the brokers
> > > > > > makes
> > > > > > > > >> total
> > > > > > > > >> > >> > sense to me.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > The main comment I have is that mirroring like
> this
> > > > > > cannot
> > > > > > > > >> handle
> > > > > > > > >> > >> > situations
> > > > > > > > >> > >> > > > in which multiple topic-partitions are
> logically
> > > > > > related, such
> > > > > > > > >> as
> > > > > > > > >> > >> > transactions,
> > > > > > > > >> > >> > > > with total fidelity. Each topic-partition is
> being
> > > > > > replicated
> > > > > > > > >> as a
> > > > > > > > >> > >> > separate entity.
> > > > > > > > >> > >> > > > The KIP calls this out and describes the
> behaviour
> > > > > > thoroughly.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > A few initial comments.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > AS1) Is it true that offsets are always
> preserved by
> > > > > > this KIP?
> > > > > > > > >> I
> > > > > > > > >> > >> > *think* so but
> > > > > > > > >> > >> > > > not totally sure that it’s true in all cases.
> It would
> > > > > > > > >> certainly be
> > > > > > > > >> > >> > nice.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > AS2) I think you need to add epoch information
> to
> > > > > > > > >> > >> > AlterShareGroupOffsetsRequest.
> > > > > > > > >> > >> > > > It really should already be there in
> hindsight, but I
> > > > > > think
> > > > > > > > >> this KIP
> > > > > > > > >> > >> > requires it.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > AS3) The CoordinatorType enum for MIRROR will
> need to
> > > > > be
> > > > > > 3
> > > > > > > > >> because 2
> > > > > > > > >> > >> > is SHARE.
> > > > > > > > >> > >> > > > I’m sure you’ll parse the keys from the right
> ;)
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > AS4) The procedure for achieving a failover
> could be
> > > > > > clearer.
> > > > > > > > >> Let’s
> > > > > > > > >> > >> > say that I am
> > > > > > > > >> > >> > > > using cluster mirroring to achieve DR
> replication. My
> > > > > > source
> > > > > > > > >> cluster
> > > > > > > > >> > >> > is utterly lost
> > > > > > > > >> > >> > > > due to a disaster. What’s the single operation
> that I
> > > > > > perform
> > > > > > > > >> to
> > > > > > > > >> > >> > switch all of the
> > > > > > > > >> > >> > > > topics mirrored from the lost source cluster
> to become
> > > > > > the
> > > > > > > > >> active
> > > > > > > > >> > >> > topics?
> > > > > > > > >> > >> > > > Similarly for failback.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > AS5) The only piece that I’m really unsure of
> is the
> > > > > > epoch
> > > > > > > > >> > >> management.
> > > > > > > > >> > >> > I would
> > > > > > > > >> > >> > > > have thought that the cluster which currently
> has the
> > > > > > writable
> > > > > > > > >> > >> > topic-partition
> > > > > > > > >> > >> > > > would be in charge of the leader epoch and it
> would not
> > > > > > be
> > > > > > > > >> > >> necessary to
> > > > > > > > >> > >> > > > perform all of the gymnastics described in the
> section
> > > > > > on epoch
> > > > > > > > >> > >> > rewriting.
> > > > > > > > >> > >> > > > I have read the Rejected Alternatives section
> too, but
> > > > > I
> > > > > > don’t
> > > > > > > > >> fully
> > > > > > > > >> > >> > grasp
> > > > > > > > >> > >> > > > why it was necessary to reject it.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > I wonder if we could store the “polarity” of
> an epoch,
> > > > > > > > >> essentially
> > > > > > > > >> > >> > whether the
> > > > > > > > >> > >> > > > epoch bump was observed by replication from a
> source
> > > > > > cluster,
> > > > > > > > >> or
> > > > > > > > >> > >> > whether
> > > > > > > > >> > >> > > > it was bumped by a local leadership change
> when the
> > > > > > topic is
> > > > > > > > >> locally
> > > > > > > > >> > >> > writable.
> > > > > > > > >> > >> > > > When a topic-partition switches from read-only
> to
> > > > > > writable, we
> > > > > > > > >> > >> should
> > > > > > > > >> > >> > definitely
> > > > > > > > >> > >> > > > bump the epoch, and we could record the fact
> that it
> > > > > was
> > > > > > a
> > > > > > > > >> local
> > > > > > > > >> > >> epoch.
> > > > > > > > >> > >> > > > When connectivity is re-established, you might
> find
> > > > > that
> > > > > > both
> > > > > > > > >> ends
> > > > > > > > >> > >> have
> > > > > > > > >> > >> > > > declared a local epoch N, but someone has to
> win.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > Thanks,
> > > > > > > > >> > >> > > > Andrew
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > > On 14 Feb 2026, at 07:17, Federico Valeri <
> > > > > > > > >> [email protected]>
> > > > > > > > >> > >> > wrote:
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> > > > > Hi, we would like to start a discussion
> thread about
> > > > > > > > >> KIP-1279:
> > > > > > > > >> > >> > Cluster
> > > > > > > > >> > >> > > > > Mirroring.
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> > > > > Cluster Mirroring is a new Kafka feature
> that enables
> > > > > > native,
> > > > > > > > >> > >> > > > > broker-level topic replication across
> clusters.
> > > > > Unlike
> > > > > > > > >> > >> MirrorMaker 2
> > > > > > > > >> > >> > > > > (which runs as an external Connect-based
> tool),
> > > > > Cluster
> > > > > > > > >> Mirroring
> > > > > > > > >> > >> is
> > > > > > > > >> > >> > > > > built into the broker itself, allowing
> tighter
> > > > > > integration
> > > > > > > > >> with
> > > > > > > > >> > >> the
> > > > > > > > >> > >> > > > > controller, coordinator, and partition
> lifecycle.
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> >
> > > > > > > > >> > >>
> > > > > > > > >>
> > > > > >
> > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1279%3A+Cluster+Mirroring
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> > > > > There are a few missing bits, but most of
> the design
> > > > > is
> > > > > > > > >> there, so
> > > > > > > > >> > >> we
> > > > > > > > >> > >> > > > > think it is the right time to involve the
> community
> > > > > > and get
> > > > > > > > >> > >> feedback.
> > > > > > > > >> > >> > > > > Please help validating our approach.
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> > > > > Thanks
> > > > > > > > >> > >> > > > > Fede
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> >
> > > > > > > > >> > >>
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > --
> > > > > > > > >> > > Paolo Patierno
> > > > > > > > >> > >
> > > > > > > > >> > > *Senior Principal Software Engineer @ IBM**CNCF
> Ambassador*
> > > > > > > > >> > >
> > > > > > > > >> > > Twitter : @ppatierno <http://twitter.com/ppatierno>
> > > > > > > > >> > > Linkedin : paolopatierno <
> > > > > > http://it.linkedin.com/in/paolopatierno>
> > > > > > > > >> > > GitHub : ppatierno <https://github.com/ppatierno>
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> > Paolo Patierno
> > > > > > > > >> >
> > > > > > > > >> > *Senior Principal Software Engineer @ IBM**CNCF
> Ambassador*
> > > > > > > > >> >
> > > > > > > > >> > Twitter : @ppatierno <http://twitter.com/ppatierno>
> > > > > > > > >> > Linkedin : paolopatierno <
> > > > > http://it.linkedin.com/in/paolopatierno
> > > > > > >
> > > > > > > > >> > GitHub : ppatierno <https://github.com/ppatierno>
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > >
> > > > > >
> > > > >
> > >
>

Reply via email to