Travis Bischel created KAFKA-12671:
--------------------------------------

             Summary: Out of order processing with a transactional producer can 
lead to a stuck LastStableOffset
                 Key: KAFKA-12671
                 URL: https://issues.apache.org/jira/browse/KAFKA-12671
             Project: Kafka
          Issue Type: Bug
            Reporter: Travis Bischel


If there is pathological processing of incoming produce requests and EndTxn 
requests, then the LastStableOffset can get stuck, which will block consuming 
in READ_COMMITTED mode.

To transactionally produce, the standard flow is to InitProducerId, and then 
loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
responsible for fencing and adding partitions to a transaction, and the end 
transaction is responsible for finishing the transaction. Producing itself is 
mostly uninvolved with the proper fencing / ending flow, but produce requests 
are required to be after AddPartitionsToTxn and before EndTxn.

When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
to mildly manage transactions. The ProducerStateManager is completely 
independent of the TxnCoordinator, and its guarantees are relatively weak. The 
ProducerStateManager handles two types of "batches" being added: a data batch 
and a transaction marker. When a data batch is added, a "transaction" is begun 
and tied to the producer ID that is producing the batch. When a transaction 
marker is handled, the ProducerStateManager removes the transaction for the 
producer ID (roughly).

EndTxn is what triggers transaction markers to be sent to the 
ProducerStateManager. In essence, EndTxn is the one part of the transactional 
producer flow that talks across both the TxnCoordinator and the 
ProducerStateManager.

If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
after EndTxn, then the ProduceRequest will begin a new transaction in the 
ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
final request issued, the new transaction created in ProducerStateManager is 
orphaned and nothing can clean it up. The LastStableOffset then hangs based off 
of this hung transaction.

This same problem can be triggered by a produce request that is issued with a 
transactional ID outside of the context of a transaction at all (no 
InitProducerId). This problem cannot be triggered by producing for so long that 
the transaction expires; the difference here is that the transaction 
coordinator bumps the epoch for the producer ID, thus producing again with the 
old epoch does not work.

Theoretically, we are supposed have unlimited retries on produce requests, but 
in the context of wanting to abort everything and shut down, this is not always 
feasible. As it currently stands, I'm not sure there's a truly safe way to shut 
down _without_ flushing and receiving responses for every record produced, even 
if I want to abort everything and quit. The safest approach I can think of is 
to actually avoid issuing an EndTxn so that instead we rely on Kafka internally 
to time things out after a period of time.

---

For some context, here's my request logs from the client. Note that I write two 
ProduceRequests, read one, and then issue EndTxn (because I know I want to 
quit). The second ProduceRequest is read successfully before shutdown, but I 
ignore the results because I am shutting down. I've taken out logs related to 
consuming, but the order of the logs is unchanged:

{noformat}
[INFO] done waiting for unknown topic, metadata was successful; topic: 
2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
[INFO] initializing producer id
[DEBUG] wrote FindCoordinator v3; err: <nil>
[DEBUG] read FindCoordinator v3; err: <nil>
[DEBUG] wrote InitProducerID v4; err: <nil>
[DEBUG] read InitProducerID v4; err: <nil>
[INFO] producer id initialization success; id: 1463, epoch: 0

[DEBUG] wrote AddPartitionsToTxn v2; err: <nil>
[DEBUG] read AddPartitionsToTxn v2; err: <nil>

[DEBUG] read Produce v8; err: <nil>
[DEBUG] produced; to: 
2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
[DEBUG] wrote Produce v8; err: <nil>

[DEBUG] wrote EndTxn v2; err: <nil>
[DEBUG] read EndTxn v2; err: <nil>

[DEBUG] read from broker errored, killing connection; addr: localhost:9092, id: 
1, successful_reads: 1, err: context canceled
[DEBUG] read Produce v8; err: <nil>
[DEBUG] produced; to: 
2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{skipped}]
{noformat}

And then from the broker's point of view. Across two brokers, the second 
ProduceRequest is completed after EndTxn is handled (and after the 
WriteTxnMarkers request is handled, which is the important one that hooks into 
the ProducerStateManager):

