This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new de7ad07dd [Bugfix] Task state changes should before engine state
changes (#4775)
de7ad07dd is described below
commit de7ad07ddfe15bca10712abbd81e37d29f5f30dd
Author: 人生有如两个橘子 <[email protected]>
AuthorDate: Sat Jul 15 23:52:03 2023 +0800
[Bugfix] Task state changes should before engine state changes (#4775)
this close #4774
---
.../executor/execute/ComputationExecutor.scala | 19 ++++++--
.../service/TaskExecutionServiceImpl.scala | 10 +---
.../executor/lock/EngineConnTimedLock.scala | 55 ++++++++++------------
.../service/EngineConnTimedLockService.scala | 4 +-
4 files changed, 41 insertions(+), 47 deletions(-)
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
index 98a6c2b21..940973be6 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala
@@ -21,7 +21,10 @@ import org.apache.linkis.DataWorkCloudApplication
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import
org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
-import
org.apache.linkis.engineconn.acessible.executor.listener.event.TaskStatusChangedEvent
+import org.apache.linkis.engineconn.acessible.executor.listener.event.{
+ TaskResponseErrorEvent,
+ TaskStatusChangedEvent
+}
import org.apache.linkis.engineconn.common.conf.{EngineConnConf,
EngineConnConstant}
import
org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
@@ -237,11 +240,9 @@ abstract class ComputationExecutor(val outputPrintLimit:
Int = 1000)
response = response match {
case _: OutputExecuteResponse =>
succeedTasks.increase()
- transformTaskStatus(engineConnTask, ExecutionNodeStatus.Succeed)
SuccessExecuteResponse()
case s: SuccessExecuteResponse =>
succeedTasks.increase()
- transformTaskStatus(engineConnTask, ExecutionNodeStatus.Succeed)
s
case _ => response
}
@@ -261,7 +262,17 @@ abstract class ComputationExecutor(val outputPrintLimit:
Int = 1000)
taskCache.put(engineConnTask.getTaskId, engineConnTask)
lastTask = engineConnTask
val response = ensureOp {
- toExecuteTask(engineConnTask)
+ val executeResponse = toExecuteTask(engineConnTask)
+ executeResponse match {
+ case successExecuteResponse: SuccessExecuteResponse =>
+ transformTaskStatus(engineConnTask, ExecutionNodeStatus.Succeed)
+ case errorExecuteResponse: ErrorExecuteResponse =>
+ listenerBusContext.getEngineConnSyncListenerBus.postToAll(
+ TaskResponseErrorEvent(engineConnTask.getTaskId,
errorExecuteResponse.message)
+ )
+ transformTaskStatus(engineConnTask, ExecutionNodeStatus.Failed)
+ }
+ executeResponse
}
Utils.tryAndWarn(afterExecute(engineConnTask, response))
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 93d607c25..bc738d549 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -400,15 +400,7 @@ class TaskExecutionServiceImpl
Utils.tryFinally {
val jobId = JobUtils.getJobIdFromMap(task.getProperties)
LoggerUtils.setJobIdMDC(jobId)
- val response = executor.execute(task)
- response match {
- case ErrorExecuteResponse(message, throwable) =>
- sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
- logger.error(message, throwable)
- LogHelper.pushAllRemainLogs()
- executor.transformTaskStatus(task, ExecutionNodeStatus.Failed)
- case _ => logger.warn(s"task get response is $response")
- }
+ executor.execute(task)
clearCache(task.getTaskId)
} {
LoggerUtils.removeJobIdMDC()
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
index 84ab6fb7c..af4d1eb01 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
@@ -43,12 +43,10 @@ class EngineConnTimedLock(private var timeout: Long)
val releaseScheduler = new ScheduledThreadPoolExecutor(1)
var releaseTask: ScheduledFuture[_] = null
var lastLockTime: Long = 0
- var lockedBy: AccessibleExecutor = null
override def acquire(executor: AccessibleExecutor): Unit = {
lock.acquire()
lastLockTime = System.currentTimeMillis()
- lockedBy = executor
scheduleTimeout
}
@@ -58,8 +56,6 @@ class EngineConnTimedLock(private var timeout: Long)
logger.debug("try to lock for succeed is " + succeed.toString)
if (succeed) {
lastLockTime = System.currentTimeMillis()
- lockedBy = executor
- logger.debug("try to lock for add time out task ! Locked by thread : " +
lockedBy.getId)
scheduleTimeout
}
succeed
@@ -68,18 +64,13 @@ class EngineConnTimedLock(private var timeout: Long)
// Unlock callback is not called in release method, because release method
is called actively
override def release(): Unit = {
logger.debug(
- "try to release for lock," + lockedBy + ",current thread " +
Thread.currentThread().getName
+ s"try to release for lock: ${lock.toString}, current thread " +
Thread.currentThread().getName
)
- if (lockedBy != null) {
- // && lockedBy == Thread.currentThread() Inconsistent thread(线程不一致)
- logger.debug("try to release for lockedBy and thread ")
- if (releaseTask != null) {
- releaseTask.cancel(true)
- releaseTask = null
- }
- logger.debug("try to release for lock release success")
- lockedBy = null
+ if (releaseTask != null) {
+ releaseTask.cancel(true)
+ releaseTask = null
}
+ logger.debug("try to release for lock release success")
unlockCallback(lock.toString)
resetLock()
}
@@ -97,7 +88,6 @@ class EngineConnTimedLock(private var timeout: Long)
releaseScheduler.purge()
}
lock.release()
- lockedBy = null
}
resetLock()
}
@@ -109,13 +99,18 @@ class EngineConnTimedLock(private var timeout: Long)
new Runnable {
override def run(): Unit = {
synchronized {
- if (isAcquired() && NodeStatus.Idle == lockedBy.getStatus &&
isExpired()) {
- // unlockCallback depends on lockedBy, so lockedBy cannot be
set null before unlockCallback
- logger.info(s"Lock : [${lock.toString} was released due to
timeout.")
- release()
- } else if (isAcquired() && NodeStatus.Busy ==
lockedBy.getStatus) {
- lastLockTime = System.currentTimeMillis()
- logger.info("Update lastLockTime because executor is busy.")
+ ExecutorManager.getInstance.getReportExecutor match {
+ case reportExecutor: AccessibleExecutor =>
+ if (
+ isAcquired() && NodeStatus.Idle ==
reportExecutor.getStatus && isExpired()
+ ) {
+ // unlockCallback depends on lockedBy, so lockedBy
cannot be set null before unlockCallback
+ logger.info(s"Lock : [${lock.toString} was released due
to timeout.")
+ release()
+ } else if (isAcquired() && NodeStatus.Busy ==
reportExecutor.getStatus) {
+ lastLockTime = System.currentTimeMillis()
+ logger.info("Update lastLockTime because executor is
busy.")
+ }
}
}
}
@@ -144,14 +139,12 @@ class EngineConnTimedLock(private var timeout: Long)
}
override def renew(): Boolean = {
- if (lockedBy != null) {
- if (isAcquired && releaseTask != null) {
- if (releaseTask.cancel(false)) {
- releaseScheduler.purge()
- scheduleTimeout
- lastLockTime = System.currentTimeMillis()
- return true
- }
+ if (isAcquired && releaseTask != null) {
+ if (releaseTask.cancel(false)) {
+ releaseScheduler.purge()
+ scheduleTimeout
+ lastLockTime = System.currentTimeMillis()
+ return true
}
}
false
@@ -195,7 +188,7 @@ class EngineConnTimedLock(private var timeout: Long)
ExecutorListenerBusContext
.getExecutorListenerBusContext()
.getEngineConnAsyncListenerBus
- .post(ExecutorUnLockEvent(null, lockStr.toString))
+ .post(ExecutorUnLockEvent(null, lockStr))
}
override def onExecutorCreated(executorCreateEvent: ExecutorCreateEvent):
Unit = {}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
index 957a03b6f..21325f42b 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
@@ -161,9 +161,7 @@ class EngineConnTimedLockService extends LockService with
Logging {
.toString
)
if (isLockExist(lock)) {
- logger.info(
- s"try to unlock lockEntity :
lockString=$lockString,lockedBy=${engineConnLock.lockedBy.getId}"
- )
+ logger.info(s"try to unlock lockEntity : lockString=$lockString")
engineConnLock.release()
this.lockString = null
true
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]