k8slight opened a new issue, #4949: URL: https://github.com/apache/seatunnel/issues/4949
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened When "semantics=EXACTLY_ONCE" is configured in kafkaSink, the data cannot be written successfully The exception is as follows. [org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.] ### SeaTunnel Version 2.3.2 ### SeaTunnel Config ```conf kafka { topic = "mysql_user" bootstrap.servers = "node2:9092,node3:9092,node4:9092" format = json kafka.request.timeout.ms = 60000 semantics = EXACTLY_ONCE kafka.config = { acks = "all" request.timeout.ms = 60000 buffer.memory = 33554432 } } ``` ### Running Command ```shell ./bin/seatunnel.sh -c /data/config/zeta-mysql-to-kafka-batch.conf -m local ``` ### Error Exception ```log 2023-06-19 15:53:01,640 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel6884-1, transactionalId=SeaTunnel6884-1] Transiting to fatal error state due to org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. 2023-06-19 15:53:01,641 ERROR org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation - [localhost]:5802 [seatunnel-686191] [5.1] org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:97) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) [seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) [seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) [seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) [seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) [seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) [seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) [seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) [seatunnel-starter.jar:2.3.3-SNAPSHOT] 2023-06-19 15:53:01,664 ERROR org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - complete checkpoint failed java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_361] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_361] at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1300) ~[?:1.8.0_361] at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1284) ~[?:1.8.0_361] at java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1034) ~[?:1.8.0_361] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_361] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_361] at com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:230) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.Operation.sendResponse(Operation.java:483) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.sendResponseAfterOperationError(OperationRunnerImpl.java:426) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleOperationError(OperationRunnerImpl.java:420) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:253) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:97) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT] ... 5 more ``` ### Flink or Spark Version _No response_ ### Java or Scala Version java version "1.8.0_361" ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
