This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit 50254ac558e1ab26321a67411b6baee402dece57
Author: peacewong <[email protected]>
AuthorDate: Fri Sep 22 14:55:54 2023 +0800

    optimize consumer code logic
---
 .../apache/linkis/scheduler/queue/Consumer.scala   |  1 -
 .../linkis/scheduler/queue/GroupFactory.scala      |  5 +++
 .../linkis/scheduler/queue/LoopArrayQueue.scala    |  7 +++-
 .../queue/fifoqueue/FIFOUserConsumer.scala         | 45 +++++++++++++++++++---
 4 files changed, 50 insertions(+), 8 deletions(-)

diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala
index 50dce2ca1..165a27436 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala
@@ -41,7 +41,6 @@ abstract class Consumer(schedulerContext: SchedulerContext, 
executeService: Exec
   def start(): Unit
 
   def shutdown(): Unit = {
-    logger.info(s"$toString is ready to stop!")
     terminate = true
     logger.info(s"$toString stopped!")
   }
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala
index f3471b07d..be1716f23 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala
@@ -19,6 +19,11 @@ package org.apache.linkis.scheduler.queue
 
 abstract class GroupFactory {
 
+  /**
+   * Create a Group and set the concurrency limit of the group
+   * @param event
+   * @return
+   */
   def getOrCreateGroup(event: SchedulerEvent): Group
 
   def getGroup(groupName: String): Group
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
index b0bbfd3c2..8bea7e52b 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
@@ -40,7 +40,12 @@ class LoopArrayQueue(var group: Group) extends ConsumeQueue 
with Logging {
 
   override def getWaitingEvents: Array[SchedulerEvent] = {
     eventQueue synchronized {
-      toIndexedSeq.filter(x => 
x.getState.equals(SchedulerEventState.Inited)).toArray
+      toIndexedSeq
+        .filter(x =>
+          x.getState.equals(SchedulerEventState.Inited) || x.getState
+            .equals(SchedulerEventState.Scheduled)
+        )
+        .toArray
     }
   }
 
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
index 2a40c2517..d541d8a2e 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
@@ -73,6 +73,8 @@ class FIFOUserConsumer(
   override def getRunningEvents: Array[SchedulerEvent] =
     getEvents(e => e.isRunning || e.isWaitForRetry)
 
+  protected def getSchedulerContext: SchedulerContext = schedulerContext
+
   private def getEvents(op: SchedulerEvent => Boolean): Array[SchedulerEvent] 
= {
     val result = ArrayBuffer[SchedulerEvent]()
     runningJobs.filter(_ != null).filter(x => op(x)).foreach(result += _)
@@ -82,16 +84,28 @@ class FIFOUserConsumer(
   override def run(): Unit = {
     Thread.currentThread().setName(s"${toString}Thread")
     logger.info(s"$toString thread started!")
-    while (!terminate) {
-      Utils.tryAndError(loop())
-      Utils.tryAndError(Thread.sleep(10))
+    while (!terminate) Utils.tryAndError {
+      loop()
+      Thread.sleep(10)
     }
     logger.info(s"$toString thread stopped!")
   }
 
   protected def askExecutorGap(): Unit = {}
 
+  /**
+   * Task scheduling interception is used to judge the rules of task 
operation, and to judge other
+   * task rules based on Group. For example, Entrance makes Creator-level task 
judgment.
+   */
+  protected def runScheduleIntercept(): Boolean = {
+    true
+  }
+
   protected def loop(): Unit = {
+    if (!runScheduleIntercept()) {
+      Utils.tryQuietly(Thread.sleep(1000))
+      return
+    }
     var isRetryJob = false
     def getWaitForRetryEvent: Option[SchedulerEvent] = {
       val waitForRetryJobs = runningJobs.filter(job => job != null && 
job.isJobCanRetry)
@@ -110,7 +124,7 @@ class FIFOUserConsumer(
     if (event.isEmpty) {
       val completedNums = runningJobs.filter(job => job == null || 
job.isCompleted)
       if (completedNums.length < 1) {
-        Utils.tryQuietly(Thread.sleep(1000)) // TODO 还可以优化,通过实现JobListener进行优化
+        Utils.tryQuietly(Thread.sleep(1000))
         return
       }
       while (event.isEmpty) {
@@ -119,7 +133,12 @@ class FIFOUserConsumer(
           if (
               takeEvent.exists(e =>
                 Utils.tryCatch(e.turnToScheduled()) { t =>
-                  
takeEvent.get.asInstanceOf[Job].onFailure("Job状态翻转为Scheduled失败!", t)
+                  takeEvent.get
+                    .asInstanceOf[Job]
+                    .onFailure(
+                      "Failed to change the job status to 
Scheduled(Job状态翻转为Scheduled失败)",
+                      t
+                    )
                   false
                 }
               )
@@ -174,7 +193,7 @@ class FIFOUserConsumer(
             )
           )
         case error: Throwable =>
-          job.onFailure("请求引擎失败,可能是由于后台进程错误!请联系管理员", error)
+          job.onFailure("Failed to request EngineConn", error)
           if (job.isWaitForRetry) {
             logger.warn(s"Ask executor for Job $job failed, wait for the next 
retry!", error)
             if (!isRetryJob) putToRunningJobs(job)
@@ -190,6 +209,20 @@ class FIFOUserConsumer(
 
   override def shutdown(): Unit = {
     future.cancel(true)
+    val waitEvents = queue.getWaitingEvents
+    if (waitEvents.nonEmpty) {
+      waitEvents.foreach {
+        case job: Job =>
+          job.onFailure("Your job will be marked as canceled because the 
consumer be killed", null)
+        case _ =>
+      }
+    }
+
+    this.runningJobs.foreach { job =>
+      if (job != null && !job.isCompleted) {
+        job.onFailure("Your job will be marked as canceled because the 
consumer be killed", null)
+      }
+    }
     super.shutdown()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to