Hi Federico and team,

Thank you for your detailed response on May 11, 2026. I greatly appreciate
the collaborative effort over the past few months to harden the KIP-1279
architecture.

* 1. Resolved Architectural & Stability Vulnerabilities*

I am pleased to confirm that the KIP has successfully integrated fixes for
the critical vulnerabilities I flagged, thereby protecting the cluster's
state machine, memory bounds, and control plane. Consider the following
items fully resolved on my end:

   -

   *VK4 / VK8 (Negative PID Bug & State Machine Failure):* I previously
   identified that the proposed -(sourceProducerId + 2) PID rewriting
   formula would fundamentally break hasProducerId() in
   AbstractRecordBatch.java, causing the broker to bypass
   ProducerStateManager.update() and default the Last Stable Offset (LSO)
   to the High Watermark. I am glad to see the KIP authors abandoned PID
   mapping entirely and adopted the deterministic MIRROR_PID_RESET control
   record barrier. This perfectly protects Kafka's exactly-once semantics.

   -

   *VK11 (TransactionIndex Rebuild Ambiguity):* Thank you for officially
   confirming that the TransactionIndex is strictly rebuilt locally during
   log appends rather than copied byte-for-byte. This resolves my concern
   regarding PID mismatches inducing consumer read anomalies.
   -

   *VK1 (Thundering Herd / OOM Heap Allocation):* Your clarification that
   fetcher threads multiplex partitions meaning the memory footprint is
   strictly bounded by num_fetcher_threads * response_max_bytes rather than
   a concurrent 1MB buffer per partition fully resolves my concern regarding
   50GB broker-wide OOM spikes during mass partition wake-ups.
   -

   *VK3 (Control Plane Hotspots):* I had flagged the severe risk of
   metadata saturation on a single broker during a "link flap" event. Your
   confirmation that the __mirror_state topic utilizes a compound hash
of (mirrorName,
   topicId, partition number) mathematically ensures that the 50,000+ state
   transitions will be safely distributed across the cluster, neutralizing the
   single-node hotspot risk.

2. Outstanding Critical Blocker: VK12 (Offset Sync Data Loss)

Regarding VK12, there is a fundamental misunderstanding in your previous
reply. You stated: *"The scenario described can only occur if offsets are
force-written to an active group, which the design prevents."*

My concern has absolutely nothing to do with overwriting active groups. My
concern applies strictly to the normal synchronization of inactive/dead
groups, and is based directly on the race condition currently documented in
the official KIP-1279 text:

*"During offset synchronization, the committed offset in the destination
cluster may temporarily exceed the current log end offset (LEO) of the
mirror topic... consumers attempting to resume from offset 100 will receive
an OffsetOutOfRangeException. To handle this gracefully, consumers should
configure auto.offset.reset=latest..."*

If a failover happens during this documented divergence window, a
reconnecting consumer will hit the OffsetOutOfRangeException. If the
downstream consumer follows the KIP's official advice and relies on
auto.offset.reset=latest, the consumer will jump to the absolute newest
offset on the partition. *This completely skips any newly produced records
that arrived between the failover and the consumer reconnecting, resulting
in silent, unrecoverable data loss for the downstream application.*
*Proposed Resolution: Double-Clamped Offset Safety Invariant* Instead of
requiring consumers to use auto.offset.reset=latest and endorsing a known
data loss vector, I propose that the ClusterMirrorCoordinator enforce a
pre-persist offset validation invariant during offset synchronization.
Before any translated offset is committed to the destination cluster, the
coordinator must apply a double-clamp:

*syncedOffset = max(destinationLSO, min(sourceCommitted, destinationLEO))*

Where destinationLSO is the Log Start Offset (earliest readable position
post-retention) and destinationLEO is the Log End Offset (latest replicated
position) of the destination partition.

This guarantees that every persisted offset falls within the physically
valid range  eliminating both the OffsetOutOfRangeException caused by
exceeding the LEO during replication lag, and the expired-offset rewind
caused by falling below the LSO due to destination retention policies.

The worst-case trade-off under this invariant is bounded re-processing
proportional to the replication lag at failover time not total partition
depth which is perfectly consistent with Kafka's documented at-least-once
delivery guarantees. Broad enterprise production evidence from large-scale
cross-cluster failovers confirms that state checks alone (preventing active
group overwrites) are insufficient; strict offset bounds clamping is
required to achieve enterprise-grade data integrity.

I look forward to your thoughts on implementing this final offset capping
logic. Once VK12 is patched, I believe this architecture will be
exceptionally robust and ready for enterprise deployment.

Best Regards,

Viquar Khan


On Mon, 18 May 2026 at 14:23, Rajini Sivaram <[email protected]>
wrote:

