This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
new 28fd5db84 refactor(linkis-scheduler): refactor some code (#2436)
28fd5db84 is described below
commit 28fd5db841002f10f861025093897f1df16a7c7b
Author: Jack Xu <[email protected]>
AuthorDate: Sat Jul 9 18:09:05 2022 +0800
refactor(linkis-scheduler): refactor some code (#2436)
---
.../linkis/common/listener/ListenerEventBus.scala | 52 +++++++++++-----------
.../org/apache/linkis/server/Knife4jConfig.scala | 1 +
.../linkis/scheduler/AbstractScheduler.scala | 27 ++++++-----
.../org/apache/linkis/scheduler/Scheduler.scala | 5 +--
.../apache/linkis/scheduler/SchedulerContext.scala | 4 +-
.../apache/linkis/scheduler/event/LogEvent.scala | 12 +++--
.../exception/SchedulerErrorException.scala | 2 +-
.../linkis/scheduler/executer/ExecuteRequest.scala | 10 ++---
.../linkis/scheduler/executer/Executor.scala | 8 ++--
.../apache/linkis/scheduler/future/BDPFuture.scala | 2 +-
.../linkis/scheduler/future/BDPFutureTask.scala | 10 ++---
.../linkis/scheduler/queue/SchedulerEvent.scala | 32 ++++++-------
.../scheduler/queue/SchedulerEventState.scala | 33 ++++----------
.../queue/fifoqueue/FIFOConsumerManager.scala | 2 +-
.../queue/fifoqueue/FIFOGroupFactory.scala | 2 +-
.../scheduler/queue/fifoqueue/FIFOScheduler.scala | 6 +--
.../queue/fifoqueue/FIFOSchedulerContextImpl.scala | 2 +-
.../queue/parallelqueue/ParallelScheduler.scala | 6 +--
.../scheduler/queue/SchedulerEventStateTest.scala} | 36 ++++++++-------
.../common/launch/process/Environment.scala | 2 +-
20 files changed, 122 insertions(+), 132 deletions(-)
diff --git
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/listener/ListenerEventBus.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/listener/ListenerEventBus.scala
index 19429c76f..742729c4b 100644
---
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/listener/ListenerEventBus.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/listener/ListenerEventBus.scala
@@ -19,10 +19,10 @@ package org.apache.linkis.common.listener
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.{ArrayBlockingQueue, CopyOnWriteArrayList, Future,
TimeoutException}
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.commons.lang3.time.DateFormatUtils
-import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
-import org.apache.commons.lang.time.DateFormatUtils
-
+import java.time.Duration
import scala.util.control.NonFatal
@@ -77,12 +77,9 @@ trait ListenerBus[L <: EventListener, E <: Event] extends
Logging {
}
abstract class ListenerEventBus[L <: EventListener, E <: Event]
(val eventQueueCapacity: Int, name: String)
- (listenerConsumerThreadSize: Int = 5, listenerThreadMaxFreeTime: Long =
ByteTimeUtils.timeStringAsMs("2m"))
+ (listenerConsumerThreadSize: Int = 5, listenerThreadMaxFreeTime: Long =
Duration.ofMinutes(2).toMillis)
extends ListenerBus[L, E] with Logging {
-// protected val listenerConsumerThreadSize: Int = 5
-// protected val listenerThreadMaxFreeTime: Long =
ByteTimeUtils.timeStringAsMs("2m")
-
private lazy val eventQueue = new ArrayBlockingQueue[E](eventQueueCapacity)
protected val executorService =
Utils.newCachedThreadPool(listenerConsumerThreadSize + 2, name +
"-Consumer-ThreadPool", true)
private val eventDealThreads =
Array.tabulate(listenerConsumerThreadSize)(new ListenerEventThread(_))
@@ -104,8 +101,8 @@ abstract class ListenerEventBus[L <: EventListener, E <:
Event]
listenerThread = executorService.submit(new Runnable {
override def run(): Unit =
while (!stopped.get) {
- val event = Utils.tryCatch(eventQueue.take()){
- case t: InterruptedException => logger.info(s"stopped $name
thread.")
+ val event = Utils.tryCatch(eventQueue.take()) {
+ case t: InterruptedException => logger.info(s"stopped $name
thread.", t)
return
}
while(!eventDealThreads.exists(_.putEvent(event)) && !stopped.get)
Utils.tryAndError(Thread.sleep(1))
@@ -121,7 +118,7 @@ abstract class ListenerEventBus[L <: EventListener, E <:
Event]
def post(event: E): Unit = {
if (stopped.get || executorService.isTerminated || (listenerThread.isDone
&& started.get())) {
dropEvent.onBusStopped(event)
- } else if(!eventQueue.offer(event)) {
+ } else if (!eventQueue.offer(event)) {
dropEvent.onDropEvent(event)
}
}
@@ -194,7 +191,7 @@ abstract class ListenerEventBus[L <: EventListener, E <:
Event]
private val logDroppedEvent = new AtomicBoolean(false)
private val logStoppedEvent = new AtomicBoolean(false)
executorService.submit(new Runnable {
- override def run(): Unit = while(true) {
+ override def run(): Unit = while (true) {
val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
@@ -241,24 +238,26 @@ abstract class ListenerEventBus[L <: EventListener, E <:
Event]
private var future: Option[Future[_]] = None
private var continue = true
private var event: Option[E] = None
- private var lastEventDealData = 0l
+ private var lastEventDealTime = 0L
- def releaseFreeThread(): Unit = if(listenerThreadMaxFreeTime > 0 &&
future.isDefined && event.isEmpty && lastEventDealData > 0 &&
- System.currentTimeMillis() - lastEventDealData >=
listenerThreadMaxFreeTime) synchronized {
- if(lastEventDealData == 0 && future.isEmpty) return
- lastEventDealData = 0l
- continue = false
- future.foreach(_.cancel(true))
- future = None
+ def releaseFreeThread(): Unit = if (listenerThreadMaxFreeTime > 0 &&
future.isDefined && event.isEmpty && lastEventDealTime > 0 &&
+ System.currentTimeMillis() - lastEventDealTime >=
listenerThreadMaxFreeTime) {
+ synchronized {
+ if (lastEventDealTime == 0 && future.isEmpty) return
+ lastEventDealTime = 0L
+ continue = false
+ future.foreach(_.cancel(true))
+ future = None
+ }
}
def isRunning: Boolean = event.isDefined
- def putEvent(event: E): Boolean = if(this.event.isDefined) false else
synchronized {
- if(this.event.isDefined) false
+ def putEvent(event: E): Boolean = if (this.event.isDefined) false else
synchronized {
+ if (this.event.isDefined) false
else {
- lastEventDealData = System.currentTimeMillis()
+ lastEventDealTime = System.currentTimeMillis()
this.event = Some(event)
- if(future.isEmpty) future = Some(executorService.submit(this))
+ if (future.isEmpty) future = Some(executorService.submit(this))
else notify()
true
}
@@ -274,12 +273,13 @@ abstract class ListenerEventBus[L <: EventListener, E <:
Event]
}
while(continue) {
synchronized {
- while(event.isEmpty) Utils.tryQuietly(wait(), _ => {
+ while (event.isEmpty) Utils.tryQuietly(wait(), _ => {
threadRelease()
- return})
+ return
+ })
}
Utils.tryFinally(event.foreach(postToAll)) (synchronized {
- lastEventDealData = System.currentTimeMillis()
+ lastEventDealTime = System.currentTimeMillis()
event = None
})
}
diff --git
a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala
b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala
index 92c44d616..847315a5b 100644
---
a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala
+++
b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.linkis.server
import com.github.xiaoymin.knife4j.spring.annotations.EnableKnife4j
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
index 1cabbe9ca..282566f29 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
@@ -20,7 +20,7 @@ package org.apache.linkis.scheduler
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.scheduler.exception.SchedulerErrorException
import org.apache.linkis.scheduler.queue.SchedulerEvent
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
abstract class AbstractScheduler extends Scheduler {
@@ -28,20 +28,24 @@ abstract class AbstractScheduler extends Scheduler {
override def start(): Unit = {}
- private def getEventId(index: Int, groupName: String): String = groupName +
"_" + index
+ private val EVENT_ID_SPLIT = "_"
+
+ private def getEventId(index: Int, groupName: String): String = groupName +
EVENT_ID_SPLIT + index
+
private def getIndexAndGroupName(eventId: String): (Int, String) = {
- if(StringUtils.isBlank(eventId) || !eventId.contains("_"))
+ if (StringUtils.isBlank(eventId) || !eventId.contains(EVENT_ID_SPLIT) ||
eventId.startsWith(EVENT_ID_SPLIT)) {
throw new SchedulerErrorException(12011, s"Unrecognized execId
$eventId.(不能识别的execId $eventId.)")
- val index = eventId.lastIndexOf("_")
- if(index < 1) throw new SchedulerErrorException(12011, s"Unrecognized
execId $eventId.(不能识别的execId $eventId.)")
+ }
+ val index = eventId.lastIndexOf(EVENT_ID_SPLIT)
(eventId.substring(index + 1).toInt, eventId.substring(0, index))
}
+
override def submit(event: SchedulerEvent): Unit = {
val group =
getSchedulerContext.getOrCreateGroupFactory.getOrCreateGroup(event)
val consumer =
getSchedulerContext.getOrCreateConsumerManager.getOrCreateConsumer(group.getGroupName)
val index = consumer.getConsumeQueue.offer(event)
index.map(getEventId(_, group.getGroupName)).foreach(event.setId)
- if(index.isEmpty) throw new SchedulerErrorException(12001,"The submission
job failed and the queue is full!(提交作业失败,队列已满!)")
+ if (index.isEmpty) throw new SchedulerErrorException(12001, "The
submission job failed and the queue is full!(提交作业失败,队列已满!)")
}
override def get(event: SchedulerEvent): Option[SchedulerEvent] =
get(event.getId)
@@ -52,12 +56,15 @@ abstract class AbstractScheduler extends Scheduler {
consumer.getRunningEvents.find(_.getId ==
eventId).orElse(consumer.getConsumeQueue.get(index))
}
- override def shutdown(): Unit = if(getSchedulerContext != null) {
- if(getSchedulerContext.getOrCreateConsumerManager != null)
+ override def shutdown(): Unit = if (getSchedulerContext != null) {
+ if (getSchedulerContext.getOrCreateConsumerManager != null) {
Utils.tryQuietly(getSchedulerContext.getOrCreateConsumerManager.shutdown())
- if(getSchedulerContext.getOrCreateExecutorManager != null)
+ }
+ if (getSchedulerContext.getOrCreateExecutorManager != null) {
Utils.tryQuietly(getSchedulerContext.getOrCreateExecutorManager.shutdown())
- if(getSchedulerContext.getOrCreateSchedulerListenerBus != null)
+ }
+ if (getSchedulerContext.getOrCreateSchedulerListenerBus != null) {
Utils.tryQuietly(getSchedulerContext.getOrCreateSchedulerListenerBus.stop())
+ }
}
}
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/Scheduler.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/Scheduler.scala
index 28fbb180c..f3af1ca07 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/Scheduler.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/Scheduler.scala
@@ -35,14 +35,13 @@ abstract class Scheduler {
}
object Scheduler extends Logging{
- def createScheduler(scheduleType: String, schedulerContext:
SchedulerContext): Option[Scheduler]={
+ def createScheduler(scheduleType: String, schedulerContext:
SchedulerContext): Option[Scheduler] = {
scheduleType match {
case "FIFO" => Some(new FIFOScheduler(schedulerContext))
case "PARA" => Some(new ParallelScheduler(schedulerContext))
- case _ => {
+ case _ =>
logger.error("Please enter the correct scheduling type!(请输入正确的调度类型!)")
None
- }
}
}
}
\ No newline at end of file
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
index 2fef52f27..868dfe427 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
@@ -37,6 +37,6 @@ trait SchedulerContext {
}
object SchedulerContext {
- val schedulerContext:SchedulerContext = new FIFOSchedulerContextImpl( 100)
- def getSchedulerContext:SchedulerContext = schedulerContext
+ val schedulerContext: SchedulerContext = new FIFOSchedulerContextImpl(100)
+ def getSchedulerContext: SchedulerContext = schedulerContext
}
\ No newline at end of file
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/event/LogEvent.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/event/LogEvent.scala
index efe153ec7..4f3b44d6c 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/event/LogEvent.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/event/LogEvent.scala
@@ -21,15 +21,13 @@ import org.apache.linkis.common.listener.Event
import org.apache.linkis.scheduler.queue.Job
-class LogEvent(source:Job,
- t:Int) extends Event{
- def getT:Int = t
+class LogEvent(source: Job, t: Int) extends Event {
+ def getT: Int = t
}
object LogEvent{
- val read:Int = 1
- val write:Int = 2
+ val read: Int = 1
+ val write: Int = 2
- def apply(source: Job,
- t: Int): LogEvent = new LogEvent(source, t)
+ def apply(source: Job, t: Int): LogEvent = new LogEvent(source, t)
}
\ No newline at end of file
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/exception/SchedulerErrorException.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/exception/SchedulerErrorException.scala
index a479d2a70..0af12ec18 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/exception/SchedulerErrorException.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/exception/SchedulerErrorException.scala
@@ -20,6 +20,6 @@ package org.apache.linkis.scheduler.exception
import org.apache.linkis.common.exception.ErrorException
-class SchedulerErrorException(errCode: Int, desc: String) extends
ErrorException(errCode,desc){
+class SchedulerErrorException(errCode: Int, desc: String) extends
ErrorException(errCode, desc) {
}
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/ExecuteRequest.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/ExecuteRequest.scala
index 4d03864b7..78e00ef5c 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/ExecuteRequest.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/ExecuteRequest.scala
@@ -25,11 +25,11 @@ trait JobExecuteRequest {
val jobId: String
}
trait RunTypeExecuteRequest{
- val runType:String
+ val runType: String
}
trait PythonExecuteRequest{
- val sparkPythonVersion:String
- val sparkPythonExtraPackage:String
- val pythonVersion:String
- val pythonExtraPackage:String
+ val sparkPythonVersion: String
+ val sparkPythonExtraPackage: String
+ val pythonVersion: String
+ val pythonExtraPackage: String
}
\ No newline at end of file
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/Executor.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/Executor.scala
index b45cbcf04..4b39aa999 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/Executor.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/Executor.scala
@@ -17,10 +17,10 @@
package org.apache.linkis.scheduler.executer
-import java.io.Closeable
-
import org.apache.linkis.protocol.engine.EngineState
+import java.io.Closeable
+
trait Executor extends Closeable {
@@ -43,7 +43,7 @@ object ExecutorState {
def apply(x: Int): ExecutorState = EngineState.values()(x)
- def isCompleted(state: ExecutorState) =
EngineState.isCompleted(state.asInstanceOf[EngineState])
+ def isCompleted(state: ExecutorState): Boolean =
EngineState.isCompleted(state)
- def isAvailable(state: ExecutorState) =
EngineState.isAvailable(state.asInstanceOf[EngineState])
+ def isAvailable(state: ExecutorState): Boolean =
EngineState.isAvailable(state)
}
\ No newline at end of file
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFuture.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFuture.scala
index 33cf384af..9d4cc125e 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFuture.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFuture.scala
@@ -19,5 +19,5 @@ package org.apache.linkis.scheduler.future
trait BDPFuture {
- def cancel():Unit
+ def cancel(): Unit
}
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFutureTask.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFutureTask.scala
index cb36aa46a..59fe3cf21 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFutureTask.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/future/BDPFutureTask.scala
@@ -17,8 +17,9 @@
package org.apache.linkis.scheduler.future
-import java.util.concurrent.{Future, FutureTask}
+import org.apache.commons.lang3.reflect.FieldUtils
+import java.util.concurrent.{Future, FutureTask}
import org.apache.linkis.common.utils.{Logging, Utils}
@@ -27,10 +28,9 @@ class BDPFutureTask(future: Future[_]) extends BDPFuture
with Logging {
future match {
case futureTask: FutureTask[_] =>
logger.info("Start to interrupt BDPFutureTask")
- val futureType = futureTask.getClass
- val field = futureType.getDeclaredField("runner")
- field.setAccessible(true)
- val runner = field.get(futureTask).asInstanceOf[Thread]
+ val runner = FieldUtils
+ .readDeclaredField(futureTask, "runner", true)
+ .asInstanceOf[Thread]
runner.interrupt()
logger.info(s"Finished to interrupt BDPFutureTask of
${runner.getName}")
}
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
index f325ae572..3a08ee34f 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
@@ -27,29 +27,29 @@ trait SchedulerEvent extends Logging {
private[queue] var id: String = _
private var state: SchedulerEventState = Inited
val createTime = System.currentTimeMillis
- protected var scheduledTime: Long = 0l
- protected var startTime: Long = 0l
- protected var endTime: Long = 0l
+ protected var scheduledTime: Long = 0L
+ protected var startTime: Long = 0L
+ protected var endTime: Long = 0L
- def getEndTime = endTime
- def getStartTime = startTime
+ def getEndTime: Long = endTime
+ def getStartTime: Long = startTime
/*
* To be compatible with old versions.
* It's not recommonded to use scheduledTime, which was only several mills
at most time.
*/
@Deprecated
- def getScheduledTime = scheduledTime
+ def getScheduledTime: Long = scheduledTime
- def getId = id
+ def getId: String = id
- def setId(id: String)={
+ def setId(id: String): Unit = {
this.id = id
this synchronized notify()
}
- def turnToScheduled(): Boolean = if(!isWaiting) false else this synchronized
{
- if(!isWaiting) false else {
+ def turnToScheduled(): Boolean = if (!isWaiting) false else this
synchronized {
+ if (!isWaiting) false else {
scheduledTime = System.currentTimeMillis
while(id == null) wait(100)
transition(Scheduled)
@@ -60,21 +60,21 @@ trait SchedulerEvent extends Logging {
def pause(): Unit
def resume(): Unit
- def cancel() = transition(Cancelled)
+ def cancel(): Unit = transition(Cancelled)
- def isWaiting = state == Inited
+ def isWaiting: Boolean = state == Inited
- def isScheduled = state == Scheduled
+ def isScheduled: Boolean = state == Scheduled
- def isRunning = state == Running
+ def isRunning: Boolean = state == Running
- def isCompleted = SchedulerEventState.isCompleted(state)
+ def isCompleted: Boolean = SchedulerEventState.isCompleted(state)
def isSucceed: Boolean = SchedulerEventState.isSucceed(state)
def isWaitForRetry: Boolean = state == WaitForRetry
- def getState = state
+ def getState: SchedulerEventState = state
def afterStateChanged(fromState: SchedulerEventState, toState:
SchedulerEventState): Unit
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala
index a75c44899..66bec2f60 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala
@@ -19,38 +19,21 @@ package org.apache.linkis.scheduler.queue
object SchedulerEventState extends Enumeration {
- type SchedulerEventState = Value
- val Inited = Value("Inited")
- val WaitForRetry = Value("WaitForRetry")
- val Scheduled = Value("Scheduled")
- val Running = Value("Running")
- val Succeed = Value("Succeed")
- val Failed = Value("Failed")
- val Cancelled = Value("Cancelled")
- val Timeout = Value("Timeout")
+ type SchedulerEventState = Value
+ val Inited, WaitForRetry, Scheduled, Running, Succeed, Failed, Cancelled,
Timeout = Value
- def isRunning(jobState: SchedulerEventState) = jobState == Running
+ def isRunning(jobState: SchedulerEventState): Boolean = jobState == Running
- def isScheduled(jobState: SchedulerEventState) = jobState != Inited
+ def isScheduled(jobState: SchedulerEventState): Boolean = jobState != Inited
- def isCompleted(jobState: SchedulerEventState) = jobState match {
+ def isCompleted(jobState: SchedulerEventState): Boolean = jobState match {
case Inited | Scheduled | Running | WaitForRetry => false
case _ => true
}
- def isSucceed(jobState: SchedulerEventState) = jobState == Succeed
-
- def isCompletedByStr(jobState: String): Boolean = jobState match {
- case "Inited" => false
- case "WaitForRetry" => false
- case "Scheduled" => false
- case "Running" => false
- case "Succeed" => true
- case "Failed" => true
- case "Cancelled" => true
- case "Timeout" => true
- case _ => true
- }
+ def isSucceed(jobState: SchedulerEventState): Boolean = jobState == Succeed
+
+ def isCompletedByStr(jobState: String): Boolean =
isCompleted(SchedulerEventState.withName(jobState))
}
\ No newline at end of file
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala
index 6c403f6a5..2c88c72ce 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManager.scala
@@ -70,7 +70,7 @@ class FIFOConsumerManager(groupName: String) extends
ConsumerManager {
}
override def shutdown(): Unit = {
- if(consumerListener != null) consumerListener.onConsumerDestroyed(consumer)
+ if (consumerListener != null)
consumerListener.onConsumerDestroyed(consumer)
consumer.shutdown()
executorService.shutdownNow()
}
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupFactory.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupFactory.scala
index 6e0761239..4d9d0cba4 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupFactory.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupFactory.scala
@@ -29,7 +29,7 @@ class FIFOGroupFactory extends GroupFactory {
//Obtained from the database(从数据库获取)
private var defaultMaxRunningJobs: Int = 1
- private var defaultMaxAskExecutorTimes: Long = 30000l
+ private var defaultMaxAskExecutorTimes: Long = 30000L
private var defaultInitCapacity: Int = 1000
private var defaultMaxCapacity: Int = 5000
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
index 1af930630..14d334db7 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
@@ -27,12 +27,12 @@ class FIFOScheduler(val schedulerContext: SchedulerContext)
extends AbstractSche
private var groupFactory: GroupFactory = _
- override def init() = {
+ override def init(): Unit = {
consumerManager = schedulerContext.getOrCreateConsumerManager
groupFactory = schedulerContext.getOrCreateGroupFactory
}
- override def getName = "FIFOScheduler"
+ override def getName: String = "FIFOScheduler"
- override def getSchedulerContext = schedulerContext
+ override def getSchedulerContext: SchedulerContext = schedulerContext
}
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerContextImpl.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerContextImpl.scala
index d378d8292..1f557c370 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerContextImpl.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerContextImpl.scala
@@ -33,7 +33,7 @@ class FIFOSchedulerContextImpl(val maxParallelismUsers: Int)
extends SchedulerCo
private val lock = new Object()
override def getOrCreateGroupFactory: GroupFactory = {
- if(groupFactory != null) return groupFactory
+ if (groupFactory != null) return groupFactory
lock.synchronized {
if (groupFactory == null) {
groupFactory = createGroupFactory()
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelScheduler.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelScheduler.scala
index bf618beb7..92304483e 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelScheduler.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelScheduler.scala
@@ -26,12 +26,12 @@ class ParallelScheduler(val schedulerContext:
SchedulerContext) extends Abstract
private var consumerManager: ConsumerManager = _
private var groupFactory: GroupFactory = _
- override def init() = {
+ override def init(): Unit = {
consumerManager = schedulerContext.getOrCreateConsumerManager
groupFactory = schedulerContext.getOrCreateGroupFactory
}
- override def getName = "ParallelScheduler"
+ override def getName: String = "ParallelScheduler"
- override def getSchedulerContext = schedulerContext
+ override def getSchedulerContext: SchedulerContext = schedulerContext
}
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/SchedulerEventStateTest.scala
similarity index 52%
copy from
linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
copy to
linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/SchedulerEventStateTest.scala
index 1af930630..7ecc5151f 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOScheduler.scala
+++
b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/SchedulerEventStateTest.scala
@@ -5,34 +5,36 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.linkis.scheduler.queue.fifoqueue
+package org.apache.linkis.scheduler.queue
-import org.apache.linkis.scheduler.queue._
-import org.apache.linkis.scheduler.{AbstractScheduler, SchedulerContext}
+import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import org.assertj.core.api.ThrowableAssert
+import org.junit.jupiter.api.Test
-class FIFOScheduler(val schedulerContext: SchedulerContext) extends
AbstractScheduler {
- private var consumerManager: ConsumerManager = _
+class SchedulerEventStateTest {
- private var groupFactory: GroupFactory = _
-
- override def init() = {
- consumerManager = schedulerContext.getOrCreateConsumerManager
- groupFactory = schedulerContext.getOrCreateGroupFactory
+ @Test def isCompletedByStr(): Unit = {
+ val initedStateStr = "Inited"
+ val succeedStr = "Succeed"
+ val errStr = "errStr"
+ assertThat(SchedulerEventState.isCompletedByStr(initedStateStr)).isFalse
+ assertThat(SchedulerEventState.isCompletedByStr(succeedStr)).isTrue
+ assertThatThrownBy(new ThrowableAssert.ThrowingCallable {
+ override def call(): Unit = {
+ SchedulerEventState.isCompletedByStr(errStr)
+ }
+ }).isInstanceOf(classOf[NoSuchElementException])
}
- override def getName = "FIFOScheduler"
-
- override def getSchedulerContext = schedulerContext
-}
+}
\ No newline at end of file
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
index 236969b3f..f1660edd2 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
@@ -17,7 +17,7 @@
package org.apache.linkis.manager.engineplugin.common.launch.process
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
object Environment extends Enumeration {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]