This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-manager.git


The following commit(s) were added to refs/heads/master by this push:
     new cba05d4  support to set agent port by user (#29)
cba05d4 is described below

commit cba05d43c91d0286fa79b8d21c3cea9cbe245c8f
Author: LiRui <[email protected]>
AuthorDate: Fri Apr 1 15:00:37 2022 +0800

    support to set agent port by user (#29)
    
    support to set agent port by user
---
 .../config/AgentInstallEventConfigInfo.java        |   2 +
 .../stack/control/manager/DorisClusterManager.java |   8 +-
 .../control/manager/ResourceClusterManager.java    |  20 +++-
 .../manager/ResourceNodeAndAgentManager.java       | 101 ++++++++++-----------
 .../DorisClusterCreationRequestHandler.java        |   9 +-
 .../DorisClusterTakeOverRequestHandler.java        |   6 +-
 .../request/control/DorisClusterCreationReq.java   |   2 +
 .../request/control/DorisClusterTakeOverReq.java   |   2 +
 .../org/apache/doris/stack/shell/BaseCommand.java  |   8 +-
 manager/manager-bin/agent/bin/agent_start.sh       |  13 ++-
 10 files changed, 102 insertions(+), 69 deletions(-)

diff --git 
a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentInstallEventConfigInfo.java
 
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentInstallEventConfigInfo.java
index 9ffda22..a7687f7 100644
--- 
a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentInstallEventConfigInfo.java
+++ 
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentInstallEventConfigInfo.java
@@ -39,4 +39,6 @@ public class AgentInstallEventConfigInfo {
 
     private long agentNodeId;
 
+    private int agentPort;
+
 }
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
index de5c816..e655206 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
@@ -109,15 +109,17 @@ public class DorisClusterManager {
         }
     }
 
-    public void configClusterResourceOperation(ClusterInfoEntity 
clusterInfoEntity, String packageInfo, String installInfo) {
+    public void configClusterResourceOperation(ClusterInfoEntity 
clusterInfoEntity, String packageInfo,
+                                               String installInfo, int 
agentPort) {
         log.info("Config cluster {} resource info operation.", 
clusterInfoEntity.getId());
         clusterInfoEntity.setInstallInfo(installInfo);
         clusterRepository.save(clusterInfoEntity);
 
-        
resourceClusterManager.configOperation(clusterInfoEntity.getResourceClusterId(),
 packageInfo, installInfo);
+        
resourceClusterManager.configOperation(clusterInfoEntity.getResourceClusterId(),
 packageInfo,
+                installInfo, agentPort);
     }
 
-    public void startClusterResourceOperation(ClusterInfoEntity 
clusterInfoEntity, long requestId) {
+    public void startClusterResourceOperation(ClusterInfoEntity 
clusterInfoEntity, long requestId) throws Exception {
         log.info("Start cluster {} resource cluster operation.", 
clusterInfoEntity.getId());
         
resourceClusterManager.startOperation(clusterInfoEntity.getResourceClusterId(), 
requestId);
     }
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
index 9faf551..2451aef 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
@@ -84,20 +84,23 @@ public class ResourceClusterManager {
         }
     }
 
-    public void configOperation(long resourceClusterId, String packageInfo, 
String installInfo) {
+    public void configOperation(long resourceClusterId, String packageInfo, 
String installInfo, int agentPort) {
         // TODO:The path can be set separately for each machine later
         log.info("config resource cluster {}", resourceClusterId);
         ResourceClusterEntity resourceClusterEntity = 
resourceClusterRepository.findById(resourceClusterId).get();
         resourceClusterEntity.setRegistryInfo(packageInfo);
+        resourceClusterRepository.save(resourceClusterEntity);
 
         List<ResourceNodeEntity> nodeEntities = 
nodeRepository.getByResourceClusterId(resourceClusterId);
         for (ResourceNodeEntity nodeEntity : nodeEntities) {
             nodeEntity.setAgentInstallDir(installInfo);
+            nodeEntity.setAgentPort(agentPort);
             nodeRepository.save(nodeEntity);
         }
     }
 
-    public void startOperation(long resourceClusterId, long requestId) {
+    public void startOperation(long resourceClusterId, long requestId) throws 
Exception {
+
         log.info("start resource cluster {} all nodes agent", 
resourceClusterId);
         ResourceClusterEntity clusterEntity = 
resourceClusterRepository.findById(resourceClusterId).get();
         PMResourceClusterAccessInfo accessInfo = 
JSON.parseObject(clusterEntity.getAccessInfo(),
@@ -109,7 +112,18 @@ public class ResourceClusterManager {
         configInfo.setSshPort(accessInfo.getSshPort());
         configInfo.setSshKey(accessInfo.getSshKey());
 
-        log.debug("install agent for resource cluster {} all nodes");
+        log.debug("check agent port for resource cluster {} all nodes", 
resourceClusterId);
+
+        // before install and start agent, to check whether port is available 
or not,
+        // it can not guarantee the port must not be used when starting the 
agent,
+        // but it may expose this problem early if the port has been uses.
+        for (ResourceNodeEntity nodeEntity : nodeEntities) {
+            if (!nodeAndAgentManager.isAvailableAgentPort(nodeEntity, 
configInfo)) {
+                throw new Exception(nodeEntity.getHost() + ":" + 
nodeEntity.getAgentPort() + " is already in use");
+            }
+        }
+
+        log.debug("install agent for resource cluster {} all nodes", 
resourceClusterId);
         for (ResourceNodeEntity nodeEntity : nodeEntities) {
             nodeAndAgentManager.installAgentOperation(nodeEntity, configInfo, 
requestId);
         }
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
index 69cf5b6..e84a5af 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
@@ -67,26 +67,55 @@ public class ResourceNodeAndAgentManager {
         nodeRepository.deleteByResourceClusterIdAndHost(resourceClusterId, 
host);
     }
 
+    public boolean isAvailableAgentPort(ResourceNodeEntity node, 
AgentInstallEventConfigInfo configInfo)
+            throws Exception {
+        // agent port check, eg: Spring Boot Param server.port=8008
+        log.info("check {} node port {}:{}", node.getId(), node.getHost(), 
node.getAgentPort());
+        String sshkey = String.format("sshkey-%d-%d", node.getId(), 
node.getAgentPort());
+        File sshKeyFile = SSH.buildSshKeyFile(sshkey);
+        SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
+
+        // only check listen port
+        String checkPortCmd = String.format("netstat -tunlp | grep  -w %d", 
node.getAgentPort());
+        SSH checkPortSSH = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
+                sshKeyFile.getAbsolutePath(), node.getHost(), checkPortCmd);
+        if (checkPortSSH.run()) {
+            String netInfo = checkPortSSH.getStdoutResponse();
+            log.info("agent node {} port check return output\n: {}", 
node.getId(), netInfo);
+
+            if (netInfo != null && !netInfo.trim().isEmpty()) {
+                log.error("agent node {} port {} already in use:\n {}", 
node.getId(), node.getAgentPort(), netInfo);
+                return false;
+            }
+        } else if (checkPortSSH.getExitCode() != 1) { //exit 1 when grep 
failed, other exit code is exception
+            log.warn("run check port cmd failed");
+            throw new Exception("check agent port scrpit execution exception");
+        }
+        return true;
+    }
+
     public void installAgentOperation(ResourceNodeEntity node, 
AgentInstallEventConfigInfo configInfo, long requestId) {
         log.info("install node {} agent for request {}", node.getId(), 
requestId);
         configInfo.setAgentNodeId(node.getId());
         configInfo.setInstallDir(node.getAgentInstallDir());
         configInfo.setHost(node.getHost());
+        configInfo.setAgentPort(node.getAgentPort());
 
         long eventId = node.getCurrentEventId();
-
+        log.info("event {}: to install and start {} node agent {}:{} in {}", 
eventId, node.getId(),
+                node.getHost(), node.getAgentPort(), 
node.getAgentInstallDir());
         // Check whether the current node is already installing agent or agent 
installation has failed
         HeartBeatEventEntity agentInstallAgentEntity;
         if (eventId < 1L) {
             log.debug("first install agent for node {}", node.getId());
-            // fisrt time install agent
+            // first time install agent
             // create HeartBeatEvent
             HeartBeatEventEntity eventEntity = new 
HeartBeatEventEntity(HeartBeatEventType.AGENT_INSTALL.name(),
                     HeartBeatEventResultType.INIT.name(), 
JSON.toJSONString(configInfo), requestId);
 
             agentInstallAgentEntity = 
heartBeatEventRepository.save(eventEntity);
             eventId = agentInstallAgentEntity.getId();
-
+            log.info("first time to install agent, create heart beat event 
{}", eventId);
             node.setCurrentEventId(eventId);
             nodeRepository.save(node);
         } else {
@@ -189,61 +218,25 @@ public class ResourceNodeAndAgentManager {
 
         // agent start
         // AGENT_START stage
-        String agentInstallHome = configInfo.getInstallDir() + File.separator 
+ "agent";
-
-        // 1 port check, eg: server.port=8008
-        // grep = application.properties | grep  -w server.port  | awk -F '=' 
'{print $2}'
-        String confFile = agentInstallHome + File.separator + 
AGENT_CONFIG_PATH;
-        String portGetFormat = "grep = %s | grep  -w server.port  | awk -F '=' 
'{print $2}'";
-        String portGetCmd = String.format(portGetFormat, confFile);
-
-        SSH portGetSSH = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
-                sshKeyFile.getAbsolutePath(), configInfo.getHost(), 
portGetCmd);
-
-        int agentPort = -1;
-        if (portGetSSH.run()) {
-            String portStr = portGetSSH.getStdoutResponse();
-            log.info("agent {} port get return output: {}", 
configInfo.getAgentNodeId(), portStr);
-
-            if (portStr == null || portStr.isEmpty()) {
-                log.warn("agent {} server.port is not set", 
configInfo.getAgentNodeId());
-            } else {
-                try {
-                    agentPort = Integer.parseInt(portStr.trim());
-                } catch (NumberFormatException e) {
-                    log.warn("agent port format is not Integer");
-                }
+        try {
+            if (!isAvailableAgentPort(node, configInfo)) {
+                log.error("port {}:{} already in use", configInfo.getHost(), 
configInfo.getAgentPort());
+                updateFailResult(String.format("agent port %s:%d already in 
use",
+                                configInfo.getHost(), 
configInfo.getAgentPort()),
+                        AgentInstallEventStage.AGENT_DEPLOY.getStage(), 
agentInstallAgentEntity);
+                return;
             }
-
-        } else {
-            log.warn("run agent port get cmd failed:{}, skip the check and use 
default port",
-                    portGetSSH.getErrorResponse());
+        } catch (Exception e) {
+            e.printStackTrace();
+            log.error("check agent port exception, skip port check");
         }
 
-        if (agentPort > 0) {
-            log.info("agent start port is {}", agentPort);
-            // only check listen port
-            String checkPortCmd = String.format("netstat -tunlp | grep  -w 
%s", agentPort);
-            SSH checkPortSSH = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
-                    sshKeyFile.getAbsolutePath(), configInfo.getHost(), 
checkPortCmd);
-            if (checkPortSSH.run()) {
-                String netInfo = checkPortSSH.getStdoutResponse();
-                log.info("agent {} port check return output: {}", 
configInfo.getAgentNodeId(), netInfo);
-
-                if (netInfo != null && !netInfo.trim().isEmpty()) {
-                    log.error("port {} already in use, {}", agentPort, 
netInfo);
-                    updateFailResult("port already in use",
-                            AgentInstallEventStage.AGENT_START.getStage(), 
agentInstallAgentEntity);
-                    return;
-                }
-            } else {
-                log.warn("run check port cmd failed");
-            }
-        }
+        String agentInstallHome = configInfo.getInstallDir() + File.separator 
+ "agent";
 
-        // 2 run start shell
-        String command = "cd %s && sh %s  --server %s --agent %s";
-        String cmd = String.format(command, agentInstallHome, 
AGENT_START_SCRIPT, getServerAddr(), configInfo.getAgentNodeId());
+        log.info("to start agent with port {}", configInfo.getAgentPort());
+        String command = "cd %s && sh %s  --server %s --agent %d --port %d";
+        String cmd = String.format(command, agentInstallHome, 
AGENT_START_SCRIPT,
+                getServerAddr(), configInfo.getAgentNodeId(), 
configInfo.getAgentPort());
         SSH startSsh = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
                 sshKeyFile.getAbsolutePath(), configInfo.getHost(), cmd);
         if (!startSsh.run()) {
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterCreationRequestHandler.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterCreationRequestHandler.java
index 39ae4a7..b37e422 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterCreationRequestHandler.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterCreationRequestHandler.java
@@ -104,11 +104,12 @@ public class DorisClusterCreationRequestHandler extends 
DorisClusterRequestHandl
 
     // CONFIG_AND_START_RESOURCE_CLUSTER
     private ModelControlResponse 
handleConfigAndStartResourceClusterEvent(CoreUserEntity user,
-                                                                  
DorisClusterCreationRequest request) {
+                                                                          
DorisClusterCreationRequest request)
+            throws Exception {
         long clusterId = request.getClusterId();
         ClusterInfoEntity clusterInfoEntity = 
clusterInfoRepository.findById(clusterId).get();
         dorisClusterManager.configClusterResourceOperation(clusterInfoEntity, 
request.getReqInfo().getPackageInfo(),
-                request.getReqInfo().getInstallInfo());
+                request.getReqInfo().getInstallInfo(), 
request.getReqInfo().getAgentPort());
         dorisClusterManager.startClusterResourceOperation(clusterInfoEntity, 
request.getRequestId());
 
         return getResponse(request, false);
@@ -135,7 +136,7 @@ public class DorisClusterCreationRequestHandler extends 
DorisClusterRequestHandl
 
     // CONFIG_AND_DEPLOY_DORIS_CLUSTER
     private ModelControlResponse 
handleConfigAndDeployDorisClusterEvent(CoreUserEntity user,
-                                                                
DorisClusterCreationRequest request) {
+                                                                        
DorisClusterCreationRequest request) {
         long clusterId = request.getClusterId();
 
         ClusterInfoEntity clusterInfoEntity = 
clusterInfoRepository.findById(clusterId).get();
@@ -147,7 +148,7 @@ public class DorisClusterCreationRequestHandler extends 
DorisClusterRequestHandl
 
     // DORIS_CLUSTER_DEPLOYED
     private ModelControlResponse 
handleDorisClusterDeployedEvent(CoreUserEntity user,
-                                                               
DorisClusterCreationRequest request) throws Exception {
+                                                                 
DorisClusterCreationRequest request) throws Exception {
         long clusterId = request.getClusterId();
         dorisClusterManager.checkClusterInstancesOperation(clusterId);
         return getResponse(request, false);
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterTakeOverRequestHandler.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterTakeOverRequestHandler.java
index 29ac117..e5434eb 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterTakeOverRequestHandler.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/request/handler/DorisClusterTakeOverRequestHandler.java
@@ -118,7 +118,7 @@ public class DorisClusterTakeOverRequestHandler extends 
DorisClusterRequestHandl
 
     // CREATE_AND_START_RESOURCE_CLUSTER
     private ModelControlResponse 
handleCreateAndStartResourceClusterEvent(CoreUserEntity user,
-                                                               
DorisClusterTakeOverRequest request) throws Exception {
+                                                                          
DorisClusterTakeOverRequest request) throws Exception {
         long clusterId = request.getClusterId();
         log.info("handle take over cluster {} 
CREATE_AND_START_RESOURCE_CLUSTER request {} event",
                 clusterId, request.getRequestId());
@@ -147,8 +147,10 @@ public class DorisClusterTakeOverRequestHandler extends 
DorisClusterRequestHandl
         log.debug("The node list IP of Doris cluster is {}", nodeIps);
 
         dorisClusterManager.createClusterResourceOperation(user, clusterInfo, 
request.getReqInfo().getAuthInfo(), nodeIps);
-        dorisClusterManager.configClusterResourceOperation(clusterInfo, "", 
request.getReqInfo().getInstallInfo());
+        dorisClusterManager.configClusterResourceOperation(clusterInfo, "",
+                request.getReqInfo().getInstallInfo(), 
request.getReqInfo().getAgentPort());
 
+        // TODO  sshInfo  and  iplist  can check agent port
         List<ResourceNodeEntity> nodeEntities =
                 
nodeRepository.getByResourceClusterId(clusterInfo.getResourceClusterId());
         Set<Long> feNodeIds = new HashSet<>();
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterCreationReq.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterCreationReq.java
index b6191c7..68a192f 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterCreationReq.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterCreationReq.java
@@ -37,6 +37,8 @@ public class DorisClusterCreationReq extends ModelControlReq {
 
     private String installInfo;
 
+    private int agentPort;
+
     // Step 4: Install agent
 
     // Step 5:Planning resource node
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterTakeOverReq.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterTakeOverReq.java
index fa7979a..1149f3a 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterTakeOverReq.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/control/DorisClusterTakeOverReq.java
@@ -39,6 +39,8 @@ public class DorisClusterTakeOverReq extends ModelControlReq {
 
     private String installInfo;
 
+    private int agentPort;
+
     // Step 4: check Install agent
     // Step 5: create cluster module and instance, check agent instance
 }
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java 
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
index bb18f19..2164bed 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
@@ -34,6 +34,7 @@ public abstract class BaseCommand {
     protected String[] resultCommand;
     protected String stdoutResponse;
     protected String errorResponse;
+    protected int exitCode;
 
     protected abstract void buildCommand();
 
@@ -45,6 +46,10 @@ public abstract class BaseCommand {
         return this.errorResponse;
     }
 
+    public int getExitCode() {
+        return this.exitCode;
+    }
+
     public boolean run() {
         buildCommand();
         log.info("run command: {}", StringUtils.join(resultCommand, " "));
@@ -59,7 +64,8 @@ public abstract class BaseCommand {
 
             stdoutResponse = 
stdoutBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
             errorResponse = 
errorBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
-            final int exitCode = process.waitFor();
+
+            exitCode = process.waitFor();
             if (exitCode == 0) {
                 return true;
             } else {
diff --git a/manager/manager-bin/agent/bin/agent_start.sh 
b/manager/manager-bin/agent/bin/agent_start.sh
index 6fee0f0..c83f307 100644
--- a/manager/manager-bin/agent/bin/agent_start.sh
+++ b/manager/manager-bin/agent/bin/agent_start.sh
@@ -27,6 +27,7 @@ OPTS=$(getopt \
   -o '' \
   -l 'server:' \
   -l 'agent:' \
+  -l 'port:' \
   -- "$@")
 
 eval set -- "$OPTS"
@@ -34,10 +35,12 @@ eval set -- "$OPTS"
 #host:port
 SERVER=
 AGENT=
+PORT=
 while true; do
     case "$1" in
         --server) SERVER=$2 ; shift 2;;
         --agent) AGENT=$2 ; shift 2;;
+        --port) PORT=$2 ; shift  2;;
         --) shift ;  break ;;
         *) echo "Internal error" ; exit 1 ;;
     esac
@@ -52,6 +55,12 @@ if [ x"$AGENT" == x"" ]; then
     echo "--agent node id can not empty!"
     exit 1
 fi
+
+if [ x"$PORT" == x"" ]; then
+    echo "--port agent port can not empty!"
+    exit 1
+fi
+
 export AGENT_HOME=`cd "$curdir/.."; pwd`
 
 #
@@ -60,7 +69,7 @@ export AGENT_HOME=`cd "$curdir/.."; pwd`
 # LOG_DIR
 # PID_DIR
 export JAVA_OPTS="-Xmx1024m"
-export SERVER_PARAMS="--manager.server.endpoint=$SERVER --agent.node.id=$AGENT"
+export SERVER_PARAMS="--manager.server.endpoint=$SERVER --agent.node.id=$AGENT 
--server.port=$PORT"
 export LOG_DIR="$AGENT_HOME/log"
 export PID_DIR=`cd "$curdir"; pwd`
 
@@ -88,4 +97,4 @@ fi
 nohup $JAVA $JAVA_OPTS -jar ${AGENT_HOME}/lib/dm-agent.jar $SERVER_PARAMS >> 
$LOG_DIR/agent.out 2>&1  &
 echo `date` >> $LOG_DIR/agent.out
 
-echo $! > $pidfile
+echo $! > $pidfile
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to