Hisoka-X commented on code in PR #5450:
URL: https://github.com/apache/seatunnel/pull/5450#discussion_r1360011813


##########
pom.xml:
##########
@@ -730,6 +730,8 @@
                     <encoding>UTF-8</encoding>
                 </configuration>
             </plugin>
+            <!-- disable spotless check during release -->

Review Comment:
   But the plugin you disabled not spotless. Is there have some special reason?



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java:
##########
@@ -840,15 +840,15 @@ public void testStreamJobRestoreInAllNodeDown()
             node1.shutdown();
             node2.shutdown();
 
-            log.info(
+            System.out.println(

Review Comment:
   Any reason for change this?



##########
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java:
##########
@@ -29,10 +29,10 @@ public enum JobStatus {
     /** Job is newly created, no task has started to run. */
     CREATED(EndState.NOT_END),
 
-    /** Job is begin schedule but some task not deploy complete. */
+    /** Job will scheduler every pipeline */

Review Comment:
   I think new comment not clearly than old one.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -315,4 +262,63 @@ public JobStatus getJobStatus() {
     public String getJobFullName() {
         return jobFullName;
     }
+
+    public void makeJobFailing(Throwable e) {
+        errorBySubPlan.compareAndSet(null, ExceptionUtils.getMessage(e));
+        updateJobState(JobStatus.FAILING);
+    }
+
+    public void startJob() {
+        isRunning = true;
+        log.info("{} state process is start", getJobFullName());
+        stateProcess();
+    }
+
+    public void stopJobStateProcess() {
+        isRunning = false;
+        log.info("{} state process is stop", getJobFullName());
+    }
+
+    private synchronized void stateProcess() {
+        if (!isRunning) {
+            log.warn(String.format("%s state process is stopped", 
jobFullName));
+            return;
+        }
+        switch (getJobStatus()) {
+            case CREATED:
+                updateJobState(JobStatus.SCHEDULED);
+                break;
+            case SCHEDULED:
+                getPipelineList()
+                        .forEach(
+                                subPlan -> {
+                                    if (PipelineStatus.CREATED.equals(
+                                            subPlan.getCurrPipelineStatus())) {
+                                        subPlan.startSubPlanStateProcess();
+                                    }
+                                });
+                updateJobState(JobStatus.RUNNING);
+                break;
+            case RUNNING:
+                try {
+                    Thread.sleep(200);
+                } catch (InterruptedException e) {
+                    makeJobFailing(e);
+                }

Review Comment:
   `sleep` seem like not unnecessary. Because this method will return with 
void, not foreach with `while(true)`.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -425,7 +425,7 @@ protected void tryTriggerPendingCheckpoint(CheckpointType 
checkpointType) {
             startTriggerPendingCheckpoint(pendingCheckpoint);
             pendingCounter.incrementAndGet();
             // if checkpoint type are final type, we don't need to trigger 
next checkpoint
-            if (checkpointType.notFinalCheckpoint() && 
checkpointType.notSchemaChangeCheckpoint()) {

Review Comment:
   PTAL @hailin0 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to