Japson0 opened a new issue, #5555:
URL: https://github.com/apache/seatunnel/issues/5555

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   I ran a long-running SeaTunnel task, but any time it passed the timeout 
node, it threw an error.
   
   I looked around and found that the following configuration generates 3 
pipelineSubtasks. But CheckpointCoordinator only goes to triggerBarrier task in 
startingTask every time。The remaining tasks will wait until the timeout 
location。
   Is this because it's not done yet?
   
   
   `
   CheckpointCoordinator.class  
       private Set<Long> getNotYetAcknowledgedTasks() {
           // TODO: some tasks have completed and don't need to be ack
           return plan.getPipelineSubtasks().stream()
                   .map(TaskLocation::getTaskID)
                   .collect(Collectors.toCollection(CopyOnWriteArraySet::new));
       }
   
       public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier 
checkpointBarrier) {
           // TODO: some tasks have completed and don't need to trigger
           return plan.getStartingSubtasks().stream()
                   .map(
                           taskLocation ->
                                   new CheckpointBarrierTriggerOperation(
                                           checkpointBarrier, taskLocation))
                   .map(checkpointManager::sendOperationToMemberNode)
                   .toArray(InvocationFuture[]::new);
       }
   `
   
   
   ### SeaTunnel Version
   
   2.3.3
   
   ### SeaTunnel Config
   
   ```conf
   {
     "env": {
       "job.mode": "BATCH"
     },
     "source": [
       {
         "result_table_name": "jdbc_temp_table",
         "url": "jdbc:mysql://****:3306/test_src",
         "driver": "com.mysql.cj.jdbc.Driver",
         "user": "***",
         "password": "***",
         "query": "SELECT id, col1, col2, col3, col4, col5, col6, col7, col8, 
col9, col10, col11, col12, col13, col14, col15 FROM test_data_5",
         "plugin_name": "JDBC"
       }
     ],
     "sink": [
       {
         "source_table": "jdbc_temp_table",
         "url": "jdbc:mysql://***:3306/test_dst",
         "driver": "com.mysql.cj.jdbc.Driver",
         "user": "***",
         "password": "***",
         "query": "INSERT INTO 
test_data_5(id,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
         "plugin_name": "JDBC"
       }
     ]
   }
   ```
   
   
   ### Running Command
   
   ```shell
   ./bin/seatunnel.sh --config conf.json
   ```
   
   
   ### Error Exception
   
   ```log
   
![image](https://github.com/apache/seatunnel/assets/33944862/90cefdb0-8a42-4e43-8cce-86d984992407)
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   2.3.3
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to