[ https://issues.apache.org/jira/browse/KAFKA-14799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Egerton updated KAFKA-14799: ---------------------------------- Fix Version/s: 3.4.1 3.3.3 > Source tasks fail if connector attempts to abort empty transaction > ------------------------------------------------------------------ > > Key: KAFKA-14799 > URL: https://issues.apache.org/jira/browse/KAFKA-14799 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2 > Reporter: Chris Egerton > Assignee: Chris Egerton > Priority: Major > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > If a source task invokes > [TransactionContext::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/TransactionContext.html#abortTransaction()] > while the current transaction is empty, and then returns an empty batch of > records from the next (or current) invocation of {{{}SourceTask::poll{}}}, > the task will fail. > This is because the Connect framework will honor the transaction abort > request by invoking > [KafkaProducer::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction()], > but without having first invoked > [KafkaProducer::beginTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#beginTransaction()] > (since no records had been received from the task), which leads to an > {{{}IllegalStateException{}}}. > An example stack trace for this scenario: > {quote}[2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] > ExactlyOnceWorkerSourceTask\{id=exactlyOnceQuestionMark-0} Task threw an > uncaught and unrecoverable exception. Task is being killed and will not > recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:210) > java.lang.IllegalStateException: TransactionalId > exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid > transition attempted from state READY to state ABORTING_TRANSACTION > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967) > at > org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269) > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266) > at > org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835) > at > org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495) > at > org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473) > at > org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398) > at > org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75) > at > org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > {quote} > > As far as a fix goes, we have a few options: > # Gracefully handle this case by translating the call to > {{TransactionContext::abortTransaction}} into a no-op > # Throw an exception (probably an {{{}IllegalStateException{}}}) from > {{{}TransactionContext::abortTransaction{}}}, which may fail the task, but > would give it the option to swallow the exception and continue processing if > it would like > # Forcibly fail the task without giving it the chance to swallow an > exception, using a similar strategy to how we fail tasks that request that a > transaction be committed and aborted for the same record (see > [here|https://github.com/apache/kafka/blob/c5240c0390892fe9ecbe5285185c370e7be8b2aa/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java#L78-L86]) -- This message was sent by Atlassian Jira (v8.20.10#820010)