Chris Egerton created KAFKA-14799:
-------------------------------------

             Summary: Source tasks fail if connector attempts to abort empty 
transaction
                 Key: KAFKA-14799
                 URL: https://issues.apache.org/jira/browse/KAFKA-14799
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
            Reporter: Chris Egerton
            Assignee: Chris Egerton


If a source task invokes 
[TransactionContext::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/TransactionContext.html#abortTransaction()]
 while the current transaction is empty, and then returns an empty batch of 
records from the next (or current) invocation of {{{}SourceTask::poll{}}}, the 
task will fail.

This is because the Connect framework will honor the transaction abort request 
by invoking 
[KafkaProducer::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction()],
 but without having first invoked 
[KafkaProducer::beginTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#beginTransaction()]
 (since no records had been received from the task), which leads to an 
{{{}IllegalStateException{}}}.

An example stack trace for this scenario:
{quote}[2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] 
ExactlyOnceWorkerSourceTask\{id=exactlyOnceQuestionMark-0} Task threw an 
uncaught and unrecoverable exception. Task is being killed and will not recover 
until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210)
java.lang.IllegalStateException: TransactionalId 
exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid 
transition attempted from state READY to state ABORTING_TRANSACTION
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
    at 
org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186)
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
    at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
{quote}
 

As far as a fix goes, we have a few options:
 # Gracefully handle this case by translating the call to 
{{TransactionContext::abortTransaction}} into a no-op
 # Throw an exception (probably an {{{}IllegalStateException{}}}) from 
{{{}TransactionContext::abortTransaction{}}}, which may fail the task, but 
would give it the option to swallow the exception and continue processing if it 
would like
 # Forcibly fail the task without giving it the chance to swallow an exception, 
using a similar strategy to how we fail tasks that request that a transaction 
be committed and aborted for the same record (see 
[here|https://github.com/apache/kafka/blob/c5240c0390892fe9ecbe5285185c370e7be8b2aa/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java#L78-L86])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to