This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
     new 28fd5db84 refactor(linkis-scheduler): refactor some code (#2436)
28fd5db84 is described below

commit 28fd5db841002f10f861025093897f1df16a7c7b
Author: Jack Xu <[email protected]>
AuthorDate: Sat Jul 9 18:09:05 2022 +0800

    refactor(linkis-scheduler): refactor some code (#2436)
---
 .../linkis/common/listener/ListenerEventBus.scala  | 52 +++++++++++-----------
 .../org/apache/linkis/server/Knife4jConfig.scala   |  1 +
 .../linkis/scheduler/AbstractScheduler.scala       | 27 ++++++-----
 .../org/apache/linkis/scheduler/Scheduler.scala    |  5 +--
 .../apache/linkis/scheduler/SchedulerContext.scala |  4 +-
 .../apache/linkis/scheduler/event/LogEvent.scala   | 12 +++--
 .../exception/SchedulerErrorException.scala        |  2 +-
 .../linkis/scheduler/executer/ExecuteRequest.scala | 10 ++---
 .../linkis/scheduler/executer/Executor.scala       |  8 ++--
 .../apache/linkis/scheduler/future/BDPFuture.scala |  2 +-
 .../linkis/scheduler/future/BDPFutureTask.scala    | 10 ++---
 .../linkis/scheduler/queue/SchedulerEvent.scala    | 32 ++++++-------
 .../scheduler/queue/SchedulerEventState.scala      | 33 ++++----------
 .../queue/fifoqueue/FIFOConsumerManager.scala      |  2 +-
 .../queue/fifoqueue/FIFOGroupFactory.scala         |  2 +-
 .../scheduler/queue/fifoqueue/FIFOScheduler.scala  |  6 +--
 .../queue/fifoqueue/FIFOSchedulerContextImpl.scala |  2 +-
 .../queue/parallelqueue/ParallelScheduler.scala    |  6 +--
 .../scheduler/queue/SchedulerEventStateTest.scala} | 36 ++++++++-------
 .../common/launch/process/Environment.scala        |  2 +-
 20 files changed, 122 insertions(+), 132 deletions(-)

diff --git 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/listener/ListenerEventBus.scala
 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/listener/ListenerEventBus.scala
index 19429c76f..742729c4b 100644
--- 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/listener/ListenerEventBus.scala
+++ 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/listener/ListenerEventBus.scala
@@ -19,10 +19,10 @@ package org.apache.linkis.common.listener
 
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import java.util.concurrent.{ArrayBlockingQueue, CopyOnWriteArrayList, Future, 
TimeoutException}
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.commons.lang3.time.DateFormatUtils
 
-import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
-import org.apache.commons.lang.time.DateFormatUtils
-
+import java.time.Duration
 import scala.util.control.NonFatal
 
 
@@ -77,12 +77,9 @@ trait ListenerBus[L <: EventListener, E <: Event] extends 
Logging {
 }
 abstract class ListenerEventBus[L <: EventListener, E <: Event]
       (val eventQueueCapacity: Int, name: String)
-      (listenerConsumerThreadSize: Int = 5, listenerThreadMaxFreeTime: Long = 
ByteTimeUtils.timeStringAsMs("2m"))
+      (listenerConsumerThreadSize: Int = 5, listenerThreadMaxFreeTime: Long = 
Duration.ofMinutes(2).toMillis)
   extends ListenerBus[L, E] with Logging {
 
-//  protected val listenerConsumerThreadSize: Int = 5
-//  protected val listenerThreadMaxFreeTime: Long = 
ByteTimeUtils.timeStringAsMs("2m")
-
   private lazy val eventQueue = new ArrayBlockingQueue[E](eventQueueCapacity)
   protected val executorService = 
Utils.newCachedThreadPool(listenerConsumerThreadSize + 2, name + 
"-Consumer-ThreadPool", true)
   private val eventDealThreads = 
Array.tabulate(listenerConsumerThreadSize)(new ListenerEventThread(_))
@@ -104,8 +101,8 @@ abstract class ListenerEventBus[L <: EventListener, E <: 
Event]
       listenerThread = executorService.submit(new Runnable {
         override def run(): Unit =
           while (!stopped.get) {
-            val event = Utils.tryCatch(eventQueue.take()){
-              case t: InterruptedException => logger.info(s"stopped $name 
thread.")
+            val event = Utils.tryCatch(eventQueue.take()) {
+              case t: InterruptedException => logger.info(s"stopped $name 
thread.", t)
                 return
             }
             while(!eventDealThreads.exists(_.putEvent(event)) && !stopped.get) 
Utils.tryAndError(Thread.sleep(1))
@@ -121,7 +118,7 @@ abstract class ListenerEventBus[L <: EventListener, E <: 
Event]
   def post(event: E): Unit = {
     if (stopped.get || executorService.isTerminated || (listenerThread.isDone 
&& started.get())) {
       dropEvent.onBusStopped(event)
-    } else if(!eventQueue.offer(event)) {
+    } else if (!eventQueue.offer(event)) {
       dropEvent.onDropEvent(event)
     }
   }
@@ -194,7 +191,7 @@ abstract class ListenerEventBus[L <: EventListener, E <: 
Event]
     private val logDroppedEvent = new AtomicBoolean(false)
     private val logStoppedEvent = new AtomicBoolean(false)
     executorService.submit(new Runnable {
-      override def run(): Unit = while(true) {
+      override def run(): Unit = while (true) {
         val droppedEvents = droppedEventsCounter.get
         if (droppedEvents > 0) {
           // Don't log too frequently
@@ -241,24 +238,26 @@ abstract class ListenerEventBus[L <: EventListener, E <: 
Event]
     private var future: Option[Future[_]] = None
     private var continue = true
     private var event: Option[E] = None
-    private var lastEventDealData = 0l
+    private var lastEventDealTime = 0L
 
-    def releaseFreeThread(): Unit = if(listenerThreadMaxFreeTime > 0 && 
future.isDefined && event.isEmpty && lastEventDealData > 0 &&
-      System.currentTimeMillis() - lastEventDealData >= 
listenerThreadMaxFreeTime) synchronized {
-      if(lastEventDealData == 0 && future.isEmpty) return
-      lastEventDealData = 0l
-      continue = false
-      future.foreach(_.cancel(true))
-      future = None
+    def releaseFreeThread(): Unit = if (listenerThreadMaxFreeTime > 0 && 
future.isDefined && event.isEmpty && lastEventDealTime > 0 &&
+      System.currentTimeMillis() - lastEventDealTime >= 
listenerThreadMaxFreeTime) {
+      synchronized {
+        if (lastEventDealTime == 0 && future.isEmpty) return
+        lastEventDealTime = 0L
+        continue = false
+        future.foreach(_.cancel(true))
+        future = None
+      }
     }
     def isRunning: Boolean = event.isDefined
 
-    def putEvent(event: E): Boolean = if(this.event.isDefined) false else 
synchronized {
-      if(this.event.isDefined) false
+    def putEvent(event: E): Boolean = if (this.event.isDefined) false else 
synchronized {
+      if (this.event.isDefined) false
       else {
-        lastEventDealData = System.currentTimeMillis()
+        lastEventDealTime = System.currentTimeMillis()
         this.event = Some(event)
-        if(future.isEmpty) future = Some(executorService.submit(this))
+        if (future.isEmpty) future = Some(executorService.submit(this))
         else notify()
         true
       }
@@ -274,12 +273,13 @@ abstract class ListenerEventBus[L <: EventListener, E <: 
Event]
       }
       while(continue) {
         synchronized {
-          while(event.isEmpty) Utils.tryQuietly(wait(), _ => {
+          while (event.isEmpty) Utils.tryQuietly(wait(), _ => {
             threadRelease()
-            return})
+            return
+          })
         }
         Utils.tryFinally(event.foreach(postToAll)) (synchronized {
-          lastEventDealData = System.currentTimeMillis()
+          lastEventDealTime = System.currentTimeMillis()
           event = None
         })
       }
diff --git 
a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala
 
b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala
index 92c44d616..847315a5b 100644
--- 
a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala
+++ 
b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.linkis.server
 
 import com.github.xiaoymin.knife4j.spring.annotations.EnableKnife4j
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
index 1cabbe9ca..282566f29 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
@@ -20,7 +20,7 @@ package org.apache.linkis.scheduler
 import org.apache.linkis.common.utils.Utils
 import org.apache.linkis.scheduler.exception.SchedulerErrorException
 import org.apache.linkis.scheduler.queue.SchedulerEvent
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 
 
 abstract class AbstractScheduler extends Scheduler {
@@ -28,20 +28,24 @@ abstract class AbstractScheduler extends Scheduler {
 
   override def start(): Unit = {}
 
-  private def getEventId(index: Int, groupName: String): String = groupName + 
"_" + index
+  private val EVENT_ID_SPLIT = "_"
+
+  private def getEventId(index: Int, groupName: String): String = groupName + 
EVENT_ID_SPLIT + index
+
   private def getIndexAndGroupName(eventId: String): (Int, String) = {
-    if(StringUtils.isBlank(eventId) || !eventId.contains("_"))
+    if (StringUtils.isBlank(eventId) || !eventId.contains(EVENT_ID_SPLIT) || 
eventId.startsWith(EVENT_ID_SPLIT)) {
       throw new SchedulerErrorException(12011, s"Unrecognized execId 
$eventId.(不能识别的execId $eventId.)")
-    val index = eventId.lastIndexOf("_")
-    if(index < 1) throw new SchedulerErrorException(12011, s"Unrecognized 
execId $eventId.(不能识别的execId $eventId.)")
+    }
+    val index = eventId.lastIndexOf(EVENT_ID_SPLIT)
     (eventId.substring(index + 1).toInt, eventId.substring(0, index))
   }
+
   override def submit(event: SchedulerEvent): Unit = {
     val group = 
getSchedulerContext.getOrCreateGroupFactory.getOrCreateGroup(event)
     val consumer = 
getSchedulerContext.getOrCreateConsumerManager.getOrCreateConsumer(group.getGroupName)
     val index = consumer.getConsumeQueue.offer(event)
     index.map(getEventId(_, group.getGroupName)).foreach(event.setId)
-    if(index.isEmpty) throw  new SchedulerErrorException(12001,"The submission 
job failed and the queue is full!(提交作业失败,队列已满!)")
+    if (index.isEmpty) throw new SchedulerErrorException(12001, "The 
submission job failed and the queue is full!(提交作业失败,队列已满!)")
   }
 
   override def get(event: SchedulerEvent): Option[SchedulerEvent] = 
get(event.getId)
@@ -52,12 +56,15 @@ abstract class AbstractScheduler extends Scheduler {
     consumer.getRunningEvents.find(_.getId == 
eventId).orElse(consumer.getConsumeQueue.get(index))
   }
 
-  override def shutdown(): Unit = if(getSchedulerContext != null) {
-    if(getSchedulerContext.getOrCreateConsumerManager != null)
+  override def shutdown(): Unit = if (getSchedulerContext != null) {
+    if (getSchedulerContext.getOrCreateConsumerManager != null) {
       
Utils.tryQuietly(getSchedulerContext.getOrCreateConsumerManager.shutdown())
-    if(getSchedulerContext.getOrCreateExecutorManager != null)
+    }
+    if (getSchedulerContext.getOrCreateExecutorManager != null) {
       
Utils.tryQuietly(getSchedulerContext.getOrCreateExecutorManager.shutdown())
-    if(getSchedulerContext.getOrCreateSchedulerListenerBus != null)
+    }
+    if (getSchedulerContext.getOrCreateSchedulerListenerBus != null) {
       
Utils.tryQuietly(getSchedulerContext.getOrCreateSchedulerListenerBus.stop())
+    }
   }
 }
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/Scheduler.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/Scheduler.scala
index 28fbb180c..f3af1ca07 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/Scheduler.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/Scheduler.scala
@@ -35,14 +35,13 @@ abstract class Scheduler {
 }
 
 object Scheduler extends Logging{
-  def createScheduler(scheduleType: String, schedulerContext: 
SchedulerContext): Option[Scheduler]={
+  def createScheduler(scheduleType: String, schedulerContext: 
SchedulerContext): Option[Scheduler] = {
     scheduleType match {
       case "FIFO" => Some(new FIFOScheduler(schedulerContext))
       case "PARA" => Some(new ParallelScheduler(schedulerContext))
-      case _ => {
+      case _ =>
         logger.error("Please enter the correct scheduling type!(请输入正确的调度类型!)")
         None
-      }
     }
   }
 }
\ No newline at end of file
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
index 2fef52f27..868dfe427 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
@@ -37,6 +37,6 @@ trait SchedulerContext {
 }
 
 object SchedulerContext {
-  val schedulerContext:SchedulerContext = new FIFOSchedulerContextImpl( 100)
-  def getSchedulerContext:SchedulerContext = schedulerContext
+  val schedulerContext: SchedulerContext = new FIFOSchedulerContextImpl(100)
+  def getSchedulerContext: SchedulerContext = schedulerContext
 }
\ No newline at end of file
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/event/LogEvent.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/event/LogEvent.scala
index efe153ec7..4f3b44d6c 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/event/LogEvent.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/event/LogEvent.scala
@@ -21,15 +21,13 @@ import org.apache.linkis.common.listener.Event
 import org.apache.linkis.scheduler.queue.Job
 
 
-class LogEvent(source:Job,
-               t:Int) extends Event{
-  def getT:Int = t
+class LogEvent(source: Job, t: Int) extends Event {
+  def getT: Int = t
 }
 
 object LogEvent{
-  val read:Int = 1
-  val write:Int = 2
+  val read: Int = 1
+  val write: Int = 2
 
-  def apply(source: Job,
-            t: Int): LogEvent = new LogEvent(source, t)
+  def apply(source: Job, t: Int): LogEvent = new LogEvent(source, t)
 }
\ No newline at end of file
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/exception/SchedulerErrorException.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/exception/SchedulerErrorException.scala
index a479d2a70..0af12ec18 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/exception/SchedulerErrorException.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/exception/SchedulerErrorException.scala
@@ -20,6 +20,6 @@ package org.apache.linkis.scheduler.exception
 import org.apache.linkis.common.exception.ErrorException
 
 
-class SchedulerErrorException(errCode: Int, desc: String) extends 
ErrorException(errCode,desc){
+class SchedulerErrorException(errCode: Int, desc: String) extends 
ErrorException(errCode, desc) {
 
 }
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/ExecuteRequest.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/ExecuteRequest.scala
index 4d03864b7..78e00ef5c 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/ExecuteRequest.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/ExecuteRequest.scala
@@ -25,11 +25,11 @@ trait JobExecuteRequest {
   val jobId: String
 }
 trait RunTypeExecuteRequest{
-  val runType:String
+  val runType: String
 }
 trait PythonExecuteRequest{
-  val sparkPythonVersion:String
-  val sparkPythonExtraPackage:String
-  val pythonVersion:String
-  val pythonExtraPackage:String
+  val sparkPythonVersion: String
+  val sparkPythonExtraPackage: String
+  val pythonVersion: String
+  val pythonExtraPackage: String
 }
\ No newline at end of file
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/Executor.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/Executor.scala
index b45cbcf04..4b39aa999 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/Executor.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/Executor.scala
@@ -17,10 +17,10 @@
  
 package org.apache.linkis.scheduler.executer
 
-import java.io.Closeable
-
 import org.apache.linkis.protocol.engine.EngineState
 
+import java.io.Closeable
+
 
 trait Executor extends Closeable {
 
@@ -43,7 +43,7 @@ object ExecutorState {
 
   def apply(x: Int): ExecutorState = EngineState.values()(x)
 
-  def isCompleted(state: ExecutorState) = 
EngineState.isCompleted(state.asInstanceOf[EngineState])
+  def isCompleted(state: ExecutorState): Boolean = 
EngineState.isCompleted(state)
 
-  def isAvailable(state: ExecutorState) = 
EngineState.isAvailable(state.asInstanceOf[EngineState])
+  def isAvailable(state: ExecutorState): Boolean = 
EngineState.isAvailable(state)
 }
\ No newline at end of file
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFuture.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFuture.scala
index 33cf384af..9d4cc125e 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFuture.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFuture.scala
@@ -19,5 +19,5 @@ package org.apache.linkis.scheduler.future
 
 
 trait BDPFuture {
-    def cancel():Unit
+    def cancel(): Unit
 }
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFutureTask.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFutureTask.scala
index cb36aa46a..59fe3cf21 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFutureTask.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFutureTask.scala
@@ -17,8 +17,9 @@
  
 package org.apache.linkis.scheduler.future
 
-import java.util.concurrent.{Future, FutureTask}
+import org.apache.commons.lang3.reflect.FieldUtils
 
+import java.util.concurrent.{Future, FutureTask}
 import org.apache.linkis.common.utils.{Logging, Utils}
 
 
@@ -27,10 +28,9 @@ class BDPFutureTask(future: Future[_]) extends BDPFuture 
with Logging {
     future match {
       case futureTask: FutureTask[_] =>
         logger.info("Start to interrupt BDPFutureTask")
-        val futureType = futureTask.getClass
-        val field = futureType.getDeclaredField("runner")
-        field.setAccessible(true)
-        val runner = field.get(futureTask).asInstanceOf[Thread]
+        val runner = FieldUtils
+          .readDeclaredField(futureTask, "runner", true)
+          .asInstanceOf[Thread]
         runner.interrupt()
         logger.info(s"Finished to interrupt BDPFutureTask of 
${runner.getName}")
     }
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
index f325ae572..3a08ee34f 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
@@ -27,29 +27,29 @@ trait SchedulerEvent extends Logging {
   private[queue] var id: String = _
   private var state: SchedulerEventState = Inited
   val createTime = System.currentTimeMillis
-  protected var scheduledTime: Long = 0l
-  protected var startTime: Long = 0l
-  protected var endTime: Long = 0l
+  protected var scheduledTime: Long = 0L
+  protected var startTime: Long = 0L
+  protected var endTime: Long = 0L
 
-  def getEndTime = endTime
-  def getStartTime = startTime
+  def getEndTime: Long = endTime
+  def getStartTime: Long = startTime
 
   /*
    * To be compatible with old versions.
    * It's not recommonded to use scheduledTime, which was only several mills 
at most time.
    */
   @Deprecated
-  def getScheduledTime = scheduledTime
+  def getScheduledTime: Long = scheduledTime
 
-  def getId = id
+  def getId: String = id
 
-  def setId(id: String)={
+  def setId(id: String): Unit = {
     this.id = id
     this synchronized notify()
   }
 
-  def turnToScheduled(): Boolean = if(!isWaiting) false else this synchronized 
{
-    if(!isWaiting) false else {
+  def turnToScheduled(): Boolean = if (!isWaiting) false else this 
synchronized {
+    if (!isWaiting) false else {
       scheduledTime = System.currentTimeMillis
       while(id == null) wait(100)
       transition(Scheduled)
@@ -60,21 +60,21 @@ trait SchedulerEvent extends Logging {
   def pause(): Unit
   def resume(): Unit
 
-  def cancel() = transition(Cancelled)
+  def cancel(): Unit = transition(Cancelled)
 
-  def isWaiting = state == Inited
+  def isWaiting: Boolean = state == Inited
 
-  def isScheduled = state == Scheduled
+  def isScheduled: Boolean = state == Scheduled
 
-  def isRunning = state == Running
+  def isRunning: Boolean = state == Running
 
-  def isCompleted = SchedulerEventState.isCompleted(state)
+  def isCompleted: Boolean = SchedulerEventState.isCompleted(state)
 
   def isSucceed: Boolean = SchedulerEventState.isSucceed(state)
 
   def isWaitForRetry: Boolean = state == WaitForRetry
 
-  def getState = state
+  def getState: SchedulerEventState = state
 
   def afterStateChanged(fromState: SchedulerEventState, toState: 
SchedulerEventState): Unit
 
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala
index a75c44899..66bec2f60 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala
@@ -19,38 +19,21 @@ package org.apache.linkis.scheduler.queue
 
 
 object SchedulerEventState extends Enumeration {
-  type SchedulerEventState = Value
 
-  val Inited = Value("Inited")
-  val WaitForRetry = Value("WaitForRetry")
-  val Scheduled = Value("Scheduled")
-  val Running = Value("Running")
-  val Succeed = Value("Succeed")
-  val Failed = Value("Failed")
-  val Cancelled = Value("Cancelled")
-  val Timeout = Value("Timeout")
+  type SchedulerEventState = Value
 
+  val Inited, WaitForRetry, Scheduled, Running, Succeed, Failed, Cancelled, 
Timeout = Value
 
-  def isRunning(jobState: SchedulerEventState) = jobState == Running
+  def isRunning(jobState: SchedulerEventState): Boolean = jobState == Running
 
-  def isScheduled(jobState: SchedulerEventState) = jobState != Inited
+  def isScheduled(jobState: SchedulerEventState): Boolean = jobState != Inited
 
-  def isCompleted(jobState: SchedulerEventState) = jobState match {
+  def isCompleted(jobState: SchedulerEventState): Boolean = jobState match {
     case Inited | Scheduled | Running | WaitForRetry => false
     case _ => true
   }
 
-  def isSucceed(jobState: SchedulerEventState) = jobState == Succeed
-
-  def isCompletedByStr(jobState: String): Boolean = jobState match {
-      case "Inited" => false
-      case "WaitForRetry" => false
-      case "Scheduled" => false
-      case "Running" => false
-      case "Succeed" => true
-      case "Failed" => true
-      case "Cancelled" => true
-      case "Timeout" => true
-      case _ => true
-  }
+  def isSucceed(jobState: SchedulerEventState): Boolean = jobState == Succeed
+
+  def isCompletedByStr(jobState: String): Boolean = 
isCompleted(SchedulerEventState.withName(jobState))
 }
\ No newline at end of file
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala
index 6c403f6a5..2c88c72ce 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala
@@ -70,7 +70,7 @@ class FIFOConsumerManager(groupName: String) extends 
ConsumerManager {
   }
 
   override def shutdown(): Unit = {
-    if(consumerListener != null) consumerListener.onConsumerDestroyed(consumer)
+    if (consumerListener != null) 
consumerListener.onConsumerDestroyed(consumer)
     consumer.shutdown()
     executorService.shutdownNow()
   }
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupFactory.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupFactory.scala
index 6e0761239..4d9d0cba4 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupFactory.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupFactory.scala
@@ -29,7 +29,7 @@ class FIFOGroupFactory extends GroupFactory {
 
   //Obtained from the database(从数据库获取)
   private var defaultMaxRunningJobs: Int = 1
-  private var defaultMaxAskExecutorTimes: Long = 30000l
+  private var defaultMaxAskExecutorTimes: Long = 30000L
   private var defaultInitCapacity: Int = 1000
   private var defaultMaxCapacity: Int = 5000
 
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
index 1af930630..14d334db7 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
@@ -27,12 +27,12 @@ class FIFOScheduler(val schedulerContext: SchedulerContext) 
extends AbstractSche
 
   private var groupFactory: GroupFactory = _
 
-  override def init() = {
+  override def init(): Unit = {
     consumerManager = schedulerContext.getOrCreateConsumerManager
     groupFactory = schedulerContext.getOrCreateGroupFactory
   }
 
-  override def getName = "FIFOScheduler"
+  override def getName: String = "FIFOScheduler"
 
-  override def getSchedulerContext = schedulerContext
+  override def getSchedulerContext: SchedulerContext = schedulerContext
 }
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerContextImpl.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerContextImpl.scala
index d378d8292..1f557c370 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerContextImpl.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerContextImpl.scala
@@ -33,7 +33,7 @@ class FIFOSchedulerContextImpl(val maxParallelismUsers: Int) 
extends SchedulerCo
   private val lock = new Object()
 
   override def getOrCreateGroupFactory: GroupFactory = {
-    if(groupFactory != null) return groupFactory
+    if (groupFactory != null) return groupFactory
     lock.synchronized {
       if (groupFactory == null) {
         groupFactory = createGroupFactory()
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelScheduler.scala
 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelScheduler.scala
index bf618beb7..92304483e 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelScheduler.scala
+++ 
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelScheduler.scala
@@ -26,12 +26,12 @@ class ParallelScheduler(val schedulerContext: 
SchedulerContext) extends Abstract
   private var consumerManager: ConsumerManager = _
   private var groupFactory: GroupFactory = _
 
-  override def init() = {
+  override def init(): Unit = {
     consumerManager = schedulerContext.getOrCreateConsumerManager
     groupFactory = schedulerContext.getOrCreateGroupFactory
   }
 
-  override def getName = "ParallelScheduler"
+  override def getName: String = "ParallelScheduler"
 
-  override def getSchedulerContext = schedulerContext
+  override def getSchedulerContext: SchedulerContext = schedulerContext
 }
diff --git 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
 
b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/SchedulerEventStateTest.scala
similarity index 52%
copy from 
linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
copy to 
linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/SchedulerEventStateTest.scala
index 1af930630..7ecc5151f 100644
--- 
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
+++ 
b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/SchedulerEventStateTest.scala
@@ -5,34 +5,36 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
-package org.apache.linkis.scheduler.queue.fifoqueue
 
+package org.apache.linkis.scheduler.queue
 
-import org.apache.linkis.scheduler.queue._
-import org.apache.linkis.scheduler.{AbstractScheduler, SchedulerContext}
+import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import org.assertj.core.api.ThrowableAssert
+import org.junit.jupiter.api.Test
 
 
-class FIFOScheduler(val schedulerContext: SchedulerContext) extends 
AbstractScheduler {
-  private var consumerManager: ConsumerManager = _
+class SchedulerEventStateTest {
 
-  private var groupFactory: GroupFactory = _
-
-  override def init() = {
-    consumerManager = schedulerContext.getOrCreateConsumerManager
-    groupFactory = schedulerContext.getOrCreateGroupFactory
+  @Test def isCompletedByStr(): Unit = {
+    val initedStateStr = "Inited"
+    val succeedStr = "Succeed"
+    val errStr = "errStr"
+    assertThat(SchedulerEventState.isCompletedByStr(initedStateStr)).isFalse
+    assertThat(SchedulerEventState.isCompletedByStr(succeedStr)).isTrue
+    assertThatThrownBy(new ThrowableAssert.ThrowingCallable {
+      override def call(): Unit = {
+        SchedulerEventState.isCompletedByStr(errStr)
+      }
+    }).isInstanceOf(classOf[NoSuchElementException])
   }
 
-  override def getName = "FIFOScheduler"
-
-  override def getSchedulerContext = schedulerContext
-}
+}
\ No newline at end of file
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
index 236969b3f..f1660edd2 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
@@ -17,7 +17,7 @@
  
 package org.apache.linkis.manager.engineplugin.common.launch.process
 
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 
 
 object Environment extends Enumeration {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to