This is an automated email from the ASF dual-hosted git repository.
alexkun pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
new 290dc18 Fix issue EC startup failure was not properly cleaned up
(#1806)
290dc18 is described below
commit 290dc1857491e7a76226825a50b6d63fc53254a2
Author: peacewong <[email protected]>
AuthorDate: Mon Mar 28 14:35:14 2022 +0800
Fix issue EC startup failure was not properly cleaned up (#1806)
* ECM starts ec If the status is Failed, it should kill ec process #1801
* optimize code
* update remove logical
* add sync
---
.../impl/DefaultEngineConnListService.scala | 42 ++++++++++++++--------
.../am/selector/rule/AvailableNodeSelectRule.scala | 2 +-
2 files changed, 28 insertions(+), 16 deletions(-)
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
index c871100..7940630 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
@@ -17,10 +17,8 @@
package org.apache.linkis.ecm.server.service.impl
-import java.util
-import java.util.concurrent.ConcurrentHashMap
-
import com.google.common.collect.Interners
+import org.apache.commons.lang3.StringUtils
import org.apache.linkis.DataWorkCloudApplication
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.ecm.core.engineconn.{EngineConn, YarnEngineConn}
@@ -33,9 +31,9 @@ import
org.apache.linkis.ecm.server.service.EngineConnListService
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.entity.resource.{Resource,
ResourceType}
import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.stereotype.{Component, Service}
+import java.util
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._
class DefaultEngineConnListService extends EngineConnListService with
ECMEventListener with Logging {
@@ -55,23 +53,33 @@ class DefaultEngineConnListService extends
EngineConnListService with ECMEventLi
override def getEngineConns: util.List[EngineConn] =
engineConnMap.values().toList
override def addEngineConn(engineConn: EngineConn): Unit = {
+ logger.info(s"add engineConn ${engineConn.getServiceInstance} to
engineConnMap")
if (LinkisECMApplication.isReady)
engineConnMap.put(engineConn.getTickedId, engineConn)
}
override def killEngineConn(engineConnId: String): Unit = {
- val conn = engineConnMap.remove(engineConnId)
- if (conn != null) {
- Utils.tryAndWarn{
- conn.close()
- info(s"engineconn ${conn.getPid} was closed.")
+ var conn = engineConnMap.get(engineConnId)
+ if (conn != null) engineConnId.intern().synchronized {
+ conn = engineConnMap.get(engineConnId)
+ if (conn != null) {
+ Utils.tryAndWarn{
+ if (NodeStatus.Failed == conn.getStatus &&
StringUtils.isNotBlank(conn.getPid)) {
+ killECByEngineConnKillService(conn)
+ }
+ conn.close()
+ }
+ engineConnMap.remove(engineConnId)
+ logger.info(s"engineconn ${conn.getServiceInstance} was closed.")
}
}
}
override def getUsedResources: Resource =
engineConnMap.values().map(_.getResource.getMinResource).fold(Resource.initResource(ResourceType.Default))(_
+ _)
- override def submit(runner: EngineConnLaunchRunner): Option[EngineConn] = ???
+ override def submit(runner: EngineConnLaunchRunner): Option[EngineConn] = {
+ None
+ }
def updateYarnAppId(event: YarnAppIdCallbackEvent): Unit = {
updateYarnEngineConn(x =>
x.setApplicationId(event.protocol.applicationId), event.protocol.nodeId)
@@ -139,12 +147,16 @@ class DefaultEngineConnListService extends
EngineConnListService with ECMEventLi
private def shutdownEngineConns(event: ECMClosedEvent): Unit = {
info("start to kill all engines belonging the ecm")
engineConnMap.values().foreach(engineconn => {
- info(s"start to kill engine, pid:${engineconn.getPid}")
- val engineStopRequest = new EngineStopRequest()
- engineStopRequest.setServiceInstance(engineconn.getServiceInstance)
- getEngineConnKillService.dealEngineConnStop(engineStopRequest)
+ killECByEngineConnKillService(engineconn)
})
info("Done! success to kill all engines belonging the ecm")
}
+ private def killECByEngineConnKillService(engineconn: EngineConn): Unit = {
+ info(s"start to kill ec by engineConnKillService
${engineconn.getServiceInstance}")
+ val engineStopRequest = new EngineStopRequest()
+ engineStopRequest.setServiceInstance(engineconn.getServiceInstance)
+ getEngineConnKillService.dealEngineConnStop(engineStopRequest)
+ }
+
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala
index 97be701..e54daeb 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala
@@ -35,7 +35,7 @@ class AvailableNodeSelectRule extends NodeSelectRule with
Logging{
nodes.filter {
case amNode: AMNode =>
if (! NodeStatus.isLocked(amNode.getNodeStatus) &&
NodeStatus.isAvailable(amNode.getNodeStatus)) {
- null == amNode.getNodeHealthyInfo || null ==
amNode.getNodeHealthyInfo.getNodeHealthy ||
NodeHealthy.isAvailable(amNode.getNodeHealthyInfo.getNodeHealthy)
+ null != amNode.getNodeHealthyInfo && null !=
amNode.getNodeHealthyInfo.getNodeHealthy &&
NodeHealthy.isAvailable(amNode.getNodeHealthyInfo.getNodeHealthy)
} else {
info(s"engineConn ${amNode.getServiceInstance} cannot be reuse
status: ${amNode.getNodeStatus}")
false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]