Chris Egerton created KAFKA-14799: ------------------------------------- Summary: Source tasks fail if connector attempts to abort empty transaction Key: KAFKA-14799 URL: https://issues.apache.org/jira/browse/KAFKA-14799 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Chris Egerton Assignee: Chris Egerton
If a source task invokes [TransactionContext::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/TransactionContext.html#abortTransaction()] while the current transaction is empty, and then returns an empty batch of records from the next (or current) invocation of {{{}SourceTask::poll{}}}, the task will fail. This is because the Connect framework will honor the transaction abort request by invoking [KafkaProducer::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction()], but without having first invoked [KafkaProducer::beginTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#beginTransaction()] (since no records had been received from the task), which leads to an {{{}IllegalStateException{}}}. An example stack trace for this scenario: {quote}[2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] ExactlyOnceWorkerSourceTask\{id=exactlyOnceQuestionMark-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210) java.lang.IllegalStateException: TransactionalId exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid transition attempted from state READY to state ABORTING_TRANSACTION at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967) at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269) at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116) at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266) at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835) at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495) at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473) at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398) at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) {quote} As far as a fix goes, we have a few options: # Gracefully handle this case by translating the call to {{TransactionContext::abortTransaction}} into a no-op # Throw an exception (probably an {{{}IllegalStateException{}}}) from {{{}TransactionContext::abortTransaction{}}}, which may fail the task, but would give it the option to swallow the exception and continue processing if it would like # Forcibly fail the task without giving it the chance to swallow an exception, using a similar strategy to how we fail tasks that request that a transaction be committed and aborted for the same record (see [here|https://github.com/apache/kafka/blob/c5240c0390892fe9ecbe5285185c370e7be8b2aa/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java#L78-L86]) -- This message was sent by Atlassian Jira (v8.20.10#820010)