sanghyeok An created KAFKA-20029:
------------------------------------

             Summary: Disallow partition count increase for __transaction_state.
                 Key: KAFKA-20029
                 URL: https://issues.apache.org/jira/browse/KAFKA-20029
             Project: Kafka
          Issue Type: Improvement
            Reporter: sanghyeok An
            Assignee: sanghyeok An


The routing logic for the Transaction Coordinator relies heavily on the 
partition count of the internal topic {{{}__transaction_state{}}}. The mapping 
is determined by
{code:java}
Utils.abs(transactionId.hashCode()) % transactionTopicPartitionCount. {code}
Consequently, changing the number of partitions for {{__transaction_state}} 
(e.g., expanding from 50 to 150) changes the mapping logic. While Kafka 
documentation advises against modifying internal topics, there is currently no 
hard guardrail preventing this via the Admin API.

IMHO, Allowing this operation can lead to a split-brain scenario during a 
rolling upgrade or cluster expansion, resulting in orphan transactions, hanging 
transactions.

 

*Scenario & Analysis*

Here is a breakdown of the race condition and inconsistency issues:
 # *Expansion:* An admin expands {{__transaction_state}} partitions from 50 to 
150. New partitions are created and leaders are elected.
 # *Metadata Inconsistency:* While the metadata cache updates, the 
{{transactionTopicPartitionCount}} used by {{partitionsFor}} logic might not 
update atomically across all nodes.
 # *Loading Failure:* A coordinator elected for a new partition might fail to 
load the transaction state in {{TransactionCoordinator#onElection}} due to a 
mismatch between the cached partition count and the actual partition count, 
throwing an exception.
 # *Split-Brain View:*
 ** *New Broker B* (starts up or receives update): Sees 150 partitions.
 ** {*}Old Broker A{*}: Still operates with the logic based on 50 partitions.
 # *Routing Divergence:*
 ** A client sends {{FindCoordinator}} for Transaction ID "A".
 ** *Broker B* calculates: {{hash("A") % 150}} -> Partition {*}130{*}.
 ** *Broker A* calculates: {{hash("A") % 50}} -> Partition {*}27{*}.
 # *non-deterministic behavior occur*
 ** If producer send a FindCoordinator to *Broker B* for Transaction ID "A", it 
receives partition {*}130{*}. However, it send a request to {*}Broker A{*}, it 
receive partition {*}27{*}.
 ** It means that producer will send {{InitProducerId}} to the leader of 
partition *130* or {*}27{*}. (anyway, the leader nodes of partition 27, 130 
will be included in old brokers)
 ** If TransactionCoordinator receive InitProducerID request for partition 130, 
it calls *partitionsFor(...)* to retrieve transaction state. but, 
TransactionCoordinator A is old broker and its *transactionTopicPartitionCount* 
is {*}50{*}. so result of re-calculating can be anything {*}(0 ~ 49){*}. It 
means that
 *** Errors.COORDINATOR_LOAD_IN_PROGRESS can occur.
 *** Transaction State will be write on unexpected __transaction_state 
partition. (for example, expect partition 130, but partition 11 – result of 
re-calculating, just assumption.)
 **** it cause orphan transaction. for example, when the broker roll out, they 
try to restore transaction state. they expect that transaction of "A" will be 
in partition 130, but actually it will be in partition 11. So, data topic may 
encounter hanging transaction problem. 
 ** In addition to, since the leaders for Partition 130 and Partition 27 are 
likely different, we now have two coordinators potentially managing the same 
Transaction ID without knowledge of each other.

 

*Impact*

This non-deterministic behavior causes
 * *Orphan/Hanging Transactions:* The producer interacts with a new coordinator 
that has no history of the transaction.
 * *Availability Issues:* Clients may receive {{COORDINATOR_LOAD_IN_PROGRESS}} 
indefinitely if the coordinator fails to load the state due to count mismatch.
 * Potential worst-case correctness risk (not asserted as guaranteed): 
depending on the timing, the transition could increase the risk of unexpected 
fencing/coordination behavior.

 

 

To ensure cluster stability and data integrity, how about enforcing a guardrail 
against modifying the partition count of __transaction_state?

*1. Enforce validation in Controller/AdminManager* 

It would be good to reject {{AlterPartitions}} requests for internal topics by 
default.
 * Introduce a mechanism to check if the topic is internal during partition 
expansion.

 * If {{topic.isInternal()}} is true, return an {{InvalidRequestException}} 
with a clear error message stating that internal topic partition counts cannot 
be changed dynamically.

 * Introduce a safety configuration to controller side (e.g., 
{{{}internal.topic.modification.enable{}}}, default {{{}false{}}}) for advanced 
users who strictly need to override this behavior, although it is strongly 
discouraged.

*2. CLI Warning*

Add a warning message or require a *{{--force}}* flag in the 
*{{kafka-topics.sh}}* CLI when attempting to modify internal topics. Note that 
this is a soft guardrail and does not prevent programmatic changes via the 
Admin API. (For example, the go, rust, python CLI...)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to