wu-a-ge opened a new issue, #5294: URL: https://github.com/apache/seatunnel/issues/5294
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened If kafka is configured with extract-once semantics, an exception will be thrown when the producer is turned off ### SeaTunnel Version 2.3.2 ### SeaTunnel Config ```conf # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # env { execution.parallelism = 1 job.mode = "BATCH" #spark config spark.app.name = "SeaTunnel" spark.executor.instances = 1 spark.executor.cores = 1 spark.executor.memory = "1g" spark.master = local } source { FakeSource { result_table_name = "fake" row.num = 10 map.size = 10 array.size = 10 bytes.length = 10 string.length = 10 schema = { fields { c_map = "map<string, smallint>" c_array = "array<int>" c_string = string c_boolean = boolean c_tinyint = tinyint c_smallint = smallint c_int = int c_bigint = bigint c_float = float c_double = double c_decimal = "decimal(30, 8)" c_bytes = bytes c_date = date c_timestamp = timestamp } } } } transform { Replace { source_table_name = "fake" result_table_name = "fake1" replace_field = "c_string" pattern = ".+" replacement = "test_extract_topic" is_regex = true replace_first = true } } sink { Kafka { source_table_name = "fake1" bootstrap.servers = "127.0.0.1:9092" topic = "${c_string}" format = json partition_key_fields = ["c_map", "c_string"] semantics = EXACTLY_ONCE kafka.config = { acks = "all" request.timeout.ms = 60000 buffer.memory = 33554432 } } } ``` ### Running Command ```shell no ``` ### Error Exception ```log 023-08-13 22:59:50,762 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(9223372036854775807/1@743121554388287489) notify finished! 2023-08-13 22:59:50,762 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, checkpoint:org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint@6e03df92 2023-08-13 22:59:50,766 DEBUG org.apache.seatunnel.engine.server.checkpoint.CheckpointManager - Sead Operation : CheckpointFinishedOperation to [localhost]:5801 for task group:TaskGroupLocation{jobId=743121554388287489, pipelineId=1, taskGroupId=1} 2023-08-13 22:59:50,769 DEBUG org.apache.seatunnel.engine.server.checkpoint.CheckpointManager - Sead Operation : CheckpointFinishedOperation to [localhost]:5801 for task group:TaskGroupLocation{jobId=743121554388287489, pipelineId=1, taskGroupId=30000} 2023-08-13 22:59:50,769 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-942691] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=743121554388287489, pipelineId=1, taskGroupId=1} 2023-08-13 22:59:50,770 DEBUG org.apache.seatunnel.engine.server.checkpoint.CheckpointManager - Sead Operation : CheckpointFinishedOperation to [localhost]:5801 for task group:TaskGroupLocation{jobId=743121554388287489, pipelineId=1, taskGroupId=30000} 2023-08-13 22:59:50,771 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-942691] [5.1] Task TaskGroupLocation{jobId=743121554388287489, pipelineId=1, taskGroupId=1} complete with state FINISHED 2023-08-13 22:59:50,771 INFO org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-942691] [5.1] Received task end from execution TaskGroupLocation{jobId=743121554388287489, pipelineId=1, taskGroupId=1}, state FINISHED 2023-08-13 22:59:50,771 DEBUG org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkCommitter - Committing transaction SeaTunnel7230-1 2023-08-13 22:59:50,772 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [127.0.0.1:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-SeaTunnel7230-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = true interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 60000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.connect.timeout.ms = null sasl.login.read.timeout.ms = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.login.retry.backoff.max.ms = 10000 sasl.login.retry.backoff.ms = 100 sasl.mechanism = GSSAPI sasl.oauthbearer.clock.skew.seconds = 30 sasl.oauthbearer.expected.audience = null sasl.oauthbearer.expected.issuer = null sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 sasl.oauthbearer.jwks.endpoint.url = null sasl.oauthbearer.scope.claim.name = scope sasl.oauthbearer.sub.claim.name = sub sasl.oauthbearer.token.endpoint.url = null security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = SeaTunnel7230-1 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 2023-08-13 22:59:50,774 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Instantiated a transactional producer. 2023-08-13 22:59:50,777 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job extractTopic_fake_to_kafka.conf (743121554388287489), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=743121554388287489, pipelineId=1, taskGroupId=1}] turn to end state FINISHED. 2023-08-13 22:59:50,777 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job extractTopic_fake_to_kafka.conf (743121554388287489), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=743121554388287489, pipelineId=1, taskGroupId=1}] end with state FINISHED 2023-08-13 22:59:50,782 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.2.0 2023-08-13 22:59:50,782 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 38103ffaa962ef50 2023-08-13 22:59:50,782 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1691938790782 2023-08-13 22:59:50,782 DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Kafka producer started 2023-08-13 22:59:50,782 INFO org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaInternalProducer - Attempting to resume transaction SeaTunnel7230-1 with producerId 2032 and epoch 0 2023-08-13 22:59:50,785 DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Starting Kafka producer I/O thread. 2023-08-13 22:59:50,787 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Transition from state UNINITIALIZED to INITIALIZING 2023-08-13 22:59:50,788 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Transition from state INITIALIZING to READY 2023-08-13 22:59:50,788 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Transition from state READY to IN_TRANSACTION 2023-08-13 22:59:50,788 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION 2023-08-13 22:59:50,788 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Enqueuing transactional request EndTxnRequestData(transactionalId='SeaTunnel7230-1', producerId=2032, producerEpoch=0, committed=true) 2023-08-13 22:59:50,789 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Enqueuing transactional request FindCoordinatorRequestData(key='SeaTunnel7230-1', keyType=1, coordinatorKeys=[]) 2023-08-13 22:59:50,789 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Enqueuing transactional request EndTxnRequestData(transactionalId='SeaTunnel7230-1', producerId=2032, producerEpoch=0, committed=true) 2023-08-13 22:59:50,789 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Initialize connection to node 127.0.0.1:9092 (id: -1 rack: null) for sending metadata request 2023-08-13 22:59:50,789 DEBUG org.apache.kafka.clients.ClientUtils - Resolved host 127.0.0.1 as 127.0.0.1 2023-08-13 22:59:50,789 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Initiating connection to node 127.0.0.1:9092 (id: -1 rack: null) using address /127.0.0.1 2023-08-13 22:59:50,791 DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 2023-08-13 22:59:50,791 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Completed connection to node -1. Fetching API versions. 2023-08-13 22:59:50,791 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Initiating API versions fetch from node -1. 2023-08-13 22:59:50,791 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-SeaTunnel7230-1, correlationId=0) and timeout 60000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.2.0') 2023-08-13 22:59:50,793 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-SeaTunnel7230-1, correlationId=0): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, maxVersion=13), ApiVersion(apiKey=2, minVersion=0, maxVersion=7), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, minVersion=0, maxVersion=6), ApiVersion(apiKey=5, minVersion=0, maxVersion=3), ApiVersion(apiKey=6, minVersion=0, maxVersion=7), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minV ersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=4), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=3), ApiVersion(apiKey=25, minVersion=0, maxVersion=3), ApiVersion(apiKey=26, minVersion=0, maxVersion=3), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=2), ApiVersion(apiKey=30, minVersion=0, maxVersion=2), ApiVersion(apiKey=31, minVersion=0, maxVersion=2), Api Version(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=3), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=2), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=2), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersio n=0, maxVersion=0), ApiVersion(apiKey=56, minVersion=0, maxVersion=1), ApiVersion(apiKey=57, minVersion=0, maxVersion=0), ApiVersion(apiKey=60, minVersion=0, maxVersion=0), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=0), ApiVersion(apiKey=67, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=0, finalizedFeatures=[]) 2023-08-13 22:59:50,793 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Node -1 has finalized features epoch: 0, finalized features: [], supported features: [], API versions: (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 13 [usable: 13], ListOffsets(2): 0 to 7 [usable: 7], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): 0 to 6 [usable: 6], StopReplica(5): 0 to 3 [usable: 3], UpdateMetadata(6): 0 to 7 [usable: 7], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 8], FindCoordinator(10): 0 to 4 [usable: 4], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteT opics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 4], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 3 [usable: 3], AddOffsetsToTxn(25): 0 to 3 [usable: 3], EndTxn(26): 0 to 3 [usable: 3], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 3 [usable: 3], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterCo nfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], AlterPartition(56): 0 to 1 [usable: 1], UpdateFeatures(57): 0 [usable: 0], DescribeCluster(60): 0 [usable: 0], DescribeProducers(61): 0 [usable: 0], DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 [usable: 0], AllocateProducerIds(67): 0 [usable: 0]). 2023-08-13 22:59:50,794 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Sending metadata request MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 127.0.0.1:9092 (id: -1 rack: null) 2023-08-13 22:59:50,794 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=producer-SeaTunnel7230-1, correlationId=1) and timeout 60000 to node -1: MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) 2023-08-13 22:59:50,794 DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Sending transactional request FindCoordinatorRequestData(key='SeaTunnel7230-1', keyType=1, coordinatorKeys=[]) to node 127.0.0.1:9092 (id: -1 rack: null) with correlation ID 2 2023-08-13 22:59:50,794 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=producer-SeaTunnel7230-1, correlationId=2) and timeout 60000 to node -1: FindCoordinatorRequestData(key='', keyType=1, coordinatorKeys=[SeaTunnel7230-1]) 2023-08-13 22:59:50,795 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=producer-SeaTunnel7230-1, correlationId=1): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=0, host='windows10.microdone.', port=9092, rack=null)], clusterId='jI90xKvsTYuS17krF4ubeg', controllerId=0, topics=[], clusterAuthorizedOperations=-2147483648) 2023-08-13 22:59:50,795 INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Cluster ID: jI90xKvsTYuS17krF4ubeg 2023-08-13 22:59:50,795 DEBUG org.apache.kafka.clients.Metadata - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='jI90xKvsTYuS17krF4ubeg', nodes={0=windows10.microdone.:9092 (id: 0 rack: null)}, partitions=[], controller=windows10.microdone.:9092 (id: 0 rack: null)} 2023-08-13 22:59:50,796 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Received FIND_COORDINATOR response from node -1 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=producer-SeaTunnel7230-1, correlationId=2): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='SeaTunnel7230-1', nodeId=0, host='windows10.microdone.', port=9092, errorCode=0, errorMessage='')]) 2023-08-13 22:59:50,797 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Discovered transaction coordinator windows10.microdone.:9092 (id: 0 rack: null) 2023-08-13 22:59:50,797 DEBUG org.apache.kafka.clients.ClientUtils - Resolved host windows10.microdone. as 172.20.0.1,192.168.223.1,172.23.96.1,192.168.64.1,172.17.96.1,192.168.209.1,172.31.208.1,192.168.3.2,172.18.192.1,172.21.32.1,172.22.224.1,172.30.32.1,172.20.48.1,172.19.240.1,172.30.160.1,172.17.32.1,172.31.160.1,172.25.208.1,172.24.192.1,172.23.0.1,172.28.224.1 2023-08-13 22:59:50,797 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Initiating connection to node windows10.microdone.:9092 (id: 0 rack: null) using address windows10.microdone./172.20.0.1 2023-08-13 22:59:50,799 DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 0 2023-08-13 22:59:50,799 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Completed connection to node 0. Fetching API versions. 2023-08-13 22:59:50,799 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Initiating API versions fetch from node 0. 2023-08-13 22:59:50,799 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-SeaTunnel7230-1, correlationId=3) and timeout 60000 to node 0: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.2.0') 2023-08-13 22:59:50,801 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Received API_VERSIONS response from node 0 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-SeaTunnel7230-1, correlationId=3): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, maxVersion=13), ApiVersion(apiKey=2, minVersion=0, maxVersion=7), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, minVersion=0, maxVersion=6), ApiVersion(apiKey=5, minVersion=0, maxVersion=3), ApiVersion(apiKey=6, minVersion=0, maxVersion=7), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minVe rsion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=4), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=3), ApiVersion(apiKey=25, minVersion=0, maxVersion=3), ApiVersion(apiKey=26, minVersion=0, maxVersion=3), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=2), ApiVersion(apiKey=30, minVersion=0, maxVersion=2), ApiVersion(apiKey=31, minVersion=0, maxVersion=2), ApiV ersion(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=3), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=2), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=2), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersion =0, maxVersion=0), ApiVersion(apiKey=56, minVersion=0, maxVersion=1), ApiVersion(apiKey=57, minVersion=0, maxVersion=0), ApiVersion(apiKey=60, minVersion=0, maxVersion=0), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=0), ApiVersion(apiKey=67, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=0, finalizedFeatures=[]) 2023-08-13 22:59:50,802 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Node 0 has finalized features epoch: 0, finalized features: [], supported features: [], API versions: (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 13 [usable: 13], ListOffsets(2): 0 to 7 [usable: 7], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): 0 to 6 [usable: 6], StopReplica(5): 0 to 3 [usable: 3], UpdateMetadata(6): 0 to 7 [usable: 7], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 8], FindCoordinator(10): 0 to 4 [usable: 4], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTo pics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 4], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 3 [usable: 3], AddOffsetsToTxn(25): 0 to 3 [usable: 3], EndTxn(26): 0 to 3 [usable: 3], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 3 [usable: 3], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterCon figs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], AlterPartition(56): 0 to 1 [usable: 1], UpdateFeatures(57): 0 [usable: 0], DescribeCluster(60): 0 [usable: 0], DescribeProducers(61): 0 [usable: 0], DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 [usable: 0], AllocateProducerIds(67): 0 [usable: 0]). 2023-08-13 22:59:50,922 DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Sending transactional request EndTxnRequestData(transactionalId='SeaTunnel7230-1', producerId=2032, producerEpoch=0, committed=true) to node windows10.microdone.:9092 (id: 0 rack: null) with correlation ID 4 2023-08-13 22:59:50,922 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Sending END_TXN request with header RequestHeader(apiKey=END_TXN, apiVersion=3, clientId=producer-SeaTunnel7230-1, correlationId=4) and timeout 60000 to node 0: EndTxnRequestData(transactionalId='SeaTunnel7230-1', producerId=2032, producerEpoch=0, committed=true) 2023-08-13 22:59:50,924 DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Received END_TXN response from node 0 for request with header RequestHeader(apiKey=END_TXN, apiVersion=3, clientId=producer-SeaTunnel7230-1, correlationId=4): EndTxnResponseData(throttleTimeMs=0, errorCode=48) 2023-08-13 22:59:50,924 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Transiting to fatal error state due to org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. 2023-08-13 22:59:50,925 DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-SeaTunnel7230-1, transactionalId=SeaTunnel7230-1] Transition from state COMMITTING_TRANSACTION to error state FATAL_ERROR org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. 2023-08-13 22:59:50,925 ERROR org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation - [localhost]:5801 [seatunnel-942691] [5.1] org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:97) ~[classes/:?] at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[classes/:?] at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81) ~[classes/:?] at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) [hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) [hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) [hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) [hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) [hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) [hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) [hazelcast-5.1.jar:5.1] at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) [hazelcast-5.1.jar:5.1] 2023-08-13 22:59:50,929 ERROR org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - notify checkpoint completed failed java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_281] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_281] at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1300) ~[?:1.8.0_281] at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1284) ~[?:1.8.0_281] at java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1034) ~[?:1.8.0_281] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_281] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_281] at com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:230) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.Operation.sendResponse(Operation.java:483) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.sendResponseAfterOperationError(OperationRunnerImpl.java:426) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleOperationError(OperationRunnerImpl.java:420) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:253) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[hazelcast-5.1.jar:5.1] Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:97) ~[classes/:?] at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[classes/:?] at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81) ~[classes/:?] at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[hazelcast-5.1.jar:5.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[hazelcast-5.1.jar:5.1] ... 5 more ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version 1.8 ### Screenshots _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
