TyrantLucifer commented on code in PR #4038:
URL: 
https://github.com/apache/incubator-seatunnel/pull/4038#discussion_r1115446471


##########
docs/en/connector-v2/source/LocalFile.md:
##########
@@ -200,6 +200,9 @@ connector will generate data as the following:
 
 The schema information of upstream data.
 
+<<<<<<< HEAD

Review Comment:
   revert



##########
docs/en/connector-v2/source/LocalFile.md:
##########
@@ -200,6 +200,9 @@ connector will generate data as the following:
 
 The schema information of upstream data.
 
+<<<<<<< HEAD
+============

Review Comment:
   revert



##########
docs/en/connector-v2/source/LocalFile.md:
##########
@@ -214,7 +217,9 @@ The file type supported column projection as the following 
shown:
 
 **Tips: If the user wants to use this feature when reading `text` `json` `csv` 
files, the schema option must be configured**
 
-### common options
+>>>>>>> apache/dev

Review Comment:
   revert



##########
seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java:
##########
@@ -88,7 +91,11 @@ public void registerReader(int subtaskId) {
 
     @Override
     public FakeSourceState snapshotState(long checkpointId) throws Exception {
-        return new FakeSourceState(assignedSplits);
+        log.debug("get lock, begin snapshot fakesource split enumerator...");

Review Comment:
   ```suggestion
           log.debug("Get lock, begin snapshot fakesource split enumerator...");
   ```



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java:
##########
@@ -810,56 +830,77 @@ public void testStreamJobRestoreInAllNodeDown()
             node1.shutdown();
             node2.shutdown();
 
+            System.out.println(

Review Comment:
   ```suggestion
               log.info(
   ```



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java:
##########
@@ -810,56 +830,77 @@ public void testStreamJobRestoreInAllNodeDown()
             node1.shutdown();
             node2.shutdown();
 
+            System.out.println(
+                    "==========================================All node is 
done========================================");
             Thread.sleep(10000);
 
             node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
+            System.out.println(
+                    "==========================================All node is 
start, begin check node size ========================================");
             // waiting all node added to cluster
             HazelcastInstanceImpl restoreFinalNode = node1;
             Awaitility.await()
-                    .atMost(10000, TimeUnit.MILLISECONDS)
+                    .atMost(60000, TimeUnit.MILLISECONDS)
                     .untilAsserted(
                             () ->
                                     Assertions.assertEquals(
                                             2, 
restoreFinalNode.getCluster().getMembers().size()));
 
+            System.out.println(

Review Comment:
   ```suggestion
               log.info(
   ```



##########
seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java:
##########
@@ -88,7 +91,11 @@ public void registerReader(int subtaskId) {
 
     @Override
     public FakeSourceState snapshotState(long checkpointId) throws Exception {
-        return new FakeSourceState(assignedSplits);
+        log.debug("get lock, begin snapshot fakesource split enumerator...");
+        synchronized (lock) {
+            log.debug("begin snapshot fakesource split enumerator...");

Review Comment:
   ```suggestion
               log.debug("Begin snapshot fakesource split enumerator...");
   ```



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java:
##########
@@ -810,56 +830,77 @@ public void testStreamJobRestoreInAllNodeDown()
             node1.shutdown();
             node2.shutdown();
 
+            System.out.println(
+                    "==========================================All node is 
done========================================");
             Thread.sleep(10000);
 
             node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
+            System.out.println(
+                    "==========================================All node is 
start, begin check node size ========================================");
             // waiting all node added to cluster
             HazelcastInstanceImpl restoreFinalNode = node1;
             Awaitility.await()
-                    .atMost(10000, TimeUnit.MILLISECONDS)
+                    .atMost(60000, TimeUnit.MILLISECONDS)
                     .untilAsserted(
                             () ->
                                     Assertions.assertEquals(
                                             2, 
restoreFinalNode.getCluster().getMembers().size()));
 
+            System.out.println(
+                    "==========================================All node is 
running========================================");
+            engineClient = new SeaTunnelClient(clientConfig);
+            ClientJobProxy newClientJobProxy = 
engineClient.createJobClient().getJobProxy(jobId);
+            CompletableFuture<JobStatus> waitForJobCompleteFuture =
+                    
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);
+
             Awaitility.await()
-                    .atMost(360000, TimeUnit.MILLISECONDS)
+                    .atMost(600000, TimeUnit.MILLISECONDS)
                     .untilAsserted(
                             () -> {
                                 // Wait job write all rows in file
                                 Thread.sleep(2000);
                                 System.out.println(
                                         FileUtils.getFileLineNumberFromDir(
                                                 testResources.getLeft()));
+                                JobStatus jobStatus = null;
+                                try {
+                                    jobStatus = 
newClientJobProxy.getJobStatus();
+                                } catch (Exception e) {
+                                    log.error(ExceptionUtils.getMessage(e));
+                                }
+
                                 Assertions.assertTrue(
-                                        
JobStatus.RUNNING.equals(clientJobProxy.getJobStatus())
+                                        JobStatus.RUNNING.equals(jobStatus)
                                                 && testRowNumber * 
testParallelism
                                                         == 
FileUtils.getFileLineNumberFromDir(
                                                                 
testResources.getLeft()));
                             });
 
             // sleep 10s and expect the job don't write more rows.
             Thread.sleep(10000);
-            clientJobProxy.cancelJob();
+            System.out.println(
+                    "==========================================Cancel 
Job========================================");
+            newClientJobProxy.cancelJob();
 
             Awaitility.await()
-                    .atMost(360000, TimeUnit.MILLISECONDS)
+                    .atMost(600000, TimeUnit.MILLISECONDS)
                     .untilAsserted(
                             () ->
                                     Assertions.assertTrue(
-                                            objectCompletableFuture.isDone()
+                                            waitForJobCompleteFuture.isDone()
                                                     && 
JobStatus.CANCELED.equals(
-                                                            
objectCompletableFuture.get())));
-
+                                                            
waitForJobCompleteFuture.get())));
             // prove that the task was restarted
             Long fileLineNumberFromDir =
                     
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
             Assertions.assertEquals(testRowNumber * testParallelism, 
fileLineNumberFromDir);
 
         } finally {
+            System.out.println(

Review Comment:
   ```suggestion
               log.info(
   ```



##########
seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java:
##########
@@ -70,6 +72,7 @@ public void close() throws IOException {
 
     @Override
     public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
+        log.debug("Fake source add splits back========{}, subtaskId:{}", 
splits, subtaskId);

Review Comment:
   ```suggestion
           log.debug("Fake source add splits back {}, subtaskId:{}", splits, 
subtaskId);
   ```



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java:
##########
@@ -810,56 +830,77 @@ public void testStreamJobRestoreInAllNodeDown()
             node1.shutdown();
             node2.shutdown();
 
+            System.out.println(
+                    "==========================================All node is 
done========================================");
             Thread.sleep(10000);
 
             node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
             node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
 
+            System.out.println(
+                    "==========================================All node is 
start, begin check node size ========================================");
             // waiting all node added to cluster
             HazelcastInstanceImpl restoreFinalNode = node1;
             Awaitility.await()
-                    .atMost(10000, TimeUnit.MILLISECONDS)
+                    .atMost(60000, TimeUnit.MILLISECONDS)
                     .untilAsserted(
                             () ->
                                     Assertions.assertEquals(
                                             2, 
restoreFinalNode.getCluster().getMembers().size()));
 
+            System.out.println(
+                    "==========================================All node is 
running========================================");
+            engineClient = new SeaTunnelClient(clientConfig);
+            ClientJobProxy newClientJobProxy = 
engineClient.createJobClient().getJobProxy(jobId);
+            CompletableFuture<JobStatus> waitForJobCompleteFuture =
+                    
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);
+
             Awaitility.await()
-                    .atMost(360000, TimeUnit.MILLISECONDS)
+                    .atMost(600000, TimeUnit.MILLISECONDS)
                     .untilAsserted(
                             () -> {
                                 // Wait job write all rows in file
                                 Thread.sleep(2000);
                                 System.out.println(
                                         FileUtils.getFileLineNumberFromDir(
                                                 testResources.getLeft()));
+                                JobStatus jobStatus = null;
+                                try {
+                                    jobStatus = 
newClientJobProxy.getJobStatus();
+                                } catch (Exception e) {
+                                    log.error(ExceptionUtils.getMessage(e));
+                                }
+
                                 Assertions.assertTrue(
-                                        
JobStatus.RUNNING.equals(clientJobProxy.getJobStatus())
+                                        JobStatus.RUNNING.equals(jobStatus)
                                                 && testRowNumber * 
testParallelism
                                                         == 
FileUtils.getFileLineNumberFromDir(
                                                                 
testResources.getLeft()));
                             });
 
             // sleep 10s and expect the job don't write more rows.
             Thread.sleep(10000);
-            clientJobProxy.cancelJob();
+            System.out.println(

Review Comment:
   ```suggestion
               log.info(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to