sanghyeok An created KAFKA-20029:
------------------------------------
Summary: Disallow partition count increase for __transaction_state.
Key: KAFKA-20029
URL: https://issues.apache.org/jira/browse/KAFKA-20029
Project: Kafka
Issue Type: Improvement
Reporter: sanghyeok An
Assignee: sanghyeok An
The routing logic for the Transaction Coordinator relies heavily on the
partition count of the internal topic {{{}__transaction_state{}}}. The mapping
is determined by
{code:java}
Utils.abs(transactionId.hashCode()) % transactionTopicPartitionCount. {code}
Consequently, changing the number of partitions for {{__transaction_state}}
(e.g., expanding from 50 to 150) changes the mapping logic. While Kafka
documentation advises against modifying internal topics, there is currently no
hard guardrail preventing this via the Admin API.
IMHO, Allowing this operation can lead to a split-brain scenario during a
rolling upgrade or cluster expansion, resulting in orphan transactions, hanging
transactions.
*Scenario & Analysis*
Here is a breakdown of the race condition and inconsistency issues:
# *Expansion:* An admin expands {{__transaction_state}} partitions from 50 to
150. New partitions are created and leaders are elected.
# *Metadata Inconsistency:* While the metadata cache updates, the
{{transactionTopicPartitionCount}} used by {{partitionsFor}} logic might not
update atomically across all nodes.
# *Loading Failure:* A coordinator elected for a new partition might fail to
load the transaction state in {{TransactionCoordinator#onElection}} due to a
mismatch between the cached partition count and the actual partition count,
throwing an exception.
# *Split-Brain View:*
** *New Broker B* (starts up or receives update): Sees 150 partitions.
** {*}Old Broker A{*}: Still operates with the logic based on 50 partitions.
# *Routing Divergence:*
** A client sends {{FindCoordinator}} for Transaction ID "A".
** *Broker B* calculates: {{hash("A") % 150}} -> Partition {*}130{*}.
** *Broker A* calculates: {{hash("A") % 50}} -> Partition {*}27{*}.
# *non-deterministic behavior occur*
** If producer send a FindCoordinator to *Broker B* for Transaction ID "A", it
receives partition {*}130{*}. However, it send a request to {*}Broker A{*}, it
receive partition {*}27{*}.
** It means that producer will send {{InitProducerId}} to the leader of
partition *130* or {*}27{*}. (anyway, the leader nodes of partition 27, 130
will be included in old brokers)
** If TransactionCoordinator receive InitProducerID request for partition 130,
it calls *partitionsFor(...)* to retrieve transaction state. but,
TransactionCoordinator A is old broker and its *transactionTopicPartitionCount*
is {*}50{*}. so result of re-calculating can be anything {*}(0 ~ 49){*}. It
means that
*** Errors.COORDINATOR_LOAD_IN_PROGRESS can occur.
*** Transaction State will be write on unexpected __transaction_state
partition. (for example, expect partition 130, but partition 11 – result of
re-calculating, just assumption.)
**** it cause orphan transaction. for example, when the broker roll out, they
try to restore transaction state. they expect that transaction of "A" will be
in partition 130, but actually it will be in partition 11. So, data topic may
encounter hanging transaction problem.
** In addition to, since the leaders for Partition 130 and Partition 27 are
likely different, we now have two coordinators potentially managing the same
Transaction ID without knowledge of each other.
*Impact*
This non-deterministic behavior causes
* *Orphan/Hanging Transactions:* The producer interacts with a new coordinator
that has no history of the transaction.
* *Availability Issues:* Clients may receive {{COORDINATOR_LOAD_IN_PROGRESS}}
indefinitely if the coordinator fails to load the state due to count mismatch.
* Potential worst-case correctness risk (not asserted as guaranteed):
depending on the timing, the transition could increase the risk of unexpected
fencing/coordination behavior.
To ensure cluster stability and data integrity, how about enforcing a guardrail
against modifying the partition count of __transaction_state?
*1. Enforce validation in Controller/AdminManager*
It would be good to reject {{AlterPartitions}} requests for internal topics by
default.
* Introduce a mechanism to check if the topic is internal during partition
expansion.
* If {{topic.isInternal()}} is true, return an {{InvalidRequestException}}
with a clear error message stating that internal topic partition counts cannot
be changed dynamically.
* Introduce a safety configuration to controller side (e.g.,
{{{}internal.topic.modification.enable{}}}, default {{{}false{}}}) for advanced
users who strictly need to override this behavior, although it is strongly
discouraged.
*2. CLI Warning*
Add a warning message or require a *{{--force}}* flag in the
*{{kafka-topics.sh}}* CLI when attempting to modify internal topics. Note that
this is a soft guardrail and does not prevent programmatic changes via the
Admin API. (For example, the go, rust, python CLI...)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)