This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.0.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.0.3 by this push:
new f94eca9 remove duplicate update actions (#1294)
f94eca9 is described below
commit f94eca97370621011adecfb95123e5f708e2e278
Author: Casion <[email protected]>
AuthorDate: Tue Jan 11 15:27:51 2022 +0800
remove duplicate update actions (#1294)
---
.../scala/org/apache/linkis/entrance/EntranceServer.scala | 12 ++++++------
.../jobhistory/service/impl/JobHistoryQueryServiceImpl.scala | 6 +++---
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index abd6c23..f0024cc 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -49,7 +49,7 @@ abstract class EntranceServer extends Logging {
* @return
*/
def execute(params: java.util.Map[String, Any]): String = {
- if(!params.containsKey(EntranceServer.DO_NOT_PRINT_PARAMS_LOG))
debug("received a request: " + params)
+ if (!params.containsKey(EntranceServer.DO_NOT_PRINT_PARAMS_LOG))
debug("received a request: " + params)
else params.remove(EntranceServer.DO_NOT_PRINT_PARAMS_LOG)
var jobRequest =
getEntranceContext.getOrCreateEntranceParser().parseToTask(params)
// tod multi entrance instances
@@ -97,7 +97,7 @@ abstract class EntranceServer extends Logging {
case _ =>
}
Utils.tryCatch{
- if(logAppender.length() > 0)
job.getLogListener.foreach(_.onLogUpdate(job, logAppender.toString.trim))
+ if (logAppender.length() > 0)
job.getLogListener.foreach(_.onLogUpdate(job, logAppender.toString.trim))
}{
t => logger.error("Failed to write init log, reason: ", t)
}
@@ -112,7 +112,7 @@ abstract class EntranceServer extends Logging {
entranceJob.getLogListener.foreach(_.onLogUpdate(entranceJob, msg))
case _ =>
}
-
getEntranceContext.getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(jobRequest)
+
job.getId()
}{t =>
job.onFailure("Submitting the query failed!(提交查询失败!)", t)
@@ -146,9 +146,9 @@ abstract class EntranceServer extends Logging {
def getJob(execId: String): Option[Job] =
getEntranceContext.getOrCreateScheduler().get(execId).map(_.asInstanceOf[Job])
- private[entrance] def getEntranceWebSocketService:
Option[EntranceWebSocketService] =
if(ServerConfiguration.BDP_SERVER_SOCKET_MODE.getValue) {
- if(entranceWebSocketService.isEmpty) synchronized {
- if(entranceWebSocketService.isEmpty) {
+ private[entrance] def getEntranceWebSocketService:
Option[EntranceWebSocketService] = if
(ServerConfiguration.BDP_SERVER_SOCKET_MODE.getValue) {
+ if (entranceWebSocketService.isEmpty) synchronized {
+ if (entranceWebSocketService.isEmpty) {
entranceWebSocketService = Some(new EntranceWebSocketService)
entranceWebSocketService.foreach(_.setEntranceServer(this))
entranceWebSocketService.foreach(getEntranceContext.getOrCreateEventListenerBus.addListener)
diff --git
a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
index 0194156..e673bf3 100644
---
a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
+++
b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
@@ -96,12 +96,12 @@ class JobHistoryQueryServiceImpl extends
JobHistoryQueryService with Logging {
if (jobReq.getStatus != null) {
val oldStatus: String =
jobHistoryMapper.selectJobHistoryStatusForUpdate(jobReq.getId)
if (oldStatus != null && !shouldUpdate(oldStatus, jobReq.getStatus)) {
- throw new
QueryException(120001,s"任务Id${jobReq.getId}在数据库中的task状态为:${oldStatus}更新的task状态为:${jobReq.getStatus}更新失败!")
+ throw new
QueryException(120001,s"jobId:${jobReq.getId},在数据库中的task状态为:${oldStatus},更新的task状态为:${jobReq.getStatus},更新失败!")
}
}
val jobUpdate = jobRequest2JobHistory(jobReq)
if(jobUpdate.getUpdated_time == null) {
- throw new
QueryException(120001,s"job${jobReq.getId}更新job相关信息失败,请指定该请求的更新时间!")
+ throw new
QueryException(120001,s"jobId:${jobReq.getId},更新job相关信息失败,请指定该请求的更新时间!")
}
jobHistoryMapper.updateJobHistory(jobUpdate)
val map = new util.HashMap[String, Object]
@@ -141,7 +141,7 @@ class JobHistoryQueryServiceImpl extends
JobHistoryQueryService with Logging {
if (jobReq.getStatus != null) {
val oldStatus: String =
jobHistoryMapper.selectJobHistoryStatusForUpdate(jobReq.getId)
if (oldStatus != null && !shouldUpdate(oldStatus,
jobReq.getStatus))
- throw new
QueryException(120001,s"${jobReq.getId}数据库中的task状态为:${oldStatus}更新的task状态为:${jobReq.getStatus}更新失败!")
+ throw new
QueryException(120001,s"jobId:${jobReq.getId},在数据库中的task状态为:${oldStatus},更新的task状态为:${jobReq.getStatus},更新失败!")
}
val jobUpdate = jobRequest2JobHistory(jobReq)
jobUpdate.setUpdated_time(new
Timestamp(System.currentTimeMillis()))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]