[ 
https://issues.apache.org/jira/browse/KAFKA-14799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14799:
----------------------------------
    Affects Version/s: 3.3.2
                       3.3.1
                       3.4.0
                       3.3.0

> Source tasks fail if connector attempts to abort empty transaction
> ------------------------------------------------------------------
>
>                 Key: KAFKA-14799
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14799
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>
> If a source task invokes 
> [TransactionContext::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/TransactionContext.html#abortTransaction()]
>  while the current transaction is empty, and then returns an empty batch of 
> records from the next (or current) invocation of {{{}SourceTask::poll{}}}, 
> the task will fail.
> This is because the Connect framework will honor the transaction abort 
> request by invoking 
> [KafkaProducer::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction()],
>  but without having first invoked 
> [KafkaProducer::beginTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#beginTransaction()]
>  (since no records had been received from the task), which leads to an 
> {{{}IllegalStateException{}}}.
> An example stack trace for this scenario:
> {quote}[2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] 
> ExactlyOnceWorkerSourceTask\{id=exactlyOnceQuestionMark-0} Task threw an 
> uncaught and unrecoverable exception. Task is being killed and will not 
> recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:210)
> java.lang.IllegalStateException: TransactionalId 
> exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid 
> transition attempted from state READY to state ABORTING_TRANSACTION
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967)
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269)
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116)
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
>     at 
> org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495)
>     at 
> org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473)
>     at 
> org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398)
>     at 
> org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186)
>     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
>     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
>     at 
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> {quote}
>  
> As far as a fix goes, we have a few options:
>  # Gracefully handle this case by translating the call to 
> {{TransactionContext::abortTransaction}} into a no-op
>  # Throw an exception (probably an {{{}IllegalStateException{}}}) from 
> {{{}TransactionContext::abortTransaction{}}}, which may fail the task, but 
> would give it the option to swallow the exception and continue processing if 
> it would like
>  # Forcibly fail the task without giving it the chance to swallow an 
> exception, using a similar strategy to how we fail tasks that request that a 
> transaction be committed and aborted for the same record (see 
> [here|https://github.com/apache/kafka/blob/c5240c0390892fe9ecbe5285185c370e7be8b2aa/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java#L78-L86])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to