Xuzhengz opened a new issue, #7839:
URL: https://github.com/apache/seatunnel/issues/7839

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   kafka source commit offset fail and can not consume data
   
   ### SeaTunnel Version
   
   2.3.8-release
   
   ### SeaTunnel Config
   
   ```conf
   {
     "env": {
       "parallelism": 1,
       "job.mode": "STREAMING",
       "checkpoint.interval": "5000",
       "job.retry.times": 1
     },
     "source": [
       {
         "plugin_name": "Kafka",
         "parallelism": 1,
         "bootstrap.servers": 
"172.16.8.200:19092,172.16.8.201:19092,172.16.8.202:19092",
         "topic": "ocean",
         "schema": {
           "fields": {
             "name": "string",
             "id": "int"
           }
         },
         "start_mode": "latest",
         "format_error_handle_way": "skip",
         "format": "json",
         "result_table_name": "jsonPath"
       }
     ],
     "transform": [
       {
         "plugin_name": "JsonPath",
         "columns": [
           {
             "src_field": "name",
             "path": "$.name1",
             "dest_field": "name1"
           },
           {
             "src_field": "name",
             "path": "$.name2",
             "dest_field": "name2"
           },
           {
             "src_field": "name",
             "path": "$.name3",
             "dest_field": "name3",
             "dest_type": "double"
           },
           {
             "src_field": "name",
             "path": "$.name4",
             "dest_field": "name4",
             "dest_type": "boolean"
           },
           {
             "src_field": "name",
             "path": "$.name5.name55",
             "dest_field": "name55"
           }
         ],
         "result_table_name": "console",
         "source_table_name": [
           "jsonPath"
         ]
       }
     ],
     "sink": [
       {
         "source_table_name": "console",
         "plugin_name": "Console"
       }
     ]
   }
   ```
   
   
   ### Running Command
   
   ```shell
   bin/seatunnel.sh -c demo.json
   ```
   
   
   ### Error Exception
   
   ```log
   [898453163625938945] 2024-10-15 14:12:33,886 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait 
checkpoint completed: 5
   [898453163625938945] 2024-10-15 14:12:33,897 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending 
checkpoint(5/1@898453163625938945) notify finished!
   [898453163625938945] 2024-10-15 14:12:33,897 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start 
notify checkpoint completed, job id: 898453163625938945, pipeline id: 1, 
checkpoint id:5
   [898453163625938945] 2024-10-15 14:12:38,898 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait 
checkpoint completed: 6
   [898453163625938945] 2024-10-15 14:12:38,942 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending 
checkpoint(6/1@898453163625938945) notify finished!
   [898453163625938945] 2024-10-15 14:12:38,942 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start 
notify checkpoint completed, job id: 898453163625938945, pipeline id: 1, 
checkpoint id:6
   [] 2024-10-15 14:12:38,951 INFO  org.apache.kafka.clients.NetworkClient - 
[Consumer clientId=seatunnel-consumer-0, groupId=SeaTunnel-Consumer-Group] 
Disconnecting from node 2147483644 due to socket connection setup timeout. The 
timeout value is 10756 ms.
   [] 2024-10-15 14:12:38,956 INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
clientId=seatunnel-consumer-0, groupId=SeaTunnel-Consumer-Group] Group 
coordinator 172.16.8.202:19092 (id: 2147483644 rack: null) is unavailable or 
invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
   [898453163625938945] 2024-10-15 14:12:43,908 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait 
checkpoint completed: 7
   [898453163625938945] 2024-10-15 14:12:43,918 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending 
checkpoint(7/1@898453163625938945) notify finished!
   [898453163625938945] 2024-10-15 14:12:43,918 INFO  
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start 
notify checkpoint completed, job id: 898453163625938945, pipeline id: 1, 
checkpoint id:7
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   1.8
   
   ### Screenshots
   
   
![image](https://github.com/user-attachments/assets/4be21f12-7ff8-4682-aaac-e811a9510b10)
   
![image](https://github.com/user-attachments/assets/7eb9b686-8fd4-4215-b412-e0d56c60091b)
   
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to