kim-up opened a new issue, #4706:
URL: https://github.com/apache/incubator-seatunnel/issues/4706

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   When I run a `Pulsar Batch Task ` using below config : 
   ```
   env {
     execution.parallelism = 1
     job.mode = "BATCH"
   }
   
   source {
     Pulsar {
       topic = "test_topic_source"
       subscription.name = "seatunnel"
       client.service-url = "pulsar://host:6650"
       admin.service-url = "http://host:8080";
       result_table_name = "pulsar_table"
       format_error_handle_way = skip
       cursor.startup.mode = "EARLIEST"
       cursor.stop.mode = "LATEST"
       schema = {
         fields {
           id = bigint
           c_map = "map<string, smallint>"
           c_array = "array<tinyint>"
           c_string = string
           c_boolean = boolean
           c_tinyint = tinyint
           c_smallint = smallint
           c_int = int
           c_bigint = bigint
           c_float = float
           c_double = double
           c_decimal = "decimal(2, 1)"
           c_bytes = bytes
           c_date = date
           c_timestamp = timestamp
         }
       }
     }
   }
   
   transform {
   }
   
   sink {
     Console {
       source_table_name = "pulsar_table"
     }
   }
   ```
   Following error occurred:
   ```
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:188)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: 
CheckpointCoordinator inside have error.
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:218)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:214)
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$startTriggerPendingCheckpoint$7(CheckpointCoordinator.java:411)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.util.concurrent.CompletionException: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException:
 ErrorCode:[PULSAR-07], ErrorDescription:[Pulsar consumer acknowledgeCumulative 
failed] - pulsar consumer acknowledgeCumulative failed.
        at 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:245)
        at java.util.HashMap.forEach(HashMap.java:1290)
        at 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.notifyCheckpointComplete(PulsarSourceReader.java:224)
        at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.notifyCheckpointComplete(SourceFlowLifeCycle.java:230)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:356)
        at 
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:130)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:370)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:370)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:356)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
        at 
org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
        at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   Caused by: org.apache.pulsar.client.api.PulsarClientException: 
java.util.concurrent.ExecutionException: 
org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: 
Closed
        at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027)
        at 
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:377)
        at 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.committingCursor(PulsarSplitReaderThread.java:126)
        at 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:230)
        ... 29 more
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: 
Closed
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at 
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:372)
        ... 31 more
   Caused by: org.apache.pulsar.client.api.PulsarClientException: Consumer not 
ready. State: Closed
        at 
org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:506)
        at 
org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:543)
        at 
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:503)
        at 
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:488)
        ... 32 more
   
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at 
java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1298)
        at 
java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1284)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
        at 
com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243)
        at 
com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234)
        at 
com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330)
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:230)
        at 
com.hazelcast.spi.impl.operationservice.Operation.sendResponse(Operation.java:483)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.sendResponseAfterOperationError(OperationRunnerImpl.java:426)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleOperationError(OperationRunnerImpl.java:420)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:253)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
        at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException:
 ErrorCode:[PULSAR-07], ErrorDescription:[Pulsar consumer acknowledgeCumulative 
failed] - pulsar consumer acknowledgeCumulative failed.
        at 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:245)
        at java.util.HashMap.forEach(HashMap.java:1290)
        at 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.notifyCheckpointComplete(PulsarSourceReader.java:224)
        at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.notifyCheckpointComplete(SourceFlowLifeCycle.java:230)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:356)
        at 
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:130)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:370)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:370)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:356)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
        at 
org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
        at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
   Caused by: org.apache.pulsar.client.api.PulsarClientException: 
java.util.concurrent.ExecutionException: 
org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: 
Closed
        at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027)
        at 
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:377)
        at 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.committingCursor(PulsarSplitReaderThread.java:126)
        at 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader.lambda$notifyCheckpointComplete$3(PulsarSourceReader.java:230)
        ... 29 more
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.pulsar.client.api.PulsarClientException: Consumer not ready. State: 
Closed
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at 
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulative(ConsumerBase.java:372)
        ... 31 more
   Caused by: org.apache.pulsar.client.api.PulsarClientException: Consumer not 
ready. State: Closed
        at 
org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:506)
        at 
org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:543)
        at 
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:503)
        at 
org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:488)
        ... 32 more
   
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:97)
        at 
org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        ... 5 more
   
        at 
org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:123)
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)
        ... 2 more
   ```
   
   ### SeaTunnel Version
   
   2.3.1-SNAPSHOT
   
   ### SeaTunnel Config
   
   ```conf
   *
   ```
   
   
   ### Running Command
   
   ```shell
   *
   ```
   
   
   ### Error Exception
   
   ```log
   *
   ```
   
   
   ### Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   
![image](https://user-images.githubusercontent.com/9264192/236398541-a66abae9-b89a-4054-8c12-629650541b82.png)
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to