kim-up opened a new issue, #4706: URL: https://github.com/apache/incubator-seatunnel/issues/4706
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened When I run a `Pulsar Batch Task ` using below config : ``` env { execution.parallelism = 1 job.mode = "BATCH" } source { Pulsar { topic = "test_topic_source" subscription.name = "seatunnel" client.service-url = "pulsar://host:6650" admin.service-url = "http://host:8080" result_table_name = "pulsar_table" format_error_handle_way = skip cursor.startup.mode = "EARLIEST" cursor.stop.mode = "LATEST" schema = { fields { id = bigint c_map = "map<string, smallint>" c_array = "array<tinyint>" c_string = string c_boolean = boolean c_tinyint = tinyint c_smallint = smallint c_int = int c_bigint = bigint c_float = float c_double = double c_decimal = "decimal(2, 1)" c_bytes = bytes c_date = date c_timestamp = timestamp } } } } transform { } sink { Console { source_table_name = "pulsar_table" } } ``` Following error occurred: ``` Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:188) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error. at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:218) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:214) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$startTriggerPendingCheckpoint$7(CheckpointCoordinator.java:411) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException: ErrorCode:[PULSAR-07], ErrorDescription:[Pulsar consumer acknowledgeCumulative failed] - pulsar consumer acknowledgeCumulative failed. at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:245) at java.util.HashMap.forEach(HashMap.java:1290) at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.notifyCheckpointComplete(PulsarSourceReader.java:224) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.notifyCheckpointComplete(SourceFlowLifeCycle.java:230) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:356) at org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:130) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:370) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:370) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:356) at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91) at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:377) at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.committingCursor(PulsarSplitReaderThread.java:126) at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:230) ... 29 more Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:372) ... 31 more Caused by: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:506) at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:543) at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:503) at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:488) ... 32 more at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1298) at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1284) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243) at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234) at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223) at com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680) at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386) at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330) at com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:230) at com.hazelcast.spi.impl.operationservice.Operation.sendResponse(Operation.java:483) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.sendResponseAfterOperationError(OperationRunnerImpl.java:426) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleOperationError(OperationRunnerImpl.java:420) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:253) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException: ErrorCode:[PULSAR-07], ErrorDescription:[Pulsar consumer acknowledgeCumulative failed] - pulsar consumer acknowledgeCumulative failed. at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:245) at java.util.HashMap.forEach(HashMap.java:1290) at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.notifyCheckpointComplete(PulsarSourceReader.java:224) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.notifyCheckpointComplete(SourceFlowLifeCycle.java:230) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:356) at org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:130) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:370) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:370) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:356) at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91) at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:377) at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.committingCursor(PulsarSplitReaderThread.java:126) at org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:230) ... 29 more Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:372) ... 31 more Caused by: org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: Closed at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:506) at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:543) at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:503) at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:488) ... 32 more at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:97) at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ... 5 more at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:123) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181) ... 2 more ``` ### SeaTunnel Version 2.3.1-SNAPSHOT ### SeaTunnel Config ```conf * ``` ### Running Command ```shell * ``` ### Error Exception ```log * ``` ### Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots  ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
