This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-manager.git
The following commit(s) were added to refs/heads/master by this push:
new cba05d4 support to set agent port by user (#29)
cba05d4 is described below
commit cba05d43c91d0286fa79b8d21c3cea9cbe245c8f
Author: LiRui <[email protected]>
AuthorDate: Fri Apr 1 15:00:37 2022 +0800
support to set agent port by user (#29)
support to set agent port by user
---
.../config/AgentInstallEventConfigInfo.java | 2 +
.../stack/control/manager/DorisClusterManager.java | 8 +-
.../control/manager/ResourceClusterManager.java | 20 +++-
.../manager/ResourceNodeAndAgentManager.java | 101 ++++++++++-----------
.../DorisClusterCreationRequestHandler.java | 9 +-
.../DorisClusterTakeOverRequestHandler.java | 6 +-
.../request/control/DorisClusterCreationReq.java | 2 +
.../request/control/DorisClusterTakeOverReq.java | 2 +
.../org/apache/doris/stack/shell/BaseCommand.java | 8 +-
manager/manager-bin/agent/bin/agent_start.sh | 13 ++-
10 files changed, 102 insertions(+), 69 deletions(-)
diff --git
a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentInstallEventConfigInfo.java
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentInstallEventConfigInfo.java
index 9ffda22..a7687f7 100644
---
a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentInstallEventConfigInfo.java
+++
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentInstallEventConfigInfo.java
@@ -39,4 +39,6 @@ public class AgentInstallEventConfigInfo {
private long agentNodeId;
+ private int agentPort;
+
}
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
index de5c816..e655206 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
@@ -109,15 +109,17 @@ public class DorisClusterManager {
}
}
- public void configClusterResourceOperation(ClusterInfoEntity
clusterInfoEntity, String packageInfo, String installInfo) {
+ public void configClusterResourceOperation(ClusterInfoEntity
clusterInfoEntity, String packageInfo,
+ String installInfo, int
agentPort) {
log.info("Config cluster {} resource info operation.",
clusterInfoEntity.getId());
clusterInfoEntity.setInstallInfo(installInfo);
clusterRepository.save(clusterInfoEntity);
-
resourceClusterManager.configOperation(clusterInfoEntity.getResourceClusterId(),
packageInfo, installInfo);
+
resourceClusterManager.configOperation(clusterInfoEntity.getResourceClusterId(),
packageInfo,
+ installInfo, agentPort);
}
- public void startClusterResourceOperation(ClusterInfoEntity
clusterInfoEntity, long requestId) {
+ public void startClusterResourceOperation(ClusterInfoEntity
clusterInfoEntity, long requestId) throws Exception {
log.info("Start cluster {} resource cluster operation.",
clusterInfoEntity.getId());
resourceClusterManager.startOperation(clusterInfoEntity.getResourceClusterId(),
requestId);
}
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
index 9faf551..2451aef 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
@@ -84,20 +84,23 @@ public class ResourceClusterManager {
}
}
- public void configOperation(long resourceClusterId, String packageInfo,
String installInfo) {
+ public void configOperation(long resourceClusterId, String packageInfo,
String installInfo, int agentPort) {
// TODO:The path can be set separately for each machine later
log.info("config resource cluster {}", resourceClusterId);
ResourceClusterEntity resourceClusterEntity =
resourceClusterRepository.findById(resourceClusterId).get();
resourceClusterEntity.setRegistryInfo(packageInfo);
+ resourceClusterRepository.save(resourceClusterEntity);
List<ResourceNodeEntity> nodeEntities =
nodeRepository.getByResourceClusterId(resourceClusterId);
for (ResourceNodeEntity nodeEntity : nodeEntities) {
nodeEntity.setAgentInstallDir(installInfo);
+ nodeEntity.setAgentPort(agentPort);
nodeRepository.save(nodeEntity);
}
}
- public void startOperation(long resourceClusterId, long requestId) {
+ public void startOperation(long resourceClusterId, long requestId) throws
Exception {
+
log.info("start resource cluster {} all nodes agent",
resourceClusterId);
ResourceClusterEntity clusterEntity =
resourceClusterRepository.findById(resourceClusterId).get();
PMResourceClusterAccessInfo accessInfo =
JSON.parseObject(clusterEntity.getAccessInfo(),
@@ -109,7 +112,18 @@ public class ResourceClusterManager {
configInfo.setSshPort(accessInfo.getSshPort());
configInfo.setSshKey(accessInfo.getSshKey());
- log.debug("install agent for resource cluster {} all nodes");
+ log.debug("check agent port for resource cluster {} all nodes",
resourceClusterId);
+
+ // before install and start agent, to check whether port is available
or not,
+ // it can not guarantee the port must not be used when starting the
agent,
+ // but it may expose this problem early if the port has been uses.
+ for (ResourceNodeEntity nodeEntity : nodeEntities) {
+ if (!nodeAndAgentManager.isAvailableAgentPort(nodeEntity,
configInfo)) {
+ throw new Exception(nodeEntity.getHost() + ":" +
nodeEntity.getAgentPort() + " is already in use");
+ }
+ }
+
+ log.debug("install agent for resource cluster {} all nodes",
resourceClusterId);
for (ResourceNodeEntity nodeEntity : nodeEntities) {
nodeAndAgentManager.installAgentOperation(nodeEntity, configInfo,
requestId);
}
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
index 69cf5b6..e84a5af 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
@@ -67,26 +67,55 @@ public class ResourceNodeAndAgentManager {
nodeRepository.deleteByResourceClusterIdAndHost(resourceClusterId,
host);
}
+ public boolean isAvailableAgentPort(ResourceNodeEntity node,
AgentInstallEventConfigInfo configInfo)
+ throws Exception {
+ // agent port check, eg: Spring Boot Param server.port=8008
+ log.info("check {} node port {}:{}", node.getId(), node.getHost(),
node.getAgentPort());
+ String sshkey = String.format("sshkey-%d-%d", node.getId(),
node.getAgentPort());
+ File sshKeyFile = SSH.buildSshKeyFile(sshkey);
+ SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
+
+ // only check listen port
+ String checkPortCmd = String.format("netstat -tunlp | grep -w %d",
node.getAgentPort());
+ SSH checkPortSSH = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
+ sshKeyFile.getAbsolutePath(), node.getHost(), checkPortCmd);
+ if (checkPortSSH.run()) {
+ String netInfo = checkPortSSH.getStdoutResponse();
+ log.info("agent node {} port check return output\n: {}",
node.getId(), netInfo);
+
+ if (netInfo != null && !netInfo.trim().isEmpty()) {
+ log.error("agent node {} port {} already in use:\n {}",
node.getId(), node.getAgentPort(), netInfo);
+ return false;
+ }
+ } else if (checkPortSSH.getExitCode() != 1) { //exit 1 when grep
failed, other exit code is exception
+ log.warn("run check port cmd failed");
+ throw new Exception("check agent port scrpit execution exception");
+ }
+ return true;
+ }
+
public void installAgentOperation(ResourceNodeEntity node,
AgentInstallEventConfigInfo configInfo, long requestId) {
log.info("install node {} agent for request {}", node.getId(),
requestId);
configInfo.setAgentNodeId(node.getId());
configInfo.setInstallDir(node.getAgentInstallDir());
configInfo.setHost(node.getHost());
+ configInfo.setAgentPort(node.getAgentPort());
long eventId = node.getCurrentEventId();
-
+ log.info("event {}: to install and start {} node agent {}:{} in {}",
eventId, node.getId(),
+ node.getHost(), node.getAgentPort(),
node.getAgentInstallDir());
// Check whether the current node is already installing agent or agent
installation has failed
HeartBeatEventEntity agentInstallAgentEntity;
if (eventId < 1L) {
log.debug("first install agent for node {}", node.getId());
- // fisrt time install agent
+ // first time install agent
// create HeartBeatEvent
HeartBeatEventEntity eventEntity = new
HeartBeatEventEntity(HeartBeatEventType.AGENT_INSTALL.name(),
HeartBeatEventResultType.INIT.name(),
JSON.toJSONString(configInfo), requestId);
agentInstallAgentEntity =
heartBeatEventRepository.save(eventEntity);
eventId = agentInstallAgentEntity.getId();
-
+ log.info("first time to install agent, create heart beat event
{}", eventId);
node.setCurrentEventId(eventId);
nodeRepository.save(node);
} else {
@@ -189,61 +218,25 @@ public class ResourceNodeAndAgentManager {
// agent start
// AGENT_START stage
- String agentInstallHome = configInfo.getInstallDir() + File.separator
+ "agent";
-
- // 1 port check, eg: server.port=8008
- // grep = application.properties | grep -w server.port | awk -F '='
'{print $2}'
- String confFile = agentInstallHome + File.separator +
AGENT_CONFIG_PATH;
- String portGetFormat = "grep = %s | grep -w server.port | awk -F '='
'{print $2}'";
- String portGetCmd = String.format(portGetFormat, confFile);
-
- SSH portGetSSH = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
- sshKeyFile.getAbsolutePath(), configInfo.getHost(),
portGetCmd);
-
- int agentPort = -1;
- if (portGetSSH.run()) {
- String portStr = portGetSSH.getStdoutResponse();
- log.info("agent {} port get return output: {}",
configInfo.getAgentNodeId(), portStr);
-
- if (portStr == null || portStr.isEmpty()) {
- log.warn("agent {} server.port is not set",
configInfo.getAgentNodeId());
- } else {
- try {
- agentPort = Integer.parseInt(portStr.trim());
- } catch (NumberFormatException e) {
- log.warn("agent port format is not Integer");
- }
+ try {
+ if (!isAvailableAgentPort(node, configInfo)) {
+ log.error("port {}:{} already in use", configInfo.getHost(),
configInfo.getAgentPort());
+ updateFailResult(String.format("agent port %s:%d already in
use",
+ configInfo.getHost(),
configInfo.getAgentPort()),
+ AgentInstallEventStage.AGENT_DEPLOY.getStage(),
agentInstallAgentEntity);
+ return;
}
-
- } else {
- log.warn("run agent port get cmd failed:{}, skip the check and use
default port",
- portGetSSH.getErrorResponse());
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("check agent port exception, skip port check");
}
- if (agentPort > 0) {
- log.info("agent start port is {}", agentPort);
- // only check listen port
- String checkPortCmd = String.format("netstat -tunlp | grep -w
%s", agentPort);
- SSH checkPortSSH = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
- sshKeyFile.getAbsolutePath(), configInfo.getHost(),
checkPortCmd);
- if (checkPortSSH.run()) {
- String netInfo = checkPortSSH.getStdoutResponse();
- log.info("agent {} port check return output: {}",
configInfo.getAgentNodeId(), netInfo);
-
- if (netInfo != null && !netInfo.trim().isEmpty()) {
- log.error("port {} already in use, {}", agentPort,
netInfo);
- updateFailResult("port already in use",
- AgentInstallEventStage.AGENT_START.getStage(),
agentInstallAgentEntity);
- return;
- }
- } else {
- log.warn("run check port cmd failed");
- }
- }
+ String agentInstallHome = configInfo.getInstallDir() + File.separator
+ "agent";
- // 2 run start shell
- String command = "cd %s && sh %s --server %s --agent %s";
- String cmd = String.format(command, agentInstallHome,
AGENT_START_SCRIPT, getServerAddr(), configInfo.getAgentNodeId());
+ log.info("to start agent with port {}", configInfo.getAgentPort());
+ String command = "cd %s && sh %s --server %s --agent %d --port %d";
+ String cmd = String.format(command, agentInstallHome,
AGENT_START_SCRIPT,
+ getServerAddr(), configInfo.getAgentNodeId(),
configInfo.getAgentPort());
SSH startSsh = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
sshKeyFile.getAbsolutePath(), configInfo.getHost(), cmd);
if (!startSsh.run()) {
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterCreationRequestHandler.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterCreationRequestHandler.java
index 39ae4a7..b37e422 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterCreationRequestHandler.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterCreationRequestHandler.java
@@ -104,11 +104,12 @@ public class DorisClusterCreationRequestHandler extends
DorisClusterRequestHandl
// CONFIG_AND_START_RESOURCE_CLUSTER
private ModelControlResponse
handleConfigAndStartResourceClusterEvent(CoreUserEntity user,
-
DorisClusterCreationRequest request) {
+
DorisClusterCreationRequest request)
+ throws Exception {
long clusterId = request.getClusterId();
ClusterInfoEntity clusterInfoEntity =
clusterInfoRepository.findById(clusterId).get();
dorisClusterManager.configClusterResourceOperation(clusterInfoEntity,
request.getReqInfo().getPackageInfo(),
- request.getReqInfo().getInstallInfo());
+ request.getReqInfo().getInstallInfo(),
request.getReqInfo().getAgentPort());
dorisClusterManager.startClusterResourceOperation(clusterInfoEntity,
request.getRequestId());
return getResponse(request, false);
@@ -135,7 +136,7 @@ public class DorisClusterCreationRequestHandler extends
DorisClusterRequestHandl
// CONFIG_AND_DEPLOY_DORIS_CLUSTER
private ModelControlResponse
handleConfigAndDeployDorisClusterEvent(CoreUserEntity user,
-
DorisClusterCreationRequest request) {
+
DorisClusterCreationRequest request) {
long clusterId = request.getClusterId();
ClusterInfoEntity clusterInfoEntity =
clusterInfoRepository.findById(clusterId).get();
@@ -147,7 +148,7 @@ public class DorisClusterCreationRequestHandler extends
DorisClusterRequestHandl
// DORIS_CLUSTER_DEPLOYED
private ModelControlResponse
handleDorisClusterDeployedEvent(CoreUserEntity user,
-
DorisClusterCreationRequest request) throws Exception {
+
DorisClusterCreationRequest request) throws Exception {
long clusterId = request.getClusterId();
dorisClusterManager.checkClusterInstancesOperation(clusterId);
return getResponse(request, false);
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterTakeOverRequestHandler.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterTakeOverRequestHandler.java
index 29ac117..e5434eb 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterTakeOverRequestHandler.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterTakeOverRequestHandler.java
@@ -118,7 +118,7 @@ public class DorisClusterTakeOverRequestHandler extends
DorisClusterRequestHandl
// CREATE_AND_START_RESOURCE_CLUSTER
private ModelControlResponse
handleCreateAndStartResourceClusterEvent(CoreUserEntity user,
-
DorisClusterTakeOverRequest request) throws Exception {
+
DorisClusterTakeOverRequest request) throws Exception {
long clusterId = request.getClusterId();
log.info("handle take over cluster {}
CREATE_AND_START_RESOURCE_CLUSTER request {} event",
clusterId, request.getRequestId());
@@ -147,8 +147,10 @@ public class DorisClusterTakeOverRequestHandler extends
DorisClusterRequestHandl
log.debug("The node list IP of Doris cluster is {}", nodeIps);
dorisClusterManager.createClusterResourceOperation(user, clusterInfo,
request.getReqInfo().getAuthInfo(), nodeIps);
- dorisClusterManager.configClusterResourceOperation(clusterInfo, "",
request.getReqInfo().getInstallInfo());
+ dorisClusterManager.configClusterResourceOperation(clusterInfo, "",
+ request.getReqInfo().getInstallInfo(),
request.getReqInfo().getAgentPort());
+ // TODO sshInfo and iplist can check agent port
List<ResourceNodeEntity> nodeEntities =
nodeRepository.getByResourceClusterId(clusterInfo.getResourceClusterId());
Set<Long> feNodeIds = new HashSet<>();
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterCreationReq.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterCreationReq.java
index b6191c7..68a192f 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterCreationReq.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterCreationReq.java
@@ -37,6 +37,8 @@ public class DorisClusterCreationReq extends ModelControlReq {
private String installInfo;
+ private int agentPort;
+
// Step 4: Install agent
// Step 5:Planning resource node
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterTakeOverReq.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterTakeOverReq.java
index fa7979a..1149f3a 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterTakeOverReq.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterTakeOverReq.java
@@ -39,6 +39,8 @@ public class DorisClusterTakeOverReq extends ModelControlReq {
private String installInfo;
+ private int agentPort;
+
// Step 4: check Install agent
// Step 5: create cluster module and instance, check agent instance
}
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
index bb18f19..2164bed 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
@@ -34,6 +34,7 @@ public abstract class BaseCommand {
protected String[] resultCommand;
protected String stdoutResponse;
protected String errorResponse;
+ protected int exitCode;
protected abstract void buildCommand();
@@ -45,6 +46,10 @@ public abstract class BaseCommand {
return this.errorResponse;
}
+ public int getExitCode() {
+ return this.exitCode;
+ }
+
public boolean run() {
buildCommand();
log.info("run command: {}", StringUtils.join(resultCommand, " "));
@@ -59,7 +64,8 @@ public abstract class BaseCommand {
stdoutResponse =
stdoutBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
errorResponse =
errorBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
- final int exitCode = process.waitFor();
+
+ exitCode = process.waitFor();
if (exitCode == 0) {
return true;
} else {
diff --git a/manager/manager-bin/agent/bin/agent_start.sh
b/manager/manager-bin/agent/bin/agent_start.sh
index 6fee0f0..c83f307 100644
--- a/manager/manager-bin/agent/bin/agent_start.sh
+++ b/manager/manager-bin/agent/bin/agent_start.sh
@@ -27,6 +27,7 @@ OPTS=$(getopt \
-o '' \
-l 'server:' \
-l 'agent:' \
+ -l 'port:' \
-- "$@")
eval set -- "$OPTS"
@@ -34,10 +35,12 @@ eval set -- "$OPTS"
#host:port
SERVER=
AGENT=
+PORT=
while true; do
case "$1" in
--server) SERVER=$2 ; shift 2;;
--agent) AGENT=$2 ; shift 2;;
+ --port) PORT=$2 ; shift 2;;
--) shift ; break ;;
*) echo "Internal error" ; exit 1 ;;
esac
@@ -52,6 +55,12 @@ if [ x"$AGENT" == x"" ]; then
echo "--agent node id can not empty!"
exit 1
fi
+
+if [ x"$PORT" == x"" ]; then
+ echo "--port agent port can not empty!"
+ exit 1
+fi
+
export AGENT_HOME=`cd "$curdir/.."; pwd`
#
@@ -60,7 +69,7 @@ export AGENT_HOME=`cd "$curdir/.."; pwd`
# LOG_DIR
# PID_DIR
export JAVA_OPTS="-Xmx1024m"
-export SERVER_PARAMS="--manager.server.endpoint=$SERVER --agent.node.id=$AGENT"
+export SERVER_PARAMS="--manager.server.endpoint=$SERVER --agent.node.id=$AGENT
--server.port=$PORT"
export LOG_DIR="$AGENT_HOME/log"
export PID_DIR=`cd "$curdir"; pwd`
@@ -88,4 +97,4 @@ fi
nohup $JAVA $JAVA_OPTS -jar ${AGENT_HOME}/lib/dm-agent.jar $SERVER_PARAMS >>
$LOG_DIR/agent.out 2>&1 &
echo `date` >> $LOG_DIR/agent.out
-echo $! > $pidfile
+echo $! > $pidfile
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]