> Hi Federico,
>
> Thanks for the updates! The KIP is looking good. A few more small comments.
>
>
> RS13: A couple of places still refer to `kafka-mirror.sh` like under
> `Failover Process`. Can we change them to `*kafka-cluster-mirrors.sh*`?
>
> RS14: Should we change `--entity-type mirrors` for kafka-configs to be `
> --entity-type *cluster-mirrors*` to be consistent? Also,
> CLUSTER_MIRROR((byte)
> 64, "mirror"); could be `*cluster-mirror*`?
>
> RS15: It may be useful to rename `mirror.topic.num.partitions` and `
> mirror.topic.replication.factor` since they are very similar to `
> mirror.topic.properties.exclude`, but the `mirror.topic` prefix refers to
> different topics (the internal topic for the first two and actual mirror
> topics for the other one).
>
> RS16: ACL Sync: KIP says "Deletes ACLs that exist in destination but not in
> source using DeleteAcls request."
> What happens if someone creates an ACL on the destination to deny
> User:Alice access to all topics?
>
>    1. If that ACL also existed on the source cluster and then it was
>    removed, will it get removed from the destination?
>    2. If that ACL never existed on the source cluster, will it get removed
>    from the destination?
>
> RS17: The table in the Source ACLs section says: "DescribeClusterMirrors
>    MC      Read    Cluster     Log truncation"
> Should that be `ClusterMirror:Read` instead of `Cluster:Read`?
>
> Regards,
>
> Rajini
>
>
>
> On Fri, May 15, 2026 at 8:41 AM Federico Valeri <[email protected]>
> wrote:
>
> > Hi Rajini, we finally addressed the API and tool naming refactoring as
> > you suggested in RS6. Please take a look when you have time. Thanks.
> >
> >
> > On Mon, May 11, 2026 at 6:17 PM Federico Valeri <[email protected]>
> > wrote:
> > >
> > > Hello all, I want to highlight a couple of new paragraphs:
> > >
> > > 1. Leader Epoch Invariant: Cluster mirroring enforces the invariant
> > > that the destination leader epoch must always be greater than or equal
> > > to the source leader epoch (DLE>=SLE). Without this, consumers on the
> > > destination cluster can get stuck in an infinite metadata refresh loop
> > > when they encounter committed offsets carrying source epochs higher
> > > than the local epoch. The invariant is maintained through three
> > > mechanisms: reactive bumping (epoch fencing triggered when SLE > DLE
> > > during fetch), proactive bumping (scheduled when SLE approaches DLE
> > > within a threshold), and periodic bumping (checked during coordinator
> > > metadata sync).
> > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-LeaderEpochInvariant
> > >
> > > 2. Group Offsets: The coordinator periodically syncs consumer and
> > > share group offsets from the source cluster to the destination for all
> > > mirrored topics. Groups are filtered by configurable include/exclude
> > > patterns, and offsets are only synced for groups that are not
> > > currently active on the destination cluster, preventing overwrites of
> > > local consumer progress. Because source and destination share the same
> > > topic offsets (no offset translation), synced offsets can be used
> > > directly without mapping.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-GroupOffsets
> > >
> > > These new paragraphs directly address some of your questions, but let
> > > me list them here:
> > >
> > > JR2: Yes, we removed the incorrect phrase and added more details to
> > > the paragraph.
> > >
> > > JR4: When source cluster topic has tiered storage enabled, CM works by
> > > mirroring remote and local log into destination cluster. When
> > > destination cluster topic has tiered storage enabled, CM fails in
> > > PREPARING state because the LME may be in remote storage, but works
> > > fine if already MIRRORING because no truncation is needed.
> > >
> > > JR11: See "Leader Epoch Invariant" paragraph mentioned above.
> > >
> > > JR13: We can't support stateful Streams application because
> > > asynchronous replication cannot preserve the transactional boundaries
> > > between input offset commits, state store mutations written to
> > > changelog topics, and intermediate records written to repartition
> > > topics. The synchronous extension of this design will be able to
> > > support them. Existing Features Integration paragraph updated.
> > >
> > > JR18: See "Group Offsets" paragraph mentioned above.
> > >
> > > IY1: See "Group Offsets" paragraph mentioned above.
> > >
> > > Thanks
> > > Fede
> > >
> > > On Mon, May 11, 2026 at 6:08 PM Federico Valeri <[email protected]>
> > wrote:
> > > >
> > > > Hi Vaquar,
> > > >
> > > > VK4/VK8: We don't do PID mapping anymore. The KIP was updated some
> > > > time ago with the new approach based on the new PID reset control
> > > > record.
> > > >
> > > > VK11: The transaction index is always built locally during log
> append,
> > > > never copied.
> > > >
> > > > VK1: The 50,000 * 1MB = 50GB calculation misunderstands the fetch
> > > > model. Fetcher threads don't allocate one buffer per partition.
> Actual
> > > > peak memory is roughly num_fetcher_threads * response_max_bytes, not
> > > > num_partitions * partition_max_bytes. With 1 fetcher thread and the
> > > > default response max, the memory footprint is modest regardless of
> > > > partition count. We are leveraging the same proven pattern used by
> the
> > > > internal replication.
> > > >
> > > > VK3: The __mirror_state topic uses hash-based partitioning based on
> > > > mirrorName, topicId and partition number. With the production default
> > > > of 50 partitions, 50,000 partition transitions distribute across ~50
> > > > partition leaders on different brokers, not a single broker. This is
> > > > the same proven pattern as __consumer_offsets, which handles millions
> > > > of commits.
> > > >
> > > > VK12: The scenario described can only occur if offsets are
> > > > force-written to an active group, which the design prevents.
> > > >
> > > > Cheers
> > > > Fede
> >
>

Reply via email to