http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java deleted file mode 100644 index 7594d51..0000000 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ /dev/null @@ -1,3320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.providers.agent; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.registry.client.binding.RegistryPathUtils; -import org.apache.hadoop.registry.client.types.Endpoint; -import org.apache.hadoop.registry.client.types.ProtocolTypes; -import org.apache.hadoop.registry.client.types.ServiceRecord; -import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; -import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.slider.api.ClusterDescription; -import org.apache.slider.api.ClusterNode; -import org.apache.slider.api.InternalKeys; -import org.apache.slider.api.OptionKeys; -import org.apache.slider.api.ResourceKeys; -import org.apache.slider.api.StatusKeys; -import org.apache.slider.common.SliderExitCodes; -import org.apache.slider.common.SliderKeys; -import org.apache.slider.common.SliderXmlConfKeys; -import org.apache.slider.common.tools.SliderFileSystem; -import org.apache.slider.common.tools.SliderUtils; -import org.apache.slider.core.conf.AggregateConf; -import org.apache.slider.core.conf.ConfTreeOperations; -import org.apache.slider.core.conf.MapOperations; -import org.apache.slider.core.exceptions.BadCommandArgumentsException; -import org.apache.slider.core.exceptions.BadConfigException; -import org.apache.slider.core.exceptions.NoSuchNodeException; -import org.apache.slider.core.exceptions.SliderException; -import org.apache.slider.core.launch.CommandLineBuilder; -import org.apache.slider.core.launch.ContainerLauncher; -import org.apache.slider.core.registry.docstore.ConfigFormat; -import org.apache.slider.core.registry.docstore.ConfigUtils; -import org.apache.slider.core.registry.docstore.ExportEntry; -import org.apache.slider.core.registry.docstore.PublishedConfiguration; -import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter; -import org.apache.slider.core.registry.docstore.PublishedExports; -import org.apache.slider.core.registry.info.CustomRegistryConstants; -import org.apache.slider.providers.AbstractProviderService; -import org.apache.slider.providers.MonitorDetail; -import org.apache.slider.providers.ProviderCore; -import org.apache.slider.providers.ProviderRole; -import org.apache.slider.providers.ProviderUtils; -import org.apache.slider.providers.agent.application.metadata.AbstractComponent; -import org.apache.slider.providers.agent.application.metadata.Application; -import org.apache.slider.providers.agent.application.metadata.CommandOrder; -import org.apache.slider.providers.agent.application.metadata.CommandScript; -import org.apache.slider.providers.agent.application.metadata.Component; -import org.apache.slider.providers.agent.application.metadata.ComponentCommand; -import org.apache.slider.providers.agent.application.metadata.ComponentExport; -import org.apache.slider.providers.agent.application.metadata.ComponentsInAddonPackage; -import org.apache.slider.providers.agent.application.metadata.ConfigFile; -import org.apache.slider.providers.agent.application.metadata.DefaultConfig; -import org.apache.slider.providers.agent.application.metadata.DockerContainer; -import org.apache.slider.providers.agent.application.metadata.Export; -import org.apache.slider.providers.agent.application.metadata.ExportGroup; -import org.apache.slider.providers.agent.application.metadata.Metainfo; -import org.apache.slider.providers.agent.application.metadata.OSPackage; -import org.apache.slider.providers.agent.application.metadata.OSSpecific; -import org.apache.slider.providers.agent.application.metadata.Package; -import org.apache.slider.providers.agent.application.metadata.PropertyInfo; -import org.apache.slider.server.appmaster.actions.ProviderReportedContainerLoss; -import org.apache.slider.server.appmaster.actions.RegisterComponentInstance; -import org.apache.slider.server.appmaster.state.ContainerPriority; -import org.apache.slider.server.appmaster.state.RoleInstance; -import org.apache.slider.server.appmaster.state.StateAccessForProviders; -import org.apache.slider.server.appmaster.web.rest.agent.AgentCommandType; -import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations; -import org.apache.slider.server.appmaster.web.rest.agent.CommandReport; -import org.apache.slider.server.appmaster.web.rest.agent.ComponentStatus; -import org.apache.slider.server.appmaster.web.rest.agent.ExecutionCommand; -import org.apache.slider.server.appmaster.web.rest.agent.HeartBeat; -import org.apache.slider.server.appmaster.web.rest.agent.HeartBeatResponse; -import org.apache.slider.server.appmaster.web.rest.agent.Register; -import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse; -import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus; -import org.apache.slider.server.appmaster.web.rest.agent.StatusCommand; -import org.apache.slider.server.services.security.CertificateManager; -import org.apache.slider.server.services.security.SecurityStore; -import org.apache.slider.server.services.security.StoresGenerator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Scanner; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Pattern; - -import static org.apache.slider.api.RoleKeys.ROLE_PREFIX; -import static org.apache.slider.server.appmaster.web.rest.RestPaths.SLIDER_PATH_AGENTS; - -/** - * This class implements the server-side logic for application deployment through Slider application package - */ -public class AgentProviderService extends AbstractProviderService implements - ProviderCore, - AgentKeys, - SliderKeys, AgentRestOperations { - - - protected static final Logger log = - LoggerFactory.getLogger(AgentProviderService.class); - private static final ProviderUtils providerUtils = new ProviderUtils(log); - private static final String LABEL_MAKER = "___"; - private static final String CONTAINER_ID = "container_id"; - private static final String GLOBAL_CONFIG_TAG = "global"; - private static final String LOG_FOLDERS_TAG = "LogFolders"; - private static final String HOST_FOLDER_FORMAT = "%s:%s"; - private static final String CONTAINER_LOGS_TAG = "container_log_dirs"; - private static final String CONTAINER_PWDS_TAG = "container_work_dirs"; - private static final String COMPONENT_TAG = "component"; - private static final String APPLICATION_TAG = "application"; - private static final String COMPONENT_DATA_TAG = "ComponentInstanceData"; - private static final String SHARED_PORT_TAG = "SHARED"; - private static final String PER_CONTAINER_TAG = "{PER_CONTAINER}"; - private static final int MAX_LOG_ENTRIES = 40; - private static final int DEFAULT_HEARTBEAT_MONITOR_INTERVAL = 60 * 1000; - - private final Object syncLock = new Object(); - private final ComponentTagProvider tags = new ComponentTagProvider(); - private int heartbeatMonitorInterval = 0; - private AgentClientProvider clientProvider; - private AtomicInteger taskId = new AtomicInteger(0); - private volatile Map<String, MetainfoHolder> metaInfoMap = new HashMap<>(); - private SliderFileSystem fileSystem = null; - private Map<String, DefaultConfig> defaultConfigs = null; - private ComponentCommandOrder commandOrder = new ComponentCommandOrder(); - private HeartbeatMonitor monitor; - private Boolean canAnyMasterPublish = null; - private AgentLaunchParameter agentLaunchParameter = null; - private String clusterName = null; - private boolean isInUpgradeMode; - private Set<String> upgradeContainers = new HashSet<String>(); - private boolean appStopInitiated; - - private final Map<String, ComponentInstanceState> componentStatuses = - new ConcurrentHashMap<String, ComponentInstanceState>(); - private final Map<String, Map<String, String>> componentInstanceData = - new ConcurrentHashMap<String, Map<String, String>>(); - private final Map<String, Map<String, List<ExportEntry>>> exportGroups = - new ConcurrentHashMap<String, Map<String, List<ExportEntry>>>(); - private final Map<String, Map<String, String>> allocatedPorts = - new ConcurrentHashMap<String, Map<String, String>>(); - private final Map<String, Metainfo> packageMetainfo = - new ConcurrentHashMap<String, Metainfo>(); - - private final Map<String, ExportEntry> logFolderExports = - Collections.synchronizedMap(new LinkedHashMap<String, ExportEntry>(MAX_LOG_ENTRIES, 0.75f, false) { - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > MAX_LOG_ENTRIES; - } - }); - private final Map<String, ExportEntry> workFolderExports = - Collections.synchronizedMap(new LinkedHashMap<String, ExportEntry>(MAX_LOG_ENTRIES, 0.75f, false) { - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > MAX_LOG_ENTRIES; - } - }); - private final Map<String, Set<String>> containerExportsMap = - new HashMap<String, Set<String>>(); - - private static class MetainfoHolder { - Metainfo metaInfo; - private Map<String, DefaultConfig> defaultConfigs = null; - - public MetainfoHolder(Metainfo metaInfo, - Map<String, DefaultConfig> defaultConfigs) { - this.metaInfo = metaInfo; - this.defaultConfigs = defaultConfigs; - } - } - - /** - * Create an instance of AgentProviderService - */ - public AgentProviderService() { - super("AgentProviderService"); - setAgentRestOperations(this); - setHeartbeatMonitorInterval(DEFAULT_HEARTBEAT_MONITOR_INTERVAL); - } - - @Override - public String getHumanName() { - return "Slider Agent"; - } - - @Override - public List<ProviderRole> getRoles() { - return AgentRoles.getRoles(); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - clientProvider = new AgentClientProvider(conf); - } - - @Override - public Configuration loadProviderConfigurationInformation(File confDir) throws - BadCommandArgumentsException, - IOException { - return new Configuration(false); - } - - @Override - public void validateInstanceDefinition(AggregateConf instanceDefinition) - throws - SliderException { - clientProvider.validateInstanceDefinition(instanceDefinition, null); - - ConfTreeOperations resources = - instanceDefinition.getResourceOperations(); - - Set<String> names = resources.getComponentNames(); - names.remove(SliderKeys.COMPONENT_AM); - for (String name : names) { - Component componentDef = getApplicationComponent(name); - if (componentDef == null) { - // component member is validated elsewhere, so we don't need to throw - // an exception here - continue; - } - - MapOperations componentConfig = resources.getMandatoryComponent(name); - int count = - componentConfig.getMandatoryOptionInt(ResourceKeys.COMPONENT_INSTANCES); - int definedMinCount = componentDef.getMinInstanceCountInt(); - int definedMaxCount = componentDef.getMaxInstanceCountInt(); - if (count < definedMinCount || count > definedMaxCount) { - throw new BadConfigException("Component %s, %s value %d out of range. " - + "Expected minimum is %d and maximum is %d", - name, - ResourceKeys.COMPONENT_INSTANCES, - count, - definedMinCount, - definedMaxCount); - } - } - } - - // Reads the metainfo.xml in the application package and loads it - private void buildMetainfo(AggregateConf instanceDefinition, - SliderFileSystem fileSystem, - String roleGroup) - throws IOException, SliderException { - String mapKey = instanceDefinition.getAppConfOperations() - .getComponentOpt(roleGroup, ROLE_PREFIX, DEFAULT_METAINFO_MAP_KEY); - String appDef = SliderUtils.getApplicationDefinitionPath( - instanceDefinition.getAppConfOperations(), roleGroup); - MapOperations component = null; - if (roleGroup != null) { - component = instanceDefinition.getAppConfOperations().getComponent(roleGroup); - } - - MetainfoHolder metaInfoHolder = metaInfoMap.get(mapKey); - if (metaInfoHolder == null) { - synchronized (syncLock) { - if (this.fileSystem == null) { - this.fileSystem = fileSystem; - } - metaInfoHolder = metaInfoMap.get(mapKey); - if (metaInfoHolder == null) { - readAndSetHeartbeatMonitoringInterval(instanceDefinition); - initializeAgentDebugCommands(instanceDefinition); - - Metainfo metaInfo = getApplicationMetainfo(fileSystem, appDef, false); - log.info("Master package metainfo: {}", metaInfo.toString()); - if (metaInfo == null || metaInfo.getApplication() == null) { - log.error("metainfo.xml is unavailable or malformed at {}.", appDef); - throw new SliderException( - "metainfo.xml is required in app package."); - } - List<CommandOrder> commandOrders = metaInfo.getApplication() - .getCommandOrders(); - if (!DEFAULT_METAINFO_MAP_KEY.equals(mapKey)) { - for (Component comp : metaInfo.getApplication().getComponents()) { - comp.setName(mapKey + comp.getName()); - log.info("Modifying external metainfo component name to {}", - comp.getName()); - } - for (CommandOrder co : commandOrders) { - log.info("Adding prefix {} to command order {}", - mapKey, co); - co.setCommand(mapKey + co.getCommand()); - co.setRequires(mapKey + co.getRequires()); - } - } - log.debug("Merging command orders {} for {}", commandOrders, - roleGroup); - commandOrder.mergeCommandOrders(commandOrders, - instanceDefinition.getResourceOperations()); - Map<String, DefaultConfig> defaultConfigs = - initializeDefaultConfigs(fileSystem, appDef, metaInfo); - metaInfoMap.put(mapKey, new MetainfoHolder(metaInfo, defaultConfigs)); - monitor = new HeartbeatMonitor(this, getHeartbeatMonitorInterval()); - monitor.start(); - - // build a map from component to metainfo - String addonAppDefString = instanceDefinition.getAppConfOperations() - .getGlobalOptions().getOption(AgentKeys.ADDONS, null); - if (component != null) { - addonAppDefString = component.getOption(AgentKeys.ADDONS, addonAppDefString); - } - log.debug("All addon appdefs: {}", addonAppDefString); - if (addonAppDefString != null) { - Scanner scanner = new Scanner(addonAppDefString).useDelimiter(","); - while (scanner.hasNext()) { - String addonAppDef = scanner.next(); - String addonAppDefPath = instanceDefinition - .getAppConfOperations().getGlobalOptions().get(addonAppDef); - if (component != null) { - addonAppDefPath = component.getOption(addonAppDef, addonAppDefPath); - } - log.debug("Addon package {} is stored at: {}", addonAppDef - + addonAppDefPath); - Metainfo addonMetaInfo = getApplicationMetainfo(fileSystem, - addonAppDefPath, true); - addonMetaInfo.validate(); - packageMetainfo.put(addonMetaInfo.getApplicationPackage() - .getName(), addonMetaInfo); - } - log.info("Metainfo map for master and addon: {}", - packageMetainfo.toString()); - } - } - } - } - } - - @Override - public void initializeApplicationConfiguration( - AggregateConf instanceDefinition, SliderFileSystem fileSystem, - String roleGroup) - throws IOException, SliderException { - buildMetainfo(instanceDefinition, fileSystem, roleGroup); - } - - @Override - public void buildContainerLaunchContext(ContainerLauncher launcher, - AggregateConf instanceDefinition, - Container container, - ProviderRole providerRole, - SliderFileSystem fileSystem, - Path generatedConfPath, - MapOperations resourceComponent, - MapOperations appComponent, - Path containerTmpDirPath) throws - IOException, - SliderException { - - String roleName = providerRole.name; - String roleGroup = providerRole.group; - String appDef = SliderUtils.getApplicationDefinitionPath(instanceDefinition - .getAppConfOperations(), roleGroup); - - initializeApplicationConfiguration(instanceDefinition, fileSystem, roleGroup); - - log.info("Build launch context for Agent"); - log.debug(instanceDefinition.toString()); - - //if we are launching docker based app on yarn, then we need to pass docker image - if (isYarnDockerContainer(roleGroup)) { - launcher.setYarnDockerMode(true); - launcher.setDockerImage(getConfigFromMetaInfo(roleGroup, "image")); - launcher.setRunPrivilegedContainer(getConfigFromMetaInfo(roleGroup, "runPriviledgedContainer")); - launcher - .setYarnContainerMountPoints(getConfigFromMetaInfoWithAppConfigOverriding( - roleGroup, "yarn.container.mount.points")); - } - - // Set the environment - launcher.putEnv(SliderUtils.buildEnvMap(appComponent, - getStandardTokenMap(getAmState().getAppConfSnapshot(), roleName, roleGroup))); - - String workDir = ApplicationConstants.Environment.PWD.$(); - launcher.setEnv("AGENT_WORK_ROOT", workDir); - log.info("AGENT_WORK_ROOT set to {}", workDir); - String logDir = ApplicationConstants.LOG_DIR_EXPANSION_VAR; - launcher.setEnv("AGENT_LOG_ROOT", logDir); - log.info("AGENT_LOG_ROOT set to {}", logDir); - if (System.getenv(HADOOP_USER_NAME) != null) { - launcher.setEnv(HADOOP_USER_NAME, System.getenv(HADOOP_USER_NAME)); - } - // for 2-Way SSL - launcher.setEnv(SLIDER_PASSPHRASE, instanceDefinition.getPassphrase()); - //add english env - launcher.setEnv("LANG", "en_US.UTF-8"); - launcher.setEnv("LC_ALL", "en_US.UTF-8"); - launcher.setEnv("LANGUAGE", "en_US.UTF-8"); - - //local resources - - // TODO: Should agent need to support App Home - String scriptPath = new File(AgentKeys.AGENT_MAIN_SCRIPT_ROOT, AgentKeys.AGENT_MAIN_SCRIPT).getPath(); - String appHome = instanceDefinition.getAppConfOperations(). - getGlobalOptions().get(AgentKeys.PACKAGE_PATH); - if (SliderUtils.isSet(appHome)) { - scriptPath = new File(appHome, AgentKeys.AGENT_MAIN_SCRIPT).getPath(); - } - - // set PYTHONPATH - List<String> pythonPaths = new ArrayList<String>(); - pythonPaths.add(AgentKeys.AGENT_MAIN_SCRIPT_ROOT); - pythonPaths.add(AgentKeys.AGENT_JINJA2_ROOT); - String pythonPath = StringUtils.join(File.pathSeparator, pythonPaths); - launcher.setEnv(PYTHONPATH, pythonPath); - log.info("PYTHONPATH set to {}", pythonPath); - - Path agentImagePath = null; - String agentImage = instanceDefinition.getInternalOperations(). - get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH); - if (SliderUtils.isUnset(agentImage)) { - agentImagePath = - new Path(new Path(new Path(instanceDefinition.getInternalOperations().get(InternalKeys.INTERNAL_TMP_DIR), - container.getId().getApplicationAttemptId().getApplicationId().toString()), - AgentKeys.PROVIDER_AGENT), - SliderKeys.AGENT_TAR); - } else { - agentImagePath = new Path(agentImage); - } - - if (fileSystem.getFileSystem().exists(agentImagePath)) { - LocalResource agentImageRes = fileSystem.createAmResource(agentImagePath, LocalResourceType.ARCHIVE); - launcher.addLocalResource(AgentKeys.AGENT_INSTALL_DIR, agentImageRes); - } else { - String msg = - String.format("Required agent image slider-agent.tar.gz is unavailable at %s", agentImagePath.toString()); - MapOperations compOps = appComponent; - boolean relaxVerificationForTest = compOps != null ? Boolean.valueOf(compOps. - getOptionBool(AgentKeys.TEST_RELAX_VERIFICATION, false)) : false; - log.error(msg); - - if (!relaxVerificationForTest) { - throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, msg); - } - } - - log.info("Using {} for agent.", scriptPath); - LocalResource appDefRes = fileSystem.createAmResource( - fileSystem.getFileSystem().resolvePath(new Path(appDef)), - LocalResourceType.ARCHIVE); - launcher.addLocalResource(AgentKeys.APP_DEFINITION_DIR, appDefRes); - - for (Package pkg : getMetaInfo(roleGroup).getApplication().getPackages()) { - Path pkgPath = fileSystem.buildResourcePath(pkg.getName()); - if (!fileSystem.isFile(pkgPath)) { - pkgPath = fileSystem.buildResourcePath(getClusterName(), - pkg.getName()); - } - if (!fileSystem.isFile(pkgPath)) { - throw new IOException("Package doesn't exist as a resource: " + - pkg.getName()); - } - log.info("Adding resource {}", pkg.getName()); - LocalResourceType type = LocalResourceType.FILE; - if ("archive".equals(pkg.getType())) { - type = LocalResourceType.ARCHIVE; - } - LocalResource packageResource = fileSystem.createAmResource( - pkgPath, type); - launcher.addLocalResource(AgentKeys.APP_PACKAGES_DIR, packageResource); - } - - String agentConf = instanceDefinition.getAppConfOperations(). - getGlobalOptions().getOption(AgentKeys.AGENT_CONF, ""); - if (SliderUtils.isSet(agentConf)) { - LocalResource agentConfRes = fileSystem.createAmResource(fileSystem - .getFileSystem().resolvePath(new Path(agentConf)), - LocalResourceType.FILE); - launcher.addLocalResource(AgentKeys.AGENT_CONFIG_FILE, agentConfRes); - } - - String agentVer = instanceDefinition.getAppConfOperations(). - getGlobalOptions().getOption(AgentKeys.AGENT_VERSION, null); - if (agentVer != null) { - LocalResource agentVerRes = fileSystem.createAmResource( - fileSystem.getFileSystem().resolvePath(new Path(agentVer)), - LocalResourceType.FILE); - launcher.addLocalResource(AgentKeys.AGENT_VERSION_FILE, agentVerRes); - } - - if (SliderUtils.isHadoopClusterSecure(getConfig())) { - localizeServiceKeytabs(launcher, instanceDefinition, fileSystem); - } - - MapOperations amComponent = instanceDefinition. - getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM); - boolean twoWayEnabled = amComponent != null ? Boolean.valueOf(amComponent. - getOptionBool(AgentKeys.KEY_AGENT_TWO_WAY_SSL_ENABLED, false)) : false; - if (twoWayEnabled) { - localizeContainerSSLResources(launcher, container, fileSystem); - } - - MapOperations compOps = appComponent; - if (areStoresRequested(compOps)) { - localizeContainerSecurityStores(launcher, container, roleName, fileSystem, - instanceDefinition, compOps); - } - - //add the configuration resources - launcher.addLocalResources(fileSystem.submitDirectory( - generatedConfPath, - SliderKeys.PROPAGATED_CONF_DIR_NAME)); - - if (appComponent.getOptionBool(AgentKeys.AM_CONFIG_GENERATION, false)) { - // build and localize configuration files - Map<String, Map<String, String>> configurations = - buildCommandConfigurations(instanceDefinition.getAppConfOperations(), - container.getId().toString(), roleName, roleGroup); - localizeConfigFiles(launcher, roleName, roleGroup, getMetaInfo(roleGroup), - configurations, launcher.getEnv(), fileSystem); - } - - String label = getContainerLabel(container, roleName, roleGroup); - CommandLineBuilder operation = new CommandLineBuilder(); - - String pythonExec = instanceDefinition.getAppConfOperations() - .getGlobalOptions().getOption(SliderXmlConfKeys.PYTHON_EXECUTABLE_PATH, - AgentKeys.PYTHON_EXE); - - operation.add(pythonExec); - - operation.add(scriptPath); - operation.add(ARG_LABEL, label); - operation.add(ARG_ZOOKEEPER_QUORUM); - operation.add(getClusterOptionPropertyValue(OptionKeys.ZOOKEEPER_QUORUM)); - operation.add(ARG_ZOOKEEPER_REGISTRY_PATH); - operation.add(getZkRegistryPath()); - - String debugCmd = agentLaunchParameter.getNextLaunchParameter(roleGroup); - if (SliderUtils.isSet(debugCmd)) { - operation.add(ARG_DEBUG); - operation.add(debugCmd); - } - - operation.add("> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" - + AgentKeys.AGENT_OUT_FILE + " 2>&1"); - - launcher.addCommand(operation.build()); - - // localize addon package - String addonAppDefString = instanceDefinition.getAppConfOperations() - .getGlobalOptions().getOption(AgentKeys.ADDONS, null); - log.debug("All addon appdefs: {}", addonAppDefString); - if (addonAppDefString != null) { - Scanner scanner = new Scanner(addonAppDefString).useDelimiter(","); - while (scanner.hasNext()) { - String addonAppDef = scanner.next(); - String addonAppDefPath = instanceDefinition - .getAppConfOperations().getGlobalOptions().get(addonAppDef); - log.debug("Addon package {} is stored at: {}", addonAppDef, addonAppDefPath); - LocalResource addonPkgRes = fileSystem.createAmResource( - fileSystem.getFileSystem().resolvePath(new Path(addonAppDefPath)), - LocalResourceType.ARCHIVE); - launcher.addLocalResource(AgentKeys.ADDON_DEFINITION_DIR + "/" + addonAppDef, addonPkgRes); - } - log.debug("Metainfo map for master and addon: {}", - packageMetainfo.toString()); - } - - // Additional files to localize in addition to the application def - String appResourcesString = instanceDefinition.getAppConfOperations() - .getGlobalOptions().getOption(AgentKeys.APP_RESOURCES, null); - log.info("Configuration value for extra resources to localize: {}", appResourcesString); - if (null != appResourcesString) { - try (Scanner scanner = new Scanner(appResourcesString).useDelimiter(",")) { - while (scanner.hasNext()) { - String resource = scanner.next(); - Path resourcePath = new Path(resource); - LocalResource extraResource = fileSystem.createAmResource( - fileSystem.getFileSystem().resolvePath(resourcePath), - LocalResourceType.FILE); - String destination = AgentKeys.APP_RESOURCES_DIR + "/" + resourcePath.getName(); - log.info("Localizing {} to {}", resourcePath, destination); - // TODO Can we try harder to avoid collisions? - launcher.addLocalResource(destination, extraResource); - } - } - } - - // initialize addon pkg states for all componentInstanceStatus - Map<String, State> pkgStatuses = new TreeMap<>(); - for (Metainfo appPkg : packageMetainfo.values()) { - // check each component of that addon to see if they apply to this - // component 'role' - for (ComponentsInAddonPackage comp : appPkg.getApplicationPackage() - .getComponents()) { - log.debug("Current component: {} component in metainfo: {}", roleName, - comp.getName()); - if (comp.getName().equals(roleGroup) - || comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) { - pkgStatuses.put(appPkg.getApplicationPackage().getName(), State.INIT); - } - } - } - log.debug("For component: {} pkg status map: {}", roleName, - pkgStatuses.toString()); - - // initialize the component instance state - getComponentStatuses().put(label, - new ComponentInstanceState( - roleGroup, - container.getId(), - getClusterInfoPropertyValue(OptionKeys.APPLICATION_NAME), - pkgStatuses)); - } - - private void localizeContainerSecurityStores(ContainerLauncher launcher, - Container container, - String role, - SliderFileSystem fileSystem, - AggregateConf instanceDefinition, - MapOperations compOps) - throws SliderException, IOException { - // substitute CLUSTER_NAME into credentials - Map<String,List<String>> newcred = new HashMap<>(); - for (Entry<String,List<String>> entry : instanceDefinition.getAppConf().credentials.entrySet()) { - List<String> resultList = new ArrayList<>(); - for (String v : entry.getValue()) { - resultList.add(v.replaceAll(Pattern.quote("${CLUSTER_NAME}"), - clusterName).replaceAll(Pattern.quote("${CLUSTER}"), - clusterName)); - } - newcred.put(entry.getKey().replaceAll(Pattern.quote("${CLUSTER_NAME}"), - clusterName).replaceAll(Pattern.quote("${CLUSTER}"), - clusterName), - resultList); - } - instanceDefinition.getAppConf().credentials = newcred; - - // generate and localize security stores - SecurityStore[] stores = generateSecurityStores(container, role, - instanceDefinition, compOps); - for (SecurityStore store : stores) { - LocalResource keystoreResource = fileSystem.createAmResource( - uploadSecurityResource(store.getFile(), fileSystem), LocalResourceType.FILE); - launcher.addLocalResource(String.format("secstores/%s-%s.p12", - store.getType(), role), - keystoreResource); - } - } - - private SecurityStore[] generateSecurityStores(Container container, - String role, - AggregateConf instanceDefinition, - MapOperations compOps) - throws SliderException, IOException { - return StoresGenerator.generateSecurityStores(container.getNodeId().getHost(), - container.getId().toString(), role, - instanceDefinition, compOps); - } - - private boolean areStoresRequested(MapOperations compOps) { - return compOps != null ? compOps. - getOptionBool(SliderKeys.COMP_STORES_REQUIRED_KEY, false) : false; - } - - private void localizeContainerSSLResources(ContainerLauncher launcher, - Container container, - SliderFileSystem fileSystem) - throws SliderException { - try { - // localize server cert - Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName()); - LocalResource certResource = fileSystem.createAmResource( - new Path(certsDir, SliderKeys.CRT_FILE_NAME), - LocalResourceType.FILE); - launcher.addLocalResource(AgentKeys.CERT_FILE_LOCALIZATION_PATH, - certResource); - - // generate and localize agent cert - CertificateManager certMgr = new CertificateManager(); - String hostname = container.getNodeId().getHost(); - String containerId = container.getId().toString(); - certMgr.generateContainerCertificate(hostname, containerId); - LocalResource agentCertResource = fileSystem.createAmResource( - uploadSecurityResource( - CertificateManager.getAgentCertficateFilePath(containerId), - fileSystem), LocalResourceType.FILE); - // still using hostname as file name on the agent side, but the files - // do end up under the specific container's file space - launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname + - ".crt", agentCertResource); - LocalResource agentKeyResource = fileSystem.createAmResource( - uploadSecurityResource( - CertificateManager.getAgentKeyFilePath(containerId), fileSystem), - LocalResourceType.FILE); - launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname + - ".key", agentKeyResource); - - } catch (Exception e) { - throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, e, - "Unable to localize certificates. Two-way SSL cannot be enabled"); - } - } - - private Path uploadSecurityResource(File resource, SliderFileSystem fileSystem) - throws IOException { - Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName()); - return uploadResource(resource, fileSystem, certsDir); - } - - private Path uploadResource(File resource, SliderFileSystem fileSystem, - String roleName) throws IOException { - Path dir; - if (roleName == null) { - dir = fileSystem.buildClusterResourcePath(getClusterName()); - } else { - dir = fileSystem.buildClusterResourcePath(getClusterName(), roleName); - } - return uploadResource(resource, fileSystem, dir); - } - - private static synchronized Path uploadResource(File resource, - SliderFileSystem fileSystem, Path parentDir) throws IOException { - if (!fileSystem.getFileSystem().exists(parentDir)) { - fileSystem.getFileSystem().mkdirs(parentDir, - new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); - } - Path destPath = new Path(parentDir, resource.getName()); - if (!fileSystem.getFileSystem().exists(destPath)) { - FSDataOutputStream os = null; - try { - os = fileSystem.getFileSystem().create(destPath); - byte[] contents = FileUtils.readFileToByteArray(resource); - os.write(contents, 0, contents.length); - os.flush(); - } finally { - IOUtils.closeStream(os); - } - log.info("Uploaded {} to localization path {}", resource, destPath); - } else { - log.info("Resource {} already existed at localization path {}", resource, - destPath); - } - - while (!fileSystem.getFileSystem().exists(destPath)) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // ignore - } - } - - fileSystem.getFileSystem().setPermission(destPath, - new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE)); - - return destPath; - } - - private void localizeServiceKeytabs(ContainerLauncher launcher, - AggregateConf instanceDefinition, - SliderFileSystem fileSystem) - throws IOException { - String keytabPathOnHost = instanceDefinition.getAppConfOperations() - .getComponent(SliderKeys.COMPONENT_AM).get( - SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH); - if (SliderUtils.isUnset(keytabPathOnHost)) { - String amKeytabName = instanceDefinition.getAppConfOperations() - .getComponent(SliderKeys.COMPONENT_AM).get( - SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME); - String keytabDir = instanceDefinition.getAppConfOperations() - .getComponent(SliderKeys.COMPONENT_AM).get( - SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR); - // we need to localize the keytab files in the directory - Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null, - getClusterName()); - boolean serviceKeytabsDeployed = false; - if (fileSystem.getFileSystem().exists(keytabDirPath)) { - FileStatus[] keytabs = fileSystem.getFileSystem().listStatus(keytabDirPath); - LocalResource keytabRes; - for (FileStatus keytab : keytabs) { - if (!amKeytabName.equals(keytab.getPath().getName()) - && keytab.getPath().getName().endsWith(".keytab")) { - serviceKeytabsDeployed = true; - log.info("Localizing keytab {}", keytab.getPath().getName()); - keytabRes = fileSystem.createAmResource(keytab.getPath(), - LocalResourceType.FILE); - launcher.addLocalResource(SliderKeys.KEYTAB_DIR + "/" + - keytab.getPath().getName(), - keytabRes); - } - } - } - if (!serviceKeytabsDeployed) { - log.warn("No service keytabs for the application have been localized. " - + "If the application requires keytabs for secure operation, " - + "please ensure that the required keytabs have been uploaded " - + "to the folder {}", keytabDirPath); - } - } - } - - private void createConfigFile(SliderFileSystem fileSystem, File file, - ConfigFile configFile, Map<String, String> config) - throws IOException { - ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType()); - log.info("Writing {} file {}", configFormat, file); - - ConfigUtils.prepConfigForTemplateOutputter(configFormat, config, - fileSystem, getClusterName(), file.getName()); - PublishedConfiguration publishedConfiguration = - new PublishedConfiguration(configFile.getDictionaryName(), - config.entrySet()); - PublishedConfigurationOutputter configurationOutputter = - PublishedConfigurationOutputter.createOutputter(configFormat, - publishedConfiguration); - configurationOutputter.save(file); - } - - @VisibleForTesting - protected void localizeConfigFiles(ContainerLauncher launcher, - String roleName, String roleGroup, - Metainfo metainfo, - Map<String, Map<String, String>> configs, - MapOperations env, - SliderFileSystem fileSystem) - throws IOException { - for (ConfigFile configFile : metainfo.getComponentConfigFiles(roleGroup)) { - Map<String, String> config = ConfigUtils.replacePropsInConfig( - configs.get(configFile.getDictionaryName()), env.options); - String fileName = ConfigUtils.replaceProps(config, - configFile.getFileName()); - File localFile = new File(SliderKeys.RESOURCE_DIR); - if (!localFile.exists()) { - localFile.mkdir(); - } - localFile = new File(localFile, new File(fileName).getName()); - - String folder = null; - if ("true".equals(config.get(PER_COMPONENT))) { - folder = roleName; - } else if ("true".equals(config.get(PER_GROUP))) { - folder = roleGroup; - } - - log.info("Localizing {} configs to config file {} (destination {}) " + - "based on {} configs", config.size(), localFile, fileName, - configFile.getDictionaryName()); - createConfigFile(fileSystem, localFile, configFile, config); - Path destPath = uploadResource(localFile, fileSystem, folder); - LocalResource configResource = fileSystem.createAmResource(destPath, - LocalResourceType.FILE); - - File destFile = new File(fileName); - if (destFile.isAbsolute()) { - launcher.addLocalResource( - SliderKeys.RESOURCE_DIR + "/" + destFile.getName(), - configResource, fileName); - } else { - launcher.addLocalResource(AgentKeys.APP_CONF_DIR + "/" + fileName, - configResource); - } - } - } - - /** - * build the zookeeper registry path. - * - * @return the path the service registered at - * @throws NullPointerException if the service has not yet registered - */ - private String getZkRegistryPath() { - Preconditions.checkNotNull(yarnRegistry, "Yarn registry not bound"); - String path = yarnRegistry.getAbsoluteSelfRegistrationPath(); - Preconditions.checkNotNull(path, "Service record path not defined"); - return path; - } - - @Override - public void rebuildContainerDetails(List<Container> liveContainers, - String applicationId, Map<Integer, ProviderRole> providerRoleMap) { - for (Container container : liveContainers) { - // get the role name and label - ProviderRole role = providerRoleMap.get(ContainerPriority - .extractRole(container)); - if (role != null) { - String roleName = role.name; - String roleGroup = role.group; - String label = getContainerLabel(container, roleName, roleGroup); - log.info("Rebuilding in-memory: container {} in role {} in cluster {}", - container.getId(), roleName, applicationId); - getComponentStatuses().put(label, - new ComponentInstanceState(roleGroup, container.getId(), - applicationId)); - } else { - log.warn("Role not found for container {} in cluster {}", - container.getId(), applicationId); - } - } - } - - @Override - public boolean isSupportedRole(String role) { - return true; - } - - /** - * Handle registration calls from the agents - * - * @param registration registration entry - * - * @return response - */ - @Override - public RegistrationResponse handleRegistration(Register registration) { - log.info("Handling registration: {}", registration); - RegistrationResponse response = new RegistrationResponse(); - String label = registration.getLabel(); - String pkg = registration.getPkg(); - State agentState = registration.getActualState(); - String appVersion = registration.getAppVersion(); - - log.info("label: {} pkg: {}", label, pkg); - - if (getComponentStatuses().containsKey(label)) { - response.setResponseStatus(RegistrationStatus.OK); - ComponentInstanceState componentStatus = getComponentStatuses().get(label); - componentStatus.heartbeat(System.currentTimeMillis()); - updateComponentStatusWithAgentState(componentStatus, agentState); - - String roleName = getRoleName(label); - String roleGroup = getRoleGroup(label); - String containerId = getContainerId(label); - - if (SliderUtils.isSet(registration.getTags())) { - tags.recordAssignedTag(roleName, containerId, registration.getTags()); - } else { - response.setTags(tags.getTag(roleName, containerId)); - } - - String hostFqdn = registration.getPublicHostname(); - Map<String, String> ports = registration.getAllocatedPorts(); - if (ports != null && !ports.isEmpty()) { - processAllocatedPorts(hostFqdn, roleName, roleGroup, containerId, ports); - } - - Map<String, String> folders = registration.getLogFolders(); - if (folders != null && !folders.isEmpty()) { - publishFolderPaths(folders, containerId, roleName, hostFqdn); - } - - // Set app version if empty. It gets unset during upgrade - why? - checkAndSetContainerAppVersion(containerId, appVersion); - } else { - response.setResponseStatus(RegistrationStatus.FAILED); - response.setLog("Label not recognized."); - log.warn("Received registration request from unknown label {}", label); - } - log.info("Registration response: {}", response); - return response; - } - - // Checks if app version is empty. Sets it to the version as reported by the - // container during registration phase. - private void checkAndSetContainerAppVersion(String containerId, - String appVersion) { - StateAccessForProviders amState = getAmState(); - try { - RoleInstance role = amState.getOwnedContainer(containerId); - if (role != null) { - String currentAppVersion = role.appVersion; - log.debug("Container = {}, app version current = {} new = {}", - containerId, currentAppVersion, appVersion); - if (currentAppVersion == null - || currentAppVersion.equals(APP_VERSION_UNKNOWN)) { - amState.getOwnedContainer(containerId).appVersion = appVersion; - } - } - } catch (NoSuchNodeException e) { - // ignore - there is nothing to do if we don't find a container - log.warn("Owned container {} not found - {}", containerId, e); - } - } - - /** - * Handle heartbeat response from agents - * - * @param heartBeat incoming heartbeat from Agent - * - * @return response to send back - */ - @Override - public HeartBeatResponse handleHeartBeat(HeartBeat heartBeat) { - log.debug("Handling heartbeat: {}", heartBeat); - HeartBeatResponse response = new HeartBeatResponse(); - long id = heartBeat.getResponseId(); - response.setResponseId(id + 1L); - - String label = heartBeat.getHostname(); - String pkg = heartBeat.getPackage(); - - log.debug("package received: " + pkg); - - String roleName = getRoleName(label); - String roleGroup = getRoleGroup(label); - String containerId = getContainerId(label); - boolean doUpgrade = false; - if (isInUpgradeMode && upgradeContainers.contains(containerId)) { - doUpgrade = true; - } - - StateAccessForProviders accessor = getAmState(); - CommandScript cmdScript = getScriptPathForMasterPackage(roleGroup); - List<ComponentCommand> commands = getApplicationComponent(roleGroup).getCommands(); - - if (!isDockerContainer(roleGroup) && !isYarnDockerContainer(roleGroup) - && (cmdScript == null || cmdScript.getScript() == null) - && commands.size() == 0) { - log.error( - "role.script is unavailable for {}. Commands will not be sent.", - roleName); - return response; - } - - String scriptPath = null; - long timeout = 600L; - if (cmdScript != null) { - scriptPath = cmdScript.getScript(); - timeout = cmdScript.getTimeout(); - } - - if (timeout == 0L) { - timeout = 600L; - } - - if (!getComponentStatuses().containsKey(label)) { - // container is completed but still heart-beating, send terminate signal - log.info( - "Sending terminate signal to completed container (still heartbeating): {}", - label); - response.setTerminateAgent(true); - return response; - } - - List<ComponentStatus> statuses = heartBeat.getComponentStatus(); - if (statuses != null && !statuses.isEmpty()) { - log.info("status from agent: " + statuses.toString()); - try { - for(ComponentStatus status : statuses){ - RoleInstance role = null; - if(status.getIp() != null && !status.getIp().isEmpty()){ - role = amState.getOwnedContainer(containerId); - role.ip = status.getIp(); - } - if(status.getHostname() != null && !status.getHostname().isEmpty()){ - role = amState.getOwnedContainer(containerId); - role.hostname = status.getHostname(); - } - if (role != null) { - // create an updated service record (including hostname and ip) and publish... - ServiceRecord record = new ServiceRecord(); - record.set(YarnRegistryAttributes.YARN_ID, containerId); - record.description = roleName; - record.set(YarnRegistryAttributes.YARN_PERSISTENCE, - PersistencePolicies.CONTAINER); - // TODO: switch record attributes to use constants from YarnRegistryAttributes - // when it's been updated. - if (role.ip != null) { - record.set("yarn:ip", role.ip); - } - if (role.hostname != null) { - record.set("yarn:hostname", role.hostname); - } - yarnRegistry.putComponent( - RegistryPathUtils.encodeYarnID(containerId), record); - - } - } - - - } catch (NoSuchNodeException e) { - // ignore - there is nothing to do if we don't find a container - log.warn("Owned container {} not found - {}", containerId, e); - } catch (IOException e) { - log.warn("Error updating container {} service record in registry", - containerId, e); - } - } - - Boolean isMaster = isMaster(roleGroup); - ComponentInstanceState componentStatus = getComponentStatuses().get(label); - componentStatus.heartbeat(System.currentTimeMillis()); - if (doUpgrade) { - switch (componentStatus.getState()) { - case STARTED: - componentStatus.setTargetState(State.UPGRADED); - break; - case UPGRADED: - componentStatus.setTargetState(State.STOPPED); - break; - case STOPPED: - componentStatus.setTargetState(State.TERMINATING); - break; - default: - break; - } - log.info("Current state = {} target state {}", - componentStatus.getState(), componentStatus.getTargetState()); - } - - if (appStopInitiated && !componentStatus.isStopInitiated()) { - log.info("Stop initiated for label {}", label); - componentStatus.setTargetState(State.STOPPED); - componentStatus.setStopInitiated(true); - } - - publishConfigAndExportGroups(heartBeat, componentStatus, roleGroup); - CommandResult result = null; - List<CommandReport> reports = heartBeat.getReports(); - if (SliderUtils.isNotEmpty(reports)) { - CommandReport report = reports.get(0); - Map<String, String> ports = report.getAllocatedPorts(); - if (SliderUtils.isNotEmpty(ports)) { - processAllocatedPorts(heartBeat.getFqdn(), roleName, roleGroup, containerId, ports); - } - result = CommandResult.getCommandResult(report.getStatus()); - Command command = Command.getCommand(report.getRoleCommand()); - componentStatus.applyCommandResult(result, command, pkg); - log.info("Component operation. Status: {}; new container state: {};" - + " new component state: {}", result, - componentStatus.getContainerState(), componentStatus.getState()); - - if (command == Command.INSTALL && SliderUtils.isNotEmpty(report.getFolders())) { - publishFolderPaths(report.getFolders(), containerId, roleName, heartBeat.getFqdn()); - } - } - - int waitForCount = accessor.getInstanceDefinitionSnapshot(). - getAppConfOperations().getComponentOptInt(roleGroup, AgentKeys.WAIT_HEARTBEAT, 0); - - if (id < waitForCount) { - log.info("Waiting until heartbeat count {}. Current val: {}", waitForCount, id); - getComponentStatuses().put(label, componentStatus); - return response; - } - - Command command = componentStatus.getNextCommand(doUpgrade); - try { - if (Command.NOP != command) { - log.debug("For comp {} pkg {} issuing {}", roleName, - componentStatus.getNextPkgToInstall(), command.toString()); - if (command == Command.INSTALL) { - log.info("Installing {} on {}.", roleName, containerId); - if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){ - addInstallDockerCommand(roleName, roleGroup, containerId, - response, null, timeout); - } else if (scriptPath != null) { - addInstallCommand(roleName, roleGroup, containerId, response, - scriptPath, null, timeout, null); - } else { - // commands - ComponentCommand installCmd = null; - for (ComponentCommand compCmd : commands) { - if (compCmd.getName().equals("INSTALL")) { - installCmd = compCmd; - } - } - addInstallCommand(roleName, roleGroup, containerId, response, null, - installCmd, timeout, null); - } - componentStatus.commandIssued(command); - } else if (command == Command.INSTALL_ADDON) { - String nextPkgToInstall = componentStatus.getNextPkgToInstall(); - // retrieve scriptPath or command of that package for the component - for (ComponentsInAddonPackage comp : packageMetainfo - .get(nextPkgToInstall).getApplicationPackage().getComponents()) { - // given nextPkgToInstall and roleName is determined, the if below - // should only execute once per heartbeat - log.debug("Addon component: {} pkg: {} script: {}", comp.getName(), - nextPkgToInstall, comp.getCommandScript().getScript()); - if (comp.getName().equals(roleGroup) - || comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) { - scriptPath = comp.getCommandScript().getScript(); - if (scriptPath != null) { - addInstallCommand(roleName, roleGroup, containerId, response, - scriptPath, null, timeout, nextPkgToInstall); - } else { - ComponentCommand installCmd = null; - for (ComponentCommand compCmd : comp.getCommands()) { - if (compCmd.getName().equals("INSTALL")) { - installCmd = compCmd; - } - } - addInstallCommand(roleName, roleGroup, containerId, response, - null, installCmd, timeout, nextPkgToInstall); - } - } - } - componentStatus.commandIssued(command); - } else if (command == Command.START) { - // check against dependencies - boolean canExecute = commandOrder.canExecute(roleGroup, command, getComponentStatuses().values()); - if (canExecute) { - log.info("Starting {} on {}.", roleName, containerId); - if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){ - addStartDockerCommand(roleName, roleGroup, containerId, - response, null, timeout, false); - } else if (scriptPath != null) { - addStartCommand(roleName, - roleGroup, - containerId, - response, - scriptPath, - null, - null, - timeout, - isMarkedAutoRestart(roleGroup)); - } else { - ComponentCommand startCmd = null; - for (ComponentCommand compCmd : commands) { - if (compCmd.getName().equals("START")) { - startCmd = compCmd; - } - } - ComponentCommand stopCmd = null; - for (ComponentCommand compCmd : commands) { - if (compCmd.getName().equals("STOP")) { - stopCmd = compCmd; - } - } - addStartCommand(roleName, roleGroup, containerId, response, null, - startCmd, stopCmd, timeout, false); - } - componentStatus.commandIssued(command); - } else { - log.info("Start of {} on {} delayed as dependencies have not started.", roleName, containerId); - } - } else if (command == Command.UPGRADE) { - addUpgradeCommand(roleName, roleGroup, containerId, response, - scriptPath, timeout); - componentStatus.commandIssued(command, true); - } else if (command == Command.STOP) { - log.info("Stop command being sent to container with id {}", - containerId); - addStopCommand(roleName, roleGroup, containerId, response, scriptPath, - timeout, doUpgrade); - componentStatus.commandIssued(command); - } else if (command == Command.TERMINATE) { - log.info("A formal terminate command is being sent to container {}" - + " in state {}", label, componentStatus.getState()); - response.setTerminateAgent(true); - } - } - - // if there is no outstanding command then retrieve config - if (isMaster && componentStatus.getState() == State.STARTED - && command == Command.NOP) { - if (!componentStatus.getConfigReported()) { - log.info("Requesting applied config for {} on {}.", roleName, containerId); - if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){ - addGetConfigDockerCommand(roleName, roleGroup, containerId, response); - } else { - addGetConfigCommand(roleName, roleGroup, containerId, response); - } - } - } - - // if restart is required then signal - response.setRestartEnabled(false); - if (componentStatus.getState() == State.STARTED - && command == Command.NOP && isMarkedAutoRestart(roleGroup)) { - response.setRestartEnabled(true); - } - - //If INSTALL_FAILED and no INSTALL is scheduled let the agent fail - if (componentStatus.getState() == State.INSTALL_FAILED - && command == Command.NOP) { - log.warn("Sending terminate signal to container that failed installation: {}", label); - response.setTerminateAgent(true); - } - - } catch (SliderException e) { - log.warn("Component instance failed operation.", e); - componentStatus.applyCommandResult(CommandResult.FAILED, command, null); - } - - log.debug("Heartbeat response: " + response); - return response; - } - - private boolean isDockerContainer(String roleGroup) { - String type = getApplicationComponent(roleGroup).getType(); - if (SliderUtils.isSet(type)) { - return type.toLowerCase().equals(SliderUtils.DOCKER) || type.toLowerCase().equals(SliderUtils.DOCKER_YARN); - } - return false; - } - - private boolean isYarnDockerContainer(String roleGroup) { - String type = getApplicationComponent(roleGroup).getType(); - if (SliderUtils.isSet(type)) { - return type.toLowerCase().equals(SliderUtils.DOCKER_YARN); - } - return false; - } - - protected void processAllocatedPorts(String fqdn, - String roleName, - String roleGroup, - String containerId, - Map<String, String> ports) { - RoleInstance instance; - try { - instance = getAmState().getOwnedContainer(containerId); - } catch (NoSuchNodeException e) { - log.warn("Failed to locate instance of container {}", containerId, e); - instance = null; - } - for (Map.Entry<String, String> port : ports.entrySet()) { - String portname = port.getKey(); - String portNo = port.getValue(); - log.info("Recording allocated port for {} as {}", portname, portNo); - - // add the allocated ports to the global list as well as per container list - // per container allocation will over-write each other in the global - this.getAllocatedPorts().put(portname, portNo); - this.getAllocatedPorts(containerId).put(portname, portNo); - if (instance != null) { - try { - // if the returned value is not a single port number then there are no - // meaningful way for Slider to use it during export - // No need to error out as it may not be the responsibility of the component - // to allocate port or the component may need an array of ports - instance.registerPortEndpoint(Integer.valueOf(portNo), portname); - } catch (NumberFormatException e) { - log.warn("Failed to parse {}", portNo, e); - } - } - } - - processAndPublishComponentSpecificData(ports, containerId, fqdn, roleGroup); - processAndPublishComponentSpecificExports(ports, containerId, fqdn, roleName, roleGroup); - - // and update registration entries - if (instance != null) { - queueAccess.put(new RegisterComponentInstance(instance.getId(), - roleName, roleGroup, 0, TimeUnit.MILLISECONDS)); - } - } - - private void updateComponentStatusWithAgentState( - ComponentInstanceState componentStatus, State agentState) { - if (agentState != null) { - componentStatus.setState(agentState); - } - } - - @Override - public Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterDesc) { - Map<String, MonitorDetail> details = super.buildMonitorDetails(clusterDesc); - buildRoleHostDetails(details); - return details; - } - - @Override - public void applyInitialRegistryDefinitions(URL amWebURI, - URL agentOpsURI, - URL agentStatusURI, - ServiceRecord serviceRecord) - throws IOException { - super.applyInitialRegistryDefinitions(amWebURI, - agentOpsURI, - agentStatusURI, - serviceRecord); - - try { - URL restURL = new URL(agentOpsURI, SLIDER_PATH_AGENTS); - URL agentStatusURL = new URL(agentStatusURI, SLIDER_PATH_AGENTS); - - serviceRecord.addInternalEndpoint( - new Endpoint(CustomRegistryConstants.AGENT_SECURE_REST_API, - ProtocolTypes.PROTOCOL_REST, - restURL.toURI())); - serviceRecord.addInternalEndpoint( - new Endpoint(CustomRegistryConstants.AGENT_ONEWAY_REST_API, - ProtocolTypes.PROTOCOL_REST, - agentStatusURL.toURI())); - } catch (URISyntaxException e) { - throw new IOException(e); - } - - // identify client component - Component client = null; - for (Component component : getMetaInfo(null).getApplication() - .getComponents()) { - if (component.getCategory().equals("CLIENT")) { - client = component; - break; - } - } - if (client == null) { - log.info("No client component specified, not publishing client configs"); - return; - } - - // register AM-generated client configs - ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); - MapOperations clientOperations = appConf.getOrAddComponent(client.getName()); - appConf.resolve(); - if (!clientOperations.getOptionBool(AgentKeys.AM_CONFIG_GENERATION, - false)) { - log.info("AM config generation is false, not publishing client configs"); - return; - } - - // build and localize configuration files - Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>(); - Map<String, String> tokens = null; - try { - tokens = getStandardTokenMap(appConf, client.getName(), client.getName()); - } catch (SliderException e) { - throw new IOException(e); - } - - for (ConfigFile configFile : getMetaInfo(null) - .getComponentConfigFiles(client.getName())) { - addNamedConfiguration(configFile.getDictionaryName(), - appConf.getGlobalOptions().options, configurations, tokens, null, - client.getName(), client.getName()); - if (appConf.getComponent(client.getName()) != null) { - addNamedConfiguration(configFile.getDictionaryName(), - appConf.getComponent(client.getName()).options, configurations, - tokens, null, client.getName(), client.getName()); - } - } - - //do a final replacement of re-used configs - dereferenceAllConfigs(configurations); - - for (ConfigFile configFile : getMetaInfo(null) - .getComponentConfigFiles(client.getName())) { - ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType()); - - Map<String, String> config = configurations.get(configFile.getDictionaryName()); - ConfigUtils.prepConfigForTemplateOutputter(configFormat, config, - fileSystem, getClusterName(), - new File(configFile.getFileName()).getName()); - PublishedConfiguration publishedConfiguration = - new PublishedConfiguration(configFile.getDictionaryName(), - config.entrySet()); - getAmState().getPublishedSliderConfigurations().put( - configFile.getDictionaryName(), publishedConfiguration); - log.info("Publishing AM configuration {}", configFile.getDictionaryName()); - } - } - - @Override - public void notifyContainerCompleted(ContainerId containerId) { - // containers get allocated and free'ed without being assigned to any - // component - so many of the data structures may not be initialized - if (containerId != null) { - String containerIdStr = containerId.toString(); - if (getComponentInstanceData().containsKey(containerIdStr)) { - getComponentInstanceData().remove(containerIdStr); - log.info("Removing container specific data for {}", containerIdStr); - publishComponentInstanceData(); - } - - if (this.allocatedPorts.containsKey(containerIdStr)) { - Map<String, String> portsByContainerId = getAllocatedPorts(containerIdStr); - this.allocatedPorts.remove(containerIdStr); - // free up the allocations from global as well - // if multiple containers allocate global ports then last one - // wins and similarly first one removes it - its not supported anyway - for(String portName : portsByContainerId.keySet()) { - getAllocatedPorts().remove(portName); - } - - } - - String componentName = null; - synchronized (this.componentStatuses) { - for (String label : getComponentStatuses().keySet()) { - if (label.startsWith(containerIdStr)) { - componentName = getRoleName(label); - log.info("Removing component status for label {}", label); - getComponentStatuses().remove(label); - } - } - } - - tags.releaseTag(componentName, containerIdStr); - - synchronized (this.containerExportsMap) { - Set<String> containerExportSets = containerExportsMap.get(containerIdStr); - if (containerExportSets != null) { - for (String containerExportStr : containerExportSets) { - String[] parts = containerExportStr.split(":"); - Map<String, List<ExportEntry>> exportGroup = getCurrentExports(parts[0]); - List<ExportEntry> exports = exportGroup.get(parts[1]); - List<ExportEntry> exportToRemove = new ArrayList<ExportEntry>(); - for (ExportEntry export : exports) { - if (containerIdStr.equals(export.getContainerId())) { - exportToRemove.add(export); - } - } - exports.removeAll(exportToRemove); - } - log.info("Removing container exports for {}", containerIdStr); - containerExportsMap.remove(containerIdStr); - } - } - } - } - - /** - * Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default. - * - * @param instanceDefinition - */ - private void readAndSetHeartbeatMonitoringInterval(AggregateConf instanceDefinition) { - String hbMonitorInterval = instanceDefinition.getAppConfOperations(). - getGlobalOptions().getOption(AgentKeys.HEARTBEAT_MONITOR_INTERVAL, - Integer.toString(DEFAULT_HEARTBEAT_MONITOR_INTERVAL)); - try { - setHeartbeatMonitorInterval(Integer.parseInt(hbMonitorInterval)); - } catch (NumberFormatException e) { - log.warn( - "Bad value {} for {}. Defaulting to ", - hbMonitorInterval, - HEARTBEAT_MONITOR_INTERVAL, - DEFAULT_HEARTBEAT_MONITOR_INTERVAL); - } - } - - /** - * Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default. - * - * @param instanceDefinition - */ - private void initializeAgentDebugCommands(AggregateConf instanceDefinition) { - String launchParameterStr = instanceDefinition.getAppConfOperations(). - getGlobalOptions().getOption(AgentKeys.AGENT_INSTANCE_DEBUG_DATA, ""); - agentLaunchParameter = new AgentLaunchParameter(launchParameterStr); - } - - @VisibleForTesting - protected Map<String, ExportEntry> getLogFolderExports() { - return logFolderExports; - } - - @VisibleForTesting - protected Map<String, ExportEntry> getWorkFolderExports() { - return workFolderExports; - } - - @VisibleForTesting - protected Metainfo getMetaInfo(String roleGroup) { - String mapKey = DEFAULT_METAINFO_MAP_KEY; - if (roleGroup != null) { - ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); - mapKey = appConf.getComponentOpt(roleGroup, ROLE_PREFIX, - DEFAULT_METAINFO_MAP_KEY); - } - MetainfoHolder mh = this.metaInfoMap.get(mapKey); - if (mh == null) { - return null; - } - return mh.metaInfo; - } - - @VisibleForTesting - protected Map<String, ComponentInstanceState> getComponentStatuses() { - return componentStatuses; - } - - @VisibleForTesting - protected Metainfo getApplicationMetainfo(SliderFileSystem fileSystem, - String appDef, boolean addonPackage) throws IOException, - BadConfigException { - return AgentUtils.getApplicationMetainfo(fileSystem, appDef, addonPackage); - } - - @VisibleForTesting - protected Metainfo getApplicationMetainfo(SliderFileSystem fileSystem, - String appDef) throws IOException, BadConfigException { - return getApplicationMetainfo(fileSystem, appDef, false); - } - - @VisibleForTesting - protected void setHeartbeatMonitorInterval(int heartbeatMonitorInterval) { - this.heartbeatMonitorInterval = heartbeatMonitorInterval; - } - - public void setInUpgradeMode(boolean inUpgradeMode) { - this.isInUpgradeMode = inUpgradeMode; - } - - public void addUpgradeContainers(Set<String> upgradeContainers) { - this.upgradeContainers.addAll(upgradeContainers); - } - - public void setAppStopInitiated(boolean appStopInitiated) { - this.appStopInitiated = appStopInitiated; - } - - /** - * Read all default configs - * - * @param fileSystem fs - * @param appDef app default path - * @param metainfo metadata - * - * @return configuration maps - * - * @throws IOException - */ - protected Map<String, DefaultConfig> initializeDefaultConfigs(SliderFileSystem fileSystem, - String appDef, Metainfo metainfo) throws IOException { - Map<String, DefaultConfig> defaultConfigMap = new HashMap<>(); - if (SliderUtils.isNotEmpty(metainfo.getApplication().getConfigFiles())) { - for (ConfigFile configFile : metainfo.getApplication().getConfigFiles()) { - DefaultConfig config = null; - try { - config = AgentUtils.getDefaultConfig(fileSystem, appDef, configFile.getDictionaryName() + ".xml"); - } catch (IOException e) { - log.warn("Default config file not found. Only the config as input during create will be applied for {}", - configFile.getDictionaryName()); - } - if (config != null) { - defaultConfigMap.put(configFile.getDictionaryName(), config); - } - } - } - - return defaultConfigMap; - } - - protected Map<String, DefaultConfig> getDefaultConfigs(String roleGroup) { - ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); - String mapKey = appConf.getComponentOpt(roleGroup, ROLE_PREFIX, - DEFAULT_METAINFO_MAP_KEY); - return metaInfoMap.get(mapKey).defaultConfigs; - } - - private int getHeartbeatMonitorInterval() { - return this.heartbeatMonitorInterval; - } - - private String getClusterName() { - if (SliderUtils.isUnset(clusterName)) { - clusterName = getAmState().getInternalsSnapshot().get(OptionKeys.APPLICATION_NAME); - } - return clusterName; - } - - /** - * Publish a named property bag that may contain name-value pairs for app configurations such as hbase-site - * - * @param name - * @param description - * @param entries - */ - protected void publishApplicationInstanceData(String name, String description, - Iterable<Map.Entry<String, String>> entries) { - PublishedConfiguration pubconf = new PublishedConfiguration(); - pubconf.description = description; - pubconf.putValues(entries); - log.info("publishing {}", pubconf); - getAmState().getPublishedSliderConfigurations().put(name, pubconf); - } - - /** - * Get a list of all hosts for all role/container per role - * - * @return the map of role->node - */ - protected Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() { - return amState.getRoleClusterNodeMapping(); - } - - private String getContainerLabel(Container container, String role, String group) { - if (role.equals(group)) { - return container.getId().toString() + LABEL_MAKER + role; - } else { - return container.getId().toString() + LABEL_MAKER + role + LABEL_MAKER + - group; - } - } - - protected String getClusterInfoPropertyValue(String name) { - StateAccessForProviders accessor = getAmState(); - assert accessor.isApplicationLive(); - ClusterDescription description = accessor.getClusterStatus(); - return description.getInfo(name); - } - - protected String getClusterOptionPropertyValue(String name) - throws BadConfigException { - StateAccessForProviders accessor = getAmState(); - assert accessor.isApplicationLive(); - ClusterDescription description = accessor.getClusterStatus(); - return description.getMandatoryOption(name); - } - - /** - * Lost heartbeat from the container - release it and ask for a replacement (async operation) - * - * @param label - * @param containerId - */ - protected void lostContainer( - String label, - ContainerId containerId) { - getComponentStatuses().remove(label); - getQueueAccess().put(new ProviderReportedContainerLoss(containerId)); - } - - /** - * Build the provider status, can be empty - * - * @return the provider status - map of entries to add to the info section - */ - public Map<String, String> buildProviderStatus() { - Map<String, String> stats = new HashMap<String, String>(); - return stats; - } - - - /** - * Format the folder locations and publish in the registry service - * - * @param folders - * @param containerId - * @param hostFqdn - * @param componentName - */ - protected void publishFolderPaths( - Map<String, String> folders, String containerId, String componentName, String hostFqdn) { - Date now = new Date(); - for (Map.Entry<String, String> entry : folders.entrySet()) { - ExportEntry exportEntry = new ExportEntry(); - exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn, entry.getValue())); - exportEntry.setContainerId(containerId); - exportEntry.setLevel(COMPONENT_TAG); - exportEntry.setTag(componentName); - exportEntry.setUpdatedTime(now.toString()); - if (entry.getKey().equals("AGENT_LOG_ROOT")) { - synchronized (logFolderExports) { - getLogFolderExports().put(containerId, exportEntry); - } - } else { - synchronized (workFolderExports) { - getWorkFolderExports().put(containerId, exportEntry); - } - } - log.info("Updating log and pwd folders for container {}", containerId); - } - - PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG); - exports.setUpdated(now.getTime()); - synchronized (logFolderExports) { - updateExportsFromList(exports, getLogFolderExports()); - } - getAmState().getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports); - - exports = new PublishedExports(CONTAINER_PWDS_TAG); - exports.setUpdated(now.getTime()); - synchronized (workFolderExports) { - updateExportsFromList(exports, getWorkFolderExports()); - } - getAmState().getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports); - } - - /** - * Update the export data from the map - * @param exports - * @param folderExports - */ - private void updateExportsFromList(PublishedExports exports, Map<String, ExportEntry> folderExports) { - Map<String, List<ExportEntry>> perComponentList = new HashMap<String, List<ExportEntry>>(); - for(Map.Entry<String, ExportEntry> logEntry : folderExports.entrySet()) - { - String componentName = logEntry.getValue().getTag(); - if (!perComponentList.containsKey(componentName)) { - perComponentList.put(componentName, new ArrayList<ExportEntry>()); - } - perComponentList.get(componentName).add(logEntry.getValue()); - } - exports.putValues(perComponentList.entrySet()); - } - - - /** - * Process return status for component instances - * - * @param heartBeat - * @param componentStatus - */ - protected void publishConfigAndExportGroups(HeartBeat heartBeat, - ComponentInstanceState componentStatus, String componentGroup) { - List<ComponentStatus> statuses = heartBeat.getComponentStatus(); - if (statuses != null && !statuses.isEmpty()) { - log.info("Processing {} status reports.", statuses.size()); - for (ComponentStatus status : statuses) { - log.info("Status report: {}", status.toString()); - - if (status.getConfigs() != null) { - Application application = getMetaInfo(componentGroup).getApplication(); - - if ((!canAnyMasterPublishConfig(componentGroup) || canPublishConfig(componentGroup)) && - !getAmState().getAppConfSnapshot().getComponentOptBool( - componentGroup, AgentKeys.AM_CONFIG_GENERATION, false)) { - // If no Master can explicitly publish then publish if its a master - // Otherwise, wait till the master that can publish is ready - - Set<String> exportedConfigs = new HashSet(); - String exportedConfigsStr = application.getExportedConfigs(); - boolean exportedAllConfigs = exportedConfigsStr == null || exportedConfigsStr.isEmpty(); - if (!exportedAllConfigs) { - for (String exportedConfig : exportedConfigsStr.split(",")) { - if (exportedConfig.trim().length() > 0) { - exportedConfigs.add(exportedConfig.trim()); - } - } - } - - for (String key : status.getConfigs().keySet()) { - if ((!exportedAllConfigs && exportedConfigs.contains(key)) || - exportedAllConfigs) { - Map<String, String> configs = status.getConfigs().get(key); - publishApplicationInstanceData(key, key, configs.entrySet()); - } - } - } - - List<ExportGroup> appExportGroups = application.getExportGroups(); - boolean hasExportGroups = SliderUtils.isNotEmpty(appExportGroups); - - Set<String> appExports = new HashSet(); - String appExportsStr = getApplicationComponent(componentGroup).getAppExports(); - if (SliderUtils.isSet(appExportsStr)) { - for (String appExport : appExportsStr.split(",")) { - if (!appExport.trim().isEmpty()) { - appExports.add(appExport.trim()); - } - } - } - - if (hasExportGroups && !appExports.isEmpty()) { - String configKeyFormat = "${site.%s.%s}"; - String hostKeyFormat = "${%s_HOST}"; - - // publish export groups if any - Map<String, String> replaceTokens = new HashMap<String, String>(); - for (Map.Entry<String, Map<String, ClusterNode>> entry : getRoleClusterNodeMapping().entrySet()) { - String hostName = getHostsList(entry.getValue().values(), true).iterator().next(); - replaceTokens.put(String.format(hostKeyFormat, entry.getKey().toUpperCase(Locale.ENGLISH)), hostName); - } - - for (String key : status.getConfigs().keySet()) { - Map<String, String> configs = status.getConfigs().get(key); - for (String configKey : configs.keySet()) { - String lookupKey = String.format(configKeyFormat, key, configKey); - replaceTokens.put(lookupKey, configs.get(configKey)); - } - } - - Set<String> modifiedGroups = new HashSet<String>(); - for (ExportGroup exportGroup : appExportGroups) { - List<Export> exports = exportGroup.getExports(); - if (SliderUtils.isNotEmpty(exports)) { - String exportGroupName = exportGroup.getName(); - ConcurrentHashMap<String, List<ExportEntry>> map = - (ConcurrentHashMap<String, List<ExportEntry>>)getCurrentExports(exportGroupName); - for (Export export : exports) { - if (canBeExported(exportGroupName, export.getName(), appExports)) { - String value = export.getValue(); - // replace host names - for (String token : replaceTokens.keySet()) { - if (value.contains(token)) { - value = value.replace(token, replaceTokens.get(token)); - } - } - ExportEntry entry = new ExportEntry(); - entry.setLevel(APPLICATION_TAG); - entry.setValue(value); - entry.setUpdatedTime(new Date().toString()); - // over-write, app exports are singletons - map.put(export.getName(), new ArrayList(Arrays.asList(entry))); - log.info("Preparing to publish. Key {} and Value {}", export.getName(), value); - } - } - modifiedGroups.add(exportGroupName); - } - } - publishModifiedExportGroups(modifiedGroups); - } - - log.info("Received and processed config for {}", heartBeat.getHostname()); - componentStatus.setConfigReported(true); - - } - } - } - } - - private boolean canBeExported(String exportGroupName, String name, Set<String> appExports) { - return appExports.contains(String.format("%s-%s", exportGroupName, name)); - } - - protected Map<String, List<ExportEntry>> getCurrentExports(String groupName) { - if (!this.exportGroups.containsKey(groupName)) { - synchronized (this.exportGroups) { - if (!this.exportGroups.containsKey(groupName)) { - this.exportGroups.put(groupName, new ConcurrentHashMap<String, List<ExportEntry>>()); - } - } - } - - return this.exportGroups.get(groupName); - } - - private void publishModifiedExportGroups(Set<String> modifiedGroups) { - for (String groupName : modifiedGroups) { - Map<String, List<ExportEntry>> entries = this.exportGroups.get(groupName); - - // Publish in old format for the time being - Map<String, String> simpleEntries = new HashMap<String, String>(); - for (Map.Entry<String, List<ExportEntry>> entry : entries.entrySet()) { - List<ExportEntry> exports = entry.getValue(); - if (SliderUtils.isNotEmpty(exports)) { - // there is no support for multiple exports per name - so extract only the first one - simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue()); - } - } - if (!getAmState().getAppConfSnapshot().getComponentOptBool( - groupName, AgentKeys.AM_CONFIG_GENERATION, false)) { - publishApplicationInstanceData(groupName, groupName, - simpleEntries.entrySet()); - } - - PublishedExports exports = new PublishedExports(groupName); - exports.setUpdated(new Date().getTime()); - exports.putValues(entries.entrySet()); - getAmState().getPublishedExportsSet().put(groupName, exports); - } - } - - /** Publish component instance specific data if the component demands it */ - protected void processAndPublishComponentSpecificData(Map<String, String> ports, - String containerId, - String hostFqdn, - String componentGroup) { - String portVarFormat = "${site.%s}"; - String hostNamePattern = "${THIS_HOST}"; - Map<String, String> toPublish = new HashMap<String, String>(); - - Application application = getMetaInfo(componentGroup).getApplication(); - for (Component component : application.getComponents()) { - if (component.getName().equals(componentGroup)) { - if (component.getComponentExports().size() > 0) { - - for (ComponentExport export : component.getComponentExports()) { - String templateToExport = export.getValue(); - for (String portName : ports.keySet()) { - boolean publishData = false; - String portValPattern = String.format(portVarFormat, portName); - if (templateToExport.contains(portValPattern)) { - templateToExport = templateToExport.replace(portValPattern, ports.get(portName)); - publishData = true; - } - if (templateToExport.contains(hostNamePattern)) { - templateToExport = templateToExport.replace(hostNamePattern, hostFqdn); - publishData = true; - } - if (publishData) { - toPublish.put(export.getName(), templateToExport); - log.info("Publishing {} for name {} and container {}", - templateToExport, export.getName(), containerId); - } - } - } - } - } - } - - if (toPublish.size() > 0) { - Map<String, String> perContainerData
<TRUNCATED>