{noformat}
/// Broker 3: init producer ID
[2021-04-15 00:56:40,030] DEBUG Completed 
request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=kgo, 
correlationId=3) -- 
{transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,transaction_timeout_ms=60000,producer_id=-1,producer_epoch=-1,_tagged_fields={}},response:{throttle_time_ms=0,error_code=0,producer_id=1463,producer_epoch=0,_tagged_fields={}}
 from connection 
127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:2.255,requestQueueTime:0.077,localTime:0.74,remoteTime:0.095,throttleTime:0,responseQueueTime:1.005,sendTime:0.336,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
 softwareVersion=0.1.0) (kafka.request.logger)


/// Broker 3: add partitions to txn
[2021-04-15 00:56:40,071] DEBUG Completed 
request:RequestHeader(apiKey=ADD_PARTITIONS_TO_TXN, apiVersion=2, clientId=kgo, 
correlationId=4) -- 
{transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[1]}]},response:{throttle_time_ms=0,results=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,results=[{partition_index=1,error_code=0}]}]}
 from connection 
127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:1.247,requestQueueTime:0.133,localTime:0.71,remoteTime:0.136,throttleTime:0,responseQueueTime:0.087,sendTime:0.178,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
 softwareVersion=0.1.0) (kafka.request.logger)


/// Broker 2: first produce
[2021-04-15 00:56:40,223] DEBUG Completed request:RequestHeader(apiKey=PRODUCE, 
apiVersion=8, clientId=kgo, correlationId=1) -- 
{acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=15589,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0}
 from connection 
127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:2.705,requestQueueTime:0.055,localTime:2.435,remoteTime:0.058,throttleTime:0,responseQueueTime:0.055,sendTime:0.1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
 softwareVersion=0.1.0),temporaryMemoryBytes:324898 (kafka.request.logger)


// Broker 3: end txn
[2021-04-15 00:56:40,350] DEBUG Completed request:RequestHeader(apiKey=END_TXN, 
apiVersion=2, clientId=kgo, correlationId=5) -- 
{transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,committed=false},response:{throttle_time_ms=0,error_code=0}
 from connection 
127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:3.484,requestQueueTime:0.052,localTime:0.318,remoteTime:0.06,throttleTime:0,responseQueueTime:2.92,sendTime:0.133,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
 softwareVersion=0.1.0) (kafka.request.logger)


/// Broker 2: txn markers
[2021-04-15 00:56:40,357] DEBUG Completed 
request:RequestHeader(apiKey=WRITE_TXN_MARKERS, apiVersion=0, 
clientId=broker-3-txn-marker-sender, correlationId=66708) -- 
{markers=[{producer_id=1463,producer_epoch=0,transaction_result=false,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_indexes=[1]}],coordinator_epoch=0}]},response:{markers=[{producer_id=1463,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[{partition_index=1,error_code=0}]}]}]}
 from connection 
127.0.0.1:9094-127.0.0.1:38966-676;totalTime:3.507,requestQueueTime:1.957,localTime:0.34,remoteTime:0.031,throttleTime:0,responseQueueTime:0.324,sendTime:0.853,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=unknown,
 softwareVersion=unknown) (kafka.request.logger)


/// Broker 2: second produce
[2021-04-15 00:56:40,374] DEBUG Completed request:RequestHeader(apiKey=PRODUCE, 
apiVersion=8, clientId=kgo, correlationId=2) -- 
{acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=19687,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0}
 from connection 
127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:4.45,requestQueueTime:0.603,localTime:2.721,remoteTime:0.051,throttleTime:0,responseQueueTime:0.043,sendTime:1.031,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
 softwareVersion=0.1.0),temporaryMemoryBytes:356824 (kafka.request.logger)
{noformat}

---

I believe that one fix for this would be to only allow transactions to start in 
the ProducerStateManager if a transaction has actually begun through 
AddPartitionsToTxn, and to reject produce requests to partitions that have not 
been added to a txn. An alternative fix would be to just wait for all produce 
requests to finish before issuing EndTxn, but this seems less desirable when 
wanting to shut down and abort progress. Another alternative is to avoid 
issuing EndTxn and to just shutdown, but this also seems undesirable and will 
block consumers until the transaction timeout expires.

This may be the cause of KAFKA-5880.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to