Xuzhengz opened a new issue, #7927:
URL: https://github.com/apache/seatunnel/issues/7927

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   Kafka Source Plugin restart job failed in streaming 
jobMode,ErrorCode:[KAFKA-04], ErrorDescription:[Add a split back to the split 
enumerator failed,it will only happen when a SourceReader failed]
   
   ### SeaTunnel Version
   
   2.3.8
   
   ### SeaTunnel Config
   
   ```conf
   {
     "env": {
       "execution.parallelism": 1,
       "job.mode": "STREAMING",
       "checkpoint.interval": 5000
     },
     "source": [
       {
         "plugin_name": "Kafka",
         "bootstrap.servers": 
"172.16.11.151:9092,172.16.11.152:9092,172.16.11.153:9092",
         "topic": "ocean",
         "schema": {
           "fields": {
             "id": "BIGINT",
             "name": "STRING",
             "age": "STRING",
             "rate": "DOUBLE"
           }
         },
         "start_mode": "latest",
         "format_error_handle_way": "fail",
         "format": "json",
         "result_table_name": "ocean"
       }
     ],
     "sink": [
       {
         "plugin_name": "Console",
         "source_table_name": "ocean"
       }
     ]
   }
   ```
   
   
   ### Running Command
   
   ```shell
   bin/seatunnel.sh -c demo.json
   ```
   
   
   ### Error Exception
   
   ```log
   [903106942201233409] 2024-10-28 10:31:50,130 INFO  
[o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-109] - Job 
实时质检-kafka (903106942201233409), Pipeline: [(2/2)], task: [pipeline-2 
[Source[1]-Kafka]-SplitEnumerator (1/1)] turned from state CREATED to DEPLOYING.
   [] 2024-10-28 10:31:50,134 ERROR [s.t.o.s.RestoredSplitOperation] 
[hz.main.generic-operation.thread-4] - [localhost]:5801 [seatunnel] [5.1] 
ErrorCode:[KAFKA-04], ErrorDescription:[Add a split back to the split 
enumerator failed,it will only happen when a SourceReader failed]
   
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException:
 ErrorCode:[KAFKA-04], ErrorDescription:[Add a split back to the split 
enumerator failed,it will only happen when a SourceReader failed]
           at 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.convertToNextSplit(KafkaSourceSplitEnumerator.java:227)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.addSplitsBack(KafkaSourceSplitEnumerator.java:202)
 ~[?:?]
           at 
org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.addSplitsBack(SourceSplitEnumeratorTask.java:216)
 ~[seatunnel-starter.jar:2.3.8]
           at 
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation.lambda$runInternal$0(RestoredSplitOperation.java:103)
 ~[seatunnel-starter.jar:2.3.8]
           at 
org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
 ~[seatunnel-starter.jar:2.3.8]
           at 
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation.runInternal(RestoredSplitOperation.java:87)
 ~[seatunnel-starter.jar:2.3.8]
           at 
org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:44)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) 
~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
 ~[seatunnel-starter.jar:2.3.8]
   Caused by: java.lang.NullPointerException
           at 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.lambda$convertToNextSplit$4(KafkaSourceSplitEnumerator.java:222)
 ~[?:?]
           at java.util.ArrayList.forEach(ArrayList.java:1257) ~[?:1.8.0_212]
           at 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.convertToNextSplit(KafkaSourceSplitEnumerator.java:219)
 ~[?:?]
           ... 14 more
   [903106942201233409] 2024-10-28 10:31:50,137 WARN  
[.s.e.s.t.f.SourceFlowLifeCycle] [hz.main.seaTunnel.task.thread-192] - source 
request split failed.
   java.util.concurrent.ExecutionException: null
           at 
com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617)
 ~[seatunnel-starter.jar:2.3.8]
           at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.restoreState(SourceFlowLifeCycle.java:375)
 ~[seatunnel-starter.jar:2.3.8]
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:426)
 ~[seatunnel-starter.jar:2.3.8]
           at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) 
~[?:1.8.0_212]
           at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
~[?:1.8.0_212]
           at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) 
~[?:1.8.0_212]
           at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) 
~[?:1.8.0_212]
           at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) 
~[?:1.8.0_212]
           at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) 
~[?:1.8.0_212]
           at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) 
~[?:1.8.0_212]
           at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
 ~[?:1.8.0_212]
           at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
~[?:1.8.0_212]
           at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) 
~[?:1.8.0_212]
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423)
 ~[seatunnel-starter.jar:2.3.8]
           at 
org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$0(NotifyTaskRestoreOperation.java:107)
 ~[seatunnel-starter.jar:2.3.8]
           at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
 ~[?:1.8.0_212]
           at 
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39) 
~[seatunnel-starter.jar:2.3.8]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_212]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_212]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
   [903106942201233409] 2024-10-28 10:31:50,138 ERROR 
[.s.e.s.c.CheckpointCoordinator] [hz.main.generic-operation.thread-6] - report 
error from task
   org.apache.seatunnel.common.utils.SeaTunnelException: 
java.lang.RuntimeException: java.util.concurrent.ExecutionException
           at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.restoreState(SourceFlowLifeCycle.java:378)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:426)
           at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
           at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
           at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
           at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
           at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
           at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
           at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
           at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
           at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
           at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423)
           at 
org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$0(NotifyTaskRestoreOperation.java:107)
           at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
           at 
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.util.concurrent.ExecutionException
           at 
com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121)
           at 
com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100)
           at 
com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617)
           at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.restoreState(SourceFlowLifeCycle.java:375)
           ... 18 more
   
           at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:390)
 ~[seatunnel-starter.jar:2.3.8]
           at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:184)
 ~[seatunnel-starter.jar:2.3.8]
           at 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48)
 ~[seatunnel-starter.jar:2.3.8]
           at 
org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:44)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) 
~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
 ~[seatunnel-starter.jar:2.3.8]
           at 
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
 ~[seatunnel-starter.jar:2.3.8]
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   2.3.8
   
   ### Java or Scala Version
   
   1.8
   
   ### Screenshots
   
   
![image](https://github.com/user-attachments/assets/65c69f77-039d-4fbb-84d4-8271dcc4e34e)
   
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to