I mentioned a corner case in the chained mirroring use case. Let me
clarify what I mean with a simple example:

1. B is fetching from A, and C is fetching from B (A --> B --> C)
2. A producer with PID 5 sends records to A
3. Failover happens and B becomes writable (A -x-> B --> C)
4. A different producer with PID 5 sends records to B
5. Collision on cluster C (two different producers mapped to PID -7 in C)

(arrows represent data flowing, not fetch direction)


On Sun, Mar 15, 2026 at 7:14 PM Federico Valeri <[email protected]> wrote:
>
> Hi Rajini, thanks for your thoughtful review and for catching a few
> bugs. I'll skip some questions that we will address later.
>
> RS2: The metadata records are described in "Mirror Metadata Records"
> paragraph. Currently there are only two records: "LastMirroredOffsets"
> record tracks the latest successfully mirrored offset for each
> partition, while "MirrorPartitionState" record represents the
> lifecycle states of a mirrored partition.
>
> RS3: That's a good point that was also raised by Andrew. It was an
> easy solution that we used for our prototype, but we need to think
> about a better solution.
>
> RS4: Current design is that mirror fetcher threads behaves like a
> read_committed consumer fetching up to "source LSO". On failover we
> truncate destination log to "local LSO".
>
> The approach of fetching up to HW that you propose is still safe as we
> keep truncating to local LSO on failover, but it trades lower
> steady-state lag (especially when long-running transactions exist on
> the source) for more data loss on failover (the net data loss relative
> to the source is the same in both approaches). In other words, with
> your approach we fetch more data that we may then need to truncate.
> Also, read_uncommited consumers on the destination cluster would be
> able to read records that may be truncated on failover. These are just
> my consideration, but we are open to discussion on which is the best
> approach here.
>
> When a failover is triggered (RemoveTopicsFromMirror), the sequence is:
>
> 1. Partitions transition to STOPPING state
> 2. Fetchers are removed
> 3. For each partition, truncate to local LSO is called
>   3.1. Reads LSO from each partition's local log
>   3.2. Calls log.truncateTo(offset) on the UnifiedLog
>   3.3. Ensures ISR members complete truncation before the partition
> becomes writable
> 4. For each partition, the LSO is recorded as the last mirrored offset
> (LMO) in __mirror_state
> 5. Partitions transition to STOPPED and become writable
>
> When a failback is triggered (AddTopicsToMirror), the sequence is:
>
> 1. Partitions transition to PREPARING state
> 2. For each partition, truncation to LMO is called
>   2.1. This sends a LastMirroredOffsetsRequest to the source cluster
> to fetch the offsets that were recorded during the previous failover
>   2.2.a. The response offsets are used to truncate local logs
>   2.2.b. If the source cluster doesn't support the LastMirroredOffsets
> API or first-time mirror, it truncates to offset 0
> 3. Partitions transition to MIRRORING
>
> RS7: Can you point me to the section that says configs are stored in
> __mirror_state? Mirror connection configs (bootstrap servers,
> credentials, etc.) are stored in KRaft metadata via
> ConfigResource.Type.MIRROR, not in __mirror_state. The internal topic
> only stores partition states and last mirrored offsets. Sensitive
> credentials follow the existing KRaft config handling, which is
> already protected by controller/broker access controls and sensitive
> config redaction in DescribeConfigs responses.
>
> RS8: Not sure what's the recommended approach here. Adding a new error
> code does not change the response schema and older clients that don't
> recognize the new error code will surface it as an
> UnknownServerException (non-retriable).
>
> RS11: Good catch. This is a prototype simplification that we need to
> address. To properly sync consumer group offsets, the implementation
> would need to send ListGroups to all source brokers (or use the
> AdminClient which does this internally), send FindCoordinator to
> discover the group coordinator for each group, send OffsetFetch to the
> correct group coordinator.
>
> RS12: You are absolutely right, the transformation is not idempotent,
> so it is not safe for chained mirroring (A -> B -> C). Instead,
> round-trip mirroring (A -> B, then B -> A) works because, when doing a
> failback, the log is truncated before mirroring resumes, so previously
> mirrored records with negative pids are removed and the transformation
> is only applied to new records produced natively on that cluster
> (double-transformation never occurs). Non-transactional batches stay
> at -1 [ -(-1 + 2) = -(1) = -1], which is correct.
>
> The chained mirroring would work if we skip the transformation when
> pid is negative, but there is still an edge case: A -> B -> C with
> local B producer. If cluster A has local pid 5 and cluster B also has
> local pid 5, both end up as -7 on cluster C. Collision: two different
> producers with the same pid on the destination. No pid-only
> transformation can solve that. We would need to incorporate cluster
> identity.
>
> Possible solution that would handle any topology: The producer IDs are
> 64-bit signed longs used to identify a producer. The clusterId (UUID)
> is a globally 128-bit unique identifier for each source cluster. We
> could use the clusterId hash to partition the entire negative PID
> space into regions, one per source cluster. Basically we divide the 64
> bits into three fields: bit 63 (sign bit), bits 62-31 (region
> selector), bits 30-0 (producer identity). Once a non-negative PID is
> mapped to a region, it passes through unchanged no matter how many
> hops follow (i.e. we apply the transformation only for PIDs >= 0).
>
> Example with two clusters:
>
> - Bit 63: This is the sign bit that makes the value negative and
> distinguishes mirrored pids from local ones (which are non-negative).
> - Bits 62-31 cluster A: clusterId = "abc-123", clusterHash = 42
> - Bits 62-31 cluster B: clusterId = "xyz-789", clusterHash = 99
> - Bits 30-0: Local producer ID 5 that is the same on both clusters.
>
>   A's pid 5 -->
> 1|00000000000000000000000000101010|0000000000000000000000000000101
>   B's pid 5 -->
> 1|00000000000000000000000001100011|0000000000000000000000000000101
>
> On Wed, Mar 11, 2026 at 1:23 PM Rajini Sivaram <[email protected]> 
> wrote:
> >
> > A few more questions about the KIP for clarification:
> >
> > RS8: The KIP says produce requests to mirror topics will throw
> > ReadOnlyTopicException. For Produce Requests returning a new error to
> > clients, don’t we need to bump Produce request version?
> >
> > RS9: The KIP says we use OffsetMovedToTieredStorageException to prevent
> > mirroring of data in tiered storage. But doesn’t the mirror client look
> > like a regular consumer to the source cluster and return records fetched
> > from tiered storage?
> >
> > RS10: Client-id based quotas for the source cluster look hard to manage
> > since there is no hierarchy or grouping possible. Seems better to rely on
> > secure user-principal based quotas on the source-side.
> >
> > RS11: The KIP says `The manager maintains a connection pool with one
> > blocking sender per source cluster`. If this is the connection used for
> > periodic sync of offsets, topic configs etc. the coordinator is likely to
> > need connections to all source brokers (i.e. all group coordinators).
> >
> > RS12: The KIP proposes to transform producer ids for mirror records to
> > avoid conflicts. This comes at a cost because CRC checksum needs to be
> > recomputed. To justify this cost, we need to ensure that this
> > transformation works in all cases. What happens if you are mirroring a
> > mirror topic? Is that a supported scenario? Or mirroring back mirrored data
> > during failback because the source was truncated?
> >
> >
> >
> > Thanks,
> >
> > Rajini
> >
> >
> > On Tue, Mar 10, 2026 at 8:19 PM Rajini Sivaram <[email protected]>
> > wrote:
> >
> > > Hi team,
> > >
> > > Thanks for the KIP! I have a few questions, mostly clarification at this
> > > point.
> > >
> > > RS1: There is a `CreateMirror` request but no corresponding `DeleteMirror`
> > > request. Is that intentional?
> > >
> > >
> > >
> > > RS2: It will be good to define the format of data going into the internal
> > > mirror state topic. There is an example under kafka-dump-logs, which
> > > shows partition-level state in the payload and the mirror name as key. I
> > > guess that is not what we expect it to be. Do we delete this information
> > > when a topic is deleted or a mirror is deleted?
> > >
> > >
> > >
> > > RS3: KIP currently says mirror name cannot end with .removed. I guess it
> > > cannot also end with .paused.  Have we considered storing state and
> > > mirror name separately, but updated together for a topic? Since new
> > > states may be added in future, name restrictions may become hard to
> > > implement.
> > >
> > >
> > >
> > > RS4: The KIP says *“mirroring must fetch only up to the LSO to maintain
> > > transactional consistency”* and it also says *“During the mirror stopping
> > > transition, the MirrorCoordinator performs a log truncation operation that
> > > resets each mirror partition to its LSO.”*  I guess the plan is to fetch
> > > up to high watermark and truncate to locally computed LSO on failover?
> > > Details of the sequence here will be useful. How does MirrorCoordinator
> > > perform truncation?
> > >
> > >
> > >
> > > RS5: The KIP says “*On the destination cluster, mirror-related operations
> > > (creating mirrors, adding/removing topics from mirrors, managing mirror
> > > configurations) require the CLUSTER_ACTION permission on the cluster
> > > resource.*” The  `Cluster:ClusterAction` ACL is currently used for broker
> > > service account, e.g. local replication is authorized using this. It seems
> > > odd to grant this permission to users managing a resource on the cluster.
> > > Have we considered adding a new resource type `ClusterMirror` and define
> > > ACLs like `ClusterMirror:Create`, `ClusterMirror:Alter` and `
> > > ClusterMirror:AlterConfigs`?
> > >
> > > RS6: The KIP talks about three entities: Cluster Mirror, Mirror Topic and 
> > > Mirror
> > > Partition, with Cluster Mirroring as the feature name. Since we already
> > > have MirrorMaker that also refers to mirrors, it will be nice if we can
> > > refer to the entities using their full name in the CLI and public APIs.
> > > That will enable us to add more mirror topic and mirror partition APIs in
> > > the future if needed. For example:
> > >
> > >
> > >    - `kafka-cluster-mirrors.sh` to manage cluster mirrors
> > >    -  createClusterMirrors(), listClusterMirrors(),
> > >    describeClusterMirrors() etc on the Admin API and Kafka Protocol.
> > >    -  KIP proposes pauseMirrorTopics(), resumeMirrorTopics() which are
> > >    good.
> > >
> > >
> > >
> > > RS7: The KIP proposes to store mirror configs in the internal mirror state
> > > topic. This includes sensitive credentials of another cluster. Have we
> > > considered other options? Can a user with read access read the data from
> > > the state topic using a consumer?
> > >
> > >
> > >
> > > Thanks,
> > >
> > >
> > > Rajini
> > >
> > >
> > >
> > > On Sun, Mar 8, 2026 at 8:58 PM Andrew Schofield <[email protected]>
> > > wrote:
> > >
> > >> Hi Fede and friends,
> > >> I've re-read in detail and have quite a lot of comments, mostly minor
> > >> clarifications, but as it approaches a vote, it's good to get the details
> > >> nailed down.
> > >>
> > >> AS6: Could we have a diagram which shows which RPCs are served by which
> > >> components? This will help illustrate the authorisation requirements for
> > >> the various components, which is an aspect of the KIP that I don't think 
> > >> is
> > >> completely specified yet.
> > >>
> > >> AS7: Please could you include a table of the operations and resources
> > >> which will be checked for authorisation of each of the RPCs introduced.
> > >> Also, please could you document the permissions which the destination
> > >> cluster will require to mirror data and ACLs (for example, I think it 
> > >> will
> > >> need ALTER on the CLUSTER resource to manipulate ACLs)? It's going to 
> > >> need
> > >> Metadata, DescribeConfigs, DescribeAcls, ListGroups, OffsetFetch,
> > >> LastMirrorOffset and Fetch RPCs I think, possibly others too. The user is
> > >> probably going to want to give as little permission as possible to the
> > >> destination cluster to get its job done.
> > >>
> > >> AS8: You include AuthorizedOperations in DescribeMirrorsResponse, but I
> > >> don't know what the operations are. I think the implies MIRROR is a new
> > >> resource type in the Kafka security model and DescribeMirrors can be used
> > >> to enquire the authorised operations for the client making the Admin API
> > >> request.
> > >>
> > >> AS9: I think you're going to need some new error codes in the Kafka
> > >> protocol, as least:
> > >>
> > >> * INVALID_MIRROR_NAME or similar if the mirror name doesn't meet the
> > >> rules for a topic name
> > >> * UNKNOWN_MIRROR if the mirror doesn't exist
> > >>
> > >> And probably some more for logical inconsistencies such as this topic
> > >> isn't in that mirror, that topic is already in another mirror, and so on.
> > >>
> > >> AS10: Could you add the usage information for kafka-mirrors.sh (the
> > >> intended output from kafka-mirrors.sh --help) so all of the options are
> > >> documented together? For example, I see that --replication-factor is
> > >> included in one of the examples, which seems a bit surprising and I'm not
> > >> sure whether it's a mistake or a feature. I can probably use --describe
> > >> with a specific --mirror but it's not specified.
> > >>
> > >> AS11: I would expect the signature for Admin.addTopicsToMirror to be
> > >> Admin.addTopicsToMirror(String mirrorName, Set<String> topics,
> > >> AddTopicsToMirrorOptions options) because it's for adding topics to a
> > >> mirror, as the counterpart to Admin.removeTopicsFromMirror(String
> > >> mirrorName, Set<String> topics, RemoveTopicsFromMirrorOptions options).
> > >>
> > >> AS12: I don't think ignorable RPC fields in version 0 RPCs make sense
> > >> because they're not trying to be compatible with a previous version.
> > >>
> > >> AS13: I would have expected AddTopicsToMirrorRequest to have mirror name
> > >> above the list of topics because the same mirror name applies to all of 
> > >> the
> > >> topics being added. As specified, you repeat the mirror name for all of 
> > >> the
> > >> topics.
> > >>
> > >> AS14: I suggest adding ErrorMessage to the responses in all cases to make
> > >> it easier to give more descriptive exception messages than just the 
> > >> default
> > >> for the error codes.
> > >>
> > >> AS15: I may have the wrong end of the stick here, but I expected
> > >> RemoveTopicsFromMirrorRequest to remove the topics from a specific named
> > >> mirror as implied by the example of the kafka-mirrors.sh command. In 
> > >> fact,
> > >> I was expecting the mirror to contain the topics in the admin RPC 
> > >> requests
> > >> and responses, and that's only true for about half of them.
> > >>
> > >> AS16: Can I change the mirror.name config using IncrementalAlterConfigs?
> > >> If I attempt it, what's the error?
> > >>
> > >> AS17: If I attempt mirror RPCs when the mirror is in the wrong state, the
> > >> error is specified as INVALID_REQUEST. That's usually kept for badly 
> > >> formed
> > >> requests, as opposed to logically invalid ones. Maybe MIRROR_NOT_STOPPED 
> > >> or
> > >> MIRRORING_ACTIVE or similar would be more expressive.
> > >>
> > >> AS18: Should the LastMirroredOffsetsResponse, ReadMirrorStatesResponse
> > >> and WriteMirrorStatesRequest include LeaderEpoch? I suspect so.
> > >>
> > >> AS19: In DescribeMirrorsResponse, I suspect you will want "null" values
> > >> for some fields which don't have values during initialisation and so on,
> > >> such as lag.
> > >>
> > >> AS20: Do you need to add new versions of the DescribeConfigs and
> > >> IncrementalAlterConfigs RPCs to support mirror resources?
> > >>
> > >> AS21: The topic configuration mirror.replication.throttled.replicas is
> > >> described as a list, but the default is MAX_LONG.
> > >>
> > >> AS22: By including mirror.name as a topic config, a client which has
> > >> permission to describe configs for the topic is able to discover the name
> > >> of the mirror, whether they are permitted to list the mirrors or describe
> > >> that particular mirror. Generally, the Kafka authorisation model does not
> > >> permit this kind of unauthorised information disclosure. For example, 
> > >> when
> > >> a client describes the committed offsets for a consumer group, the list 
> > >> of
> > >> topics returned is filtered to only those topics which the client is
> > >> permitted to describe, even though that may results in an incomplete set 
> > >> of
> > >> topic partitions being returned. Is there an alternative way in which 
> > >> this
> > >> information could be stored so Kafka only reveals mirror information to
> > >> principals authorised to see it?
> > >>
> > >> AS23: I observe that there are situations in which a `.removed` suffix is
> > >> added to the mirror name. Is it permitted for the user to define a mirror
> > >> called "my.nasty.mirror.removed" and does it break anything?
> > >>
> > >>
> > >> Thanks,
> > >> Andrew
> > >>
> > >> On 2026/03/06 13:41:52 Paolo Patierno wrote:
> > >> > Hi Fede,
> > >> > something more ...
> > >> >
> > >> > Is there any migration path for users who want to migrate from using
> > >> Mirror
> > >> > Maker 2 to the cluster mirroring?
> > >> > I mean, something like a tool useful to create a corresponding cluster
> > >> > mirroring configuration starting from a MM2 one. Nothing that runs the
> > >> > migration automatically but something that can be provided to the users
> > >> as
> > >> > output to be validated and put in place by them.
> > >> >
> > >> > The Admin Client is missing methods to pause and stop mirroring (but we
> > >> > have corresponding protocol messages). Is it on purpose? Any specific
> > >> > reasons? They would be important from an automatic operator perspective
> > >> use
> > >> > case.
> > >> > Also a method to provide the LastMirroredOffset from the source cluster
> > >> > could be useful for progress and tracking purposes.
> > >> > Finally, what about a method to get the mirror states? I don't think 
> > >> > the
> > >> > describe method provides such information.
> > >> > In general, I think that the Admin Client section needs to cover in 
> > >> > more
> > >> > details the new classes definition like CreateMirrorOptions,
> > >> > CreateMirrorResult, ... and so on for all the defined new methods.
> > >> >
> > >> > > AddTopicsToMirrorResult addTopicsToMirror(Map<String, String>
> > >> > topicToMirrorName, AddTopicsToMirrorOptions options);
> > >> >
> > >> > Isn't it missing the mirrorName (as you have in the
> > >> removeTopicsFromMirror
> > >> > counterpart)?
> > >> > What's the topicToMirrorName parameter if it's defined as a Map? The
> > >> method
> > >> > is also plural using "topics" so comparing to the 
> > >> > removeTopicsFromMirror
> > >> > method, I would assume the parameter really is Set<String> topics?
> > >> > Comparing to the corresponding protocol message
> > >> AddTopicsToMirrorRequest, I
> > >> > see a list of topics but each of them has id, name and corresponding
> > >> > mirror. So it's unclear how the addTopicsToMirror is defined.
> > >> >
> > >> > > RemoveTopicsFromMirrorResult removeTopicsFromMirror(String 
> > >> > > mirrorName,
> > >> > Set<String> topics, RemoveTopicsFromMirrorOptions options);
> > >> >
> > >> > This method gets a mirrorName but if I look at the corresponding
> > >> protocol
> > >> > message RemoveTopicsFromMirrorRequest, it says "Allows users to detach
> > >> > topics from their associated mirror" so the mirror is actually not
> > >> provided
> > >> > and it's exactly what I see in the JSON definition (only topics list
> > >> with
> > >> > id and name).
> > >> >
> > >> > Finally, regarding the protocol change:
> > >> >
> > >> > * ListMirrorsResponse I would add the clusterId in the JSON definition
> > >> > (it's related to my comments in the previous email when using the 
> > >> > tool).
> > >> > * WriteMirrorStatesRequest has the following in the JSON which should
> > >> not
> > >> > be part of it "{ "name": "RemovedTopics", "type": "[]string",
> > >> "versions":
> > >> > "0+", "about": "The topic names to be removed." }"
> > >> >
> > >> > Thanks,
> > >> > Paolo.
> > >> >
> > >> > On Fri, 6 Mar 2026 at 13:08, Paolo Patierno <[email protected]>
> > >> > wrote:
> > >> >
> > >> > > Hi Fede,
> > >> > > thank you for the proposal. I had a first pass with following
> > >> thoughts and
> > >> > > questions.
> > >> > >
> > >> > > > When the unclean.leader.election.enable is set to true, the broker
> > >> will
> > >> > > log a warning at every configuration synchronization period.
> > >> > > Be more explicit about what the warning says.
> > >> > >
> > >> > > > This topic ID is not used by other topics in the current cluster
> > >> > > In such a case, which should be very unlikely, what's going to 
> > >> > > happen?
> > >> > > Isn't it possible to mirror the topic?
> > >> > >
> > >> > > > To enable it, all cluster nodes (controllers and brokers) must
> > >> > > explicitly enable unstable API versions and unstable feature versions
> > >> in
> > >> > > all configuration files. After starting the cluster with a minimum
> > >> metadata
> > >> > > version, operators can dynamically enable the mirror version feature
> > >> to
> > >> > > activate Cluster Mirroring.
> > >> > > AFAIU there is going to be a dedicated feature flag for it, right? If
> > >> yes
> > >> > > can we state it clearly also specifying the exact name (i.e.
> > >> mirror.version
> > >> > > or something similar)?
> > >> > >
> > >> > > When running the kafka-mirrors.sh tool to list the mirrors, other 
> > >> > > than
> > >> > > showing the SOURCE-BOOTSTRAP, it could be useful to have also the
> > >> clusterId
> > >> > > which, as a unique identifier, could be helpful in automated systems
> > >> using
> > >> > > the cluster mirroring. Of course, it would be important to have in 
> > >> > > the
> > >> > > ListMirrorsResponse as well as an additional field.
> > >> > >
> > >> > > What happens in case of Kafka downgrade from a version supporting
> > >> > > mirroring to an older one not supporting it.
> > >> > > The mirror won't be running but the topic configuration will still
> > >> have
> > >> > > config parameters like mirror.name and so on, right? Are they just
> > >> > > ignored by the older Kafka version and the cluster will work without
> > >> issues?
> > >> > >
> > >> > > Thanks,
> > >> > > Paolo
> > >> > >
> > >> > > On Fri, 6 Mar 2026 at 10:43, Luke Chen <[email protected]> wrote:
> > >> > >
> > >> > >> Hi Andrew and all,
> > >> > >>
> > >> > >> About AS5, yes, I've created a sub-document
> > >> > >> <
> > >> > >>
> > >> https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring
> > >> > >> >to
> > >> > >> explain the algorithm to support unclean leader election in cluster
> > >> > >> mirroring.
> > >> > >> Thanks for your comments, I'm inspired by that! :)
> > >> > >>
> > >> > >> About your idea, to store the owner of the leader epoch when
> > >> leadership
> > >> > >> change, I think it might not be needed because the most important
> > >> thing
> > >> > >> should be this:
> > >> > >> > you might find that both ends have declared a local epoch N, but
> > >> someone
> > >> > >> has to win.
> > >> > >>
> > >> > >> That is, as long as we have a way to declare who is the owner of
> > >> leader
> > >> > >> epoch N, then the 2 clusters can sync up successfully.
> > >> > >> And that's why I proposed to the "last mirrored leader epoch"
> > >> semantic in
> > >> > >> the sub-proposal
> > >> > >> <
> > >> > >>
> > >> https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring
> > >> > >> >,
> > >> > >> which is a solution to draw a line between these 2 clusters to
> > >> declare
> > >> > >> records beyond the "last mirrored leader epoch" N, it belongs to
> > >> who. I
> > >> > >> think this should work well, as long as all replicas in the cluster
> > >> can
> > >> > >> truncate the log correctly.
> > >> > >>
> > >> > >> What do you think?
> > >> > >>
> > >> > >> Any feedback is appreciated.
> > >> > >>
> > >> > >> Thank you,
> > >> > >> Luke
> > >> > >>
> > >> > >> On Fri, Mar 6, 2026 at 6:02 PM Andrew Schofield <
> > >> [email protected]>
> > >> > >> wrote:
> > >> > >>
> > >> > >> > Hi Fede,
> > >> > >> > Thanks for your response.
> > >> > >> >
> > >> > >> > AS1: Thanks for the clarification.
> > >> > >> >
> > >> > >> > AS2: I expect you'll include a version bump of
> > >> AlterShareGroupOffsets in
> > >> > >> > this KIP, but that's a small matter compared with the rest of the
> > >> > >> protocol
> > >> > >> > changes.
> > >> > >> >
> > >> > >> > AS3: OK.
> > >> > >> >
> > >> > >> > AS4: Thanks for the details. My only comment is that it might be a
> > >> bit
> > >> > >> > laborious when you want to failover all topics. I suggest adding
> > >> > >> > `--all-topics` so you could do:
> > >> > >> >
> > >> > >> > $ bin/kafka-mirror.sh --bootstrap-server :9094 --remove
> > >> --all-topics
> > >> > >> > --mirror my-mirror
> > >> > >> >
> > >> > >> > AS5: Thanks for the response. I understand there are good reasons
> > >> for
> > >> > >> the
> > >> > >> > way epochs are handled in the KIP. I see that there is a
> > >> sub-document
> > >> > >> for
> > >> > >> > the KIP about unclean leader election. I'll spend some time
> > >> reviewing
> > >> > >> that.
> > >> > >> >
> > >> > >> > Thanks,
> > >> > >> > Andrew
> > >> > >> >
> > >> > >> > On 2026/02/18 13:27:07 Federico Valeri wrote:
> > >> > >> > > Hi Andrew, thanks for the review.
> > >> > >> > >
> > >> > >> > > Let me try to answer your questions and then other authors can
> > >> join
> > >> > >> > > the discussion.
> > >> > >> > >
> > >> > >> > > AS1
> > >> > >> > > ------
> > >> > >> > >
> > >> > >> > > Destination topics are created with the same topic IDs using the
> > >> > >> > > extended CreateTopics API. Then, data is replicated starting 
> > >> > >> > > from
> > >> > >> > > offset 0 with byte-for-byte batch copying, so destination 
> > >> > >> > > offsets
> > >> > >> > > always match source offsets. When failing over, we record the
> > >> last
> > >> > >> > > mirrored offset (LMO) in the destination cluster. When failing
> > >> back,
> > >> > >> > > the LMO is used for truncating and then start mirroring the
> > >> delta,
> > >> > >> > > otherwise we start mirroring from scratch by truncating to zero.
> > >> > >> > >
> > >> > >> > > Retention: If the mirror leader attempts to fetch an offset that
> > >> is
> > >> > >> > > below the current log start offset of the source leader (e.g.
> > >> fetching
> > >> > >> > > offset 50 when log start offset is 100), the source broker
> > >> returns an
> > >> > >> > > OffsetOutOfRangeException that the mirror leader handles by
> > >> truncating
> > >> > >> > > to the source's current log start offset and resuming fetching
> > >> from
> > >> > >> > > that point. Compaction: The mirror leader replicates these
> > >> compacted
> > >> > >> > > log segments exactly as they exist in the source cluster,
> > >> maintaining
> > >> > >> > > the same offset assignments and gaps.
> > >> > >> > >
> > >> > >> > > Do you have any specific corner case in mind?
> > >> > >> > >
> > >> > >> > > AS2
> > >> > >> > > ------
> > >> > >> > >
> > >> > >> > > Agreed. The current AlterShareGroupOffsetsRequest (v0) only
> > >> includes
> > >> > >> > > PartitionIndex and StartOffset with no epoch field. When
> > >> mirroring
> > >> > >> > > share group offsets across clusters, the epoch is needed to
> > >> ensure the
> > >> > >> > > offset alteration targets the correct leader generation.
> > >> > >> > >
> > >> > >> > > AS3
> > >> > >> > > ------
> > >> > >> > >
> > >> > >> > > Right, the enum is now fixed. Yes, we will parse from the right
> > >> and
> > >> > >> > > apply the same naming rules used for topic name ;)
> > >> > >> > >
> > >> > >> > > AS4
> > >> > >> > > -------
> > >> > >> > >
> > >> > >> > > Agreed. I'll try to improve those paragraphs because they are
> > >> crucial
> > >> > >> > > from an operational point of view.
> > >> > >> > >
> > >> > >> > > Let me shortly explain how it is supposed to work:
> > >> > >> > >
> > >> > >> > > 9091 (source) -----> 9094 (destination)
> > >> > >> > >
> > >> > >> > > The single operation that allows an operator to switch all
> > >> topics at
> > >> > >> > > once in case of disaster is the following:
> > >> > >> > >
> > >> > >> > > bin/kafka-mirror.sh --bootstrap-server :9094 --remove --topic .*
> > >> > >> > > --mirror my-mirror
> > >> > >> > >
> > >> > >> > > 9091 (source) --x--> 9094 (destination)
> > >> > >> > >
> > >> > >> > > After that, all mirror topics become detached from the source
> > >> cluster
> > >> > >> > > and start accepting writes (the two cluster are allowed to
> > >> diverge).
> > >> > >> > >
> > >> > >> > > When the source cluster is back, the operator can failback by
> > >> creating
> > >> > >> > > a mirror with the same name on the source cluster (new
> > >> destination):
> > >> > >> > >
> > >> > >> > > echo "bootstrap.servers=localhost:9094" >
> > >> /tmp/my-mirror.properties
> > >> > >> > > bin/kafka-mirrors.sh --bootstrap-server :9091 --create --mirror
> > >> > >> > > my-mirror --mirror-config /tmp/my-mirror.properties
> > >> > >> > > bin/kafka-mirrors.sh --bootstrap-server :"9091 --add --topic .*
> > >> > >> > > --mirror my-mirror
> > >> > >> > >
> > >> > >> > > 9091 (destination) <----- 9094 (source)
> > >> > >> > >
> > >> > >> > > AS5
> > >> > >> > > -------
> > >> > >> > >
> > >> > >> > > This is the core of our design and we reached that empirically 
> > >> > >> > > by
> > >> > >> > > trying out different options. We didn't want to change local
> > >> > >> > > replication, and this is something you need to do when
> > >> preserving the
> > >> > >> > > source leader epoch. The current design is simple and keeps the
> > >> epoch
> > >> > >> > > domains entirely separate. Destination cluster is in charge of
> > >> the
> > >> > >> > > leader epoch for its own log. The source epoch is only used
> > >> during the
> > >> > >> > > fetch protocol to validate responses and detect divergence.
> > >> > >> > >
> > >> > >> > > The polarity idea of tracking whether an epoch bump originated
> > >> from
> > >> > >> > > replication vs. local leadership change is interesting, but adds
> > >> > >> > > significant complexity and coupling between source and
> > >> destination
> > >> > >> > > epochs. Could you clarify what specific scenario polarity
> > >> tracking
> > >> > >> > > would address that the current separation doesn't handle? One
> > >> case we
> > >> > >> > > don't support is unclean leader election reconciliation across
> > >> > >> > > clusters, is that the gap you're aiming at?
> > >> > >> > >
> > >> > >> > > I tried to rewrite the unclean leader election paragraph in the
> > >> > >> > > rejected alternatives to be easier to digest. Let me know if it
> > >> works.
> > >> > >> > >
> > >> > >> > > On Tue, Feb 17, 2026 at 2:57 PM Andrew Schofield
> > >> > >> > > <[email protected]> wrote:
> > >> > >> > > >
> > >> > >> > > > Hi Fede and friends,
> > >> > >> > > > Thanks for the KIP.
> > >> > >> > > >
> > >> > >> > > > It’s a comprehensive design, easy to read and has clearly
> > >> taken a
> > >> > >> lot
> > >> > >> > of work.
> > >> > >> > > > The principle of integrating mirroring into the brokers makes
> > >> total
> > >> > >> > sense to me.
> > >> > >> > > >
> > >> > >> > > > The main comment I have is that mirroring like this cannot
> > >> handle
> > >> > >> > situations
> > >> > >> > > > in which multiple topic-partitions are logically related, such
> > >> as
> > >> > >> > transactions,
> > >> > >> > > > with total fidelity. Each topic-partition is being replicated
> > >> as a
> > >> > >> > separate entity.
> > >> > >> > > > The KIP calls this out and describes the behaviour thoroughly.
> > >> > >> > > >
> > >> > >> > > > A few initial comments.
> > >> > >> > > >
> > >> > >> > > > AS1) Is it true that offsets are always preserved by this KIP?
> > >> I
> > >> > >> > *think* so but
> > >> > >> > > > not totally sure that it’s true in all cases. It would
> > >> certainly be
> > >> > >> > nice.
> > >> > >> > > >
> > >> > >> > > > AS2) I think you need to add epoch information to
> > >> > >> > AlterShareGroupOffsetsRequest.
> > >> > >> > > > It really should already be there in hindsight, but I think
> > >> this KIP
> > >> > >> > requires it.
> > >> > >> > > >
> > >> > >> > > > AS3) The CoordinatorType enum for MIRROR will need to be 3
> > >> because 2
> > >> > >> > is SHARE.
> > >> > >> > > > I’m sure you’ll parse the keys from the right ;)
> > >> > >> > > >
> > >> > >> > > > AS4) The procedure for achieving a failover could be clearer.
> > >> Let’s
> > >> > >> > say that I am
> > >> > >> > > > using cluster mirroring to achieve DR replication. My source
> > >> cluster
> > >> > >> > is utterly lost
> > >> > >> > > > due to a disaster. What’s the single operation that I perform
> > >> to
> > >> > >> > switch all of the
> > >> > >> > > > topics mirrored from the lost source cluster to become the
> > >> active
> > >> > >> > topics?
> > >> > >> > > > Similarly for failback.
> > >> > >> > > >
> > >> > >> > > > AS5) The only piece that I’m really unsure of is the epoch
> > >> > >> management.
> > >> > >> > I would
> > >> > >> > > > have thought that the cluster which currently has the writable
> > >> > >> > topic-partition
> > >> > >> > > > would be in charge of the leader epoch and it would not be
> > >> > >> necessary to
> > >> > >> > > > perform all of the gymnastics described in the section on 
> > >> > >> > > > epoch
> > >> > >> > rewriting.
> > >> > >> > > > I have read the Rejected Alternatives section too, but I don’t
> > >> fully
> > >> > >> > grasp
> > >> > >> > > > why it was necessary to reject it.
> > >> > >> > > >
> > >> > >> > > > I wonder if we could store the “polarity” of an epoch,
> > >> essentially
> > >> > >> > whether the
> > >> > >> > > > epoch bump was observed by replication from a source cluster,
> > >> or
> > >> > >> > whether
> > >> > >> > > > it was bumped by a local leadership change when the topic is
> > >> locally
> > >> > >> > writable.
> > >> > >> > > > When a topic-partition switches from read-only to writable, we
> > >> > >> should
> > >> > >> > definitely
> > >> > >> > > > bump the epoch, and we could record the fact that it was a
> > >> local
> > >> > >> epoch.
> > >> > >> > > > When connectivity is re-established, you might find that both
> > >> ends
> > >> > >> have
> > >> > >> > > > declared a local epoch N, but someone has to win.
> > >> > >> > > >
> > >> > >> > > > Thanks,
> > >> > >> > > > Andrew
> > >> > >> > > >
> > >> > >> > > > > On 14 Feb 2026, at 07:17, Federico Valeri <
> > >> [email protected]>
> > >> > >> > wrote:
> > >> > >> > > > >
> > >> > >> > > > > Hi, we would like to start a discussion thread about
> > >> KIP-1279:
> > >> > >> > Cluster
> > >> > >> > > > > Mirroring.
> > >> > >> > > > >
> > >> > >> > > > > Cluster Mirroring is a new Kafka feature that enables 
> > >> > >> > > > > native,
> > >> > >> > > > > broker-level topic replication across clusters. Unlike
> > >> > >> MirrorMaker 2
> > >> > >> > > > > (which runs as an external Connect-based tool), Cluster
> > >> Mirroring
> > >> > >> is
> > >> > >> > > > > built into the broker itself, allowing tighter integration
> > >> with
> > >> > >> the
> > >> > >> > > > > controller, coordinator, and partition lifecycle.
> > >> > >> > > > >
> > >> > >> > > > >
> > >> > >> >
> > >> > >>
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1279%3A+Cluster+Mirroring
> > >> > >> > > > >
> > >> > >> > > > > There are a few missing bits, but most of the design is
> > >> there, so
> > >> > >> we
> > >> > >> > > > > think it is the right time to involve the community and get
> > >> > >> feedback.
> > >> > >> > > > > Please help validating our approach.
> > >> > >> > > > >
> > >> > >> > > > > Thanks
> > >> > >> > > > > Fede
> > >> > >> > > >
> > >> > >> > >
> > >> > >> >
> > >> > >>
> > >> > >
> > >> > >
> > >> > > --
> > >> > > Paolo Patierno
> > >> > >
> > >> > > *Senior Principal Software Engineer @ IBM**CNCF Ambassador*
> > >> > >
> > >> > > Twitter : @ppatierno <http://twitter.com/ppatierno>
> > >> > > Linkedin : paolopatierno <http://it.linkedin.com/in/paolopatierno>
> > >> > > GitHub : ppatierno <https://github.com/ppatierno>
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > Paolo Patierno
> > >> >
> > >> > *Senior Principal Software Engineer @ IBM**CNCF Ambassador*
> > >> >
> > >> > Twitter : @ppatierno <http://twitter.com/ppatierno>
> > >> > Linkedin : paolopatierno <http://it.linkedin.com/in/paolopatierno>
> > >> > GitHub : ppatierno <https://github.com/ppatierno>
> > >> >
> > >>
> > >

Reply via email to