Hisoka-X commented on code in PR #7693:
URL: https://github.com/apache/seatunnel/pull/7693#discussion_r1793377332
##########
seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java:
##########
@@ -173,6 +176,137 @@ public void canNotSubmitJobWhenHaveNoWorkerNode() {
}
}
+ @SneakyThrows
+ @Test
+ public void enterPendingWhenResourcesNotEnough() {
+ HazelcastInstanceImpl masterNode = null;
+ String testClusterName = "Test_enterPendingWhenResourcesNotEnough";
+ SeaTunnelClient seaTunnelClient = null;
+
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ // set job pending
+ EngineConfig engineConfig = seaTunnelConfig.getEngineConfig();
+ engineConfig.setScheduleStrategy(ScheduleStrategy.WAIT);
+ engineConfig.getSlotServiceConfig().setDynamicSlot(false);
+ engineConfig.getSlotServiceConfig().setSlotNum(3);
+ seaTunnelConfig
+ .getHazelcastConfig()
+ .setClusterName(TestUtils.getClusterName(testClusterName));
+
+ // submit job
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath = TestUtils.getResource("/client_test.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("Test_enterPendingWhenResourcesNotEnough");
+
+ try {
+ // master node must start first in ci
+ masterNode =
SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);
+
+ HazelcastInstanceImpl finalMasterNode = masterNode;
+ Awaitility.await()
+ .atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ 1,
finalMasterNode.getCluster().getMembers().size()));
+
+ // new seatunnel client and submit job
+ seaTunnelClient = createSeaTunnelClient(testClusterName);
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ seaTunnelClient.createExecutionContext(filePath,
jobConfig, seaTunnelConfig);
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ Awaitility.await()
+ .atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ clientJobProxy.getJobStatus(),
JobStatus.PENDING));
+ // start two worker nodes
+
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+
+ // There are already resources available, wait for job enter
running or complete
+ Awaitility.await()
+ .atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
clientJobProxy.getJobStatus()));
+ System.out.println("1234:" + clientJobProxy.getJobStatus());
Review Comment:
Please remove this log.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -195,6 +216,82 @@ public CoordinatorService(
masterActiveListener = Executors.newSingleThreadScheduledExecutor();
masterActiveListener.scheduleAtFixedRate(
this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
+ isJobPending =
+
engineConfig.getScheduleStrategy().equals(ScheduleStrategy.WAIT) &&
!dynamicSlot;
+ if (isJobPending) {
+ logger.info("Start pending job schedule thread");
+ // start pending job schedule thread
+ startPendingJobScheduleThread();
+ }
+ }
+
+ private void startPendingJobScheduleThread() {
+ Runnable pendingJobScheduleTask =
+ () -> {
+
Thread.currentThread().setName("pending-job-schedule-runner");
+ while (true) {
+ try {
+ // sleep 5s , not too frequent
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ pendingJobSchedule();
+ }
+ };
+ new Thread(pendingJobScheduleTask).start();
Review Comment:
The thread should be daemon. Why not schedule this thread using
https://github.com/apache/seatunnel/blob/3078d78f3388b8ed2e12d82460ca93a0115d9234/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java#L162
?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -195,6 +216,82 @@ public CoordinatorService(
masterActiveListener = Executors.newSingleThreadScheduledExecutor();
masterActiveListener.scheduleAtFixedRate(
this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
+ isJobPending =
+
engineConfig.getScheduleStrategy().equals(ScheduleStrategy.WAIT) &&
!dynamicSlot;
+ if (isJobPending) {
+ logger.info("Start pending job schedule thread");
+ // start pending job schedule thread
+ startPendingJobScheduleThread();
+ }
+ }
+
+ private void startPendingJobScheduleThread() {
+ Runnable pendingJobScheduleTask =
+ () -> {
+
Thread.currentThread().setName("pending-job-schedule-runner");
+ while (true) {
+ try {
+ // sleep 5s , not too frequent
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ pendingJobSchedule();
Review Comment:
Looks like we would sleep 5s each job, if we have 10 jobs should be
scheduled, we must sleep 50s? Meaning ours jobs will delay to finish 50s. We
should schedule all job ASAP, I think only when slot not enough, we can sleep
2 or 3 s.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -173,10 +187,17 @@ public class CoordinatorService {
private PassiveCompletableFuture restoreAllJobFromMasterNodeSwitchFuture;
+ private LinkedList<Tuple2<Long, JobMaster>> pendingJob = new
LinkedList<>();
Review Comment:
LinkedList not threadsafe.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]