This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new e89c3a512 [LinkisManager]Interface performance optimization (#3568)
e89c3a512 is described below
commit e89c3a5124b67f23d5065c2be0a84effc0c1c361
Author: huangKai-2323 <[email protected]>
AuthorDate: Mon Nov 14 11:07:21 2022 +0800
[LinkisManager]Interface performance optimization (#3568)
---
.../linkis/manager/am/restful/EMRestfulApi.java | 57 ++++---------
.../manager/am/restful/EngineRestfulApi.java | 78 ++++-------------
.../manager/label/service/NodeLabelService.scala | 4 +
.../service/impl/DefaultNodeLabelService.scala | 23 +++++
.../linkis/manager/rm/restful/RMMonitorRest.scala | 98 ++++++++++++++++------
.../linkis/manager/dao/NodeManagerMapper.java | 2 +
.../linkis/manager/dao/impl/NodeManagerMapper.xml | 6 ++
.../persistence/NodeManagerPersistence.java | 20 +++++
.../impl/DefaultNodeManagerPersistence.java | 70 +++++++++++++++-
9 files changed, 231 insertions(+), 127 deletions(-)
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java
index a4d670483..7bff30656 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java
@@ -65,6 +65,7 @@ import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.*;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
@Api(tags = "ECM(engineconnmanager) operation")
@RequestMapping(
@@ -110,48 +111,22 @@ public class EMRestfulApi {
@RequestParam(value = "nodeHealthy", required = false) String
nodeHealthy,
@RequestParam(value = "owner", required = false) String owner)
throws AMErrorException {
- String userName = ModuleUserUtils.getOperationUser(req, "listAllEMs");
- checkAdmin(userName);
- EMNode[] allEM = emInfoService.getAllEM();
- ArrayList<EMNodeVo> allEMVo = AMUtils.copyToEMVo(allEM);
- ArrayList<EMNodeVo> allEMVoFilter1 = allEMVo;
- if (CollectionUtils.isNotEmpty(allEMVoFilter1) &&
!StringUtils.isEmpty(instance)) {
- allEMVoFilter1 =
- (ArrayList<EMNodeVo>)
- allEMVoFilter1.stream()
- .filter(
- em -> {
- return
em.getInstance().contains(instance);
- })
- .collect(Collectors.toList());
- }
- ArrayList<EMNodeVo> allEMVoFilter2 = allEMVoFilter1;
- if (CollectionUtils.isNotEmpty(allEMVoFilter2) &&
!StringUtils.isEmpty(nodeHealthy)) {
- allEMVoFilter2 =
- (ArrayList<EMNodeVo>)
- allEMVoFilter2.stream()
- .filter(
- em -> {
- return em.getNodeHealthy() ==
null
- || em.getNodeHealthy()
- .equals(
-
NodeHealthy.valueOf(
-
nodeHealthy));
- })
- .collect(Collectors.toList());
- }
- ArrayList<EMNodeVo> allEMVoFilter3 = allEMVoFilter2;
- if (CollectionUtils.isNotEmpty(allEMVoFilter3) &&
!StringUtils.isEmpty(owner)) {
- allEMVoFilter3 =
- (ArrayList<EMNodeVo>)
- allEMVoFilter3.stream()
- .filter(
- em -> {
- return
em.getOwner().equalsIgnoreCase(owner);
- })
- .collect(Collectors.toList());
+ checkAdmin(ModuleUserUtils.getOperationUser(req, "listAllEMs"));
+ List<EMNodeVo> emNodeVos =
AMUtils.copyToEMVo(emInfoService.getAllEM());
+ if (CollectionUtils.isNotEmpty(emNodeVos)){
+ Stream<EMNodeVo> stream = emNodeVos.stream();
+ if (StringUtils.isNotBlank(instance)){
+ stream = stream.filter(em ->
em.getInstance().contains(instance));
+ }
+ if (StringUtils.isNotBlank(nodeHealthy)){
+ stream = stream.filter(em -> em.getNodeHealthy() == null ||
em.getNodeHealthy().equals(NodeHealthy.valueOf(nodeHealthy)));
+ }
+ if (StringUtils.isNotBlank(owner)){
+ stream = stream.filter(em ->
em.getOwner().equalsIgnoreCase(owner));
+ }
+ emNodeVos = stream.collect(Collectors.toList());
}
- return Message.ok().data("EMs", allEMVoFilter3);
+ return Message.ok().data("EMs", emNodeVos);
}
@ApiOperation(value = "listAllECMHealthyStatus", notes = "get all ECM
healthy status", response = Message.class)
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
index 6e2325f84..eef6c495f 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
@@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
@Api(tags = "EC(engineconn) operation")
@RequestMapping(
@@ -270,67 +271,24 @@ public class EngineRestfulApi {
JsonNode engineType = jsonNode.get("engineType");
JsonNode owner = jsonNode.get("owner");
List<EngineNode> engineNodes =
engineInfoService.listEMEngines(amemNode);
- ArrayList<AMEngineNodeVo> allengineNodes =
AMUtils.copyToAMEngineNodeVo(engineNodes);
- ArrayList<AMEngineNodeVo> allEMVoFilter1 = allengineNodes;
- if (CollectionUtils.isNotEmpty(allEMVoFilter1) && emInstace != null) {
- allEMVoFilter1 =
- (ArrayList<AMEngineNodeVo>)
- allEMVoFilter1.stream()
- .filter(
- em ->
- em.getInstance() != null
- && em.getInstance()
-
.contains(emInstace.asText()))
- .collect(Collectors.toList());
- }
- ArrayList<AMEngineNodeVo> allEMVoFilter2 = allEMVoFilter1;
- if (CollectionUtils.isNotEmpty(allEMVoFilter2)
- && nodeStatus != null
- && !StringUtils.isEmpty(nodeStatus.asText())) {
- allEMVoFilter2 =
- (ArrayList<AMEngineNodeVo>)
- allEMVoFilter2.stream()
- .filter(
- em ->
- em.getNodeStatus() != null
- &&
em.getNodeStatus()
- .equals(
-
NodeStatus.valueOf(
-
nodeStatus
-
.asText())))
- .collect(Collectors.toList());
- }
- ArrayList<AMEngineNodeVo> allEMVoFilter3 = allEMVoFilter2;
- if (CollectionUtils.isNotEmpty(allEMVoFilter3)
- && owner != null
- && !StringUtils.isEmpty(owner.asText())) {
- allEMVoFilter3 =
- (ArrayList<AMEngineNodeVo>)
- allEMVoFilter3.stream()
- .filter(
- em ->
- em.getOwner() != null
- && em.getOwner()
-
.equalsIgnoreCase(
-
owner.asText()))
- .collect(Collectors.toList());
- }
- ArrayList<AMEngineNodeVo> allEMVoFilter4 = allEMVoFilter3;
- if (CollectionUtils.isNotEmpty(allEMVoFilter4)
- && engineType != null
- && !StringUtils.isEmpty(engineType.asText())) {
- allEMVoFilter4 =
- (ArrayList<AMEngineNodeVo>)
- allEMVoFilter4.stream()
- .filter(
- em ->
- em.getEngineType() != null
- &&
em.getEngineType()
-
.equalsIgnoreCase(
-
engineType.asText()))
- .collect(Collectors.toList());
+ List<AMEngineNodeVo> allengineNodes =
AMUtils.copyToAMEngineNodeVo(engineNodes);
+ if (CollectionUtils.isNotEmpty(allengineNodes)) {
+ Stream<AMEngineNodeVo> stream = allengineNodes.stream();
+ if (null != emInstace) {
+ stream = stream.filter(em ->
StringUtils.isNotBlank(em.getInstance()) &&
em.getInstance().contains(emInstace.asText()));
+ }
+ if (null != nodeStatus &&
StringUtils.isNotBlank(nodeStatus.asText())) {
+ stream = stream.filter(em -> null != em.getNodeStatus() &&
em.getNodeStatus().equals(NodeStatus.valueOf(nodeStatus.asText())));
+ }
+ if (null != owner && StringUtils.isNotBlank(owner.asText())) {
+ stream = stream.filter(em ->
StringUtils.isNotBlank(em.getOwner()) &&
em.getOwner().equalsIgnoreCase(owner.asText()));
+ }
+ if (null != engineType &&
StringUtils.isNotBlank(engineType.asText())) {
+ stream = stream.filter(em ->
StringUtils.isNotBlank(em.getEngineType()) &&
em.getEngineType().equalsIgnoreCase(engineType.asText()));
+ }
+ allengineNodes = stream.collect(Collectors.toList());
}
- return Message.ok().data("engines", allEMVoFilter4);
+ return Message.ok().data("engines", allengineNodes);
}
@ApiOperation(value = "modifyEngineInfo", notes = "modify engineconn
info", response = Message.class)
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/NodeLabelService.scala
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/NodeLabelService.scala
index 4478ad9c7..e6fe947a4 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/NodeLabelService.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/NodeLabelService.scala
@@ -86,4 +86,8 @@ trait NodeLabelService {
labels: util.List[Label[_]]
): util.Map[ScoreServiceInstance, util.List[Label[_]]]
+ def getNodeLabelsByInstanceList(
+ instanceList: util.List[ServiceInstance]
+ ): util.HashMap[String, util.List[Label[_]]]
+
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala
index 80218a7a9..8982bb9a4 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala
@@ -383,4 +383,27 @@ class DefaultNodeLabelService extends NodeLabelService
with Logging {
persistenceLabel.getId
}
+ override def getNodeLabelsByInstanceList(
+ serviceInstanceList: util.List[ServiceInstance]
+ ): util.HashMap[String, util.List[Label[_]]] = {
+ val resultMap = new util.HashMap[String, util.List[Label[_]]]()
+ val map =
labelManagerPersistence.getLabelRelationsByServiceInstance(serviceInstanceList)
+ serviceInstanceList.asScala.foreach(serviceInstance => {
+ val LabelList = map
+ .get(serviceInstance)
+ .asScala
+ .map { label =>
+ val realyLabel: Label[_] = labelFactory.createLabel(
+ label.getLabelKey,
+ if (!CollectionUtils.isEmpty(label.getValue)) label.getValue else
label.getStringValue
+ )
+ realyLabel
+ }
+ .toList
+ .asJava
+ resultMap.put(serviceInstance.toString, LabelList)
+ })
+ resultMap
+ }
+
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala
index b0430d022..db092daf6 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala
@@ -49,10 +49,11 @@ import
org.apache.linkis.manager.rm.service.{LabelResourceService, ResourceManag
import org.apache.linkis.manager.rm.service.impl.UserResourceService
import org.apache.linkis.manager.rm.utils.{RMUtils, UserConfiguration}
import org.apache.linkis.manager.service.common.metrics.MetricsConverter
-import org.apache.linkis.server.{BDPJettyServerHelper, Message}
+import org.apache.linkis.server.{toScalaBuffer, BDPJettyServerHelper, Message}
import org.apache.linkis.server.security.SecurityFilter
import org.apache.linkis.server.utils.ModuleUserUtils
+import org.apache.commons.collections4.ListUtils
import org.apache.commons.lang3.StringUtils
import org.springframework.beans.factory.annotation.Autowired
@@ -62,7 +63,7 @@ import javax.servlet.http.HttpServletRequest
import java.text.{MessageFormat, SimpleDateFormat}
import java.util
-import java.util.{Comparator, TimeZone}
+import java.util.{Comparator, List, TimeZone}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -470,10 +471,10 @@ class RMMonitorRest extends Logging {
val record = new mutable.HashMap[String, Any]
record.put("applicationName", node.getServiceInstance.getApplicationName)
record.put("engineInstance", node.getServiceInstance.getInstance)
- if (null != node.getEMNode) {
- record.put("moduleName",
node.getEMNode.getServiceInstance.getApplicationName)
- record.put("engineManagerInstance",
node.getEMNode.getServiceInstance.getInstance)
- }
+// if (null != node.getEMNode) {
+// record.put("moduleName",
node.getEMNode.getServiceInstance.getApplicationName)
+// record.put("engineManagerInstance",
node.getEMNode.getServiceInstance.getInstance)
+// }
record.put("creator", userCreatorLabel.getCreator)
record.put("engineType", engineTypeLabel.getEngineType)
if (node.getNodeResource != null) {
@@ -539,26 +540,28 @@ class RMMonitorRest extends Logging {
val userResourceRecords = new ArrayBuffer[mutable.HashMap[String, Any]]()
val yarnAppsInfo =
externalResourceService.getAppInfo(ResourceType.Yarn, labelContainer,
yarnIdentifier)
+ val userList =
+
yarnAppsInfo.asScala.groupBy(_.asInstanceOf[YarnAppInfo].user).keys.toList.asJava
Utils.tryCatch {
+ val nodesList = getEngineNodesByUserList(userList, true)
yarnAppsInfo.asScala.groupBy(_.asInstanceOf[YarnAppInfo].user).foreach {
userAppInfo =>
- var nodes = getEngineNodes(userAppInfo._1, true)
var busyResource =
Resource.initResource(ResourceType.Yarn).asInstanceOf[YarnResource]
var idleResource =
Resource.initResource(ResourceType.Yarn).asInstanceOf[YarnResource]
val appIdToEngineNode = new mutable.HashMap[String, EngineNode]()
- if (nodes == null) {
- nodes = new Array[EngineNode](0)
- }
- nodes.foreach { node =>
- if (node.getNodeResource != null &&
node.getNodeResource.getUsedResource != null) {
- node.getNodeResource.getUsedResource match {
- case driverYarn: DriverAndYarnResource
- if
driverYarn.yarnResource.queueName.equals(yarnIdentifier.getQueueName) =>
- appIdToEngineNode.put(driverYarn.yarnResource.applicationId,
node)
- case yarn: YarnResource if
yarn.queueName.equals(yarnIdentifier.getQueueName) =>
- appIdToEngineNode.put(yarn.applicationId, node)
- case _ =>
+ val nodesplus = nodesList.get(userAppInfo._1)
+ if (nodesplus.isDefined) {
+ nodesplus.get.foreach(node => {
+ if (node.getNodeResource != null &&
node.getNodeResource.getUsedResource != null) {
+ node.getNodeResource.getUsedResource match {
+ case driverYarn: DriverAndYarnResource
+ if
driverYarn.yarnResource.queueName.equals(yarnIdentifier.getQueueName) =>
+ appIdToEngineNode.put(driverYarn.yarnResource.applicationId,
node)
+ case yarn: YarnResource if
yarn.queueName.equals(yarnIdentifier.getQueueName) =>
+ appIdToEngineNode.put(yarn.applicationId, node)
+ case _ =>
+ }
}
- }
+ })
}
userAppInfo._2.foreach { appInfo =>
appIdToEngineNode.get(appInfo.asInstanceOf[YarnAppInfo].id) match {
@@ -661,22 +664,23 @@ class RMMonitorRest extends Logging {
}
private def getEngineNodes(user: String, withResource: Boolean = false):
Array[EngineNode] = {
- val nodes = nodeManagerPersistence
+ val serviceInstancelist = nodeManagerPersistence
.getNodes(user)
- .asScala
.map(_.getServiceInstance)
- .map(nodeManagerPersistence.getEngineNode)
- .filter(_ != null)
.asJava
+ val nodes =
nodeManagerPersistence.getEngineNodeByServiceInstance(serviceInstancelist)
val metrics = nodeMetricManagerPersistence
.getNodeMetrics(nodes)
.asScala
.map(m => (m.getServiceInstance.toString, m))
.toMap
val configurationMap = new mutable.HashMap[String, Resource]
+ val labelsMap =
+
nodeLabelService.getNodeLabelsByInstanceList(nodes.map(_.getServiceInstance).asJava)
nodes.asScala
.map { node =>
- node.setLabels(nodeLabelService.getNodeLabels(node.getServiceInstance))
+//
node.setLabels(nodeLabelService.getNodeLabels(node.getServiceInstance))
+ node.setLabels(labelsMap.get(node.getServiceInstance.toString))
if (!node.getLabels.asScala.exists(_.isInstanceOf[UserCreatorLabel])) {
null
} else {
@@ -733,4 +737,48 @@ class RMMonitorRest extends Logging {
.toArray
}
+ private def getEngineNodesByUserList(
+ userList: List[String],
+ withResource: Boolean = false
+ ): Map[String, Array[EngineNode]] = {
+ val serviceInstance =
+
nodeManagerPersistence.getNodesByOwnerList(userList).map(_.getServiceInstance).asJava
+ val engineNodesList =
nodeManagerPersistence.getEngineNodeByServiceInstance(serviceInstance)
+ val metrics = nodeMetricManagerPersistence
+ .getNodeMetrics(engineNodesList)
+ .asScala
+ .map(m => (m.getServiceInstance.toString, m))
+ .toMap
+ val labelsMap =
+
nodeLabelService.getNodeLabelsByInstanceList(engineNodesList.map(_.getServiceInstance).asJava)
+ engineNodesList
+ .map(nodeInfo => {
+ nodeInfo.setLabels(labelsMap.get(nodeInfo.getServiceInstance.toString))
+ if (nodeInfo.getLabels.exists(_.isInstanceOf[UserCreatorLabel])) {
+ metrics
+ .get(nodeInfo.getServiceInstance.toString)
+ .foreach(metricsConverter.fillMetricsToNode(nodeInfo, _))
+ if (withResource) {
+ val engineInstanceOption =
nodeInfo.getLabels.find(_.isInstanceOf[EngineInstanceLabel])
+ if (engineInstanceOption.isDefined) {
+ val engineInstanceLabel =
engineInstanceOption.get.asInstanceOf[EngineInstanceLabel]
+
engineInstanceLabel.setServiceName(nodeInfo.getServiceInstance.getApplicationName)
+
engineInstanceLabel.setInstance(nodeInfo.getServiceInstance.getInstance)
+ val nodeResource =
labelResourceService.getLabelResource(engineInstanceLabel)
+ if (nodeResource != null) {
+ if (null == nodeResource.getUsedResource) {
+ nodeResource.setUsedResource(nodeResource.getLockedResource)
+ }
+ nodeInfo.setNodeResource(nodeResource)
+ }
+ }
+ }
+ }
+ nodeInfo
+ })
+ .filter(_ != null)
+ .toArray
+ .groupBy(_.getOwner)
+ }
+
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
index b5bf3a87a..8dbbdafda 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
@@ -72,4 +72,6 @@ public interface NodeManagerMapper {
void updateNodeLabelRelation(
@Param("tickedId") String tickedId, @Param("instance") String instance);
+
+ List<PersistenceNode> getNodeInstancesByOwnerList(@Param("owner")
List<String> owner);
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/NodeManagerMapper.xml
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/NodeManagerMapper.xml
index d017c6e05..f770c1513 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/NodeManagerMapper.xml
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/impl/NodeManagerMapper.xml
@@ -109,4 +109,10 @@
select id from linkis_cg_manager_service_instance where owner =
#{owner}
</select>
+ <select id="getNodeInstancesByOwnerList"
resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
+ select * from linkis_cg_manager_service_instance where owner in(
+ <foreach collection='owner' separator=',' item='owner'>
+ #{owner}
+ </foreach>)
+ </select>
</mapper>
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
index a4c62172b..80fc57a17 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java
@@ -114,4 +114,24 @@ public interface NodeManagerPersistence {
*/
List<EngineNode> getEngineNodeByEM(ServiceInstance serviceInstance)
throws PersistenceErrorException;
+
+ /**
+ * Get the information of the Engine and EM information through the
ServiceInstance of the Engine
+ * (batch query)
+ *
+ * @param serviceInstances
+ * @return
+ * @throws PersistenceErrorException
+ */
+ List<EngineNode> getEngineNodeByServiceInstance(List<ServiceInstance>
serviceInstances)
+ throws PersistenceErrorException;
+
+ /**
+ * Get the node list according to ownerList
+ *
+ * @param owner
+ * @return
+ * @throws PersistenceErrorException
+ */
+ List<Node> getNodesByOwnerList(List<String> owner);
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
index 45f840d06..86f202ace 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
@@ -32,17 +32,24 @@ import
org.apache.linkis.manager.exception.NodeInstanceNotFoundException;
import org.apache.linkis.manager.exception.PersistenceErrorException;
import org.apache.linkis.manager.persistence.NodeManagerPersistence;
+import org.apache.commons.collections.CollectionUtils;
+
import org.springframework.dao.DuplicateKeyException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static
org.apache.linkis.manager.errorcode.LinkisManagerPersistenceErrorCodeSummary.*;
public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
-
+ private Logger logger =
LoggerFactory.getLogger(DefaultNodeManagerPersistence.class);
private NodeManagerMapper nodeManagerMapper;
private NodeMetricManagerMapper metricManagerMapper;
@@ -300,4 +307,65 @@ public class DefaultNodeManagerPersistence implements
NodeManagerPersistence {
}
return amEngineNodeList;
}
+
+ @Override
+ public List<EngineNode> getEngineNodeByServiceInstance(List<ServiceInstance>
serviceInstanceList)
+ throws PersistenceErrorException {
+ List<EngineNode> amEngineNodeList = new ArrayList<>();
+ // Limit database size per query
+ List<List<ServiceInstance>> partition =
Lists.partition(serviceInstanceList, 100);
+ // Batch query
+ partition.forEach(
+ instanceList -> {
+ // Get each batch of ServiceInstances
+ List<String> collect =
+
instanceList.stream().map(ServiceInstance::getInstance).collect(Collectors.toList());
+ if (CollectionUtils.isNotEmpty(collect)) {
+ // Get engineNodes in batches through ServiceInstance
+ List<PersistenceNode> engineNodeList =
nodeManagerMapper.getNodesByInstances(collect);
+ if (CollectionUtils.isNotEmpty(engineNodeList)) {
+ // Assembly data amEngineNodeList
+ instanceList.forEach(
+ serviceInstance -> {
+ PersistenceNode engineNode =
+ engineNodeList.stream()
+ .filter(
+ engineNodeInfo ->
+ engineNodeInfo
+ .getInstance()
+ .equals(serviceInstance.getInstance()))
+ .findFirst()
+ .orElse(new PersistenceNode());
+ AMEngineNode amEngineNode = new AMEngineNode();
+ amEngineNode.setServiceInstance(serviceInstance);
+ amEngineNode.setOwner(engineNode.getOwner());
+ amEngineNode.setMark(engineNode.getMark());
+ amEngineNode.setStartTime(engineNode.getCreateTime());
+ amEngineNodeList.add(amEngineNode);
+ });
+ }
+ }
+ });
+ return amEngineNodeList;
+ }
+
+ @Override
+ public List<Node> getNodesByOwnerList(List<String> ownerlist) {
+ List<PersistenceNode> nodeInstances =
nodeManagerMapper.getNodeInstancesByOwnerList(ownerlist);
+ List<Node> persistenceNodeEntitys = new ArrayList<>();
+ if (!nodeInstances.isEmpty()) {
+ for (PersistenceNode persistenceNode : nodeInstances) {
+ PersistenceNodeEntity persistenceNodeEntity = new
PersistenceNodeEntity();
+ ServiceInstance serviceInstance = new ServiceInstance();
+ serviceInstance.setApplicationName(persistenceNode.getName());
+ serviceInstance.setInstance(persistenceNode.getInstance());
+ persistenceNodeEntity.setServiceInstance(serviceInstance);
+ persistenceNodeEntity.setMark(persistenceNode.getMark());
+ persistenceNodeEntity.setOwner(persistenceNode.getOwner());
+ persistenceNodeEntity.setStartTime(persistenceNode.getCreateTime());
+ persistenceNodeEntitys.add(persistenceNodeEntity);
+ }
+ }
+ return persistenceNodeEntitys;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]