This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-manager.git
The following commit(s) were added to refs/heads/master by this push:
new 8ba0737 fix module delete and config bug && update agent code (#16)
8ba0737 is described below
commit 8ba0737589967c8a3b2f5537adcfd3af73d75a5f
Author: LiRui <[email protected]>
AuthorDate: Thu Mar 24 15:37:29 2022 +0800
fix module delete and config bug && update agent code (#16)
* add dm-agent resource includes
* fix module delete/config bug
* export agent config and start module at module root dir
* check agent port before starting agent
* add cache for heartbeat event
Co-authored-by: lirui40 <[email protected]>
---
build.sh | 4 ++
.../service/heartbeat/DorisInstanceOperator.java | 10 ++--
.../component/ModelControlRequestComponent.java | 1 -
.../control/manager/DorisClusterModuleManager.java | 4 ++
.../manager/ResourceNodeAndAgentManager.java | 53 ++++++++++++++++++++++
.../org/apache/doris/stack/shell/BaseCommand.java | 22 +++++++--
.../org/apache/doris/stack/util/Constants.java | 1 +
.../src/main/resources/cache/ehcache.xml | 25 ++++++++++
.../doris/stack/dao/ClusterInstanceRepository.java | 14 ++++++
.../doris/stack/dao/HeartBeatEventRepository.java | 20 ++++++++
.../doris/stack/dao/ResourceNodeRepository.java | 21 +++++++++
11 files changed, 166 insertions(+), 9 deletions(-)
diff --git a/build.sh b/build.sh
index 75ad587..4dfb601 100644
--- a/build.sh
+++ b/build.sh
@@ -50,6 +50,10 @@ mv output/manager-bin/agent output/
mv output/manager-bin output/server/bin
mkdir -p output/agent/lib
mv manager/dm-agent/target/dm-agent-1.0.0.jar output/agent/lib/dm-agent.jar
+
+mkdir -p output/agent/config
+cp manager/dm-agent/src/main/resources/application.properties
output/agent/config
+
cp -r manager/manager-server/src/main/resources/web-resource output/server/
tar -zcvf doris-manager-1.0.0.tar.gz output/
echo "copy to output package end"
\ No newline at end of file
diff --git
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
index c9195b5..961f4a4 100644
---
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
+++
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
@@ -306,9 +306,13 @@ public class DorisInstanceOperator {
private void executePkgShellScriptWithBash(String scriptName, String
runningDir,
String moduleName, Map<String, String>
environment) throws Exception {
- String scripts = Paths.get(runningDir, moduleName, "bin",
scriptName).toFile().getAbsolutePath();
- final String shellCmd = "sh " + scripts;
- log.info("begin to execute: `" + shellCmd + "`");
+ String mouduleRootDir = runningDir + File.separator + moduleName;
+ String script = "bin" + File.separator + scriptName;
+
+ String cmdFormat = "cd %s && sh %s";
+ final String shellCmd = String.format(cmdFormat, mouduleRootDir,
script);
+
+ log.info("begin to execute with bash: `" + shellCmd + "`");
ShellUtil.cmdExecute(shellCmd);
}
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java
index e49e71e..8c72032 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java
@@ -51,7 +51,6 @@ public class ModelControlRequestComponent {
requestEntity.setCurrentEventType(eventType);
requestEntity.setModelId(modelId);
requestEntity.setRequestInfo(requestInfo);
- requestRepository.save(requestEntity);
return requestRepository.save(requestEntity);
}
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
index 3c3ae9f..30205de 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
@@ -149,6 +149,7 @@ public class DorisClusterModuleManager {
serviceCreateOperation(moduleEntity, serviceNamePorts, accessInfo);
}
+ clusterModuleRepository.save(moduleEntity);
}
private void serviceCreateOperation(ClusterModuleEntity module,
Map<String, Integer> serviceNamePorts,
@@ -252,6 +253,9 @@ public class DorisClusterModuleManager {
// delete service
serviceRepository.deleteByModuleId(module.getId());
+
+ // delete module
+ clusterModuleRepository.deleteById(module.getId());
}
}
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
index 9f4c91e..2d76ce9 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
@@ -46,6 +46,7 @@ import java.net.UnknownHostException;
@Component
public class ResourceNodeAndAgentManager {
private static final String AGENT_START_SCRIPT =
Constants.KEY_DORIS_AGENT_START_SCRIPT;
+ private static final String AGENT_CONFIG_PATH =
Constants.KEY_DORIS_AGENT_CONFIG_PATH;
@Autowired
private ResourceNodeRepository nodeRepository;
@@ -184,6 +185,58 @@ public class ResourceNodeAndAgentManager {
// agent start
// AGENT_START stage
String agentInstallHome = configInfo.getInstallDir() + File.separator
+ "agent";
+
+ // 1 port check, eg: server.port=8008
+ // grep = application.properties | grep -w server.port | awk -F '='
'{print $2}'
+ String confFile = agentInstallHome + File.separator +
AGENT_CONFIG_PATH;
+ String portGetFormat = "grep = %s | grep -w server.port | awk -F '='
'{print $2}'";
+ String portGetCmd = String.format(portGetFormat, confFile);
+
+ SSH portGetSSH = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
+ sshKeyFile.getAbsolutePath(), configInfo.getHost(),
portGetCmd);
+
+ int agentPort = -1;
+ if (portGetSSH.run()) {
+ String portStr = portGetSSH.getStdoutResponse();
+ log.info("agent {} port get return output: {}",
configInfo.getAgentNodeId(), portStr);
+
+ if (portStr == null || portStr.isEmpty()) {
+ log.warn("agent {} server.port is not set",
configInfo.getAgentNodeId());
+ } else {
+ try {
+ agentPort = Integer.parseInt(portStr.trim());
+ } catch (NumberFormatException e) {
+ log.warn("agent port format is not Integer");
+ }
+ }
+
+ } else {
+ log.warn("run agent port get cmd failed:{}, skip the check and use
default port",
+ portGetSSH.getErrorResponse());
+ }
+
+ if (agentPort > 0) {
+ log.info("agent start port is {}", agentPort);
+ // only check listen port
+ String checkPortCmd = String.format("netstat -tunlp | grep -w
%s", agentPort);
+ SSH checkPortSSH = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
+ sshKeyFile.getAbsolutePath(), configInfo.getHost(),
checkPortCmd);
+ if (checkPortSSH.run()) {
+ String netInfo = checkPortSSH.getStdoutResponse();
+ log.info("agent {} port check return output: {}",
configInfo.getAgentNodeId(), netInfo);
+
+ if (netInfo != null && !netInfo.trim().isEmpty()) {
+ log.error("port {} already in use, {}", agentPort,
netInfo);
+ updateFailResult("port already in use",
+ AgentInstallEventStage.AGENT_START.getStage(),
agentInstallAgentEntity);
+ return;
+ }
+ } else {
+ log.warn("run check port cmd failed");
+ }
+ }
+
+ // 2 run start shell
String command = "cd %s && sh %s --server %s --agent %s";
String cmd = String.format(command, agentInstallHome,
AGENT_START_SCRIPT, getServerAddr(), configInfo.getAgentNodeId());
SSH startSsh = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
index a07ef9a..bb18f19 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
@@ -32,10 +32,15 @@ import java.util.stream.Collectors;
public abstract class BaseCommand {
protected String[] resultCommand;
+ protected String stdoutResponse;
protected String errorResponse;
protected abstract void buildCommand();
+ public String getStdoutResponse() {
+ return this.stdoutResponse;
+ }
+
public String getErrorResponse() {
return this.errorResponse;
}
@@ -45,11 +50,15 @@ public abstract class BaseCommand {
log.info("run command: {}", StringUtils.join(resultCommand, " "));
ProcessBuilder pb = new ProcessBuilder(resultCommand);
Process process = null;
- BufferedReader bufferedReader = null;
+ BufferedReader stdoutBufferedReader = null;
+ BufferedReader errorBufferedReader = null;
try {
process = pb.start();
- bufferedReader = new BufferedReader(new
InputStreamReader(process.getErrorStream()));
- errorResponse =
bufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
+ stdoutBufferedReader = new BufferedReader(new
InputStreamReader(process.getInputStream()));
+ errorBufferedReader = new BufferedReader(new
InputStreamReader(process.getErrorStream()));
+
+ stdoutResponse =
stdoutBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
+ errorResponse =
errorBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
final int exitCode = process.waitFor();
if (exitCode == 0) {
return true;
@@ -65,8 +74,11 @@ public abstract class BaseCommand {
process.destroy();
}
try {
- if (bufferedReader != null) {
- bufferedReader.close();
+ if (stdoutBufferedReader != null) {
+ stdoutBufferedReader.close();
+ }
+ if (errorBufferedReader != null) {
+ errorBufferedReader.close();
}
} catch (IOException e) {
log.error("close buffered reader fail");
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
index 27e1bb4..896e1d0 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
@@ -19,6 +19,7 @@ package org.apache.doris.stack.util;
public class Constants {
public static final String KEY_DORIS_AGENT_START_SCRIPT =
"bin/agent_start.sh";
+ public static final String KEY_DORIS_AGENT_CONFIG_PATH =
"config/application.properties";
public static final String KEY_FE_QUERY_PORT = "query_port";
public static final String KEY_FE_EDIT_LOG_PORT = "edit_log_port";
public static final String KEY_BE_HEARTBEAT_PORT =
"heartbeat_service_port";
diff --git a/manager/manager-server/src/main/resources/cache/ehcache.xml
b/manager/manager-server/src/main/resources/cache/ehcache.xml
index 9ea40e3..8463f31 100644
--- a/manager/manager-server/src/main/resources/cache/ehcache.xml
+++ b/manager/manager-server/src/main/resources/cache/ehcache.xml
@@ -6,4 +6,29 @@
timeToLiveSeconds="3600"
overflowToDisk="false"
/>
+
+ <!-- ResourceNodeRepository cache -->
+ <cache name="node_agent"
+ maxElementsInMemory="1000"
+ eternal="false"
+ timeToIdleSeconds="1800"
+ timeToLiveSeconds="3600"
+ overflowToDisk="false"
+ />
+ <!-- HeartBeatEventRepository -->
+ <cache name="heart_beat"
+ maxElementsInMemory="1000"
+ eternal="false"
+ timeToIdleSeconds="1800"
+ timeToLiveSeconds="3600"
+ overflowToDisk="false"
+ />
+ <!-- ClusterInstanceRepository cache -->
+ <cache name="cluster_instance"
+ maxElementsInMemory="1000"
+ eternal="false"
+ timeToIdleSeconds="1800"
+ timeToLiveSeconds="3600"
+ overflowToDisk="false"
+ />
</ehcache>
diff --git
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
index 2dcb1fa..b77486e 100644
---
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
+++
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
@@ -18,6 +18,8 @@
package org.apache.doris.stack.dao;
import org.apache.doris.stack.entity.ClusterInstanceEntity;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
@@ -26,6 +28,7 @@ import java.util.List;
public interface ClusterInstanceRepository extends
JpaRepository<ClusterInstanceEntity, Long> {
@Query("select c from ClusterInstanceEntity c where c.nodeId = :nodeId")
+ @Cacheable(value = "cluster_instance", key = "#p0")
List<ClusterInstanceEntity> getByNodeId(@Param("nodeId") long nodeId);
@Query("select c from ClusterInstanceEntity c where c.moduleId =
:moduleId")
@@ -34,4 +37,15 @@ public interface ClusterInstanceRepository extends
JpaRepository<ClusterInstance
@Query("select c.nodeId from ClusterInstanceEntity c where c.moduleId =
:moduleId")
List<Long> getNodeIdsByModuleId(@Param("moduleId") long moduleId);
+ @Override
+ @CacheEvict(value = "cluster_instance", key = "#result.nodeId")
+ ClusterInstanceEntity save(ClusterInstanceEntity entity);
+
+ @Override
+ @CacheEvict(value = "cluster_instance", allEntries = true)
+ void deleteById(Long id);
+
+ @Override
+ @CacheEvict(value = "cluster_instance", key = "#entity.nodeId")
+ void delete(ClusterInstanceEntity entity);
}
diff --git
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java
index 6ae686e..d6f2c27 100644
---
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java
+++
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java
@@ -21,8 +21,12 @@ import org.apache.doris.stack.entity.HeartBeatEventEntity;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+import org.springframework.cache.annotation.CachePut;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
public interface HeartBeatEventRepository extends
JpaRepository<HeartBeatEventEntity, Long> {
@@ -33,4 +37,20 @@ public interface HeartBeatEventRepository extends
JpaRepository<HeartBeatEventEn
@Query("select c.status from HeartBeatEventEntity c where c.requestId =
:requestId")
Set<String> getStatusByRequestId(@Param("requestId") long requestId);
+
+ @Override
+ @CachePut(value = "heart_beat", key = "#result.id")
+ HeartBeatEventEntity save(HeartBeatEventEntity entity);
+
+ @Override
+ @Cacheable(value = "heart_beat", key = "#p0")
+ Optional<HeartBeatEventEntity> findById(Long id);
+
+ @Override
+ @CacheEvict(value = "heart_beat", key = "#p0")
+ void deleteById(Long id);
+
+ @Override
+ @CacheEvict(value = "heat_beat", key = "#entity.id")
+ void delete(HeartBeatEventEntity entity);
}
diff --git
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java
index e26a800..025a28d 100644
---
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java
+++
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java
@@ -18,12 +18,16 @@
package org.apache.doris.stack.dao;
import org.apache.doris.stack.entity.ResourceNodeEntity;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.CachePut;
+import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.List;
+import java.util.Optional;
public interface ResourceNodeRepository extends
JpaRepository<ResourceNodeEntity, Long> {
@Query("select c.host from ResourceNodeEntity c where c.resourceClusterId
= :resourceClusterId")
@@ -34,6 +38,23 @@ public interface ResourceNodeRepository extends
JpaRepository<ResourceNodeEntity
@Modifying
@Query("delete from ResourceNodeEntity c where c.resourceClusterId =
:resourceClusterId and c.host = :host")
+ @CacheEvict(value = "node_agent", allEntries = true)
void deleteByResourceClusterIdAndHost(@Param("resourceClusterId") long
resourceClusterId,
@Param("host") String host);
+
+ @Override
+ @CachePut(value = "node_agent", key = "#result.id")
+ ResourceNodeEntity save(ResourceNodeEntity entity);
+
+ @Override
+ @Cacheable(value = "node_agent", key = "#p0")
+ Optional<ResourceNodeEntity> findById(Long id);
+
+ @Override
+ @CacheEvict(value = "node_agent", key = "#p0")
+ void deleteById(Long id);
+
+ @Override
+ @CacheEvict(value = "node_agent", key = "#entity.id")
+ void delete(ResourceNodeEntity entity);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]