This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new 7eea895ed refactor: refact entrance (#4008)
7eea895ed is described below
commit 7eea895ed76f4704e091cd5e69724aa684b4ad4a
Author: Jack Xu <[email protected]>
AuthorDate: Thu Dec 15 20:33:52 2022 +0800
refactor: refact entrance (#4008)
---
.../apache/linkis/scheduler/SchedulerContext.scala | 6 ----
.../queue/fifoqueue/FIFOUserConsumer.scala | 2 +-
.../linkis/scheduler/SchedulerContextTest.scala | 4 ++-
.../entrance/restful/EntranceRestfulApi.java | 14 ++++----
.../entrance/server/DefaultEntranceServer.java | 6 ++--
.../apache/linkis/entrance/EntranceServer.scala | 20 +++++------
.../linkis/entrance/EntranceWebSocketService.scala | 31 ++++++++---------
.../linkis/entrance/cs/CSEntranceHelper.scala | 39 +++++++++++-----------
.../entrance/execute/EntranceExecutorManager.scala | 19 +++--------
.../entrance/parser/CommonEntranceParser.scala | 6 ++--
.../impl/value/object/CSFlowInfosSerializer.java | 5 +--
11 files changed, 68 insertions(+), 84 deletions(-)
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
index 833750f57..35c6ff259 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/SchedulerContext.scala
@@ -21,7 +21,6 @@ import org.apache.linkis.common.listener.ListenerEventBus
import org.apache.linkis.scheduler.event.{ScheduleEvent,
SchedulerEventListener}
import org.apache.linkis.scheduler.executer.ExecutorManager
import org.apache.linkis.scheduler.queue.{ConsumerManager, GroupFactory}
-import org.apache.linkis.scheduler.queue.fifoqueue.FIFOSchedulerContextImpl
trait SchedulerContext {
@@ -35,8 +34,3 @@ trait SchedulerContext {
: ListenerEventBus[_ <: SchedulerEventListener, _ <: ScheduleEvent]
}
-
-object SchedulerContext {
- val schedulerContext: SchedulerContext = new FIFOSchedulerContextImpl(100)
- def getSchedulerContext: SchedulerContext = schedulerContext
-}
diff --git
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
index 926d7ba17..dcfbbfee6 100644
---
a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
+++
b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
@@ -108,7 +108,7 @@ class FIFOUserConsumer(
}
var event: Option[SchedulerEvent] = getWaitForRetryEvent
if (event.isEmpty) {
- val completedNums = runningJobs.filter(e => e == null || e.isCompleted)
+ val completedNums = runningJobs.filter(job => job == null ||
job.isCompleted)
if (completedNums.length < 1) {
Utils.tryQuietly(Thread.sleep(1000)) // TODO 还可以优化,通过实现JobListener进行优化
return
diff --git
a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/SchedulerContextTest.scala
b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/SchedulerContextTest.scala
index 1a6462cd4..42d62b241 100644
---
a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/SchedulerContextTest.scala
+++
b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/SchedulerContextTest.scala
@@ -17,6 +17,8 @@
package org.apache.linkis.scheduler
+import org.apache.linkis.scheduler.queue.fifoqueue.FIFOSchedulerContextImpl
+
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Test
@@ -24,7 +26,7 @@ class SchedulerContextTest {
@Test
def testSchedulerContext: Unit = {
- val schedulerContext = SchedulerContext
+ val schedulerContext = new FIFOSchedulerContextImpl(100);
assertNotNull(schedulerContext)
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
index 33ab1e97a..324187fc2 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
+++
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
@@ -98,15 +98,14 @@ public class EntranceRestfulApi implements
EntranceRestfulRemote {
logger.info("Begin to get an execID");
json.put(TaskConstant.EXECUTE_USER, ModuleUserUtils.getOperationUser(req));
json.put(TaskConstant.SUBMIT_USER, SecurityFilter.getLoginUsername(req));
- HashMap<String, String> map = (HashMap) json.get(TaskConstant.SOURCE);
+ HashMap<String, String> map = (HashMap<String, String>)
json.get(TaskConstant.SOURCE);
if (map == null) {
map = new HashMap<>();
json.put(TaskConstant.SOURCE, map);
}
String ip = JobHistoryHelper.getRequestIpAddr(req);
map.put(TaskConstant.REQUEST_IP, ip);
- String jobId = entranceServer.execute(json);
- Job job = entranceServer.getJob(jobId).get();
+ Job job = entranceServer.execute(json);
JobRequest jobReq = ((EntranceJob) job).getJobRequest();
Long jobReqId = jobReq.getId();
ModuleUserUtils.getOperationUser(req, "execute task,id: " + jobReqId);
@@ -121,7 +120,7 @@ public class EntranceRestfulApi implements
EntranceRestfulRemote {
"************************************SCRIPT
CODE************************************", job);
String execID =
ZuulEntranceUtils.generateExecID(
- jobId,
+ job.getId(),
Sender.getThisServiceInstance().getApplicationName(),
new String[] {Sender.getThisInstance()});
pushLog(
@@ -158,8 +157,7 @@ public class EntranceRestfulApi implements
EntranceRestfulRemote {
}
String ip = JobHistoryHelper.getRequestIpAddr(req);
map.put(TaskConstant.REQUEST_IP, ip);
- String jobId = entranceServer.execute(json);
- Job job = entranceServer.getJob(jobId).get();
+ Job job = entranceServer.execute(json);
JobRequest jobRequest = ((EntranceJob) job).getJobRequest();
Long jobReqId = jobRequest.getId();
ModuleUserUtils.getOperationUser(req, "submit jobReqId: " + jobReqId);
@@ -175,7 +173,7 @@ public class EntranceRestfulApi implements
EntranceRestfulRemote {
pushLog(
LogUtils.generateInfo(
"Your job is accepted, jobID is "
- + jobId
+ + job.getId()
+ " and jobReqId is "
+ jobReqId
+ " in "
@@ -184,7 +182,7 @@ public class EntranceRestfulApi implements
EntranceRestfulRemote {
job);
String execID =
ZuulEntranceUtils.generateExecID(
- jobId,
+ job.getId(),
Sender.getThisServiceInstance().getApplicationName(),
new String[] {Sender.getThisInstance()});
message = Message.ok();
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
index 97ee71c0e..a050056fe 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
+++
b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
@@ -80,9 +80,9 @@ public class DefaultEntranceServer extends EntranceServer {
logger.warn("event has been handled");
} else {
logger.warn("Entrance exit to stop all job");
- EntranceJob[] allUndoneTask = getAllUndoneTask(null);
- if (null != allUndoneTask) {
- for (EntranceJob job : allUndoneTask) {
+ EntranceJob[] allUndoneJobs = getAllUndoneTask(null);
+ if (null != allUndoneJobs) {
+ for (EntranceJob job : allUndoneJobs) {
job.onFailure(
"Entrance exits the automatic cleanup task and can be
rerun(服务退出自动清理任务,可以重跑)", null);
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index cc470d542..2b3a75377 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -52,14 +52,14 @@ abstract class EntranceServer extends Logging {
def getEntranceContext: EntranceContext
/**
- * Execute a task and return an execId(执行一个task,返回一个execId)
+ * Execute a task and return an job(执行一个task,返回一个job)
* @param params
* @return
*/
- def execute(params: java.util.Map[String, Any]): String = {
- if (!params.containsKey(EntranceServer.DO_NOT_PRINT_PARAMS_LOG))
+ def execute(params: java.util.Map[String, Any]): Job = {
+ if (!params.containsKey(EntranceServer.DO_NOT_PRINT_PARAMS_LOG)) {
logger.debug("received a request: " + params)
- else params.remove(EntranceServer.DO_NOT_PRINT_PARAMS_LOG)
+ } else params.remove(EntranceServer.DO_NOT_PRINT_PARAMS_LOG)
var jobRequest =
getEntranceContext.getOrCreateEntranceParser().parseToTask(params)
// todo: multi entrance instances
jobRequest.setInstances(Sender.getThisInstance)
@@ -131,14 +131,14 @@ abstract class EntranceServer extends Logging {
job.setProgressListener(getEntranceContext.getOrCreatePersistenceManager())
job.setJobListener(getEntranceContext.getOrCreatePersistenceManager())
job match {
- case entranceJob: EntranceJob => {
+ case entranceJob: EntranceJob =>
entranceJob.setEntranceListenerBus(getEntranceContext.getOrCreateEventListenerBus)
- }
case _ =>
}
Utils.tryCatch {
- if (logAppender.length() > 0)
+ if (logAppender.length() > 0) {
job.getLogListener.foreach(_.onLogUpdate(job,
logAppender.toString.trim))
+ }
} { t =>
logger.error("Failed to write init log, reason: ", t)
}
@@ -158,13 +158,13 @@ abstract class EntranceServer extends Logging {
job match {
case entranceJob: EntranceJob =>
entranceJob.getJobRequest.setReqId(job.getId())
- if (jobTimeoutManager.timeoutCheck &&
JobTimeoutManager.hasTimeoutLabel(entranceJob))
+ if (jobTimeoutManager.timeoutCheck &&
JobTimeoutManager.hasTimeoutLabel(entranceJob)) {
jobTimeoutManager.add(job.getId(), entranceJob)
+ }
entranceJob.getLogListener.foreach(_.onLogUpdate(entranceJob, msg))
case _ =>
}
-
- job.getId()
+ job
} { t =>
job.onFailure("Submitting the query failed!(提交查询失败!)", t)
val _jobRequest: JobRequest =
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
index 06b6520d1..59dc42475 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
@@ -49,6 +49,8 @@ import java.util
import java.util.Date
import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+
class EntranceWebSocketService
extends ServerEventService
with EntranceEventListener
@@ -115,10 +117,9 @@ class EntranceWebSocketService
case job: Job =>
if (jobIdToEventId.containsKey(job.getId)) {
val entranceJob = job.asInstanceOf[EntranceJob]
- val engineTypeLabel = entranceJob.getJobRequest.getLabels
- .filter(l =>
l.getLabelKey.equalsIgnoreCase(LabelKeyConstant.ENGINE_TYPE_KEY))
- .headOption
- .getOrElse(null)
+ val engineTypeLabel =
entranceJob.getJobRequest.getLabels.asScala
+ .find(l =>
l.getLabelKey.equalsIgnoreCase(LabelKeyConstant.ENGINE_TYPE_KEY))
+ .orNull
if (null == engineTypeLabel) {
logger.error("Invalid engineTpyeLabel")
return
@@ -132,7 +133,8 @@ class EntranceWebSocketService
Utils.tryQuietly(
sendMsg(
job,
- "Get waiting size succeed."
+ Message
+ .ok("Get waiting size succeed.")
.data("execID", realID)
.data("taskID", taskID)
.data("waitingSize", index)
@@ -178,17 +180,16 @@ class EntranceWebSocketService
} // TODO Convert to a suitable Map(转换成合适的Map)
val websocketTag = event.getWebsocketTag
params.put(TaskConstant.EXECUTE_USER, event.getUser)
- val jobId = entranceServer.execute(params)
- jobIdToEventId synchronized jobIdToEventId.put(jobId, event.getId)
- websocketTagJobID synchronized websocketTagJobID.put(jobId, websocketTag)
- val jobRequest =
entranceServer.getJob(jobId).get.asInstanceOf[EntranceJob].getJobRequest
+ val job = entranceServer.execute(params)
+ jobIdToEventId synchronized jobIdToEventId.put(job.getId(), event.getId)
+ websocketTagJobID synchronized websocketTagJobID.put(job.getId(),
websocketTag)
+ val jobRequest = job.asInstanceOf[EntranceJob].getJobRequest
val taskID = jobRequest.getId
- val job = entranceServer.getJob(jobId).get
val engineTypeLabel = LabelUtil.getEngineTypeLabel(jobRequest.getLabels)
val executeApplicationName: String = engineTypeLabel.getEngineType
val creator: String =
LabelUtil.getUserCreatorLabel(jobRequest.getLabels).getCreator
val execID = ZuulEntranceUtils.generateExecID(
- jobId,
+ job.getId(),
executeApplicationName,
Sender.getThisInstance,
creator
@@ -197,7 +198,7 @@ class EntranceWebSocketService
executeResponseMsg
.data("execID", execID)
.data("taskID", taskID)
- .data("websocketTag", websocketTagJobID.get(jobId))
+ .data("websocketTag", websocketTagJobID.get(job.getId()))
executeResponseMsg.setMethod(restfulURI + "entrance/execute")
executeResponseMsg.setStatus(0)
sendMsg(job, executeResponseMsg)
@@ -238,7 +239,7 @@ class EntranceWebSocketService
"The request was executed successfully!"
.data("execID", execID)
.data("taskID", taskID)
- .data("websocketTag", websocketTagJobID.get(jobId))
+ .data("websocketTag", websocketTagJobID.get(job.getId))
// executeResponseMsg
}
@@ -301,8 +302,8 @@ class EntranceWebSocketService
.data("websocketTag", websocketTagJobID.get(realID))
.data("taskID", taskID)
logger.info(
- s" retMessage: execID is $longExecID, status is ${status.toString},
websocketTag is ${websocketTagJobID
- .get(realID)}"
+ "retMessage: execID is {}, status is {}, websocketTag is {}",
+ Array(longExecID, status.toString, websocketTagJobID.get(realID)): _*
)
retMessage.setStatus(0)
retMessage.setMethod(restfulURI + "entrance/" + longExecID + "/status")
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
index 813315e7d..d12a9b9d3 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala
@@ -43,7 +43,7 @@ import org.apache.commons.lang3.StringUtils
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
object CSEntranceHelper extends Logging {
@@ -80,12 +80,12 @@ object CSEntranceHelper extends Logging {
*/
def registerCSRSData(job: Job): Unit = {
job match {
- case entranceJob: EntranceJob => {
+ case entranceJob: EntranceJob =>
val (contextIDValueStr, nodeNameStr) =
getContextInfo(entranceJob.getParams)
- logger.info(s"registerCSRSData: nodeName:$nodeNameStr")
- if (StringUtils.isBlank(contextIDValueStr) ||
StringUtils.isBlank(nodeNameStr))
+ logger.info("registerCSRSData: nodeName: {}", nodeNameStr)
+ if (StringUtils.isBlank(contextIDValueStr) ||
StringUtils.isBlank(nodeNameStr)) {
return null
-
+ }
val contextKey = new CommonContextKey
contextKey.setContextScope(ContextScope.PUBLIC)
contextKey.setContextType(ContextType.DATA)
@@ -101,11 +101,10 @@ object CSEntranceHelper extends Logging {
SerializeHelper.serializeContextKey(contextKey),
data
)
- logger.info(s"(${contextKey.getKey} put ${jobRequest.getId} of
jobId to cs)")
+ logger.info("({} put {} of jobId to cs)", contextKey.getKey: Any,
jobRequest.getId: Any)
case _ =>
}
- logger.info(s"registerCSRSData end: nodeName:$nodeNameStr")
- }
+ logger.info("registerCSRSData end: nodeName: {}", nodeNameStr)
case _ =>
}
}
@@ -123,7 +122,7 @@ object CSEntranceHelper extends Logging {
)
if (StringUtils.isNotBlank(contextIDValueStr) &&
StringUtils.isNotBlank(nodeNameStr)) {
- logger.info(s"init node($nodeNameStr) cs info")
+ logger.info("init node({}) cs info", nodeNameStr)
CSNodeServiceImpl.getInstance().initNodeCSInfo(contextIDValueStr,
nodeNameStr)
}
}
@@ -144,21 +143,23 @@ object CSEntranceHelper extends Logging {
if (StringUtils.isNotBlank(contextIDValueStr) &&
StringUtils.isNotBlank(nodeNameStr)) {
val userCreatorLabel =
LabelUtil.getUserCreatorLabel(requestPersistTask.getLabels)
val newLabels = new util.ArrayList[Label[_]]
- requestPersistTask.getLabels
+ requestPersistTask.getLabels.asScala
.filterNot(_.isInstanceOf[UserCreatorLabel])
.foreach(newLabels.add)
SerializeHelper.deserializeContextID(contextIDValueStr) match {
case contextID: LinkisWorkflowContextID =>
if
(CSCommonUtils.CONTEXT_ENV_PROD.equalsIgnoreCase(contextID.getEnv)) {
logger.info(
- s"reset creator from ${userCreatorLabel.getCreator} to " +
EntranceConfiguration.SCHEDULER_CREATOR
- .getHotValue()
+ "reset creator from {} to {}",
+ userCreatorLabel.getCreator: Any,
+ EntranceConfiguration.SCHEDULER_CREATOR.getHotValue(): Any
)
userCreatorLabel.setCreator(EntranceConfiguration.SCHEDULER_CREATOR.getHotValue())
} else {
logger.info(
- s"reset creator from ${userCreatorLabel.getCreator} to " +
EntranceConfiguration.FLOW_EXECUTION_CREATOR
- .getHotValue()
+ "reset creator from {} to {}",
+ userCreatorLabel.getCreator: Any,
+ EntranceConfiguration.FLOW_EXECUTION_CREATOR.getHotValue: Any
)
userCreatorLabel.setCreator(EntranceConfiguration.FLOW_EXECUTION_CREATOR.getHotValue())
}
@@ -182,19 +183,19 @@ object CSEntranceHelper extends Logging {
)
if (StringUtils.isNotBlank(contextIDValueStr)) {
- logger.info(s"parse variable nodeName:$nodeNameStr")
+ logger.info("parse variable nodeName: {}", nodeNameStr)
val linkisVariableList: util.List[LinkisVariable] =
CSVariableService.getInstance().getUpstreamVariables(contextIDValueStr,
nodeNameStr);
if (null != linkisVariableList) {
- linkisVariableList.foreach { linkisVariable =>
+ linkisVariableList.asScala.foreach { linkisVariable =>
variableMap.put(linkisVariable.getKey, linkisVariable.getValue)
}
}
- if (variableMap.nonEmpty) {
+ if (!variableMap.isEmpty) {
// 1.cs priority is low, the same ones are not added
val varMap =
TaskUtils.getVariableMap(requestPersistTask.getParams.asInstanceOf[util.Map[String,
Any]])
- variableMap.foreach { keyAndValue =>
+ variableMap.asScala.foreach { keyAndValue =>
if (!varMap.containsKey(keyAndValue._1)) {
varMap.put(keyAndValue._1, keyAndValue._2)
}
@@ -205,7 +206,7 @@ object CSEntranceHelper extends Logging {
)
}
- logger.info(s"parse variable end nodeName:$nodeNameStr")
+ logger.info("parse variable end nodeName: {}", nodeNameStr)
}
}
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
index 516b900b6..0d5d60598 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala
@@ -22,7 +22,6 @@ import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._
import org.apache.linkis.entrance.exception.EntranceErrorException
-import org.apache.linkis.entrance.job.EntranceExecutionJob
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.scheduler.executer.{Executor, ExecutorManager}
import org.apache.linkis.scheduler.queue.{GroupFactory, Job, SchedulerEvent}
@@ -49,17 +48,7 @@ abstract class EntranceExecutorManager(groupFactory:
GroupFactory)
override def askExecutor(schedulerEvent: SchedulerEvent): Option[Executor] =
schedulerEvent match {
case job: Job =>
- val executor = createExecutor(job)
- if (executor != null) {
- job match {
- case entranceExecutionJob: EntranceExecutionJob =>
- val jobReq = entranceExecutionJob.getJobRequest
- jobReq.setUpdatedTime(new Date(System.currentTimeMillis()))
- case _ =>
- }
- Some(executor)
- } else None
-
+ Option(createExecutor(job))
}
// Update the overall job running status after no subtask runs
@@ -70,7 +59,7 @@ abstract class EntranceExecutorManager(groupFactory:
GroupFactory)
val startTime = System.currentTimeMillis()
var warnException: WarnException = null
var executor: Option[Executor] = None
- while (System.currentTimeMillis - startTime < wait.toMillis &&
executor.isEmpty)
+ while (System.currentTimeMillis - startTime < wait.toMillis &&
executor.isEmpty) {
Utils.tryCatch(executor = askExecutor(job)) {
case warn: WarnException =>
logger.warn("request engine failed!", warn)
@@ -78,6 +67,7 @@ abstract class EntranceExecutorManager(groupFactory:
GroupFactory)
None
case t: Throwable => throw t
}
+ }
// todo check
if (warnException != null && executor.isEmpty) throw warnException
executor
@@ -98,7 +88,7 @@ abstract class EntranceExecutorManager(groupFactory:
GroupFactory)
schedulerEvent match {
case job: EntranceJob =>
job.getJobRequest match {
- case jobRequest: JobRequest =>
+ case jobReq: JobRequest =>
val entranceEntranceExecutor =
new DefaultEntranceExecutor(idGenerator.incrementAndGet())
// getEngineConn Executor
@@ -108,6 +98,7 @@ abstract class EntranceExecutorManager(groupFactory:
GroupFactory)
LogUtils.generateInfo("Your job is being scheduled by
orchestrator.")
)
)
+ jobReq.setUpdatedTime(new Date(System.currentTimeMillis()))
/**
* // val engineConnExecutor =
engineConnManager.getAvailableEngineConnExecutor(mark)
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
index 146eb2c2c..e230bbc78 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
@@ -86,7 +86,7 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
if (labelMap.isEmpty) {
throw new EntranceIllegalParamException(
EntranceErrorCode.PARAM_CANNOT_EMPTY.getErrCode,
- EntranceErrorCode.PARAM_CANNOT_EMPTY.getDesc + s", labels is null"
+ s"${EntranceErrorCode.PARAM_CANNOT_EMPTY.getDesc}, labels is null"
)
}
// 3. set Code
@@ -154,7 +154,7 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
): Unit = {
val engineRunTypeLabel =
labels.getOrDefault(LabelKeyConstant.CODE_TYPE_KEY, null)
if (StringUtils.isBlank(runType) && null == engineRunTypeLabel) {
- val msg = s"You need to specify runType in execution content, such as
sql"
+ val msg = "You need to specify runType in execution content, such as sql"
logger.warn(msg)
throw new EntranceIllegalParamException(
EntranceErrorCode.LABEL_PARAMS_INVALID.getErrCode,
@@ -260,7 +260,7 @@ class CommonEntranceParser(val persistenceManager:
PersistenceManager)
.asInstanceOf[util.Map[String, Object]]
if (null != labelMap && !labelMap.isEmpty) {
val list: util.List[Label[_]] =
- labelBuilderFactory.getLabels(labelMap.asInstanceOf[util.Map[String,
AnyRef]])
+ labelBuilderFactory.getLabels(labelMap)
labelList.addAll(list)
}
}
diff --git
a/linkis-public-enhancements/linkis-context-service/linkis-cs-common/src/main/java/org/apache/linkis/cs/common/serialize/impl/value/object/CSFlowInfosSerializer.java
b/linkis-public-enhancements/linkis-context-service/linkis-cs-common/src/main/java/org/apache/linkis/cs/common/serialize/impl/value/object/CSFlowInfosSerializer.java
index 3f196cac9..6e4247a5c 100644
---
a/linkis-public-enhancements/linkis-context-service/linkis-cs-common/src/main/java/org/apache/linkis/cs/common/serialize/impl/value/object/CSFlowInfosSerializer.java
+++
b/linkis-public-enhancements/linkis-context-service/linkis-cs-common/src/main/java/org/apache/linkis/cs/common/serialize/impl/value/object/CSFlowInfosSerializer.java
@@ -35,9 +35,6 @@ public class CSFlowInfosSerializer extends
AbstractSerializer<CSFlowInfos> {
@Override
public boolean accepts(Object obj) {
- if (null != obj &&
obj.getClass().getName().equals(CSFlowInfos.class.getName())) {
- return true;
- }
- return false;
+ return null != obj && CSFlowInfos.class.isAssignableFrom(obj.getClass());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]