[
https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-37356:
-----------------------------------
Labels: pull-request-available (was: )
> Recycle use of kafka producer(which commit error) maybe send data without
> AddPartitionsToTxnRequest
> ---------------------------------------------------------------------------------------------------
>
> Key: FLINK-37356
> URL: https://issues.apache.org/jira/browse/FLINK-37356
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: kafka-3.4.0
> Reporter: Hongshun Wang
> Assignee: Arvid Heise
> Priority: Major
> Labels: pull-request-available
>
> In my production environment, READ_COMMITTED consumer can no longer consume
> any records. Then I found that the LSO of the partition doesn't change for a
> long time. I lookup all the log in Kafka cluster, then find that there is a
> transaction lacking AddPartitionsToTxnRequest.
>
> At first, I think the problem is caused by
> https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster
> log contains InvalidTxnStateException. However, though the transaction is in
> an invalid state, no data is written into Kafka topic partition in this
> transaction( because in this case, the transaction is empty). It will not
> influence any Kafka topic partition's LSO, thus consumer won't be blocked.
>
> Then I check the code of Kafka client, it seems no way to produce data
> without AddPartitionsToTxnRequest done because the the `Sender` will refuse
> to dequeue batches from the accumulator until they have been added to the
> transaction.
> {code:java}
> // org.apache.kafka.clients.producer.KafkaProducer#doSend
> private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback
> callback) {
> // ..ignore code
> // Add the partition to the transaction (if in progress) after it has been
> successfully
> // appended to the accumulator. We cannot do it before because the partition
> may be
> // unknown or the initially selected partition may be changed when the batch
> is closed
> // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse
> to dequeue
> // batches from the accumulator until they have been added to the transaction.
> if (transactionManager != null) {
> transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
> // ignore code
> }{code}
> {code:java}
> //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
>
> if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { //
> there is a rare case that a single batch size is larger than the request size
> due to // compression; in this case we will still eventually send this batch
> in a single request break; } else { if
> (shouldStopDrainBatchesForPartition(first, tp)) break; }
> {code}
>
> Then I have a idea that if a TransactionManager which doesn't clear
> partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will
> be sent again. It maybe happen in Flink Kafka connector:
>
> 1. The flink kafka connector also reuse and recycle KafkaProducer:
> KafkaCommitter will recycle the producer to
> producerPool after the transaction complete or exception, and then
> KafkaWriter will reuse it from producerPool.
> {code:java}
> // code placeholder
> org.apache.flink.connector.kafka.sink.KafkaCommitter#commit
> @Override
> public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
> throws IOException, InterruptedException {
> for (CommitRequest<KafkaCommittable> request : requests) {
>
> producer.commitTransaction();
> producer.flush();
> recyclable.ifPresent(Recyclable::close);
> } catch (RetriableException e) {
> request.retryLater();
> } catch (ProducerFencedException e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (InvalidTxnStateException e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (UnknownProducerIdException e) {
> LOG.error(
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithKnownReason(e);
> } catch (Exception e) {
> recyclable.ifPresent(Recyclable::close);
> request.signalFailedWithUnknownReason(e);
> }
> }
> } {code}
> 2. If KafkaCommitter meet an exception and doesn't sucess to
> commitTransaction, the partitionsInTransaction in
> TransactionManager won't be
> clear(org.apache.kafka.clients.producer.internals.TransactionManager#completeTransaction).
>
> 3. If KafkaWriter which reuse same producer and send data to same
> partitions in next transaction, the AddPartitionsToTxnRequest won't be send.
>
> Thus, in FlinkKafkaInternalProducer#setTransactionId, we should clear the
> partition information of TransactionManager.(now we just set
> transactionalId and currentState.
> {code:java}
> // code placeholder
> public void setTransactionId(String transactionalId) {
> if (!transactionalId.equals(this.transactionalId)) {
> checkState(
> !inTransaction,
> String.format("Another transaction %s is still open.",
> transactionalId));
> LOG.debug("Change transaction id from {} to {}",
> this.transactionalId, transactionalId);
> Object transactionManager = getTransactionManager();
> synchronized (transactionManager) {
> setField(transactionManager, "transactionalId", transactionalId);
> setField(
> transactionManager,
> "currentState",
> getTransactionManagerState("UNINITIALIZED"));
> this.transactionalId = transactionalId;
> }
> }
> } {code}
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)