[
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643477#comment-17643477
]
Justine Olshan commented on KAFKA-12671:
----------------------------------------
KIP-890 should be able to help with this.
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-890:+Transactions+Server-Side+Defense]
> Out of order processing with a transactional producer can lead to a stuck
> LastStableOffset
> ------------------------------------------------------------------------------------------
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
> Reporter: Travis Bischel
> Priority: Major
> Labels: Transactions
>
> If there is pathological processing of incoming produce requests and EndTxn
> requests, then the LastStableOffset can get stuck, which will block consuming
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is
> responsible for fencing and adding partitions to a transaction, and the end
> transaction is responsible for finishing the transaction. Producing itself is
> mostly uninvolved with the proper fencing / ending flow, but produce requests
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager
> to mildly manage transactions. The ProducerStateManager is completely
> independent of the TxnCoordinator, and its guarantees are relatively weak.
> The ProducerStateManager handles two types of "batches" being added: a data
> batch and a transaction marker. When a data batch is added, a "transaction"
> is begun and tied to the producer ID that is producing the batch. When a
> transaction marker is handled, the ProducerStateManager removes the
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the
> ProducerStateManager. In essence, EndTxn is the one part of the transactional
> producer flow that talks across both the TxnCoordinator and the
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka
> after EndTxn, then the ProduceRequest will begin a new transaction in the
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the
> final request issued, the new transaction created in ProducerStateManager is
> orphaned and nothing can clean it up. The LastStableOffset then hangs based
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a
> transactional ID outside of the context of a transaction at all (no
> AddPartitionsToTxn). This problem cannot be triggered by producing for so
> long that the transaction expires; the difference here is that the
> transaction coordinator bumps the epoch for the producer ID, thus producing
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests,
> but in the context of wanting to abort everything and shut down, this is not
> always feasible. As it currently stands, I'm not sure there's a truly safe
> way to shut down _without_ flushing and receiving responses for every record
> produced, even if I want to abort everything and quit. The safest approach I
> can think of is to actually avoid issuing an EndTxn so that instead we rely
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write
> two ProduceRequests, read one, and then issue EndTxn (because I know I want
> to quit). The second ProduceRequest is read successfully before shutdown, but
> I ignore the results because I am shutting down. I've taken out logs related
> to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic:
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
> [INFO] initializing producer id
> [DEBUG] wrote FindCoordinator v3; err: <nil>
> [DEBUG] read FindCoordinator v3; err: <nil>
> [DEBUG] wrote InitProducerID v4; err: <nil>
> [DEBUG] read InitProducerID v4; err: <nil>
> [INFO] producer id initialization success; id: 1463, epoch: 0
> [DEBUG] wrote AddPartitionsToTxn v2; err: <nil>
> [DEBUG] read AddPartitionsToTxn v2; err: <nil>
> [DEBUG] wrote Produce v8; err: <nil>
> [DEBUG] read Produce v8; err: <nil>
> [DEBUG] produced; to:
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
> [DEBUG] wrote Produce v8; err: <nil>
> [DEBUG] wrote EndTxn v2; err: <nil>
> [DEBUG] read EndTxn v2; err: <nil>
> [DEBUG] read from broker errored, killing connection; addr: localhost:9092,
> id: 1, successful_reads: 1, err: context canceled
> [DEBUG] read Produce v8; err: <nil>
> [DEBUG] produced; to:
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{skipped}]
> {noformat}
> And then from the broker's point of view. Across two brokers, the second
> ProduceRequest is completed after EndTxn is handled (and after the
> WriteTxnMarkers request is handled, which is the important one that hooks
> into the ProducerStateManager):
> {noformat}
> /// Broker 3: init producer ID
> [2021-04-15 00:56:40,030] DEBUG Completed
> request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=kgo,
> correlationId=3) --
> {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,transaction_timeout_ms=60000,producer_id=-1,producer_epoch=-1,_tagged_fields={}},response:{throttle_time_ms=0,error_code=0,producer_id=1463,producer_epoch=0,_tagged_fields={}}
> from connection
> 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:2.255,requestQueueTime:0.077,localTime:0.74,remoteTime:0.095,throttleTime:0,responseQueueTime:1.005,sendTime:0.336,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
> softwareVersion=0.1.0) (kafka.request.logger)
> /// Broker 3: add partitions to txn
> [2021-04-15 00:56:40,071] DEBUG Completed
> request:RequestHeader(apiKey=ADD_PARTITIONS_TO_TXN, apiVersion=2,
> clientId=kgo, correlationId=4) --
> {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[1]}]},response:{throttle_time_ms=0,results=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,results=[{partition_index=1,error_code=0}]}]}
> from connection
> 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:1.247,requestQueueTime:0.133,localTime:0.71,remoteTime:0.136,throttleTime:0,responseQueueTime:0.087,sendTime:0.178,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
> softwareVersion=0.1.0) (kafka.request.logger)
> /// Broker 2: first produce
> [2021-04-15 00:56:40,223] DEBUG Completed
> request:RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=kgo,
> correlationId=1) --
> {acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=15589,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0}
> from connection
> 127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:2.705,requestQueueTime:0.055,localTime:2.435,remoteTime:0.058,throttleTime:0,responseQueueTime:0.055,sendTime:0.1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
> softwareVersion=0.1.0),temporaryMemoryBytes:324898 (kafka.request.logger)
> // Broker 3: end txn
> [2021-04-15 00:56:40,350] DEBUG Completed
> request:RequestHeader(apiKey=END_TXN, apiVersion=2, clientId=kgo,
> correlationId=5) --
> {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,committed=false},response:{throttle_time_ms=0,error_code=0}
> from connection
> 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:3.484,requestQueueTime:0.052,localTime:0.318,remoteTime:0.06,throttleTime:0,responseQueueTime:2.92,sendTime:0.133,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
> softwareVersion=0.1.0) (kafka.request.logger)
> /// Broker 2: txn markers
> [2021-04-15 00:56:40,357] DEBUG Completed
> request:RequestHeader(apiKey=WRITE_TXN_MARKERS, apiVersion=0,
> clientId=broker-3-txn-marker-sender, correlationId=66708) --
> {markers=[{producer_id=1463,producer_epoch=0,transaction_result=false,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_indexes=[1]}],coordinator_epoch=0}]},response:{markers=[{producer_id=1463,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[{partition_index=1,error_code=0}]}]}]}
> from connection
> 127.0.0.1:9094-127.0.0.1:38966-676;totalTime:3.507,requestQueueTime:1.957,localTime:0.34,remoteTime:0.031,throttleTime:0,responseQueueTime:0.324,sendTime:0.853,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=unknown,
> softwareVersion=unknown) (kafka.request.logger)
> /// Broker 2: second produce
> [2021-04-15 00:56:40,374] DEBUG Completed
> request:RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=kgo,
> correlationId=2) --
> {acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=19687,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0}
> from connection
> 127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:4.45,requestQueueTime:0.603,localTime:2.721,remoteTime:0.051,throttleTime:0,responseQueueTime:0.043,sendTime:1.031,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
> softwareVersion=0.1.0),temporaryMemoryBytes:356824 (kafka.request.logger)
> {noformat}
> —
> I believe that one fix for this would be to only allow transactions to start
> in the ProducerStateManager if a transaction has actually begun through
> AddPartitionsToTxn, and to reject produce requests to partitions that have
> not been added to a txn. An alternative fix would be to just wait for all
> produce requests to finish before issuing EndTxn, but this seems less
> desirable when wanting to shut down and abort progress. Another alternative
> is to avoid issuing EndTxn and to just shutdown, but this also seems
> undesirable and will block consumers until the transaction timeout expires.
> This may be the cause of KAFKA-5880.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)