TyrantLucifer commented on code in PR #4038:
URL:
https://github.com/apache/incubator-seatunnel/pull/4038#discussion_r1115446471
##########
docs/en/connector-v2/source/LocalFile.md:
##########
@@ -200,6 +200,9 @@ connector will generate data as the following:
The schema information of upstream data.
+<<<<<<< HEAD
Review Comment:
revert
##########
docs/en/connector-v2/source/LocalFile.md:
##########
@@ -200,6 +200,9 @@ connector will generate data as the following:
The schema information of upstream data.
+<<<<<<< HEAD
+============
Review Comment:
revert
##########
docs/en/connector-v2/source/LocalFile.md:
##########
@@ -214,7 +217,9 @@ The file type supported column projection as the following
shown:
**Tips: If the user wants to use this feature when reading `text` `json` `csv`
files, the schema option must be configured**
-### common options
+>>>>>>> apache/dev
Review Comment:
revert
##########
seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java:
##########
@@ -88,7 +91,11 @@ public void registerReader(int subtaskId) {
@Override
public FakeSourceState snapshotState(long checkpointId) throws Exception {
- return new FakeSourceState(assignedSplits);
+ log.debug("get lock, begin snapshot fakesource split enumerator...");
Review Comment:
```suggestion
log.debug("Get lock, begin snapshot fakesource split enumerator...");
```
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java:
##########
@@ -810,56 +830,77 @@ public void testStreamJobRestoreInAllNodeDown()
node1.shutdown();
node2.shutdown();
+ System.out.println(
Review Comment:
```suggestion
log.info(
```
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java:
##########
@@ -810,56 +830,77 @@ public void testStreamJobRestoreInAllNodeDown()
node1.shutdown();
node2.shutdown();
+ System.out.println(
+ "==========================================All node is
done========================================");
Thread.sleep(10000);
node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ System.out.println(
+ "==========================================All node is
start, begin check node size ========================================");
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Awaitility.await()
- .atMost(10000, TimeUnit.MILLISECONDS)
+ .atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2,
restoreFinalNode.getCluster().getMembers().size()));
+ System.out.println(
Review Comment:
```suggestion
log.info(
```
##########
seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java:
##########
@@ -88,7 +91,11 @@ public void registerReader(int subtaskId) {
@Override
public FakeSourceState snapshotState(long checkpointId) throws Exception {
- return new FakeSourceState(assignedSplits);
+ log.debug("get lock, begin snapshot fakesource split enumerator...");
+ synchronized (lock) {
+ log.debug("begin snapshot fakesource split enumerator...");
Review Comment:
```suggestion
log.debug("Begin snapshot fakesource split enumerator...");
```
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java:
##########
@@ -810,56 +830,77 @@ public void testStreamJobRestoreInAllNodeDown()
node1.shutdown();
node2.shutdown();
+ System.out.println(
+ "==========================================All node is
done========================================");
Thread.sleep(10000);
node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ System.out.println(
+ "==========================================All node is
start, begin check node size ========================================");
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Awaitility.await()
- .atMost(10000, TimeUnit.MILLISECONDS)
+ .atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2,
restoreFinalNode.getCluster().getMembers().size()));
+ System.out.println(
+ "==========================================All node is
running========================================");
+ engineClient = new SeaTunnelClient(clientConfig);
+ ClientJobProxy newClientJobProxy =
engineClient.createJobClient().getJobProxy(jobId);
+ CompletableFuture<JobStatus> waitForJobCompleteFuture =
+
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);
+
Awaitility.await()
- .atMost(360000, TimeUnit.MILLISECONDS)
+ .atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(
FileUtils.getFileLineNumberFromDir(
testResources.getLeft()));
+ JobStatus jobStatus = null;
+ try {
+ jobStatus =
newClientJobProxy.getJobStatus();
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getMessage(e));
+ }
+
Assertions.assertTrue(
-
JobStatus.RUNNING.equals(clientJobProxy.getJobStatus())
+ JobStatus.RUNNING.equals(jobStatus)
&& testRowNumber *
testParallelism
==
FileUtils.getFileLineNumberFromDir(
testResources.getLeft()));
});
// sleep 10s and expect the job don't write more rows.
Thread.sleep(10000);
- clientJobProxy.cancelJob();
+ System.out.println(
+ "==========================================Cancel
Job========================================");
+ newClientJobProxy.cancelJob();
Awaitility.await()
- .atMost(360000, TimeUnit.MILLISECONDS)
+ .atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
- objectCompletableFuture.isDone()
+ waitForJobCompleteFuture.isDone()
&&
JobStatus.CANCELED.equals(
-
objectCompletableFuture.get())));
-
+
waitForJobCompleteFuture.get())));
// prove that the task was restarted
Long fileLineNumberFromDir =
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
Assertions.assertEquals(testRowNumber * testParallelism,
fileLineNumberFromDir);
} finally {
+ System.out.println(
Review Comment:
```suggestion
log.info(
```
##########
seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java:
##########
@@ -70,6 +72,7 @@ public void close() throws IOException {
@Override
public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
+ log.debug("Fake source add splits back========{}, subtaskId:{}",
splits, subtaskId);
Review Comment:
```suggestion
log.debug("Fake source add splits back {}, subtaskId:{}", splits,
subtaskId);
```
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java:
##########
@@ -810,56 +830,77 @@ public void testStreamJobRestoreInAllNodeDown()
node1.shutdown();
node2.shutdown();
+ System.out.println(
+ "==========================================All node is
done========================================");
Thread.sleep(10000);
node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ System.out.println(
+ "==========================================All node is
start, begin check node size ========================================");
// waiting all node added to cluster
HazelcastInstanceImpl restoreFinalNode = node1;
Awaitility.await()
- .atMost(10000, TimeUnit.MILLISECONDS)
+ .atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
2,
restoreFinalNode.getCluster().getMembers().size()));
+ System.out.println(
+ "==========================================All node is
running========================================");
+ engineClient = new SeaTunnelClient(clientConfig);
+ ClientJobProxy newClientJobProxy =
engineClient.createJobClient().getJobProxy(jobId);
+ CompletableFuture<JobStatus> waitForJobCompleteFuture =
+
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);
+
Awaitility.await()
- .atMost(360000, TimeUnit.MILLISECONDS)
+ .atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
// Wait job write all rows in file
Thread.sleep(2000);
System.out.println(
FileUtils.getFileLineNumberFromDir(
testResources.getLeft()));
+ JobStatus jobStatus = null;
+ try {
+ jobStatus =
newClientJobProxy.getJobStatus();
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getMessage(e));
+ }
+
Assertions.assertTrue(
-
JobStatus.RUNNING.equals(clientJobProxy.getJobStatus())
+ JobStatus.RUNNING.equals(jobStatus)
&& testRowNumber *
testParallelism
==
FileUtils.getFileLineNumberFromDir(
testResources.getLeft()));
});
// sleep 10s and expect the job don't write more rows.
Thread.sleep(10000);
- clientJobProxy.cancelJob();
+ System.out.println(
Review Comment:
```suggestion
log.info(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]