[ 
https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930027#comment-17930027
 ] 

Hongshun Wang edited comment on FLINK-37356 at 2/25/25 6:22 AM:
----------------------------------------------------------------

What's more, It seems that new KafkaSink with non transaction prefix setting, 
will use kafka-sink, which is easy to cause transaction conflict. if Old 
FlinkKafkaProducer is not set transaction prefix , will try to avoid same 
transaction prefix by( also maybe conflict 
https://issues.apache.org/jira/browse/FLINK-22452)

 

{color:#de350b}I do hope TransactionalIdPrefix be a required option if 
EXACTLY_ONCE!{color}
{code:java}
        String actualTransactionalIdPrefix;
        if (this.transactionalIdPrefix != null) {
            actualTransactionalIdPrefix = this.transactionalIdPrefix;
        } else {
            String taskName = getRuntimeContext().getTaskName();
            // Kafka transactional IDs are limited in length to be less than 
the max value of
            // a short, so we truncate here if necessary to a more reasonable 
length string.
            if (taskName.length() > maxTaskNameSize) {
                taskName = taskName.substring(0, maxTaskNameSize);
                LOG.warn(
                        "Truncated task name for Kafka TransactionalId from {} 
to {}.",
                        getRuntimeContext().getTaskName(),
                        taskName);
            }
            actualTransactionalIdPrefix =
                    taskName
                            + "-"
                            + ((StreamingRuntimeContext) 
getRuntimeContext()).getOperatorUniqueID();
        } {code}


was (Author: JIRAUSER298968):
What's more, It seems that new KafkaSink with non transaction prefix setting, 
will use kafka-sink, which is easy to cause transaction conflict. if Old 
FlinkKafkaProducer is not set transaction prefix , will try to avoid same 
transaction prefix by: 
{code:java}
        String actualTransactionalIdPrefix;
        if (this.transactionalIdPrefix != null) {
            actualTransactionalIdPrefix = this.transactionalIdPrefix;
        } else {
            String taskName = getRuntimeContext().getTaskName();
            // Kafka transactional IDs are limited in length to be less than 
the max value of
            // a short, so we truncate here if necessary to a more reasonable 
length string.
            if (taskName.length() > maxTaskNameSize) {
                taskName = taskName.substring(0, maxTaskNameSize);
                LOG.warn(
                        "Truncated task name for Kafka TransactionalId from {} 
to {}.",
                        getRuntimeContext().getTaskName(),
                        taskName);
            }
            actualTransactionalIdPrefix =
                    taskName
                            + "-"
                            + ((StreamingRuntimeContext) 
getRuntimeContext()).getOperatorUniqueID();
        } {code}

> Recycle use of kafka producer(which commit error) maybe send data without 
> AddPartitionsToTxnRequest
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37356
>                 URL: https://issues.apache.org/jira/browse/FLINK-37356
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: kafka-3.4.0
>            Reporter: Hongshun Wang
>            Priority: Major
>
> In my production environment,  READ_COMMITTED consumer can no longer consume 
> any records. Then I found that the LSO of the partition doesn't change for a 
> long time.  I lookup all the log in Kafka cluster, then find that there is a 
> transaction lacking AddPartitionsToTxnRequest.
>  
> At first, I think the problem is caused by 
> https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster 
> log contains InvalidTxnStateException. However, though the transaction is in 
> an invalid state,  no data is written into Kafka topic partition in this 
> transaction( because in this case, the transaction is empty). It will not 
> influence any Kafka topic partition's LSO, thus consumer won't be blocked.
>  
> Then I check the code of Kafka client, it seems no way to produce data 
> without AddPartitionsToTxnRequest done because the the `Sender` will refuse 
> to dequeue batches from the accumulator until they have been added to the 
> transaction.
> {code:java}
> // org.apache.kafka.clients.producer.KafkaProducer#doSend
> private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback 
> callback) {
> // ..ignore code
> // Add the partition to the transaction (if in progress) after it has been 
> successfully
> // appended to the accumulator. We cannot do it before because the partition 
> may be
> // unknown or the initially selected partition may be changed when the batch 
> is closed
> // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse 
> to dequeue
> // batches from the accumulator until they have been added to the transaction.
> if (transactionManager != null) {
>     transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
> // ignore code
> }{code}
> {code:java}
> //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
>  
> if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 
> there is a rare case that a single batch size is larger than the request size 
> due to // compression; in this case we will still eventually send this batch 
> in a single request break; } else { if 
> (shouldStopDrainBatchesForPartition(first, tp)) break; }
> {code}
>  
> Then I have a idea that if a TransactionManager which doesn't clear 
> partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will 
> be sent again.  It maybe happen in Flink Kafka connector:
>  
>       1. The flink kafka connector also reuse and recycle KafkaProducer: 
> KafkaCommitter will recycle the producer to
> producerPool after the transaction complete or exception,  and then 
> KafkaWriter will reuse it from producerPool.
> {code:java}
> // code placeholder
> org.apache.flink.connector.kafka.sink.KafkaCommitter#commit
> @Override
> public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
>         throws IOException, InterruptedException {
>     for (CommitRequest<KafkaCommittable> request : requests) {
>              
>             producer.commitTransaction();
>             producer.flush();
>             recyclable.ifPresent(Recyclable::close);
>         } catch (RetriableException e) {
>             request.retryLater();
>         } catch (ProducerFencedException e) {
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithKnownReason(e);
>         } catch (InvalidTxnStateException e) {
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithKnownReason(e);
>         } catch (UnknownProducerIdException e) {
>             LOG.error(
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithKnownReason(e);
>         } catch (Exception e) {
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithUnknownReason(e);
>         }
>     }
> } {code}
>      2. If KafkaCommitter meet an exception and doesn't sucess to 
> commitTransaction, the partitionsInTransaction in 
> TransactionManager won't be 
> clear(org.apache.kafka.clients.producer.internals.TransactionManager#completeTransaction).
>  
>    3. If KafkaWriter which reuse same producer and send data to  same 
> partitions in next transaction, the AddPartitionsToTxnRequest won't be send.
>  
> Thus, in FlinkKafkaInternalProducer#setTransactionId, we should clear the 
> partition information of TransactionManager.(now we just set 
> transactionalId and currentState.
> {code:java}
> // code placeholder
> public void setTransactionId(String transactionalId) {
>     if (!transactionalId.equals(this.transactionalId)) {
>         checkState(
>                 !inTransaction,
>                 String.format("Another transaction %s is still open.", 
> transactionalId));
>         LOG.debug("Change transaction id from {} to {}", 
> this.transactionalId, transactionalId);
>         Object transactionManager = getTransactionManager();
>         synchronized (transactionManager) {
>             setField(transactionManager, "transactionalId", transactionalId);
>             setField(
>                     transactionManager,
>                     "currentState",
>                     getTransactionManagerState("UNINITIALIZED"));
>             this.transactionalId = transactionalId;
>         }
>     }
> } {code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to