Repository: incubator-slider Updated Branches: refs/heads/feature/packaging_improvements c7b08d869 -> ec2f013eb
Adding support for Docker container Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/ec2f013e Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/ec2f013e Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/ec2f013e Branch: refs/heads/feature/packaging_improvements Commit: ec2f013eb1b6dddc31902093644c06cf49a4891e Parents: c7b08d8 Author: Sumit Mohanty <[email protected]> Authored: Sun Mar 1 22:39:02 2015 -0800 Committer: Sumit Mohanty <[email protected]> Committed: Sun Mar 1 22:39:02 2015 -0800 ---------------------------------------------------------------------- .../src/main/python/agent/ActionQueue.py | 24 +- .../src/main/python/agent/DockerManager.py | 178 ++++++++++++ slider-agent/src/main/python/agent/main.py | 6 + slider-core/pom.xml | 6 +- .../providers/agent/AgentProviderService.java | 277 ++++++++++++++++++- .../application/metadata/DockerContainer.java | 37 ++- .../metadata/DockerContainerInputFile.java | 32 +++ .../web/rest/agent/ExecutionCommand.java | 12 + 8 files changed, 559 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ec2f013e/slider-agent/src/main/python/agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/ActionQueue.py b/slider-agent/src/main/python/agent/ActionQueue.py index 4cb5de7..e82339e 100644 --- a/slider-agent/src/main/python/agent/ActionQueue.py +++ b/slider-agent/src/main/python/agent/ActionQueue.py @@ -31,7 +31,9 @@ from AgentToggleLogger import AgentToggleLogger from CommandStatusDict import CommandStatusDict from CustomServiceOrchestrator import CustomServiceOrchestrator import Constants - +import subprocess +import getpass +from DockerManager import DockerManager logger = logging.getLogger() installScriptHash = -1 @@ -51,6 +53,8 @@ class ActionQueue(threading.Thread): STORE_APPLIED_CONFIG = 'record_config' AUTO_RESTART = 'auto_restart' + + docker_mode = False def __init__(self, config, controller, agentToggleLogger): super(ActionQueue, self).__init__() @@ -66,7 +70,8 @@ class ActionQueue(threading.Thread): self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller, self.queueOutAgentToggleLogger) - + self.dockerManager = DockerManager(self.tmpdir, config.getWorkRootPath(), self.customServiceOrchestrator) + def stop(self): self._stop.set() @@ -157,7 +162,14 @@ class ActionQueue(threading.Thread): logger.info("Component has indicated auto-restart. Saving details from START command.") # running command - commandresult = self.customServiceOrchestrator.runCommand(command, + commandresult = None + logger.info("command fromhost: " + str(command)) + + if 'configurations' in command and 'docker' in command['configurations']: + self.docker_mode = True + commandresult = self.dockerManager.execute_command(command, store_config or store_command) + else: + commandresult = self.customServiceOrchestrator.runCommand(command, in_progress_status[ 'tmpout'], in_progress_status[ @@ -214,7 +226,11 @@ class ActionQueue(threading.Thread): service = command['serviceName'] component = command['componentName'] reportResult = CommandStatusDict.shouldReportResult(command) - component_status = self.customServiceOrchestrator.requestComponentStatus(command) + component_status = None + if self.docker_mode: + component_status = self.dockerManager.query_status(command) + else: + component_status = self.customServiceOrchestrator.requestComponentStatus(command) result = {"componentName": component, "msg": "", http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ec2f013e/slider-agent/src/main/python/agent/DockerManager.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/DockerManager.py b/slider-agent/src/main/python/agent/DockerManager.py new file mode 100644 index 0000000..23c8c49 --- /dev/null +++ b/slider-agent/src/main/python/agent/DockerManager.py @@ -0,0 +1,178 @@ +import logging +import os +import subprocess +from AgentConfig import AgentConfig +import Constants + +logger = logging.getLogger() + +class DockerManager(): + stored_status_command = '' + stored_command = '' + + def __init__(self, tmpdir, workroot, customServiceOrchestrator): + self.tmpdir = tmpdir + self.workroot = workroot + self.customServiceOrchestrator = customServiceOrchestrator + + def execute_command(self, command, store_command=False): + returncode = '' + out = '' + err = '' + + if store_command: + logger.info("Storing applied config: " + str(command['configurations'])) + self.stored_command = command + + if command['roleCommand'] == 'INSTALL': + returncode, out, err = self.pull_image(command) + if command['roleCommand'] == 'START': + returncode, out, err = self.start_container(command) + # need check + return {Constants.EXIT_CODE:returncode, 'stdout':out, 'stderr':err} + + def pull_image(self, command): + logger.info(str( command['configurations'])) + command_path = self.extract_config_from_command(command, 'docker.command_path') + imageName = self.extract_config_from_command(command, 'docker.image_name') + + docker_command = [command_path, 'pull', imageName] + logger.info("docker pull command: " + str(docker_command)) + return self.execute_command_on_linux(docker_command) + + + def extract_config_from_command(self, command, field): + value = '' + if 'configurations' in command: + if 'docker' in command['configurations']: + if field in command['configurations']['docker']: + logger.info(field + ': ' + str( command['configurations']['docker'][field])) + value = command['configurations']['docker'][field] + return value + + + # will evolve into a class hierarch, linux and windows + def execute_command_on_linux(self, docker_command): + proc = subprocess.Popen(docker_command, stdout = subprocess.PIPE) + out, err = proc.communicate() + logger.info("docker command output: " + str(out) + " err: " + str(err)) + return proc.returncode, out, err + + + def start_container(self, command): + #extracting param needed by docker run from the command passed from AM + command_path = self.extract_config_from_command(command, 'docker.command_path') + imageName = self.extract_config_from_command(command, 'docker.image_name') + options = self.extract_config_from_command(command, 'docker.options') + containerPort = self.extract_config_from_command(command, 'docker.container_port') + mounting_directory = self.extract_config_from_command(command, 'docker.mounting_directory') + memory_usage = self.extract_config_from_command(command, "docker.memory_usage") + additional_param = self.extract_config_from_command(command, 'docker.additional_param') + input_file_local_path = self.extract_config_from_command(command, 'docker.input_file.local_path') + input_file_mount_path = self.extract_config_from_command(command, 'docker.input_file.mount_path') + + docker_command = [command_path, "run"] + if options: + docker_command = self.add_docker_run_options_to_command(docker_command, options) + if containerPort: + self.add_port_binding_to_command(docker_command, command, containerPort) + if mounting_directory: + self.add_mnted_dir_to_command(docker_command, "/docker_use", mounting_directory) + if input_file_local_path: + self.add_mnted_dir_to_command(docker_command, "/inputDir", input_file_mount_path) + if memory_usage: + self.add_resource_restriction(docker_command, memory_usage) + self.add_container_name_to_command(docker_command, command) + docker_command.append(imageName) + if additional_param: + docker_command = self.add_additional_param_to_command(docker_command, additional_param) + logger.info("docker run command: " + str(docker_command)) + return self.execute_command_on_linux(docker_command) + + def add_docker_run_options_to_command(self, docker_command, options): + return docker_command + options.split(" ") + + def add_port_binding_to_command(self, docker_command, command, containerPort): + docker_command.append("-p") + hostPort = self.extract_config_from_command(command, 'docker.host_port') + if not hostPort: + #this is the list of allowed port range specified in appConfig + allowedPorts = self.customServiceOrchestrator.get_allowed_ports(command) + #if the user specify hostPort in appConfig, then we use it, otherwise allocate it + allocated_for_this_component_format = "${{{0}.ALLOCATED_PORT}}" + component = command['componentName'] + port_allocation_req = allocated_for_this_component_format.format(component) + hostPort = self.customServiceOrchestrator.allocate_ports(port_allocation_req, port_allocation_req, allowedPorts) + docker_command.append(hostPort+":"+containerPort) + + def add_mnted_dir_to_command(self, docker_command, host_dir, container_dir): + docker_command.append("-v") + tmp_mount_dir = self.workroot + host_dir + docker_command.append(tmp_mount_dir+":"+container_dir) + + def add_container_name_to_command(self, docker_command, command): + docker_command.append("-name") + docker_command.append(self.get_container_id(command)) + + def add_additional_param_to_command(self, docker_command, additional_param): + return docker_command + additional_param.split(" ") + + def get_container_id(self, command): + # will make this more resilient to changes + return self.tmpdir[-30:-2] + + def add_resource_restriction(self, docker_command, memory_usage): + docker_command.append("-m") + docker_command.append(memory_usage) + + def query_status(self, command): + if command['roleCommand'] == "GET_CONFIG": + return self.getConfig(command) + else: + returncode = '' + out = '' + err = '' + status_command_str = self.extract_config_from_command(command, 'docker.status_command') + if status_command_str: + self.stored_status_command = status_command_str.split(" ") + logger.info("status command" + str(self.stored_status_command)) + if self.stored_status_command: + returncode, out, err = self.execute_command_on_linux(self.stored_status_command) + logger.info("status of the app in docker container: " + str(returncode) + str(out) + str(err)) + return {Constants.EXIT_CODE:returncode, 'stdout':out, 'stderr':err} + + def getConfig(self, command): + logger.info("get config command: " + str(command)) + if 'configurations' in self.stored_command: + if 'commandParams' in command and 'config_type' in command['commandParams']: + config_type = command['commandParams']['config_type'] + logger.info("Requesting applied config for type {0}".format(config_type)) + if config_type in self.stored_command['configurations']: + logger.info("get config result: " + self.stored_command['configurations'][config_type]) + return { + 'configurations': {config_type: self.stored_command['configurations'][config_type]} + } + else: + return { + 'configurations': {} + } + pass + else: + logger.info("Requesting all applied config." + str(self.stored_command['configurations'])) + return { + 'configurations': self.stored_command['configurations'] + } + pass + else: + return { + 'configurations': {} + } + pass + + def stop_container(self): + docker_command = ["/usr/bin/docker", "stop"] + docker_command.append(self.get_container_id(docker_command)) + logger.info("docker stop: " + str(docker_command)) + code, out, err = self.execute_command_on_linux(docker_command) + logger.info("output: " + str(out)) + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ec2f013e/slider-agent/src/main/python/agent/main.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/main.py b/slider-agent/src/main/python/agent/main.py index 3a75cb1..3aeaea2 100644 --- a/slider-agent/src/main/python/agent/main.py +++ b/slider-agent/src/main/python/agent/main.py @@ -41,6 +41,7 @@ logger = logging.getLogger() IS_WINDOWS = platform.system() == "Windows" formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s" agentPid = os.getpid() +con = None configFileRelPath = "infra/conf/agent.ini" logFileName = "slider-agent.log" @@ -54,6 +55,9 @@ def signal_handler(signum, frame): if os.getpid() != agentPid: os._exit(0) logger.info('signal received, exiting.') + + tmpdir = con.actionQueue.dockerManager.stop_container() + ProcessHelper.stopAgent() @@ -287,6 +291,8 @@ def main(): # Launch Controller communication controller = Controller(agentConfig) controller.start() + global con + con = controller try: while controller.is_alive(): controller.join(timeout=1.0) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ec2f013e/slider-core/pom.xml ---------------------------------------------------------------------- diff --git a/slider-core/pom.xml b/slider-core/pom.xml index d5b3093..6cba0d7 100644 --- a/slider-core/pom.xml +++ b/slider-core/pom.xml @@ -205,7 +205,11 @@ <groupId>com.beust</groupId> <artifactId>jcommander</artifactId> </dependency> - + <dependency> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + <version>3.3.1</version> + </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ec2f013e/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index b624221..f4d3275 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.ProtocolTypes; import org.apache.hadoop.registry.client.types.ServiceRecord; -import org.apache.hadoop.security.alias.CredentialProviderFactory; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.Container; @@ -38,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.slider.api.ClusterDescription; -import org.apache.slider.api.ClusterDescriptionKeys; import org.apache.slider.api.ClusterNode; import org.apache.slider.api.InternalKeys; import org.apache.slider.api.OptionKeys; @@ -73,6 +71,7 @@ import org.apache.slider.providers.agent.application.metadata.ComponentCommand; import org.apache.slider.providers.agent.application.metadata.ComponentExport; import org.apache.slider.providers.agent.application.metadata.ConfigFile; import org.apache.slider.providers.agent.application.metadata.DefaultConfig; +import org.apache.slider.providers.agent.application.metadata.DockerContainer; import org.apache.slider.providers.agent.application.metadata.Export; import org.apache.slider.providers.agent.application.metadata.ExportGroup; import org.apache.slider.providers.agent.application.metadata.Metainfo; @@ -718,11 +717,13 @@ public class AgentProviderService extends AbstractProviderService implements CommandScript cmdScript = getScriptPathFromMetainfo(roleName); List<ComponentCommand> commands = getMetaInfo().getApplicationComponent(roleName).getCommands(); + /* if ((cmdScript == null || cmdScript.getScript() == null) && commands.size() == 0) { log.error("role.script is unavailable for {}. Commands will not be sent.", roleName); return response; } + */ String scriptPath = null; long timeout = 600L; @@ -794,7 +795,11 @@ public class AgentProviderService extends AbstractProviderService implements installCmd = compCmd; } } - addInstallCommand2(roleName, containerId, response, installCmd, timeout); + if(isDockerContainer(roleName)){ + addInstallDockerCommand2(roleName, containerId, response, installCmd, timeout); + } else { + addInstallCommand2(roleName, containerId, response, installCmd, timeout); + } componentStatus.commandIssued(command); } } else if (command == Command.START) { @@ -819,7 +824,11 @@ public class AgentProviderService extends AbstractProviderService implements startCmd = compCmd; } } - addStartCommand2(roleName, containerId, response, startCmd, timeout, false); + if(isDockerContainer(roleName)){ + addStartDockerCommand2(roleName, containerId, response, startCmd, timeout, false); + } else { + addStartCommand2(roleName, containerId, response, startCmd, timeout, false); + } componentStatus.commandIssued(command); } else { log.info("Start of {} on {} delayed as dependencies have not started.", roleName, containerId); @@ -833,7 +842,11 @@ public class AgentProviderService extends AbstractProviderService implements && command == Command.NOP) { if (!componentStatus.getConfigReported()) { log.info("Requesting applied config for {} on {}.", roleName, containerId); - addGetConfigCommand(roleName, containerId, response); + if(isDockerContainer(roleName)){ + addGetConfigDockerCommand(roleName, containerId, response); + } else { + addGetConfigCommand(roleName, containerId, response); + } } } @@ -860,6 +873,14 @@ public class AgentProviderService extends AbstractProviderService implements return response; } + private boolean isDockerContainer(String roleName){ + String type = getMetaInfo().getApplicationComponent(roleName).getType(); + if(SliderUtils.isSet(type)) { + return type.toLowerCase().equals("docker"); + } + return false; + } + protected void processAllocatedPorts(String fqdn, String roleName, String containerId, @@ -1740,6 +1761,60 @@ public class AgentProviderService extends AbstractProviderService implements response.addExecutionCommand(cmd); } + @VisibleForTesting + protected void addInstallDockerCommand2(String componentName, + String containerId, + HeartBeatResponse response, + ComponentCommand compCmd, + long timeout) + throws SliderException { + assert getAmState().isApplicationLive(); + ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); + + ExecutionCommand cmd = new ExecutionCommand(AgentCommandType.EXECUTION_COMMAND); + prepareExecutionCommand(cmd); + String clusterName = getClusterName(); + cmd.setClusterName(clusterName); + cmd.setRoleCommand(Command.INSTALL.toString()); + cmd.setServiceName(clusterName); + cmd.setComponentName(componentName); + cmd.setRole(componentName); + Map<String, String> hostLevelParams = new TreeMap<String, String>(); + hostLevelParams.put(PACKAGE_LIST, getPackageList()); + hostLevelParams.put(CONTAINER_ID, containerId); + cmd.setHostLevelParams(hostLevelParams); + + Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId, componentName); + cmd.setConfigurations(configurations); + + ComponentCommand effectiveCommand = compCmd; + if(compCmd == null) { + effectiveCommand = new ComponentCommand(); + effectiveCommand.setName("INSTALL"); + effectiveCommand.setExec("DEFAULT"); + } + cmd.setCommandParams(setCommandParameters(effectiveCommand, timeout, false)); + configurations.get("global").put("exec_cmd", effectiveCommand.getExec()); + + cmd.setHostname(getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME)); + cmd.addContainerDetails(componentName, getMetaInfo()); + + log.info("bbb: " + getMetaInfo().toString()); + log.info("bbb: " + getMetaInfo().getApplicationComponent(componentName).toString()); + log.info("bbb: " + getMetaInfo().getApplicationComponent(componentName).getDockerContainers().get(0).getImage()); + + Map<String, String> dockerConfig = new HashMap<String, String>(); + dockerConfig.put("docker.command_path", + appConf.getGlobalOptions().get("site.docker.docker.command_path")); + dockerConfig.put("docker.image_name", + getConfigFromMetaInfo(componentName, "image")); + configurations.put("docker", dockerConfig); + + log.info("bbb configuration" + cmd.toString()); + + response.addExecutionCommand(cmd); + } + protected static String getPackageListFromApplication(Application application) { String pkgFormatString = "{\"type\":\"%s\",\"name\":\"%s\"}"; String pkgListFormatString = "[%s]"; @@ -1811,6 +1886,12 @@ public class AgentProviderService extends AbstractProviderService implements String scriptPath, long timeout) throws SliderException { + + if(isDockerContainer(componentName)){ + addStatusDockerCommand(componentName, containerId, response, scriptPath, timeout); + return; + } + assert getAmState().isApplicationLive(); ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); @@ -1838,6 +1919,43 @@ public class AgentProviderService extends AbstractProviderService implements } @VisibleForTesting + protected void addStatusDockerCommand(String componentName, + String containerId, + HeartBeatResponse response, + String scriptPath, + long timeout) + throws SliderException { + assert getAmState().isApplicationLive(); + ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); + + StatusCommand cmd = new StatusCommand(); + String clusterName = getClusterName(); + + cmd.setCommandType(AgentCommandType.STATUS_COMMAND); + cmd.setComponentName(componentName); + cmd.setServiceName(clusterName); + cmd.setClusterName(clusterName); + cmd.setRoleCommand(StatusCommand.STATUS_COMMAND); + + Map<String, String> hostLevelParams = new TreeMap<String, String>(); + hostLevelParams.put(JAVA_HOME, appConf.getGlobalOptions().getMandatoryOption(JAVA_HOME)); + hostLevelParams.put(CONTAINER_ID, containerId); + cmd.setHostLevelParams(hostLevelParams); + + cmd.setCommandParams(setCommandParameters(scriptPath, timeout, false)); + + Map<String, Map<String, String>> configurations = buildCommandConfigurations( + appConf, containerId, componentName); + Map<String, String> dockerConfig = new HashMap<String, String>(); + dockerConfig.put("docker.status_command", + getConfigFromMetaInfo(componentName, "status_command")); + configurations.put("docker", dockerConfig); + cmd.setConfigurations(configurations); + log.info("bbb status" + cmd); + response.addStatusCommand(cmd); + } + + @VisibleForTesting protected void addGetConfigCommand(String componentName, String containerId, HeartBeatResponse response) throws SliderException { assert getAmState().isApplicationLive(); @@ -1860,6 +1978,84 @@ public class AgentProviderService extends AbstractProviderService implements } @VisibleForTesting + protected void addGetConfigDockerCommand(String componentName, String containerId, HeartBeatResponse response) + throws SliderException { + assert getAmState().isApplicationLive(); + + StatusCommand cmd = new StatusCommand(); + String clusterName = getClusterName(); + + cmd.setCommandType(AgentCommandType.STATUS_COMMAND); + cmd.setComponentName(componentName); + cmd.setServiceName(clusterName); + cmd.setClusterName(clusterName); + cmd.setRoleCommand(StatusCommand.GET_CONFIG_COMMAND); + Map<String, String> hostLevelParams = new TreeMap<String, String>(); + hostLevelParams.put(CONTAINER_ID, containerId); + cmd.setHostLevelParams(hostLevelParams); + + hostLevelParams.put(CONTAINER_ID, containerId); + + ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); + Map<String, Map<String, String>> configurations = buildCommandConfigurations( + appConf, containerId, componentName); + Map<String, String> dockerConfig = new HashMap<String, String>(); + dockerConfig.put("docker.status_command", + getConfigFromMetaInfo(componentName, "status_command")); + configurations.put("docker", dockerConfig); + + cmd.setConfigurations(configurations); + log.info("bbb getconfig command " + cmd); + + response.addStatusCommand(cmd); + } + + private String getConfigFromMetaInfo(String componentName, + String configName) { + String result = null; + DockerContainer container = getMetaInfo() + .getApplicationComponent(componentName).getDockerContainers().get(0);//to support multi container per component later + switch (configName){ + case "image": + result = container.getImage(); + break; + case "status_command": + result = container.getStatusCommand(); + break; + case "docker_command_path": + result = container.getCommandPath(); + break; + case "docker_run_option": + result = container.getOptions(); + break; + case "container_port": + result = container.getPorts().get(0).getContainerPort();//to support multi port later + break; + case "host_port": + result = container.getPorts().get(0).getHostPort();//to support multi port later + break; + case "containerMount": + result = container.getMounts().get(0).getContainerMount();//to support multi port later + break; + case "hostMount": + result = container.getMounts().get(0).getHostMount();//to support multi port later + break; + case "additional_param": + result = container.getAdditionalParam();//to support multi port later + break; + case "input_file_container_mount": + result = container.getInputFiles().get(0).getContainerMount();//to support multi port later + break; + case "input_file_local_path": + result = container.getInputFiles().get(0).getFileLocalPath();//to support multi port later + break; + default: + break; + } + return result; + } + + @VisibleForTesting protected void addStartCommand(String componentName, String containerId, HeartBeatResponse response, String scriptPath, long timeout, boolean isMarkedAutoRestart) throws @@ -1963,6 +2159,77 @@ public class AgentProviderService extends AbstractProviderService implements response.addExecutionCommand(cmd); } + @VisibleForTesting + protected void addStartDockerCommand2(String componentName, String containerId, HeartBeatResponse response, + ComponentCommand startCommand, long timeout, boolean isMarkedAutoRestart) + throws + SliderException { + assert getAmState().isApplicationLive(); + ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); + ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot(); + + ExecutionCommand cmd = new ExecutionCommand(AgentCommandType.EXECUTION_COMMAND); + prepareExecutionCommand(cmd); + String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME); + String hostName = getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME); + cmd.setHostname(hostName); + cmd.setClusterName(clusterName); + cmd.setRoleCommand(Command.START.toString()); + cmd.setServiceName(clusterName); + cmd.setComponentName(componentName); + cmd.setRole(componentName); + Map<String, String> hostLevelParams = new TreeMap<>(); + hostLevelParams.put(CONTAINER_ID, containerId); + cmd.setHostLevelParams(hostLevelParams); + + Map<String, String> roleParams = new TreeMap<>(); + cmd.setRoleParams(roleParams); + cmd.getRoleParams().put("auto_restart", Boolean.toString(isMarkedAutoRestart)); + + cmd.setCommandParams(setCommandParameters(startCommand, timeout, true)); + + Map<String, Map<String, String>> configurations = buildCommandConfigurations( + appConf, containerId, componentName); + + log.info("bbb: " + getMetaInfo().toString()); + log.info("bbb: " + getMetaInfo().getApplicationComponent(componentName).toString()); + + Map<String, String> dockerConfig = new HashMap<String, String>(); + String docker_command_path = getConfigFromMetaInfo(componentName, "docker_command_path"); + if(docker_command_path == null){ + docker_command_path = appConf.getGlobalOptions().get("site.docker.docker.command_path"); + } + dockerConfig.put("docker.command_path",docker_command_path); + dockerConfig.put("docker.image_name", + getConfigFromMetaInfo(componentName, "image")); + String docker_run_options = getConfigFromMetaInfo(componentName, "docker_run_option"); + if(docker_run_options == null){ + docker_run_options = appConf.getGlobalOptions().get("site.docker.options"); + } + dockerConfig.put("docker.options",docker_run_options); + dockerConfig.put("docker.container_port", + getConfigFromMetaInfo(componentName, "docker.container_port")); + dockerConfig.put("docker.host_port", + getConfigFromMetaInfo(componentName, "docker.host_port")); + + // dockerConfig + // .put("docker.mounting_directory", getConfigFromMetaInfo(componentName, "containerMount")); + // dockerConfig + // .put("docker.host_mounting_directory", getConfigFromMetaInfo(componentName, "hostMount")); + + dockerConfig.put("docker.additional_param", + getConfigFromMetaInfo(componentName, "additional_param")); + + // dockerConfig.put("docker.input_file.mount_path", getConfigFromMetaInfo( + // componentName, "intpu_file_container_mount")); + configurations.put("docker", dockerConfig); + + cmd.setConfigurations(configurations); + configurations.get("global").put("exec_cmd", startCommand.getExec()); + cmd.addContainerDetails(componentName, getMetaInfo()); + response.addExecutionCommand(cmd); + } + protected Map<String, String> getAllocatedPorts() { return getAllocatedPorts(SHARED_PORT_TAG); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ec2f013e/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/DockerContainer.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/DockerContainer.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/DockerContainer.java index 3117f3b..f111656 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/DockerContainer.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/DockerContainer.java @@ -39,21 +39,28 @@ public class DockerContainer implements Validate { private String options; private List<DockerContainerMount> mounts = new ArrayList<>(); private List<DockerContainerPort> ports = new ArrayList<>(); + private String statusCommand; + private String commandPath; + private String additionalParam; + private List<DockerContainerInputFile> inputFiles = new ArrayList<>(); public DockerContainer() { } @JsonProperty("mounts") - public List<DockerContainerMount> getMounts() { - return this.mounts; - } + public List<DockerContainerMount> getMounts() { return this.mounts; } @JsonProperty("ports") public List<DockerContainerPort> getPorts() { return this.ports; } + @JsonProperty("inputFiles") + public List<DockerContainerInputFile> getInputFiles() { + return this.inputFiles; + } + public String getName() { return name; } @@ -88,4 +95,28 @@ public class DockerContainer implements Validate { dcp.validate(version); } } + + public String getStatusCommand() { + return statusCommand; + } + + public void setStatusCommand(String statusCommand) { + this.statusCommand = statusCommand; + } + + public String getCommandPath() { + return commandPath; + } + + public void setCommandPath(String commandPath) { + this.commandPath = commandPath; + } + + public String getAdditionalParam() { + return additionalParam; + } + + public void setAdditionalParam(String additionalParam) { + this.additionalParam = additionalParam; + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ec2f013e/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/DockerContainerInputFile.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/DockerContainerInputFile.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/DockerContainerInputFile.java new file mode 100644 index 0000000..1466678 --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/DockerContainerInputFile.java @@ -0,0 +1,32 @@ +package org.apache.slider.providers.agent.application.metadata; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DockerContainerInputFile { + protected static final Logger log = LoggerFactory + .getLogger(DockerContainerInputFile.class); + + private String containerMount; + private String fileLocalPath; + + public DockerContainerInputFile() { + } + + public String getContainerMount() { + return containerMount; + } + + public void setContainerMount(String containerMount) { + this.containerMount = containerMount; + } + + public String getFileLocalPath() { + return fileLocalPath; + } + + public void setFileLocalPath(String fileLocalPath) { + this.fileLocalPath = fileLocalPath; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ec2f013e/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/ExecutionCommand.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/ExecutionCommand.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/ExecutionCommand.java index 9208707..addeee0 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/ExecutionCommand.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/ExecutionCommand.java @@ -20,6 +20,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.slider.providers.agent.application.metadata.Component; import org.apache.slider.providers.agent.application.metadata.DockerContainer; +import org.apache.slider.providers.agent.application.metadata.DockerContainerInputFile; import org.apache.slider.providers.agent.application.metadata.DockerContainerMount; import org.apache.slider.providers.agent.application.metadata.DockerContainerPort; import org.apache.slider.providers.agent.application.metadata.Metainfo; @@ -232,6 +233,9 @@ public class ExecutionCommand { container.setImage(metaContainer.getImage()); container.setName(metaContainer.getName()); container.setOptions(metaContainer.getOptions()); + container.setAdditionalParam(metaContainer.getAdditionalParam()); + container.setCommandPath(metaContainer.getAdditionalParam()); + container.setStatusCommand(metaContainer.getStatusCommand()); if(metaContainer.getMounts().size() > 0) { for(DockerContainerMount metaContMount : metaContainer.getMounts()) { DockerContainerMount contMnt = new DockerContainerMount(); @@ -248,6 +252,14 @@ public class ExecutionCommand { container.getPorts().add(cntPort); } } + if(metaContainer.getInputFiles().size() > 0) { + for(DockerContainerInputFile metaInpFile : metaContainer.getInputFiles()) { + DockerContainerInputFile inpFile = new DockerContainerInputFile(); + inpFile.setContainerMount(metaInpFile.getContainerMount()); + inpFile.setFileLocalPath(metaInpFile.getFileLocalPath()); + container.getInputFiles().add(inpFile); + } + } this.getContainers().add(container); } }
