Repository: incubator-slider Updated Branches: refs/heads/develop 2f08b2da8 -> bd62d359a
SLIDER-463. Have each container instance for a role be assigned with a unique integer id starting at 1 up to maximum requested instances Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/bd62d359 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/bd62d359 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/bd62d359 Branch: refs/heads/develop Commit: bd62d359a16aec32281f419d7ba36e3c48fca553 Parents: 2f08b2d Author: Sumit Mohanty <[email protected]> Authored: Thu Oct 16 22:31:54 2014 -0700 Committer: Sumit Mohanty <[email protected]> Committed: Thu Oct 16 22:31:54 2014 -0700 ---------------------------------------------------------------------- .../src/main/python/agent/Controller.py | 36 ++++-- slider-agent/src/main/python/agent/Register.py | 7 +- .../src/test/python/agent/TestRegistration.py | 7 +- .../providers/agent/AgentProviderService.java | 95 ++++++++------ .../providers/agent/ComponentTagProvider.java | 127 +++++++++++++++++++ .../appmaster/web/rest/agent/Register.java | 22 +++- .../web/rest/agent/RegistrationResponse.java | 14 +- .../providers/agent/AgentTestUtils.groovy | 2 +- .../agent/TestAgentProviderService.java | 12 +- .../agent/TestComponentTagProvider.java | 115 +++++++++++++++++ .../web/rest/agent/TestAMAgentWebServices.java | 2 +- 11 files changed, 366 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd62d359/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index 93390eb..5afab83 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -93,6 +93,7 @@ class Controller(threading.Thread): self.stopCommand = None self.appGracefulStopQueued = False self.appGracefulStopTriggered = False + self.tags = "" def __del__(self): @@ -129,34 +130,41 @@ class Controller(threading.Thread): self.componentExpectedState, self.actionQueue.customServiceOrchestrator.allocated_ports, self.actionQueue.customServiceOrchestrator.log_folders, + self.tags, id)) logger.info("Registering with the server at " + self.registerUrl + " with data " + pprint.pformat(data)) response = self.sendRequest(self.registerUrl, data) - ret = json.loads(response) + regResp = json.loads(response) exitstatus = 0 - # exitstatus is a code of error which was rised on server side. + # exitstatus is a code of error which was raised on server side. # exitstatus = 0 (OK - Default) # exitstatus = 1 (Registration failed because # different version of agent and server) - if 'exitstatus' in ret.keys(): - exitstatus = int(ret['exitstatus']) - # log - message, which will be printed to agents log - if 'log' in ret.keys(): - log = ret['log'] + if 'exitstatus' in regResp.keys(): + exitstatus = int(regResp['exitstatus']) + + # log - message, which will be printed to agents log + if 'log' in regResp.keys(): + log = regResp['log'] + + # container may be associated with tags + if 'tags' in regResp.keys(): + self.tags = regResp['tags'] + if exitstatus == 1: logger.error(log) self.isRegistered = False self.repeatRegistration = False - return ret - logger.info("Registered with the server with " + pprint.pformat(ret)) + return regResp + logger.info("Registered with the server with " + pprint.pformat(regResp)) print("Registered with the server") - self.responseId = int(ret['responseId']) + self.responseId = int(regResp['responseId']) self.isRegistered = True - if 'statusCommands' in ret.keys(): + if 'statusCommands' in regResp.keys(): logger.info("Got status commands on registration " + pprint.pformat( - ret['statusCommands'])) - self.addToQueue(ret['statusCommands']) + regResp['statusCommands'])) + self.addToQueue(regResp['statusCommands']) pass else: self.hasMappedComponents = False @@ -173,7 +181,7 @@ class Controller(threading.Thread): time.sleep(delay) pass pass - return ret + return regResp def addToQueue(self, commands): http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd62d359/slider-agent/src/main/python/agent/Register.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Register.py b/slider-agent/src/main/python/agent/Register.py index c8246c7..c5197fd 100644 --- a/slider-agent/src/main/python/agent/Register.py +++ b/slider-agent/src/main/python/agent/Register.py @@ -29,20 +29,21 @@ class Register: def __init__(self, config): self.config = config - def build(self, actualState, expectedState, allocated_ports, log_folders, id='-1'): + def build(self, actualState, expectedState, allocated_ports, log_folders, tags="", id='-1'): timestamp = int(time.time() * 1000) version = self.read_agent_version() register = {'responseId': int(id), 'timestamp': timestamp, - 'hostname': self.config.getLabel(), + 'label': self.config.getLabel(), 'publicHostname': hostname.public_hostname(), 'agentVersion': version, 'actualState': actualState, 'expectedState': expectedState, 'allocatedPorts': allocated_ports, - 'logFolders': log_folders + 'logFolders': log_folders, + 'tags': tags } return register http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd62d359/slider-agent/src/test/python/agent/TestRegistration.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestRegistration.py b/slider-agent/src/test/python/agent/TestRegistration.py index 2c98978..c82d784 100644 --- a/slider-agent/src/test/python/agent/TestRegistration.py +++ b/slider-agent/src/test/python/agent/TestRegistration.py @@ -39,9 +39,9 @@ class TestRegistration(TestCase): config.set('agent', 'current_ping_port', '33777') register = Register(config) - data = register.build(State.INIT, State.INIT, {}, {}, 1) + data = register.build(State.INIT, State.INIT, {}, {}, "tag", 1) #print ("Register: " + pprint.pformat(data)) - self.assertEquals(data['hostname'] != "", True, "hostname should not be empty") + self.assertEquals(data['label'] != "", True, "hostname should not be empty") self.assertEquals(data['publicHostname'] != "", True, "publicHostname should not be empty") self.assertEquals(data['responseId'], 1) self.assertEquals(data['timestamp'] > 1353678475465L, True, "timestamp should not be empty") @@ -50,7 +50,8 @@ class TestRegistration(TestCase): self.assertEquals(data['expectedState'], State.INIT, "expectedState should not be empty") self.assertEquals(data['allocatedPorts'], {}, "allocatedPorts should be empty") self.assertEquals(data['logFolders'], {}, "allocated log should be empty") - self.assertEquals(len(data), 9) + self.assertEquals(data['tags'], "tag", "tags should be tag") + self.assertEquals(len(data), 10) self.assertEquals(os.path.join(tmpdir, "app/definition"), config.getResolvedPath("app_pkg_dir")) self.assertEquals(os.path.join(tmpdir, "app/install"), config.getResolvedPath("app_install_dir")) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd62d359/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index fc7d935..fc97fd2 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -145,6 +145,7 @@ public class AgentProviderService extends AbstractProviderService implements private static final int DEFAULT_HEARTBEAT_MONITOR_INTERVAL = 60 * 1000; private final Object syncLock = new Object(); + private final ComponentTagProvider tags = new ComponentTagProvider(); private int heartbeatMonitorInterval = 0; private AgentClientProvider clientProvider; private AtomicInteger taskId = new AtomicInteger(0); @@ -493,7 +494,7 @@ public class AgentProviderService extends AbstractProviderService implements public RegistrationResponse handleRegistration(Register registration) { log.info("Handling registration: " + registration); RegistrationResponse response = new RegistrationResponse(); - String label = registration.getHostname(); + String label = registration.getLabel(); State agentState = registration.getActualState(); if (getComponentStatuses().containsKey(label)) { response.setResponseStatus(RegistrationStatus.OK); @@ -503,6 +504,13 @@ public class AgentProviderService extends AbstractProviderService implements String roleName = getRoleName(label); String containerId = getContainerId(label); + + if (SliderUtils.isSet(registration.getTags())) { + tags.recordAssignedTag(roleName, containerId, registration.getTags()); + } else { + response.setTags(tags.getTag(roleName, containerId)); + } + String hostFqdn = registration.getPublicHostname(); Map<String, String> ports = registration.getAllocatedPorts(); if (ports != null && !ports.isEmpty()) { @@ -725,6 +733,8 @@ public class AgentProviderService extends AbstractProviderService implements @Override public void notifyContainerCompleted(ContainerId containerId) { + // containers get allocated and free'ed without being assigned to any + // component - so many of the data structures may not be initialized if (containerId != null) { String containerIdStr = containerId.toString(); if (getComponentInstanceData().containsKey(containerIdStr)) { @@ -737,14 +747,19 @@ public class AgentProviderService extends AbstractProviderService implements this.allocatedPorts.remove(containerIdStr); } + String componentName = null; synchronized (this.componentStatuses) { for (String label : getComponentStatuses().keySet()) { if (label.startsWith(containerIdStr)) { + componentName = getRoleName(label); + log.info("Removing component status for label {}", label); getComponentStatuses().remove(label); } } } + tags.releaseTag(componentName, containerIdStr); + synchronized (this.containerExportsMap) { Set<String> containerExportSets = containerExportsMap.get(containerIdStr); if (containerExportSets != null) { @@ -760,6 +775,7 @@ public class AgentProviderService extends AbstractProviderService implements } exports.removeAll(exportToRemove); } + log.info("Removing container exports for {}", containerIdStr); containerExportsMap.remove(containerIdStr); } } @@ -953,17 +969,17 @@ public class AgentProviderService extends AbstractProviderService implements * @param folders * @param containerId * @param hostFqdn - * @param roleName + * @param componentName */ protected void publishFolderPaths( - Map<String, String> folders, String containerId, String roleName, String hostFqdn) { + Map<String, String> folders, String containerId, String componentName, String hostFqdn) { Date now = new Date(); for (Map.Entry<String, String> entry : folders.entrySet()) { ExportEntry exportEntry = new ExportEntry(); exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn, entry.getValue())); exportEntry.setContainerId(containerId); exportEntry.setLevel(COMPONENT_TAG); - exportEntry.setTag(roleName); + exportEntry.setTag(componentName); exportEntry.setUpdatedTime(now.toString()); if (entry.getKey().equals("AGENT_LOG_ROOT")) { synchronized (logFolderExports) { @@ -1018,7 +1034,7 @@ public class AgentProviderService extends AbstractProviderService implements * @param componentStatus */ protected void publishConfigAndExportGroups( - HeartBeat heartBeat, ComponentInstanceState componentStatus, String roleName) { + HeartBeat heartBeat, ComponentInstanceState componentStatus, String componentName) { List<ComponentStatus> statuses = heartBeat.getComponentStatus(); if (statuses != null && !statuses.isEmpty()) { log.info("Processing {} status reports.", statuses.size()); @@ -1028,7 +1044,7 @@ public class AgentProviderService extends AbstractProviderService implements if (status.getConfigs() != null) { Application application = getMetainfo().getApplication(); - if (canAnyMasterPublishConfig() == false || canPublishConfig(roleName)) { + if (canAnyMasterPublishConfig() == false || canPublishConfig(componentName)) { // If no Master can explicitly publish then publish if its a master // Otherwise, wait till the master that can publish is ready @@ -1056,7 +1072,7 @@ public class AgentProviderService extends AbstractProviderService implements boolean hasExportGroups = appExportGroups != null && !appExportGroups.isEmpty(); Set<String> appExports = new HashSet(); - String appExportsStr = getApplicationComponent(roleName).getAppExports(); + String appExportsStr = getApplicationComponent(componentName).getAppExports(); if (SliderUtils.isSet(appExportsStr)) { for (String appExport : appExportsStr.split(",")) { if (appExport.trim().length() > 0) { @@ -1165,14 +1181,14 @@ public class AgentProviderService extends AbstractProviderService implements protected void processAndPublishComponentSpecificData(Map<String, String> ports, String containerId, String hostFqdn, - String roleName) { + String componentName) { String portVarFormat = "${site.%s}"; String hostNamePattern = "${THIS_HOST}"; Map<String, String> toPublish = new HashMap<String, String>(); Application application = getMetainfo().getApplication(); for (Component component : application.getComponents()) { - if (component.getName().equals(roleName)) { + if (component.getName().equals(componentName)) { if (component.getComponentExports().size() > 0) { for (ComponentExport export : component.getComponentExports()) { @@ -1216,20 +1232,20 @@ public class AgentProviderService extends AbstractProviderService implements protected void processAndPublishComponentSpecificExports(Map<String, String> ports, String containerId, String hostFqdn, - String roleName) { + String compName) { String portVarFormat = "${site.%s}"; - String hostNamePattern = "${" + roleName + "_HOST}"; + String hostNamePattern = "${" + compName + "_HOST}"; List<ExportGroup> appExportGroups = getMetainfo().getApplication().getExportGroups(); - Component component = getMetainfo().getApplicationComponent(roleName); + Component component = getMetainfo().getApplicationComponent(compName); if (component != null && SliderUtils.isSet(component.getCompExports()) && appExportGroups != null && appExportGroups.size() > 0) { Set<String> compExports = new HashSet(); String compExportsStr = component.getCompExports(); - for (String appExport : compExportsStr.split(",")) { - if (appExport.trim().length() > 0) { - compExports.add(appExport.trim()); + for (String compExport : compExportsStr.split(",")) { + if (compExport.trim().length() > 0) { + compExports.add(compExport.trim()); } } @@ -1244,7 +1260,7 @@ public class AgentProviderService extends AbstractProviderService implements for (Export export : exports) { if (canBeExported(exportGroupName, export.getName(), compExports)) { log.info("Attempting to publish {} of group {} for component type {}", - export.getName(), exportGroupName, roleName); + export.getName(), exportGroupName, compName); String templateToExport = export.getValue(); for (String portName : ports.keySet()) { boolean publishData = false; @@ -1263,6 +1279,7 @@ public class AgentProviderService extends AbstractProviderService implements entryToAdd.setValue(templateToExport); entryToAdd.setUpdatedTime(now.toString()); entryToAdd.setContainerId(containerId); + entryToAdd.setTag(tags.getTag(compName, containerId)); List<ExportEntry> existingList = map.putIfAbsent(export.getName(), new CopyOnWriteArrayList(Arrays.asList(entryToAdd))); @@ -1423,7 +1440,7 @@ public class AgentProviderService extends AbstractProviderService implements /** * Add install command to the heartbeat response * - * @param roleName + * @param componentName * @param containerId * @param response * @param scriptPath @@ -1431,7 +1448,7 @@ public class AgentProviderService extends AbstractProviderService implements * @throws SliderException */ @VisibleForTesting - protected void addInstallCommand(String roleName, + protected void addInstallCommand(String componentName, String containerId, HeartBeatResponse response, String scriptPath, @@ -1446,15 +1463,15 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setClusterName(clusterName); cmd.setRoleCommand(Command.INSTALL.toString()); cmd.setServiceName(clusterName); - cmd.setComponentName(roleName); - cmd.setRole(roleName); + cmd.setComponentName(componentName); + cmd.setRole(componentName); Map<String, String> hostLevelParams = new TreeMap<String, String>(); hostLevelParams.put(JAVA_HOME, appConf.getGlobalOptions().getMandatoryOption(JAVA_HOME)); hostLevelParams.put(PACKAGE_LIST, getPackageList()); hostLevelParams.put(CONTAINER_ID, containerId); cmd.setHostLevelParams(hostLevelParams); - Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId); + Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId, componentName); cmd.setConfigurations(configurations); cmd.setCommandParams(setCommandParameters(scriptPath, timeout, false)); @@ -1506,7 +1523,7 @@ public class AgentProviderService extends AbstractProviderService implements } @VisibleForTesting - protected void addStatusCommand(String roleName, + protected void addStatusCommand(String componentName, String containerId, HeartBeatResponse response, String scriptPath, @@ -1519,7 +1536,7 @@ public class AgentProviderService extends AbstractProviderService implements String clusterName = getClusterName(); cmd.setCommandType(AgentCommandType.STATUS_COMMAND); - cmd.setComponentName(roleName); + cmd.setComponentName(componentName); cmd.setServiceName(clusterName); cmd.setClusterName(clusterName); cmd.setRoleCommand(StatusCommand.STATUS_COMMAND); @@ -1531,7 +1548,7 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setCommandParams(setCommandParameters(scriptPath, timeout, false)); - Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId); + Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId, componentName); cmd.setConfigurations(configurations); @@ -1539,7 +1556,7 @@ public class AgentProviderService extends AbstractProviderService implements } @VisibleForTesting - protected void addGetConfigCommand(String roleName, String containerId, HeartBeatResponse response) + protected void addGetConfigCommand(String componentName, String containerId, HeartBeatResponse response) throws SliderException { assert getAmState().isApplicationLive(); @@ -1547,7 +1564,7 @@ public class AgentProviderService extends AbstractProviderService implements String clusterName = getClusterName(); cmd.setCommandType(AgentCommandType.STATUS_COMMAND); - cmd.setComponentName(roleName); + cmd.setComponentName(componentName); cmd.setServiceName(clusterName); cmd.setClusterName(clusterName); cmd.setRoleCommand(StatusCommand.GET_CONFIG_COMMAND); @@ -1561,7 +1578,7 @@ public class AgentProviderService extends AbstractProviderService implements } @VisibleForTesting - protected void addStartCommand(String roleName, String containerId, HeartBeatResponse response, + protected void addStartCommand(String componentName, String containerId, HeartBeatResponse response, String scriptPath, long timeout, boolean isMarkedAutoRestart) throws SliderException { @@ -1577,8 +1594,8 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setClusterName(clusterName); cmd.setRoleCommand(Command.START.toString()); cmd.setServiceName(clusterName); - cmd.setComponentName(roleName); - cmd.setRole(roleName); + cmd.setComponentName(componentName); + cmd.setRole(componentName); Map<String, String> hostLevelParams = new TreeMap<String, String>(); hostLevelParams.put(JAVA_HOME, appConf.getGlobalOptions().getMandatoryOption(JAVA_HOME)); hostLevelParams.put(CONTAINER_ID, containerId); @@ -1590,7 +1607,7 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setCommandParams(setCommandParameters(scriptPath, timeout, true)); - Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId); + Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId, componentName); cmd.setConfigurations(configurations); response.addExecutionCommand(cmd); @@ -1606,8 +1623,8 @@ public class AgentProviderService extends AbstractProviderService implements cmdStop.setClusterName(clusterName); cmdStop.setRoleCommand(Command.STOP.toString()); cmdStop.setServiceName(clusterName); - cmdStop.setComponentName(roleName); - cmdStop.setRole(roleName); + cmdStop.setComponentName(componentName); + cmdStop.setRole(componentName); Map<String, String> hostLevelParamsStop = new TreeMap<String, String>(); hostLevelParamsStop.put(JAVA_HOME, appConf.getGlobalOptions() .getMandatoryOption(JAVA_HOME)); @@ -1622,7 +1639,7 @@ public class AgentProviderService extends AbstractProviderService implements cmdStop.setCommandParams(setCommandParameters(scriptPath, timeout, true)); Map<String, Map<String, String>> configurationsStop = buildCommandConfigurations( - appConf, containerId); + appConf, containerId, componentName); cmdStop.setConfigurations(configurationsStop); response.addExecutionCommand(cmdStop); } @@ -1648,7 +1665,7 @@ public class AgentProviderService extends AbstractProviderService implements } private Map<String, Map<String, String>> buildCommandConfigurations( - ConfTreeOperations appConf, String containerId) + ConfTreeOperations appConf, String containerId, String componentName) throws SliderException { Map<String, Map<String, String>> configurations = @@ -1661,7 +1678,7 @@ public class AgentProviderService extends AbstractProviderService implements for (String configType : configs) { addNamedConfiguration(configType, appConf.getGlobalOptions().options, - configurations, tokens, containerId); + configurations, tokens, containerId, componentName); } //do a final replacement of re-used configs @@ -1742,10 +1759,11 @@ public class AgentProviderService extends AbstractProviderService implements private void addNamedConfiguration(String configName, Map<String, String> sourceConfig, Map<String, Map<String, String>> configurations, - Map<String, String> tokens, String containerId) { + Map<String, String> tokens, String containerId, + String roleName) { Map<String, String> config = new HashMap<String, String>(); if (configName.equals(GLOBAL_CONFIG_TAG)) { - addDefaultGlobalConfig(config, containerId); + addDefaultGlobalConfig(config, containerId, roleName); } // add role hosts to tokens addRoleRelatedTokens(tokens); @@ -1805,12 +1823,13 @@ public class AgentProviderService extends AbstractProviderService implements return hosts; } - private void addDefaultGlobalConfig(Map<String, String> config, String containerId) { + private void addDefaultGlobalConfig(Map<String, String> config, String containerId, String roleName) { config.put("app_log_dir", "${AGENT_LOG_ROOT}"); config.put("app_pid_dir", "${AGENT_WORK_ROOT}/app/run"); config.put("app_install_dir", "${AGENT_WORK_ROOT}/app/install"); config.put("app_input_conf_dir", "${AGENT_WORK_ROOT}/" + SliderKeys.PROPAGATED_CONF_DIR_NAME); config.put("app_container_id", containerId); + config.put("app_container_tag", tags.getTag(roleName, containerId)); } private void buildRoleHostDetails(Map<String, String> details) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd62d359/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentTagProvider.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentTagProvider.java b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentTagProvider.java new file mode 100644 index 0000000..68f63fa --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentTagProvider.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.providers.agent; + +import org.apache.slider.common.tools.SliderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; + +/** A simple tag provider that attempts to associate tags from 1-N to all container of a component */ +public class ComponentTagProvider { + private static final Logger log = LoggerFactory.getLogger(ComponentTagProvider.class); + private static String FREE = "free"; + private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> allTags; + + public ComponentTagProvider() { + allTags = new ConcurrentHashMap<String, ConcurrentHashMap<String, String>>(); + } + + /** + * Record an assigned tag to a container + * + * @param component + * @param containerId + * @param tag + */ + public void recordAssignedTag(String component, String containerId, String tag) { + if (SliderUtils.isSet(component) && SliderUtils.isSet(containerId)) { + Integer key = null; + try { + key = Integer.valueOf(tag); + } catch (NumberFormatException nfe) { + //ignore + } + if (key != null && key > 0) { + ConcurrentHashMap<String, String> compTags = getComponentSpecificTags(component); + synchronized (compTags) { + for (int index = 1; index <= key.intValue(); index++) { + String tempKey = new Integer(index).toString(); + if (!compTags.containsKey(tempKey)) { + compTags.put(tempKey, FREE); + } + } + compTags.put(key.toString(), containerId); + } + } + } + } + + /** + * Get a tag for container + * + * @param component + * @param containerId + * + * @return + */ + public String getTag(String component, String containerId) { + if (SliderUtils.isSet(component) && SliderUtils.isSet(containerId)) { + ConcurrentHashMap<String, String> compTags = getComponentSpecificTags(component); + synchronized (compTags) { + for (String key : compTags.keySet()) { + if (compTags.get(key).equals(containerId)) { + return key; + } + } + for (String key : compTags.keySet()) { + if (compTags.get(key).equals(FREE)) { + compTags.put(key, containerId); + return key; + } + } + String newKey = new Integer(compTags.size() + 1).toString(); + compTags.put(newKey, containerId); + return newKey; + } + } + return ""; + } + + /** + * Release a tag associated with a container + * + * @param component + * @param containerId + */ + public void releaseTag(String component, String containerId) { + if (SliderUtils.isSet(component) && SliderUtils.isSet(containerId)) { + ConcurrentHashMap<String, String> compTags = allTags.get(component); + if (compTags != null) { + synchronized (compTags) { + for (String key : compTags.keySet()) { + if (compTags.get(key).equals(containerId)) { + compTags.put(key, FREE); + } + } + } + } + } + } + + private ConcurrentHashMap<String, String> getComponentSpecificTags(String component) { + if (!allTags.containsKey(component)) { + synchronized (allTags) { + if (!allTags.containsKey(component)) { + allTags.put(component, new ConcurrentHashMap<String, String>()); + } + } + } + return allTags.get(component); + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd62d359/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java index 70d639f..842e5a9 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java @@ -29,10 +29,11 @@ import java.util.Map; public class Register { private int responseId = -1; private long timestamp; - private String hostname; + private String label; private int currentPingPort; private HostInfo hardwareProfile; private String publicHostname; + private String tags; private AgentEnv agentEnv; private String agentVersion; private State actualState; @@ -58,12 +59,20 @@ public class Register { this.timestamp = timestamp; } - public String getHostname() { - return hostname; + public String getLabel() { + return label; } - public void setHostname(String hostname) { - this.hostname = hostname; + public void setLabel(String label) { + this.label = label; + } + + public String getTags() { + return tags; + } + + public void setTags(String tags) { + this.tags = tags; } public HostInfo getHardwareProfile() { @@ -150,7 +159,8 @@ public class Register { public String toString() { String ret = "responseId=" + responseId + "\n" + "timestamp=" + timestamp + "\n" + - "hostname=" + hostname + "\n" + + "label=" + label + "\n" + + "hostname=" + publicHostname + "\n" + "expectedState=" + expectedState + "\n" + "actualState=" + actualState + "\n"; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd62d359/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/RegistrationResponse.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/RegistrationResponse.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/RegistrationResponse.java index 734119d..fd852e2 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/RegistrationResponse.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/RegistrationResponse.java @@ -37,10 +37,14 @@ public class RegistrationResponse { @JsonProperty("exitstatus") private int exitstatus; - /** log - message, which will be printed to agents log */ + /** log - message, which will be printed to agents log */ @JsonProperty("log") private String log; + /** tags - tags associated with the container */ + @JsonProperty("tags") + private String tags; + //Response id to start with, usually zero. @JsonProperty("responseId") private long responseId; @@ -75,6 +79,14 @@ public class RegistrationResponse { this.responseId = responseId; } + public String getTags() { + return tags; + } + + public void setTags(String tags) { + this.tags = tags; + } + public void setExitstatus(int exitstatus) { this.exitstatus = exitstatus; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd62d359/slider-core/src/test/groovy/org/apache/slider/providers/agent/AgentTestUtils.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/providers/agent/AgentTestUtils.groovy b/slider-core/src/test/groovy/org/apache/slider/providers/agent/AgentTestUtils.groovy index 989919f..54c2fe7 100644 --- a/slider-core/src/test/groovy/org/apache/slider/providers/agent/AgentTestUtils.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/providers/agent/AgentTestUtils.groovy @@ -38,7 +38,7 @@ class AgentTestUtils { Register register = new Register(); register.setResponseId(-1); register.setTimestamp(System.currentTimeMillis()); - register.setHostname("dummyHost"); + register.setLabel("dummyHost"); return register; } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd62d359/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java index a20e7a9..154dc45 100644 --- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java +++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java @@ -371,7 +371,7 @@ public class TestAgentProviderService { Register reg = new Register(); reg.setResponseId(0); - reg.setHostname("mockcontainer_1___HBASE_MASTER"); + reg.setLabel("mockcontainer_1___HBASE_MASTER"); Map<String,String> ports = new HashMap<String, String>(); ports.put("a","100"); reg.setAllocatedPorts(ports); @@ -504,7 +504,7 @@ public class TestAgentProviderService { AgentProviderService mockAps = prepareProviderServiceForAgentStateTests(); Register reg = new Register(); reg.setResponseId(0); - reg.setHostname("mockcontainer_1___HBASE_MASTER"); + reg.setLabel("mockcontainer_1___HBASE_MASTER"); Map<String,String> ports = new HashMap<String,String>(); ports.put("a","100"); reg.setAllocatedPorts(ports); @@ -570,7 +570,7 @@ public class TestAgentProviderService { Register reg = new Register(); reg.setResponseId(0); - reg.setHostname("mockcontainer_1___HBASE_MASTER"); + reg.setLabel("mockcontainer_1___HBASE_MASTER"); Map<String,String> ports = new HashMap<String,String>(); ports.put("a","100"); reg.setAllocatedPorts(ports); @@ -742,7 +742,7 @@ public class TestAgentProviderService { Assert.assertEquals(1, expEntries.size()); Assert.assertEquals("mockcontainer_1", expEntries.get(0).getContainerId()); Assert.assertEquals("component", expEntries.get(0).getLevel()); - Assert.assertEquals(null, expEntries.get(0).getTag()); + Assert.assertEquals("1", expEntries.get(0).getTag()); Assert.assertEquals("http://host1:10010", expEntries.get(0).getValue()); Assert.assertNotNull(expEntries.get(0).getUpdatedTime()); Assert.assertNull(expEntries.get(0).getValidUntil()); @@ -1145,14 +1145,14 @@ public class TestAgentProviderService { // Both containers register Register reg = new Register(); reg.setResponseId(0); - reg.setHostname("mockcontainer_1___HBASE_MASTER"); + reg.setLabel("mockcontainer_1___HBASE_MASTER"); RegistrationResponse resp = mockAps.handleRegistration(reg); Assert.assertEquals(0, resp.getResponseId()); Assert.assertEquals(RegistrationStatus.OK, resp.getResponseStatus()); reg = new Register(); reg.setResponseId(0); - reg.setHostname("mockcontainer_1___HBASE_REGIONSERVER"); + reg.setLabel("mockcontainer_1___HBASE_REGIONSERVER"); resp = mockAps.handleRegistration(reg); Assert.assertEquals(0, resp.getResponseId()); Assert.assertEquals(RegistrationStatus.OK, resp.getResponseStatus()); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd62d359/slider-core/src/test/java/org/apache/slider/providers/agent/TestComponentTagProvider.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestComponentTagProvider.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestComponentTagProvider.java new file mode 100644 index 0000000..7b38ee3 --- /dev/null +++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestComponentTagProvider.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers.agent; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TestComponentTagProvider { + protected static final Logger log = + LoggerFactory.getLogger(TestComponentTagProvider.class); + + @Test + public void testTagProvider() throws Exception { + ComponentTagProvider ctp = new ComponentTagProvider(); + Assert.assertEquals("", ctp.getTag(null, null)); + Assert.assertEquals("", ctp.getTag(null, "cid")); + Assert.assertEquals("", ctp.getTag("comp1", null)); + + Assert.assertEquals("1", ctp.getTag("comp1", "cid1")); + Assert.assertEquals("2", ctp.getTag("comp1", "cid2")); + Assert.assertEquals("3", ctp.getTag("comp1", "cid3")); + ctp.releaseTag("comp1", "cid2"); + Assert.assertEquals("2", ctp.getTag("comp1", "cid22")); + + ctp.releaseTag("comp1", "cid4"); + ctp.recordAssignedTag("comp1", "cid5", "5"); + Assert.assertEquals("4", ctp.getTag("comp1", "cid4")); + Assert.assertEquals("4", ctp.getTag("comp1", "cid4")); + Assert.assertEquals("6", ctp.getTag("comp1", "cid6")); + + ctp.recordAssignedTag("comp1", "cid55", "5"); + Assert.assertEquals("5", ctp.getTag("comp1", "cid55")); + + ctp.recordAssignedTag("comp2", "cidb3", "3"); + Assert.assertEquals("1", ctp.getTag("comp2", "cidb1")); + Assert.assertEquals("2", ctp.getTag("comp2", "cidb2")); + Assert.assertEquals("4", ctp.getTag("comp2", "cidb4")); + + ctp.recordAssignedTag("comp2", "cidb5", "six"); + ctp.recordAssignedTag("comp2", "cidb5", "-55"); + ctp.recordAssignedTag("comp2", "cidb5", "tags"); + ctp.recordAssignedTag("comp2", "cidb5", null); + ctp.recordAssignedTag("comp2", "cidb5", ""); + ctp.recordAssignedTag("comp2", "cidb5", "5"); + Assert.assertEquals("6", ctp.getTag("comp2", "cidb6")); + + ctp.recordAssignedTag("comp2", null, "5"); + ctp.recordAssignedTag(null, null, "5"); + ctp.releaseTag("comp1", null); + ctp.releaseTag(null, "cid4"); + ctp.releaseTag(null, null); + } + + @Test + public void testTagProviderWithThread() throws Exception { + ComponentTagProvider ctp = new ComponentTagProvider(); + Thread thread = new Thread(new Taggged(ctp)); + Thread thread2 = new Thread(new Taggged(ctp)); + Thread thread3 = new Thread(new Taggged(ctp)); + thread.start(); + thread2.start(); + thread3.start(); + ctp.getTag("comp1", "cid50"); + thread.join(); + thread2.join(); + thread3.join(); + Assert.assertEquals("101", ctp.getTag("comp1", "cid101")); + } + + public class Taggged implements Runnable { + private final ComponentTagProvider ctp; + + public Taggged(ComponentTagProvider ctp) { + this.ctp = ctp; + } + + public void run() { + for (int i = 0; i < 100; i++) { + String containerId = "cid" + (i + 1); + this.ctp.getTag("comp1", containerId); + } + for (int i = 0; i < 100; i++) { + String containerId = "cid" + (i + 1); + this.ctp.getTag("comp1", containerId); + } + for (int i = 0; i < 100; i += 2) { + String containerId = "cid" + (i + 1); + this.ctp.releaseTag("comp1", containerId); + } + for (int i = 0; i < 100; i += 2) { + String containerId = "cid" + (i + 1); + this.ctp.getTag("comp1", containerId); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd62d359/slider-core/src/test/java/org/apache/slider/server/appmaster/web/rest/agent/TestAMAgentWebServices.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/server/appmaster/web/rest/agent/TestAMAgentWebServices.java b/slider-core/src/test/java/org/apache/slider/server/appmaster/web/rest/agent/TestAMAgentWebServices.java index 452b03b..fa8ffc2 100644 --- a/slider-core/src/test/java/org/apache/slider/server/appmaster/web/rest/agent/TestAMAgentWebServices.java +++ b/slider-core/src/test/java/org/apache/slider/server/appmaster/web/rest/agent/TestAMAgentWebServices.java @@ -205,7 +205,7 @@ public class TestAMAgentWebServices { Register register = new Register(); register.setResponseId(-1); register.setTimestamp(System.currentTimeMillis()); - register.setHostname("dummyHost"); + register.setLabel("dummyHost"); return register; }
