sxiongzhang opened a new issue, #6291:
URL: https://github.com/apache/seatunnel/issues/6291

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   kafka to kafka real-time task, consuming the first message is normal, the 
second message starts to report errors, the real-time synchronization task 
automatically ends, kafka version 3.2.0
   
   ### SeaTunnel Version
   
   2.3.3
   
   ### SeaTunnel Config
   
   ```conf
   {
       "env" : {
           "job.mode" : "STREAMING"
       },
       "source" : [
           {
               "consumer.group" : "SeaTunnel-Consumer-Group1",
               "commit_on_checkpoint" : "false",
               "schema" : {
                   "fields" : {
                       "age" : "int",
                       "name" : "string",
                       "id" : "int"
                   }
               },
               "parallelism" : 1,
               "start_mode" : "latest",
               "format" : "JSON",
               "topic" : "test_gojson",
               "bootstrap.servers" : "10.91.125.24:9092",
               "result_table_name" : "Table12227790020321",
               "plugin_name" : "Kafka",
               "partition-discovery.interval-millis" : -1,
               "debezium_record_include_schema" : "true"
           }
       ],
       "transform" : [],
       "sink" : [
           {
               "format" : "JSON",
               "topic" : "test",
               "bootstrap.servers" : "10.91.125.24:9092",
               "source_table_name" : "Table12227790020321",
               "semantics" : "EXACTLY_ONCE",
               "plugin_name" : "Kafka"
           }
       ]
   }
   ```
   
   
   ### Running Command
   
   ```shell
   rest API 
提交任务接口执行:http://xxx:5801/hazelcast/rest/maps/submit-job?jobId=797744266423042050&jobName=rest_api_test3&isStartWithSavePoint=false
   ```
   
   
   ### Error Exception
   
   ```log
   2024-01-25 16:36:59,688 INFO  org.apache.kafka.clients.NetworkClient - 
[AdminClient clientId=seatunnel-enumerator-admin-client-1745698351] Node 0 
disconnected.
   2024-01-25 16:37:02,109 INFO  
org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer 
clientId=seatunnel-consumer-704390609, groupId=SeaTunnel-Consumer-Group] 
Subscribed to partition(s): test_gojson-0
   2024-01-25 16:37:02,109 INFO  
org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer 
clientId=seatunnel-consumer-704390609, groupId=SeaTunnel-Consumer-Group] 
Seeking to offset 2 for partition test_gojson-0
   2024-01-25 16:37:06,043 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait 
checkpoint completed: 3
   2024-01-25 16:37:06,060 INFO  
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-SeaTunnel6691-3, transactionalId=SeaTunnel6691-3] Closing the 
Kafka producer with timeoutMillis = 9223372036854775807 ms.
   2024-01-25 16:37:06,061 INFO  
org.apache.kafka.clients.producer.internals.Sender - [Producer 
clientId=producer-SeaTunnel6691-3, transactionalId=SeaTunnel6691-3] Aborting 
incomplete transaction due to shutdown
   2024-01-25 16:37:06,062 INFO  
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-SeaTunnel6691-3, transactionalId=SeaTunnel6691-3] Invoking 
InitProducerId with current producer ID and epoch 
ProducerIdAndEpoch(producerId=0, epoch=1) in order to bump the epoch
   2024-01-25 16:37:06,069 INFO  
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-SeaTunnel6691-3, transactionalId=SeaTunnel6691-3] Transiting 
to fatal error state due to 
org.apache.kafka.common.errors.ProducerFencedException: There is a newer 
producer with the same transactionalId which fences the current one.
   2024-01-25 16:37:06,074 INFO  org.apache.kafka.common.metrics.Metrics - 
Metrics scheduler closed
   2024-01-25 16:37:06,074 INFO  org.apache.kafka.common.metrics.Metrics - 
Closing reporter org.apache.kafka.common.metrics.JmxReporter
   2024-01-25 16:37:06,074 INFO  org.apache.kafka.common.metrics.Metrics - 
Metrics reporters closed
   2024-01-25 16:37:06,074 INFO  org.apache.kafka.common.utils.AppInfoParser - 
App info kafka.producer for producer-SeaTunnel6691-3 unregistered
   2024-01-25 16:37:06,076 INFO  
org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
           acks = -1
           batch.size = 16384
           bootstrap.servers = [10.91.125.24:9092]
           buffer.memory = 33554432
           client.dns.lookup = use_all_dns_ips
           client.id = producer-SeaTunnel6691-4
           compression.type = none
           connections.max.idle.ms = 540000
           delivery.timeout.ms = 120000
           enable.idempotence = true
           interceptor.classes = []
           key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
           linger.ms = 0
           max.block.ms = 60000
           max.in.flight.requests.per.connection = 5
           max.request.size = 1048576
           metadata.max.age.ms = 300000
           metadata.max.idle.ms = 300000
           metric.reporters = []
           metrics.num.samples = 2
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
           receive.buffer.bytes = 32768
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 30000
           retries = 2147483647
           retry.backoff.ms = 100
           sasl.client.callback.handler.class = null
           sasl.jaas.config = null
           sasl.kerberos.kinit.cmd = /usr/bin/kinit
           sasl.kerberos.min.time.before.relogin = 60000
           sasl.kerberos.service.name = null
           sasl.kerberos.ticket.renew.jitter = 0.05
           sasl.kerberos.ticket.renew.window.factor = 0.8
           sasl.login.callback.handler.class = null
           sasl.login.class = null
           sasl.login.connect.timeout.ms = null
           sasl.login.read.timeout.ms = null
           sasl.login.refresh.buffer.seconds = 300
           sasl.login.refresh.min.period.seconds = 60
           sasl.login.refresh.window.factor = 0.8
           sasl.login.refresh.window.jitter = 0.05
           sasl.login.retry.backoff.max.ms = 10000
           sasl.login.retry.backoff.ms = 100
           sasl.mechanism = GSSAPI
           sasl.oauthbearer.clock.skew.seconds = 30
           sasl.oauthbearer.expected.audience = null
           sasl.oauthbearer.expected.issuer = null
           sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
           sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
           sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
           sasl.oauthbearer.jwks.endpoint.url = null
           sasl.oauthbearer.scope.claim.name = scope
           sasl.oauthbearer.sub.claim.name = sub
           sasl.oauthbearer.token.endpoint.url = null
           security.protocol = PLAINTEXT
           security.providers = null
           send.buffer.bytes = 131072
           socket.connection.setup.timeout.max.ms = 30000
           socket.connection.setup.timeout.ms = 10000
           ssl.cipher.suites = null
           ssl.enabled.protocols = [TLSv1.2]
           ssl.endpoint.identification.algorithm = https
           ssl.engine.factory.class = null
           ssl.key.password = null
           ssl.keymanager.algorithm = SunX509
           ssl.keystore.certificate.chain = null
           ssl.keystore.key = null
           ssl.keystore.location = null
           ssl.keystore.password = null
           ssl.keystore.type = JKS
           ssl.protocol = TLSv1.2
           ssl.provider = null
           ssl.secure.random.implementation = null
           ssl.trustmanager.algorithm = PKIX
           ssl.truststore.certificates = null
           ssl.truststore.location = null
           ssl.truststore.password = null
           ssl.truststore.type = JKS
           transaction.timeout.ms = 60000
           transactional.id = SeaTunnel6691-4
           value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
   
   2024-01-25 16:37:06,077 INFO  
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-SeaTunnel6691-4, transactionalId=SeaTunnel6691-4] 
Instantiated a transactional producer.
   2024-01-25 16:37:06,080 INFO  org.apache.kafka.common.utils.AppInfoParser - 
Kafka version: 3.2.0
   2024-01-25 16:37:06,080 INFO  org.apache.kafka.common.utils.AppInfoParser - 
Kafka commitId: 38103ffaa962ef50
   2024-01-25 16:37:06,080 INFO  org.apache.kafka.common.utils.AppInfoParser - 
Kafka startTimeMs: 1706171826080
   2024-01-25 16:37:06,084 INFO  
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-SeaTunnel6691-4, transactionalId=SeaTunnel6691-4] Invoking 
InitProducerId for the first time in order to acquire a producer ID
   2024-01-25 16:37:06,090 INFO  org.apache.kafka.clients.Metadata - [Producer 
clientId=producer-SeaTunnel6691-4, transactionalId=SeaTunnel6691-4] Cluster ID: 
FyLi3MaFQ2eMau11fwnbKQ
   2024-01-25 16:37:06,092 INFO  
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-SeaTunnel6691-4, transactionalId=SeaTunnel6691-4] Discovered 
transaction coordinator 10.91.125.24:9092 (id: 0 rack: null)
   2024-01-25 16:37:06,200 INFO  
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-SeaTunnel6691-4, transactionalId=SeaTunnel6691-4] ProducerId 
set to 1 with epoch 0
   2024-01-25 16:37:06,229 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending 
checkpoint(3/1@797744266423042049) notify finished!
   2024-01-25 16:37:06,229 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start 
notify checkpoint completed, 
checkpoint:org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint@df470a
   2024-01-25 16:37:06,233 INFO  
org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
           acks = -1
           batch.size = 16384
           bootstrap.servers = [10.91.125.24:9092]
           buffer.memory = 33554432
           client.dns.lookup = use_all_dns_ips
           client.id = producer-SeaTunnel6691-3
           compression.type = none
           connections.max.idle.ms = 540000
           delivery.timeout.ms = 120000
           enable.idempotence = true
           interceptor.classes = []
           key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
           linger.ms = 0
           max.block.ms = 60000
           max.in.flight.requests.per.connection = 5
           max.request.size = 1048576
           metadata.max.age.ms = 300000
           metadata.max.idle.ms = 300000
           metric.reporters = []
           metrics.num.samples = 2
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
           receive.buffer.bytes = 32768
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 30000
           retries = 2147483647
           retry.backoff.ms = 100
           sasl.client.callback.handler.class = null
           sasl.jaas.config = null
           sasl.kerberos.kinit.cmd = /usr/bin/kinit
           sasl.kerberos.min.time.before.relogin = 60000
           sasl.kerberos.service.name = null
           sasl.kerberos.ticket.renew.jitter = 0.05
           sasl.kerberos.ticket.renew.window.factor = 0.8
           sasl.login.callback.handler.class = null
           sasl.login.class = null
           sasl.login.connect.timeout.ms = null
           sasl.login.read.timeout.ms = null
           sasl.login.refresh.buffer.seconds = 300
           sasl.login.refresh.min.period.seconds = 60
           sasl.login.refresh.window.factor = 0.8
           sasl.login.refresh.window.jitter = 0.05
           sasl.login.retry.backoff.max.ms = 10000
           sasl.login.retry.backoff.ms = 100
           sasl.mechanism = GSSAPI
           sasl.oauthbearer.clock.skew.seconds = 30
           sasl.oauthbearer.expected.audience = null
           sasl.oauthbearer.expected.issuer = null
           sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
           sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
           sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
           sasl.oauthbearer.jwks.endpoint.url = null
           sasl.oauthbearer.scope.claim.name = scope
           sasl.oauthbearer.sub.claim.name = sub
           sasl.oauthbearer.token.endpoint.url = null
           security.protocol = PLAINTEXT
           security.providers = null
           send.buffer.bytes = 131072
           socket.connection.setup.timeout.max.ms = 30000
           socket.connection.setup.timeout.ms = 10000
           ssl.cipher.suites = null
           ssl.enabled.protocols = [TLSv1.2]
           ssl.endpoint.identification.algorithm = https
           ssl.engine.factory.class = null
           ssl.key.password = null
           ssl.keymanager.algorithm = SunX509
           ssl.keystore.certificate.chain = null
           ssl.keystore.key = null
           ssl.keystore.location = null
           ssl.keystore.password = null
           ssl.keystore.type = JKS
           ssl.protocol = TLSv1.2
           ssl.provider = null
           ssl.secure.random.implementation = null
           ssl.trustmanager.algorithm = PKIX
           ssl.truststore.certificates = null
           ssl.truststore.location = null
           ssl.truststore.password = null
           ssl.truststore.type = JKS
           transaction.timeout.ms = 60000
           transactional.id = SeaTunnel6691-3
           value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
   
   2024-01-25 16:37:06,234 INFO  
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=producer-SeaTunnel6691-3, transactionalId=SeaTunnel6691-3] 
Instantiated a transactional producer.
   2024-01-25 16:37:06,238 INFO  org.apache.kafka.common.utils.AppInfoParser - 
Kafka version: 3.2.0
   2024-01-25 16:37:06,238 INFO  org.apache.kafka.common.utils.AppInfoParser - 
Kafka commitId: 38103ffaa962ef50
   2024-01-25 16:37:06,238 INFO  org.apache.kafka.common.utils.AppInfoParser - 
Kafka startTimeMs: 1706171826238
   2024-01-25 16:37:06,239 INFO  
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaInternalProducer - 
Attempting to resume transaction SeaTunnel6691-3 with producerId 0 and epoch 1
   2024-01-25 16:37:06,245 INFO  org.apache.kafka.clients.Metadata - [Producer 
clientId=producer-SeaTunnel6691-3, transactionalId=SeaTunnel6691-3] Cluster ID: 
FyLi3MaFQ2eMau11fwnbKQ
   2024-01-25 16:37:06,246 INFO  
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-SeaTunnel6691-3, transactionalId=SeaTunnel6691-3] Discovered 
transaction coordinator 10.91.125.24:9092 (id: 0 rack: null)
   2024-01-25 16:37:06,351 INFO  
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-SeaTunnel6691-3, transactionalId=SeaTunnel6691-3] Transiting 
to fatal error state due to 
org.apache.kafka.common.errors.ProducerFencedException: There is a newer 
producer with the same transactionalId which fences the current one.
   2024-01-25 16:37:06,351 ERROR 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation
 - [localhost]:5801 [seatunnel] [5.1] 
org.apache.kafka.common.errors.ProducerFencedException: There is a newer 
producer with the same transactionalId which fences the current one.
   
   org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.kafka.common.errors.ProducerFencedException: There is a newer 
producer with the same transactionalId which fences the current one.
   
           at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:97)
 ~[seatunnel-starter.jar:2.3.3]
           at 
org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
 ~[seatunnel-starter.jar:2.3.3]
           at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) 
~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
 ~[seatunnel-starter.jar:2.3.3]
   2024-01-25 16:37:06,355 ERROR 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - notify 
checkpoint completed failed
   java.util.concurrent.CompletionException: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.kafka.common.errors.ProducerFencedException: There is a newer 
producer with the same transactionalId which fences the current one.
   
           at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_181]
           at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_181]
           at 
java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1286) 
~[?:1.8.0_181]
           at 
java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1270)
 ~[?:1.8.0_181]
           at 
java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1020)
 ~[?:1.8.0_181]
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_181]
           at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_181]
           at 
com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:230)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.Operation.sendResponse(Operation.java:483)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.sendResponseAfterOperationError(OperationRunnerImpl.java:426)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleOperationError(OperationRunnerImpl.java:420)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:253)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
 ~[seatunnel-starter.jar:2.3.3]
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.kafka.common.errors.ProducerFencedException: There is a newer 
producer with the same transactionalId which fences the current one.
   
           at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:97)
 ~[seatunnel-starter.jar:2.3.3]
           at 
org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
 ~[seatunnel-starter.jar:2.3.3]
           at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) 
~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
 ~[seatunnel-starter.jar:2.3.3]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
 ~[seatunnel-starter.jar:2.3.3]
           ... 5 more
   2024-01-25 16:37:06,356 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start 
clean pending checkpoint cause Checkpoint notify complete failed
   2024-01-25 16:37:06,359 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - Turn 
checkpoint_state_797744266423042049_1 state from RUNNING to FAILED
   2024-01-25 16:37:06,361 WARN  
org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job rest_api_test2 
(797744266423042049), Pipeline: [(1/1)] checkpoint have error, cancel the 
pipeline
   2024-01-25 16:37:06,361 WARN  
org.apache.seatunnel.engine.server.dag.physical.SubPlan - start cancel job Job 
rest_api_test2 (797744266423042049), Pipeline: [(1/1)] count = 0
   2024-01-25 16:37:06,363 INFO  
org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job rest_api_test2 
(797744266423042049), Pipeline: [(1/1)] turn from state RUNNING to CANCELING.
   2024-01-25 16:37:06,366 INFO  
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Try to update 
the task Job rest_api_test2 (797744266423042049), Pipeline: [(1/1)], task: 
[pipeline-1 [Source[0]-Kafka-Table12227790020320]-SplitEnumerator (1/1)] state 
from CREATED to CANCELED
   2024-01-25 16:37:06,366 WARN  
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job 
rest_api_test2 (797744266423042049), Pipeline: [(1/1)], task: [pipeline-1 
[Source[0]-Kafka-Table12227790020320]-SplitEnumerator (1/1)] state in Imap is 
RUNNING, not equals expected state CREATED
   2024-01-25 16:37:06,366 INFO  
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Try to update 
the task Job rest_api_test2 (797744266423042049), Pipeline: [(1/1)], task: 
[pipeline-1 [Source[0]-Kafka-Table12227790020320]-SplitEnumerator (1/1)] state 
from SCHEDULED to CANCELED
   2024-01-25 16:37:06,366 WARN  
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job 
rest_api_test2 (797744266423042049), Pipeline: [(1/1)], task: [pipeline-1 
[Source[0]-Kafka-Table12227790020320]-SplitEnumerator (1/1)] state in Imap is 
RUNNING, not equals expected state SCHEDULED
   2024-01-25 16:37:06,366 INFO  
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Try to update 
the task Job rest_api_test2 (797744266423042049), Pipeline: [(1/1)], task: 
[pipeline-1 [Source[0]-Kafka-Table12227790020320]-SplitEnumerator (1/1)] state 
from DEPLOYING to CANCELED
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   zeta
   
   ### Java or Scala Version
   
   java8
   
   ### Screenshots
   
   <img width="1094" alt="e0d9a6b0845a9244825f602ee0f8082" 
src="https://github.com/apache/seatunnel/assets/33976943/f41609f6-82ed-475f-9887-3573b4b31e45";>
   <img width="1110" alt="e3aadde0e9a252fac90e62167be1bff" 
src="https://github.com/apache/seatunnel/assets/33976943/9f24f6ba-adf9-4191-a098-893a883b5c23";>
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to