http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java new file mode 100644 index 0000000..37e9a7f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.slider.api.ClusterDescription; +import org.apache.slider.api.ClusterNode; +import org.apache.slider.api.types.ApplicationLivenessInformation; +import org.apache.slider.api.types.ComponentInformation; +import org.apache.slider.api.types.NodeInformation; +import org.apache.slider.api.types.RoleStatistics; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.ConfTreeOperations; +import org.apache.slider.core.exceptions.NoSuchNodeException; +import org.apache.slider.core.registry.docstore.PublishedConfigSet; +import org.apache.slider.core.registry.docstore.PublishedExportsSet; +import org.apache.slider.server.appmaster.web.rest.RestPaths; +import org.apache.slider.server.services.utility.PatternValidator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Implementation of {@link StateAccessForProviders}, which means + * state access for providers, web UI and IPC/REST views. + */ +public class ProviderAppState implements StateAccessForProviders { + + + private final Map<String, PublishedConfigSet> publishedConfigSets = + new ConcurrentHashMap<>(5); + private final PublishedExportsSet publishedExportsSets = new PublishedExportsSet(); + private static final PatternValidator validator = new PatternValidator( + RestPaths.PUBLISHED_CONFIGURATION_SET_REGEXP); + private String applicationName; + + private final AppState appState; + + public ProviderAppState(String applicationName, AppState appState) { + this.appState = appState; + this.applicationName = applicationName; + } + + public void setApplicationName(String applicationName) { + this.applicationName = applicationName; + } + + @Override + public String getApplicationName() { + return applicationName; + } + + @Override + public PublishedConfigSet getPublishedSliderConfigurations() { + return getOrCreatePublishedConfigSet(RestPaths.SLIDER_CONFIGSET); + } + + @Override + public PublishedExportsSet getPublishedExportsSet() { + return publishedExportsSets; + } + + @Override + public PublishedConfigSet getPublishedConfigSet(String name) { + return publishedConfigSets.get(name); + } + + @Override + public PublishedConfigSet getOrCreatePublishedConfigSet(String name) { + PublishedConfigSet set = publishedConfigSets.get(name); + if (set == null) { + validator.validate(name); + synchronized (publishedConfigSets) { + // synchronized double check to ensure that there is never an overridden + // config set created + set = publishedConfigSets.get(name); + if (set == null) { + set = new PublishedConfigSet(); + publishedConfigSets.put(name, set); + } + } + } + return set; + } + + @Override + public List<String> listConfigSets() { + + synchronized (publishedConfigSets) { + List<String> sets = new ArrayList<>(publishedConfigSets.keySet()); + return sets; + } + } + + @Override + public Map<Integer, RoleStatus> getRoleStatusMap() { + return appState.getRoleStatusMap(); + } + + + @Override + public Map<ContainerId, RoleInstance> getFailedContainers() { + return appState.getFailedContainers(); + } + + @Override + public Map<ContainerId, RoleInstance> getLiveContainers() { + return appState.getLiveContainers(); + } + + @Override + public ClusterDescription getClusterStatus() { + return appState.getClusterStatus(); + } + + @Override + public ConfTreeOperations getResourcesSnapshot() { + return appState.getResourcesSnapshot(); + } + + @Override + public ConfTreeOperations getAppConfSnapshot() { + return appState.getAppConfSnapshot(); + } + + @Override + public ConfTreeOperations getInternalsSnapshot() { + return appState.getInternalsSnapshot(); + } + + @Override + public boolean isApplicationLive() { + return appState.isApplicationLive(); + } + + @Override + public long getSnapshotTime() { + return appState.getSnapshotTime(); + } + + @Override + public AggregateConf getInstanceDefinitionSnapshot() { + return appState.getInstanceDefinitionSnapshot(); + } + + @Override + public AggregateConf getUnresolvedInstanceDefinition() { + return appState.getUnresolvedInstanceDefinition(); + } + + @Override + public RoleStatus lookupRoleStatus(int key) { + return appState.lookupRoleStatus(key); + } + + @Override + public RoleStatus lookupRoleStatus(Container c) throws YarnRuntimeException { + return appState.lookupRoleStatus(c); + } + + @Override + public RoleStatus lookupRoleStatus(String name) throws YarnRuntimeException { + return appState.lookupRoleStatus(name); + } + + @Override + public List<RoleInstance> cloneOwnedContainerList() { + return appState.cloneOwnedContainerList(); + } + + @Override + public int getNumOwnedContainers() { + return appState.getNumOwnedContainers(); + } + + @Override + public RoleInstance getOwnedContainer(ContainerId id) { + return appState.getOwnedContainer(id); + } + + @Override + public RoleInstance getOwnedContainer(String id) throws NoSuchNodeException { + return appState.getOwnedInstanceByContainerID(id); + } + + @Override + public List<RoleInstance> cloneLiveContainerInfoList() { + return appState.cloneLiveContainerInfoList(); + } + + @Override + public RoleInstance getLiveInstanceByContainerID(String containerId) throws + NoSuchNodeException { + return appState.getLiveInstanceByContainerID(containerId); + } + + @Override + public List<RoleInstance> getLiveInstancesByContainerIDs(Collection<String> containerIDs) { + return appState.getLiveInstancesByContainerIDs(containerIDs); + } + + @Override + public ClusterDescription refreshClusterStatus() { + return appState.refreshClusterStatus(); + } + + @Override + public List<RoleStatus> cloneRoleStatusList() { + return appState.cloneRoleStatusList(); + } + + @Override + public ApplicationLivenessInformation getApplicationLivenessInformation() { + return appState.getApplicationLivenessInformation(); + } + + @Override + public Map<String, Integer> getLiveStatistics() { + return appState.getLiveStatistics(); + } + + @Override + public Map<String, ComponentInformation> getComponentInfoSnapshot() { + return appState.getComponentInfoSnapshot(); + } + + @Override + public Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() { + return appState.createRoleToClusterNodeMap(); + } + + @Override + public List<RoleInstance> enumLiveInstancesInRole(String role) { + List<RoleInstance> nodes = new ArrayList<>(); + Collection<RoleInstance> allRoleInstances = cloneLiveContainerInfoList(); + for (RoleInstance node : allRoleInstances) { + if (role.isEmpty() || role.equals(node.role)) { + nodes.add(node); + } + } + return nodes; + } + + @Override + public List<RoleInstance> lookupRoleContainers(String component) { + RoleStatus roleStatus = lookupRoleStatus(component); + List<RoleInstance> ownedContainerList = cloneOwnedContainerList(); + List<RoleInstance> matching = new ArrayList<>(ownedContainerList.size()); + int roleId = roleStatus.getPriority(); + for (RoleInstance instance : ownedContainerList) { + if (instance.roleId == roleId) { + matching.add(instance); + } + } + return matching; + } + + @Override + public ComponentInformation getComponentInformation(String component) { + RoleStatus roleStatus = lookupRoleStatus(component); + ComponentInformation info = roleStatus.serialize(); + List<RoleInstance> containers = lookupRoleContainers(component); + info.containers = new ArrayList<>(containers.size()); + for (RoleInstance container : containers) { + info.containers.add(container.id); + } + return info; + } + + @Override + public Map<String, NodeInformation> getNodeInformationSnapshot() { + return appState.getRoleHistory() + .getNodeInformationSnapshot(appState.buildNamingMap()); + } + + @Override + public NodeInformation getNodeInformation(String hostname) { + return appState.getRoleHistory() + .getNodeInformation(hostname, appState.buildNamingMap()); + } + + @Override + public RoleStatistics getRoleStatistics() { + return appState.getRoleStatistics(); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java new file mode 100644 index 0000000..4e8a4d7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -0,0 +1,1101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.slider.api.types.NodeInformation; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.providers.ProviderRole; +import org.apache.slider.server.appmaster.management.BoolMetric; +import org.apache.slider.server.appmaster.management.MetricsAndMonitoring; +import org.apache.slider.server.appmaster.management.Timestamp; +import org.apache.slider.server.appmaster.operations.AbstractRMOperation; +import org.apache.slider.server.avro.LoadedRoleHistory; +import org.apache.slider.server.avro.NodeEntryRecord; +import org.apache.slider.server.avro.RoleHistoryHeader; +import org.apache.slider.server.avro.RoleHistoryWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The Role History. + * <p> + * Synchronization policy: all public operations are synchronized. + * Protected methods are in place for testing -no guarantees are made. + * <p> + * Inner classes have no synchronization guarantees; they should be manipulated + * in these classes and not externally. + * <p> + * Note that as well as some methods marked visible for testing, there + * is the option for the time generator method, {@link #now()} to + * be overridden so that a repeatable time series can be used. + * + */ +public class RoleHistory { + protected static final Logger log = + LoggerFactory.getLogger(RoleHistory.class); + private final List<ProviderRole> providerRoles; + /** the roles in here are shared with App State */ + private final Map<Integer, RoleStatus> roleStatusMap = new HashMap<>(); + private final AbstractClusterServices recordFactory; + + private long startTime; + + /** Time when saved */ + private final Timestamp saveTime = new Timestamp(0); + + /** If the history was loaded, the time at which the history was saved. + * That is: the time the data was valid */ + private final Timestamp thawedDataTime = new Timestamp(0); + + private NodeMap nodemap; + private int roleSize; + private final BoolMetric dirty = new BoolMetric(false); + private FileSystem filesystem; + private Path historyPath; + private RoleHistoryWriter historyWriter = new RoleHistoryWriter(); + + /** + * When were the nodes updated in a {@link #onNodesUpdated(List)} call? + * If zero: never. + */ + private final Timestamp nodesUpdatedTime = new Timestamp(0); + private final BoolMetric nodeUpdateReceived = new BoolMetric(false); + + private OutstandingRequestTracker outstandingRequests = + new OutstandingRequestTracker(); + + /** + * For each role, lists nodes that are available for data-local allocation, + * ordered by more recently released - to accelerate node selection. + * That is, they are "recently used nodes" + */ + private Map<Integer, LinkedList<NodeInstance>> recentNodes; + + /** + * Instantiate + * @param roles initial role list + * @param recordFactory yarn record factory + * @throws BadConfigException + */ + public RoleHistory(Collection<RoleStatus> roles, AbstractClusterServices recordFactory) throws BadConfigException { + this.recordFactory = recordFactory; + roleSize = roles.size(); + providerRoles = new ArrayList<>(roleSize); + for (RoleStatus role : roles) { + addNewRole(role); + } + reset(); + } + + /** + * Reset the variables -this does not adjust the fixed attributes + * of the history, but the nodemap and failed node map are cleared. + */ + protected synchronized void reset() throws BadConfigException { + + nodemap = new NodeMap(roleSize); + resetAvailableNodeLists(); + outstandingRequests = new OutstandingRequestTracker(); + } + + /** + * Register all metrics with the metrics infra + * @param metrics metrics + */ + public void register(MetricsAndMonitoring metrics) { + metrics.register(RoleHistory.class, dirty, "dirty"); + metrics.register(RoleHistory.class, nodesUpdatedTime, "nodes-updated.time"); + metrics.register(RoleHistory.class, nodeUpdateReceived, "nodes-updated.flag"); + metrics.register(RoleHistory.class, thawedDataTime, "thawed.time"); + metrics.register(RoleHistory.class, saveTime, "saved.time"); + } + + /** + * safety check: make sure the role is unique amongst + * the role stats...which is extended with the new role + * @param roleStatus role + * @throws ArrayIndexOutOfBoundsException + * @throws BadConfigException + */ + protected void putRole(RoleStatus roleStatus) throws BadConfigException { + int index = roleStatus.getKey(); + if (index < 0) { + throw new BadConfigException("Provider " + roleStatus + " id is out of range"); + } + if (roleStatusMap.get(index) != null) { + throw new BadConfigException( + roleStatus.toString() + " id duplicates that of " + + roleStatusMap.get(index)); + } + roleStatusMap.put(index, roleStatus); + } + + /** + * Add a new role + * @param roleStatus new role + */ + public void addNewRole(RoleStatus roleStatus) throws BadConfigException { + log.debug("Validating/adding new role to role history: {} ", roleStatus); + putRole(roleStatus); + this.providerRoles.add(roleStatus.getProviderRole()); + } + + /** + * Lookup a role by ID + * @param roleId role Id + * @return role or null if not found + */ + public ProviderRole lookupRole(int roleId) { + for (ProviderRole role : providerRoles) { + if (role.id == roleId) { + return role; + } + } + return null; + } + + /** + * Clear the lists of available nodes + */ + private synchronized void resetAvailableNodeLists() { + recentNodes = new ConcurrentHashMap<>(roleSize); + } + + /** + * Prepare the history for re-reading its state. + * <p> + * This intended for use by the RoleWriter logic. + * @throws BadConfigException if there is a problem rebuilding the state + */ + private void prepareForReading(RoleHistoryHeader header) + throws BadConfigException { + reset(); + + int roleCountInSource = header.getRoles(); + if (roleCountInSource != roleSize) { + log.warn("Number of roles in source {}" + +" does not match the expected number of {}", + roleCountInSource, + roleSize); + } + //record when the data was loaded + setThawedDataTime(header.getSaved()); + } + + /** + * rebuild the placement history from the loaded role history + * @param loadedRoleHistory loaded history + * @return the number of entries discarded + * @throws BadConfigException if there is a problem rebuilding the state + */ + @VisibleForTesting + public synchronized int rebuild(LoadedRoleHistory loadedRoleHistory) throws BadConfigException { + RoleHistoryHeader header = loadedRoleHistory.getHeader(); + prepareForReading(header); + int discarded = 0; + Long saved = header.getSaved(); + for (NodeEntryRecord nodeEntryRecord : loadedRoleHistory.records) { + Integer roleId = nodeEntryRecord.getRole(); + NodeEntry nodeEntry = new NodeEntry(roleId); + nodeEntry.setLastUsed(nodeEntryRecord.getLastUsed()); + if (nodeEntryRecord.getActive()) { + //if active at the time of save, make the last used time the save time + nodeEntry.setLastUsed(saved); + } + String hostname = SliderUtils.sequenceToString(nodeEntryRecord.getHost()); + ProviderRole providerRole = lookupRole(roleId); + if (providerRole == null) { + // discarding entry + log.info("Discarding history entry with unknown role: {} on host {}", + roleId, hostname); + discarded ++; + } else { + NodeInstance instance = getOrCreateNodeInstance(hostname); + instance.set(roleId, nodeEntry); + } + } + return discarded; + } + + public synchronized long getStartTime() { + return startTime; + } + + public synchronized long getSaveTime() { + return saveTime.get(); + } + + public long getThawedDataTime() { + return thawedDataTime.get(); + } + + public void setThawedDataTime(long thawedDataTime) { + this.thawedDataTime.set(thawedDataTime); + } + + public synchronized int getRoleSize() { + return roleSize; + } + + /** + * Get the total size of the cluster -the number of NodeInstances + * @return a count + */ + public synchronized int getClusterSize() { + return nodemap.size(); + } + + public synchronized boolean isDirty() { + return dirty.get(); + } + + public synchronized void setDirty(boolean dirty) { + this.dirty.set(dirty); + } + + /** + * Tell the history that it has been saved; marks itself as clean + * @param timestamp timestamp -updates the savetime field + */ + public synchronized void saved(long timestamp) { + setDirty(false); + saveTime.set(timestamp); + } + + /** + * Get a clone of the nodemap. + * The instances inside are not cloned + * @return the map + */ + public synchronized NodeMap cloneNodemap() { + return (NodeMap) nodemap.clone(); + } + + /** + * Get snapshot of the node map + * @return a snapshot of the current node state + * @param naming naming map of priority to enty name; entries must be unique. + * It's OK to be incomplete, for those the list falls back to numbers. + */ + public synchronized Map<String, NodeInformation> getNodeInformationSnapshot( + Map<Integer, String> naming) { + Map<String, NodeInformation> result = new HashMap<>(nodemap.size()); + for (Map.Entry<String, NodeInstance> entry : nodemap.entrySet()) { + result.put(entry.getKey(), entry.getValue().serialize(naming)); + } + return result; + } + + /** + * Get the information on a node + * @param hostname hostname + * @param naming naming map of priority to enty name; entries must be unique. + * It's OK to be incomplete, for those the list falls back to numbers. + * @return the information about that host, or null if there is none + */ + public synchronized NodeInformation getNodeInformation(String hostname, + Map<Integer, String> naming) { + NodeInstance nodeInstance = nodemap.get(hostname); + return nodeInstance != null ? nodeInstance.serialize(naming) : null; + } + + /** + * Get the node instance for the specific node -creating it if needed + * @param hostname node address + * @return the instance + */ + public synchronized NodeInstance getOrCreateNodeInstance(String hostname) { + //convert to a string + return nodemap.getOrCreate(hostname); + } + + /** + * Insert a list of nodes into the map; overwrite any with that name. + * This is a bulk operation for testing. + * Important: this does not update the available node lists, these + * must be rebuilt afterwards. + * @param nodes collection of nodes. + */ + @VisibleForTesting + public synchronized void insert(Collection<NodeInstance> nodes) { + nodemap.insert(nodes); + } + + /** + * Get current time. overrideable for test subclasses + * @return current time in millis + */ + protected long now() { + return System.currentTimeMillis(); + } + + /** + * Mark ourselves as dirty + */ + public void touch() { + setDirty(true); + try { + saveHistoryIfDirty(); + } catch (IOException e) { + log.warn("Failed to save history file ", e); + } + } + + /** + * reset the failed recently counters + */ + public synchronized void resetFailedRecently() { + log.info("Resetting failure history"); + nodemap.resetFailedRecently(); + } + + /** + * Get the path used for history files + * @return the directory used for history files + */ + public Path getHistoryPath() { + return historyPath; + } + + /** + * Save the history to its location using the timestamp as part of + * the filename. The saveTime and dirty fields are updated + * @param time timestamp timestamp to use as the save time + * @return the path saved to + * @throws IOException IO problems + */ + @VisibleForTesting + public synchronized Path saveHistory(long time) throws IOException { + Path filename = historyWriter.createHistoryFilename(historyPath, time); + historyWriter.write(filesystem, filename, true, this, time); + saved(time); + return filename; + } + + /** + * Save the history with the current timestamp if it is dirty; + * return the path saved to if this is the case + * @return the path or null if the history was not saved + * @throws IOException failed to save for some reason + */ + public synchronized Path saveHistoryIfDirty() throws IOException { + if (isDirty()) { + return saveHistory(now()); + } else { + return null; + } + } + + /** + * Start up + * @param fs filesystem + * @param historyDir path in FS for history + * @return true if the history was thawed + */ + public boolean onStart(FileSystem fs, Path historyDir) throws BadConfigException { + assert filesystem == null; + filesystem = fs; + historyPath = historyDir; + startTime = now(); + //assume the history is being thawed; this will downgrade as appropriate + return onThaw(); + } + + /** + * Handler for bootstrap event: there was no history to thaw + */ + public void onBootstrap() { + log.debug("Role history bootstrapped"); + } + + /** + * Handle the start process <i>after the history has been rebuilt</i>, + * and after any gc/purge + */ + public synchronized boolean onThaw() throws BadConfigException { + assert filesystem != null; + assert historyPath != null; + boolean thawSuccessful = false; + //load in files from data dir + + LoadedRoleHistory loadedRoleHistory = null; + try { + loadedRoleHistory = historyWriter.loadFromHistoryDir(filesystem, historyPath); + } catch (IOException e) { + log.warn("Exception trying to load history from {}", historyPath, e); + } + if (loadedRoleHistory != null) { + rebuild(loadedRoleHistory); + thawSuccessful = true; + Path loadPath = loadedRoleHistory.getPath(); + log.debug("loaded history from {}", loadPath); + // delete any old entries + try { + int count = historyWriter.purgeOlderHistoryEntries(filesystem, loadPath); + log.debug("Deleted {} old history entries", count); + } catch (IOException e) { + log.info("Ignoring exception raised while trying to delete old entries", + e); + } + + //start is then completed + buildRecentNodeLists(); + } else { + //fallback to bootstrap procedure + onBootstrap(); + } + return thawSuccessful; + } + + + /** + * (After the start), rebuild the availability data structures + */ + @VisibleForTesting + public synchronized void buildRecentNodeLists() { + resetAvailableNodeLists(); + // build the list of available nodes + for (Map.Entry<String, NodeInstance> entry : nodemap.entrySet()) { + NodeInstance ni = entry.getValue(); + for (int i = 0; i < roleSize; i++) { + NodeEntry nodeEntry = ni.get(i); + if (nodeEntry != null && nodeEntry.isAvailable()) { + log.debug("Adding {} for role {}", ni, i); + listRecentNodesForRoleId(i).add(ni); + } + } + } + // sort the resulting arrays + for (int i = 0; i < roleSize; i++) { + sortRecentNodeList(i); + } + } + + /** + * Get the nodes for an ID -may be null + * @param id role ID + * @return potentially null list + */ + @VisibleForTesting + public List<NodeInstance> getRecentNodesForRoleId(int id) { + return recentNodes.get(id); + } + + /** + * Get a possibly empty list of suggested nodes for a role. + * @param id role ID + * @return list + */ + private LinkedList<NodeInstance> listRecentNodesForRoleId(int id) { + LinkedList<NodeInstance> instances = recentNodes.get(id); + if (instances == null) { + synchronized (this) { + // recheck in the synchronized block and recreate + if (recentNodes.get(id) == null) { + recentNodes.put(id, new LinkedList<NodeInstance>()); + } + instances = recentNodes.get(id); + } + } + return instances; + } + + /** + * Sort a the recent node list for a single role + * @param role role to sort + */ + private void sortRecentNodeList(int role) { + List<NodeInstance> nodesForRoleId = getRecentNodesForRoleId(role); + if (nodesForRoleId != null) { + Collections.sort(nodesForRoleId, new NodeInstance.Preferred(role)); + } + } + + /** + * Find a node for use + * @param role role + * @return the instance, or null for none + */ + @VisibleForTesting + public synchronized NodeInstance findRecentNodeForNewInstance(RoleStatus role) { + if (!role.isPlacementDesired()) { + // no data locality policy + return null; + } + int roleId = role.getKey(); + boolean strictPlacement = role.isStrictPlacement(); + NodeInstance nodeInstance = null; + // Get the list of possible targets. + // This is a live list: changes here are preserved + List<NodeInstance> targets = getRecentNodesForRoleId(roleId); + if (targets == null) { + // nothing to allocate on + return null; + } + + int cnt = targets.size(); + log.debug("There are {} node(s) to consider for {}", cnt, role.getName()); + for (int i = 0; i < cnt && nodeInstance == null; i++) { + NodeInstance candidate = targets.get(i); + if (candidate.getActiveRoleInstances(roleId) == 0) { + // no active instances: check failure statistics + if (strictPlacement + || (candidate.isOnline() && !candidate.exceedsFailureThreshold(role))) { + targets.remove(i); + // exit criteria for loop is now met + nodeInstance = candidate; + } else { + // too many failures for this node + log.info("Recent node failures is higher than threshold {}. Not requesting host {}", + role.getNodeFailureThreshold(), candidate.hostname); + } + } + } + + if (nodeInstance == null) { + log.info("No node found for {}", role.getName()); + } + return nodeInstance; + } + + /** + * Find a node for use + * @param role role + * @return the instance, or null for none + */ + @VisibleForTesting + public synchronized List<NodeInstance> findNodeForNewAAInstance(RoleStatus role) { + // all nodes that are live and can host the role; no attempt to exclude ones + // considered failing + return nodemap.findAllNodesForRole(role.getKey(), role.getLabelExpression()); + } + + /** + * Request an instance on a given node. + * An outstanding request is created & tracked, with the + * relevant node entry for that role updated. + *<p> + * The role status entries will also be tracked + * <p> + * Returns the request that is now being tracked. + * If the node instance is not null, it's details about the role is incremented + * + * @param node node to target or null for "any" + * @param role role to request + * @return the request + */ + public synchronized OutstandingRequest requestInstanceOnNode( + NodeInstance node, RoleStatus role, Resource resource) { + OutstandingRequest outstanding = outstandingRequests.newRequest(node, role.getKey()); + outstanding.buildContainerRequest(resource, role, now()); + return outstanding; + } + + /** + * Find a node for a role and request an instance on that (or a location-less + * instance) + * @param role role status + * @return a request ready to go, or null if this is an AA request and no + * location can be found. + */ + public synchronized OutstandingRequest requestContainerForRole(RoleStatus role) { + + if (role.isAntiAffinePlacement()) { + return requestContainerForAARole(role); + } else { + Resource resource = recordFactory.newResource(); + role.copyResourceRequirements(resource); + NodeInstance node = findRecentNodeForNewInstance(role); + return requestInstanceOnNode(node, role, resource); + } + } + + /** + * Find a node for an AA role and request an instance on that (or a location-less + * instance) + * @param role role status + * @return a request ready to go, or null if no location can be found. + */ + public synchronized OutstandingRequest requestContainerForAARole(RoleStatus role) { + List<NodeInstance> nodes = findNodeForNewAAInstance(role); + if (!nodes.isEmpty()) { + OutstandingRequest outstanding = outstandingRequests.newAARequest( + role.getKey(), nodes, role.getLabelExpression()); + Resource resource = recordFactory.newResource(); + role.copyResourceRequirements(resource); + outstanding.buildContainerRequest(resource, role, now()); + return outstanding; + } else { + log.warn("No suitable location for {}", role.getName()); + return null; + } + } + /** + * Get the list of active nodes ... walks the node map so + * is {@code O(nodes)} + * @param role role index + * @return a possibly empty list of nodes with an instance of that node + */ + public synchronized List<NodeInstance> listActiveNodes(int role) { + return nodemap.listActiveNodes(role); + } + + /** + * Get the node entry of a container + * @param container container to look up + * @return the entry + * @throws RuntimeException if the container has no hostname + */ + public NodeEntry getOrCreateNodeEntry(Container container) { + return getOrCreateNodeInstance(container).getOrCreate(container); + } + + /** + * Get the node instance of a container -always returns something + * @param container container to look up + * @return a (possibly new) node instance + * @throws RuntimeException if the container has no hostname + */ + public synchronized NodeInstance getOrCreateNodeInstance(Container container) { + return nodemap.getOrCreate(RoleHistoryUtils.hostnameOf(container)); + } + + /** + * Get the node instance of a host if defined + * @param hostname hostname to look up + * @return a node instance or null + * @throws RuntimeException if the container has no hostname + */ + public synchronized NodeInstance getExistingNodeInstance(String hostname) { + return nodemap.get(hostname); + } + + /** + * Get the node instance of a container <i>if there's an entry in the history</i> + * @param container container to look up + * @return a node instance or null + * @throws RuntimeException if the container has no hostname + */ + public synchronized NodeInstance getExistingNodeInstance(Container container) { + return nodemap.get(RoleHistoryUtils.hostnameOf(container)); + } + + /** + * Perform any pre-allocation operations on the list of allocated containers + * based on knowledge of system state. + * Currently this places requested hosts ahead of unrequested ones. + * @param allocatedContainers list of allocated containers + * @return list of containers potentially reordered + */ + public synchronized List<Container> prepareAllocationList(List<Container> allocatedContainers) { + + //partition into requested and unrequested + List<Container> requested = + new ArrayList<>(allocatedContainers.size()); + List<Container> unrequested = + new ArrayList<>(allocatedContainers.size()); + outstandingRequests.partitionRequests(this, allocatedContainers, requested, unrequested); + + //give the unrequested ones lower priority + requested.addAll(unrequested); + return requested; + } + + /** + * A container has been allocated on a node -update the data structures + * @param container container + * @param desiredCount desired #of instances + * @param actualCount current count of instances + * @return The allocation outcome + */ + public synchronized ContainerAllocationResults onContainerAllocated(Container container, + long desiredCount, + long actualCount) { + int role = ContainerPriority.extractRole(container); + + String hostname = RoleHistoryUtils.hostnameOf(container); + List<NodeInstance> nodeInstances = listRecentNodesForRoleId(role); + ContainerAllocationResults outcome = + outstandingRequests.onContainerAllocated(role, hostname, container); + if (desiredCount <= actualCount) { + // all outstanding requests have been satisfied + // clear all the lists, so returning nodes to the available set + List<NodeInstance> hosts = outstandingRequests.resetOutstandingRequests(role); + if (!hosts.isEmpty()) { + //add the list + log.info("Adding {} hosts for role {}", hosts.size(), role); + nodeInstances.addAll(hosts); + sortRecentNodeList(role); + } + } + return outcome; + } + + /** + * A container has been assigned to a role instance on a node -update the data structures + * @param container container + */ + public void onContainerAssigned(Container container) { + NodeInstance node = getOrCreateNodeInstance(container); + NodeEntry nodeEntry = node.getOrCreate(container); + nodeEntry.onStarting(); + log.debug("Node {} has updated NodeEntry {}", node, nodeEntry); + } + + /** + * Event: a container start has been submitted + * @param container container being started + * @param instance instance bound to the container + */ + public void onContainerStartSubmitted(Container container, + RoleInstance instance) { + // no actions here + } + + /** + * Container start event + * @param container container that just started + */ + public void onContainerStarted(Container container) { + NodeEntry nodeEntry = getOrCreateNodeEntry(container); + nodeEntry.onStartCompleted(); + touch(); + } + + /** + * A container failed to start: update the node entry state + * and return the container to the queue + * @param container container that failed + * @return true if the node was queued + */ + public boolean onNodeManagerContainerStartFailed(Container container) { + return markContainerFinished(container, false, true, ContainerOutcome.Failed); + } + + /** + * Does the RoleHistory have enough information about the YARN cluster + * to start placing AA requests? That is: has it the node map and + * any label information needed? + * @return true if the caller can start requesting AA nodes + */ + public boolean canPlaceAANodes() { + return nodeUpdateReceived.get(); + } + + /** + * Get the last time the nodes were updated from YARN + * @return the update time or zero if never updated. + */ + public long getNodesUpdatedTime() { + return nodesUpdatedTime.get(); + } + + /** + * Update failedNodes and nodemap based on the node state + * + * @param updatedNodes list of updated nodes + * @return true if a review should be triggered. + */ + public synchronized boolean onNodesUpdated(List<NodeReport> updatedNodes) { + log.debug("Updating {} nodes", updatedNodes.size()); + nodesUpdatedTime.set(now()); + nodeUpdateReceived.set(true); + int printed = 0; + boolean triggerReview = false; + for (NodeReport updatedNode : updatedNodes) { + String hostname = updatedNode.getNodeId() == null + ? "" + : updatedNode.getNodeId().getHost(); + NodeState nodeState = updatedNode.getNodeState(); + if (hostname.isEmpty() || nodeState == null) { + log.warn("Ignoring incomplete update"); + continue; + } + if (log.isDebugEnabled() && printed++ < 10) { + // log the first few, but avoid overloading the logs for a full cluster + // update + log.debug("Node \"{}\" is in state {}", hostname, nodeState); + } + // update the node; this also creates an instance if needed + boolean updated = nodemap.updateNode(hostname, updatedNode); + triggerReview |= updated; + } + return triggerReview; + } + + /** + * A container release request was issued + * @param container container submitted + */ + public void onContainerReleaseSubmitted(Container container) { + NodeEntry nodeEntry = getOrCreateNodeEntry(container); + nodeEntry.release(); + } + + /** + * App state notified of a container completed + * @param container completed container + * @return true if the node was queued + */ + public boolean onReleaseCompleted(Container container) { + return markContainerFinished(container, true, false, ContainerOutcome.Failed); + } + + /** + * App state notified of a container completed -but as + * it wasn't being released it is marked as failed + * + * @param container completed container + * @param shortLived was the container short lived? + * @param outcome + * @return true if the node is considered available for work + */ + public boolean onFailedContainer(Container container, + boolean shortLived, + ContainerOutcome outcome) { + return markContainerFinished(container, false, shortLived, outcome); + } + + /** + * Mark a container finished; if it was released then that is treated + * differently. history is {@code touch()}-ed + * + * + * @param container completed container + * @param wasReleased was the container released? + * @param shortLived was the container short lived? + * @param outcome + * @return true if the node was queued + */ + protected synchronized boolean markContainerFinished(Container container, + boolean wasReleased, + boolean shortLived, + ContainerOutcome outcome) { + NodeEntry nodeEntry = getOrCreateNodeEntry(container); + log.info("Finished container for node {}, released={}, shortlived={}", + nodeEntry.rolePriority, wasReleased, shortLived); + boolean available; + if (shortLived) { + nodeEntry.onStartFailed(); + available = false; + } else { + available = nodeEntry.containerCompleted(wasReleased, outcome); + maybeQueueNodeForWork(container, nodeEntry, available); + } + touch(); + return available; + } + + /** + * If the node is marked as available; queue it for assignments. + * Unsynced: requires caller to be in a sync block. + * @param container completed container + * @param nodeEntry node + * @param available available flag + * @return true if the node was queued + */ + private boolean maybeQueueNodeForWork(Container container, + NodeEntry nodeEntry, + boolean available) { + if (available) { + //node is free + nodeEntry.setLastUsed(now()); + NodeInstance ni = getOrCreateNodeInstance(container); + int roleId = ContainerPriority.extractRole(container); + log.debug("Node {} is now available for role id {}", ni, roleId); + listRecentNodesForRoleId(roleId).addFirst(ni); + } + return available; + } + + /** + * Print the history to the log. This is for testing and diagnostics + */ + public synchronized void dump() { + for (ProviderRole role : providerRoles) { + log.info(role.toString()); + List<NodeInstance> instances = listRecentNodesForRoleId(role.id); + log.info(" available: " + instances.size() + + " " + SliderUtils.joinWithInnerSeparator(" ", instances)); + } + + log.info("Nodes in Cluster: {}", getClusterSize()); + for (NodeInstance node : nodemap.values()) { + log.info(node.toFullString()); + } + } + + /** + * Build the mapping entry for persisting to the role history + * @return a mapping object + */ + public synchronized Map<CharSequence, Integer> buildMappingForHistoryFile() { + Map<CharSequence, Integer> mapping = new HashMap<>(getRoleSize()); + for (ProviderRole role : providerRoles) { + mapping.put(role.name, role.id); + } + return mapping; + } + + /** + * Get a clone of the available list + * @param role role index + * @return a clone of the list + */ + @VisibleForTesting + public List<NodeInstance> cloneRecentNodeList(int role) { + return new LinkedList<>(listRecentNodesForRoleId(role)); + } + + /** + * Get a snapshot of the outstanding placed request list + * @return a list of the requests outstanding at the time of requesting + */ + @VisibleForTesting + public List<OutstandingRequest> listPlacedRequests() { + return outstandingRequests.listPlacedRequests(); + } + + /** + * Get a snapshot of the outstanding placed request list + * @return a list of the requests outstanding at the time of requesting + */ + @VisibleForTesting + public List<OutstandingRequest> listOpenRequests() { + return outstandingRequests.listOpenRequests(); + } + + /** + * Escalate operation as triggered by external timer. + * @return a (usually empty) list of cancel/request operations. + */ + public synchronized List<AbstractRMOperation> escalateOutstandingRequests() { + return outstandingRequests.escalateOutstandingRequests(now()); + } + /** + * Escalate operation as triggered by external timer. + * @return a (usually empty) list of cancel/request operations. + */ + public List<AbstractRMOperation> cancelOutstandingAARequests() { + return outstandingRequests.cancelOutstandingAARequests(); + } + + /** + * Cancel a number of outstanding requests for a role -that is, not + * actual containers, just requests for new ones. + * @param role role + * @param toCancel number to cancel + * @return a list of cancellable operations. + */ + public List<AbstractRMOperation> cancelRequestsForRole(RoleStatus role, int toCancel) { + return role.isAntiAffinePlacement() ? + cancelRequestsForAARole(role, toCancel) + : cancelRequestsForSimpleRole(role, toCancel); + } + + /** + * Build the list of requests to cancel from the outstanding list. + * @param role role + * @param toCancel number to cancel + * @return a list of cancellable operations. + */ + private synchronized List<AbstractRMOperation> cancelRequestsForSimpleRole(RoleStatus role, int toCancel) { + Preconditions.checkArgument(toCancel > 0, + "trying to cancel invalid number of requests: " + toCancel); + List<AbstractRMOperation> results = new ArrayList<>(toCancel); + // first scan through the unplaced request list to find all of a role + int roleId = role.getKey(); + List<OutstandingRequest> requests = + outstandingRequests.extractOpenRequestsForRole(roleId, toCancel); + + // are there any left? + int remaining = toCancel - requests.size(); + // ask for some placed nodes + requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, remaining)); + + // build cancellations + for (OutstandingRequest request : requests) { + results.add(request.createCancelOperation()); + } + return results; + } + + /** + * Build the list of requests to cancel for an AA role. This reduces the number + * of outstanding pending requests first, then cancels any active request, + * before finally asking for any placed containers + * @param role role + * @param toCancel number to cancel + * @return a list of cancellable operations. + */ + private synchronized List<AbstractRMOperation> cancelRequestsForAARole(RoleStatus role, int toCancel) { + List<AbstractRMOperation> results = new ArrayList<>(toCancel); + int roleId = role.getKey(); + List<OutstandingRequest> requests = new ArrayList<>(toCancel); + // there may be pending requests which can be cancelled here + long pending = role.getPendingAntiAffineRequests(); + if (pending > 0) { + // there are some pending ones which can be cancelled first + long pendingToCancel = Math.min(pending, toCancel); + log.info("Cancelling {} pending AA allocations, leaving {}", toCancel, + pendingToCancel); + role.setPendingAntiAffineRequests(pending - pendingToCancel); + toCancel -= pendingToCancel; + } + if (toCancel > 0 && role.isAARequestOutstanding()) { + // not enough + log.info("Cancelling current AA request"); + // find the single entry which may be running + requests = outstandingRequests.extractOpenRequestsForRole(roleId, toCancel); + role.cancelOutstandingAARequest(); + toCancel--; + } + + // ask for some excess nodes + if (toCancel > 0) { + requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, toCancel)); + } + + // build cancellations + for (OutstandingRequest request : requests) { + results.add(request.createCancelOperation()); + } + return results; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java new file mode 100644 index 0000000..ea6197b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistoryUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.slider.common.tools.SliderUtils; + +public class RoleHistoryUtils { + + public static String hostnameOf(Container container) { + NodeId nodeId = container.getNodeId(); + if (nodeId== null) { + throw new RuntimeException("Container has no node ID: %s" + + SliderUtils.containerToString(container)); + } + return nodeId.getHost(); + } + + /** + * Decrement a value but hold it at zero. Usually a sanity check + * on counters tracking outstanding operations + * @param val value + * @return decremented value + */ + public static int decToFloor(int val) { + int v = val-1; + if (v < 0) { + v = 0; + } + return v; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java new file mode 100644 index 0000000..920887a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHostnamePair.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import java.util.Objects; + +public class RoleHostnamePair { + + /** + * requested role + */ + public final int roleId; + + /** + * hostname -will be null if node==null + */ + public final String hostname; + + public RoleHostnamePair(int roleId, String hostname) { + this.roleId = roleId; + this.hostname = hostname; + } + + public int getRoleId() { + return roleId; + } + + public String getHostname() { + return hostname; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof RoleHostnamePair)) { + return false; + } + RoleHostnamePair that = (RoleHostnamePair) o; + return Objects.equals(roleId, that.roleId) && + Objects.equals(hostname, that.hostname); + } + + @Override + public int hashCode() { + return Objects.hash(roleId, hostname); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "RoleHostnamePair{"); + sb.append("roleId=").append(roleId); + sb.append(", hostname='").append(hostname).append('\''); + sb.append('}'); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java new file mode 100644 index 0000000..30cfec9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ProtocolTypes; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.slider.api.ClusterNode; +import org.apache.slider.api.proto.Messages; +import org.apache.slider.api.types.ContainerInformation; +import org.apache.slider.common.tools.SliderUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Tracking information about a container + */ +public final class RoleInstance implements Cloneable { + + public Container container; + /** + * Container ID + */ + public final String id; + public long createTime; + public long startTime; + /** + * flag set when it is released, to know if it has + * already been targeted for termination + */ + public boolean released; + + /** + * Name of the role + */ + public String role; + public String group; + + /** + * Version of the app + */ + public String appVersion; + + /** + * Role Id; matches priority in resources.json + */ + public int roleId; + + /** + * state from StateValues + */ + public int state; + + /** + * Exit code: only valid if the state >= STOPPED + */ + public int exitCode; + + /** + * what was the command executed? + */ + public String command; + + /** + * Any diagnostics + */ + public String diagnostics; + + /** + * What is the tail output from the executed process (or [] if not started + * or the log cannot be picked up + */ + public String[] output; + + /** + * Any environment details + */ + public String[] environment; + + public String ip; + public String hostname; + public String host; + public String hostURL; + public ContainerAllocationOutcome placement; + + + /** + * A list of registered endpoints. + */ + private List<Endpoint> endpoints = + new ArrayList<>(2); + + public RoleInstance(ContainerAssignment assignment) { + this(assignment.container); + placement = assignment.placement; + } + /** + * Create an instance to track an allocated container + * @param container a container which must be non null, and have a non-null Id field. + */ + public RoleInstance(Container container) { + Preconditions.checkNotNull(container, "Null container"); + Preconditions.checkState(container.getId() != null, + "Null container ID"); + + this.container = container; + id = container.getId().toString(); + if (container.getNodeId() != null) { + host = container.getNodeId().getHost(); + } + if (container.getNodeHttpAddress() != null) { + hostURL = "http://" + container.getNodeHttpAddress(); + } + } + + public ContainerId getId() { + return container.getId(); + } + + public NodeId getHost() { + return container.getNodeId(); + } + + @Override + public String toString() { + final StringBuilder sb = + new StringBuilder("RoleInstance{"); + sb.append("role='").append(role).append('\''); + sb.append(", id='").append(id).append('\''); + sb.append(", container=").append(SliderUtils.containerToString(container)); + sb.append(", createTime=").append(createTime); + sb.append(", startTime=").append(startTime); + sb.append(", released=").append(released); + sb.append(", roleId=").append(roleId); + sb.append(", host=").append(host); + sb.append(", hostURL=").append(hostURL); + sb.append(", state=").append(state); + sb.append(", placement=").append(placement); + sb.append(", exitCode=").append(exitCode); + sb.append(", command='").append(command).append('\''); + sb.append(", diagnostics='").append(diagnostics).append('\''); + sb.append(", output=").append(Arrays.toString(output)); + sb.append(", environment=").append(Arrays.toString(environment)); + sb.append('}'); + return sb.toString(); + } + + public ContainerId getContainerId() { + return container != null ? container.getId() : null; + } + + /** + * Generate the protobuf format of a request + * @return protobuf format. This excludes the Container info + */ + public Messages.RoleInstanceState toProtobuf() { + Messages.RoleInstanceState.Builder builder = + Messages.RoleInstanceState.newBuilder(); + if (container != null) { + builder.setName(container.getId().toString()); + } else { + builder.setName("unallocated instance"); + } + if (command != null) { + builder.setCommand(command); + } + if (environment != null) { + builder.addAllEnvironment(Arrays.asList(environment)); + } + if (diagnostics != null) { + builder.setDiagnostics(diagnostics); + } + builder.setExitCode(exitCode); + + if (output != null) { + builder.addAllOutput(Arrays.asList(output)); + } + if (role != null) { + builder.setRole(role); + } + builder.setRoleId(roleId); + builder.setState(state); + + builder.setReleased(released); + builder.setCreateTime(createTime); + builder.setStartTime(startTime); + builder.setHost(host); + builder.setHostURL(hostURL); + if (appVersion != null) { + builder.setAppVersion(appVersion); + } + return builder.build(); + } + + /** + * Build a serializable ClusterNode structure from this instance. + * This operation is unsynchronized. + * @return a serialized value. + */ + public ClusterNode toClusterNode() { + ClusterNode node; + if (container != null) { + node = new ClusterNode(container.getId()); + } else { + node = new ClusterNode(); + node.name = "unallocated instance"; + } + node.command = command; + node.createTime = createTime; + node.diagnostics = diagnostics; + if (environment != null) { + node.environment = Arrays.copyOf(environment, environment.length); + } + node.exitCode = exitCode; + node.ip = ip; + node.hostname = hostname; + node.host = host; + node.hostUrl = hostURL; + if (output != null) { + node.output = Arrays.copyOf(output, output.length); + } + node.released = released; + node.role = role; + node.roleId = roleId; + node.startTime = startTime ; + node.state = state; + + return node; + } + + /** + * Clone operation clones all the simple values but shares the + * Container object into the cloned copy -same with the output, + * diagnostics and env arrays. + * @return a clone of the object + * @throws CloneNotSupportedException + */ + @Override + public Object clone() throws CloneNotSupportedException { + RoleInstance cloned = (RoleInstance) super.clone(); + // clone the endpoint list, but not the values + cloned.endpoints = new ArrayList<Endpoint>(this.endpoints); + return cloned; + } + + /** + * Get the list of endpoints. + * @return the endpoint list. + */ + public List<Endpoint> getEndpoints() { + return endpoints; + } + + /** + * Add an endpoint registration + * @param endpoint endpoint (non-null) + */ + public void addEndpoint(Endpoint endpoint) { + Preconditions.checkArgument(endpoint != null); + endpoints.add(endpoint); + } + + /** + * Register a port endpoint as an inet-addr formatted endpoint, using the + * hostname as the first part of the address + * @param port port port + * @param api API API name + */ + public void registerPortEndpoint(int port, String api) { + Endpoint epr = + RegistryTypeUtils.inetAddrEndpoint(api, + ProtocolTypes.PROTOCOL_TCP, host, port); + addEndpoint(epr); + } + + /** + * Serialize. Some data structures (e.g output) + * may be shared + * @return a serialized form for marshalling as JSON + */ + public ContainerInformation serialize() { + ContainerInformation info = new ContainerInformation(); + info.containerId = id; + info.component = role; + info.appVersion = appVersion; + info.startTime = startTime; + info.createTime = createTime; + info.diagnostics = diagnostics; + info.state = state; + info.host = host; + info.hostURL = hostURL; + info.released = released ? Boolean.TRUE : null; + if (placement != null) { + info.placement = placement.toString(); + } + if (output != null) { + info.output = output; + } + return info; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java new file mode 100644 index 0000000..0a3a3c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.slider.api.types.ComponentInformation; +import org.apache.slider.api.types.RoleStatistics; +import org.apache.slider.providers.PlacementPolicy; +import org.apache.slider.providers.ProviderRole; +import org.apache.slider.server.appmaster.management.BoolMetricPredicate; +import org.apache.slider.server.appmaster.management.LongGauge; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; + +/** + * Models the ongoing status of all nodes in an application. + * + * These structures are shared across the {@link AppState} and {@link RoleHistory} structures, + * and must be designed for synchronous access. Atomic counters are preferred to anything which + * requires synchronization. Where synchronized access is good is that it allows for + * the whole instance to be locked, for updating multiple entries. + */ +public final class RoleStatus implements Cloneable, MetricSet { + + private final String name; + private final String group; + + /** + * Role priority + */ + private final int key; + private final ProviderRole providerRole; + + private final LongGauge actual = new LongGauge(); + private final LongGauge completed = new LongGauge(); + private final LongGauge desired = new LongGauge(); + private final LongGauge failed = new LongGauge(); + private final LongGauge failedRecently = new LongGauge(0); + private final LongGauge limitsExceeded = new LongGauge(0); + private final LongGauge nodeFailed = new LongGauge(0); + /** Number of AA requests queued. */ + private final LongGauge pendingAntiAffineRequests = new LongGauge(0); + private final LongGauge preempted = new LongGauge(0); + private final LongGauge releasing = new LongGauge(); + private final LongGauge requested = new LongGauge(); + private final LongGauge started = new LongGauge(); + private final LongGauge startFailed = new LongGauge(); + private final LongGauge totalRequested = new LongGauge(); + + /** resource requirements */ + private Resource resourceRequirements; + + + /** any pending AA request */ + private volatile OutstandingRequest outstandingAArequest = null; + + + private String failureMessage = ""; + + public RoleStatus(ProviderRole providerRole) { + this.providerRole = providerRole; + this.name = providerRole.name; + this.group = providerRole.group; + this.key = providerRole.id; + } + + @Override + public Map<String, Metric> getMetrics() { + Map<String, Metric> metrics = new HashMap<>(15); + metrics.put("actual", actual); + metrics.put("completed", completed ); + metrics.put("desired", desired); + metrics.put("failed", failed); + metrics.put("limitsExceeded", limitsExceeded); + metrics.put("nodeFailed", nodeFailed); + metrics.put("preempted", preempted); + metrics.put("pendingAntiAffineRequests", pendingAntiAffineRequests); + metrics.put("releasing", releasing); + metrics.put("requested", requested); + metrics.put("preempted", preempted); + metrics.put("releasing", releasing ); + metrics.put("requested", requested); + metrics.put("started", started); + metrics.put("startFailed", startFailed); + metrics.put("totalRequested", totalRequested); + + metrics.put("outstandingAArequest", + new BoolMetricPredicate(new BoolMetricPredicate.Eval() { + @Override + public boolean eval() { + return isAARequestOutstanding(); + } + })); + return metrics; + } + + public String getName() { + return name; + } + + public String getGroup() { + return group; + } + + public int getKey() { + return key; + } + + public int getPriority() { + return getKey(); + } + + /** + * Get the placement policy enum, from the values in + * {@link PlacementPolicy} + * @return the placement policy for this role + */ + public int getPlacementPolicy() { + return providerRole.placementPolicy; + } + + public long getPlacementTimeoutSeconds() { + return providerRole.placementTimeoutSeconds; + } + + /** + * The number of failures on a specific node that can be tolerated + * before selecting a different node for placement + * @return + */ + public int getNodeFailureThreshold() { + return providerRole.nodeFailureThreshold; + } + + public boolean isExcludeFromFlexing() { + return hasPlacementPolicy(PlacementPolicy.EXCLUDE_FROM_FLEXING); + } + + public boolean isStrictPlacement() { + return hasPlacementPolicy(PlacementPolicy.STRICT); + } + + public boolean isAntiAffinePlacement() { + return hasPlacementPolicy(PlacementPolicy.ANTI_AFFINITY_REQUIRED); + } + + public boolean hasPlacementPolicy(int policy) { + return 0 != (getPlacementPolicy() & policy); + } + + public boolean isPlacementDesired() { + return !hasPlacementPolicy(PlacementPolicy.ANYWHERE); + } + + public long getDesired() { + return desired.get(); + } + + public void setDesired(long desired) { + this.desired.set(desired); + } + + public long getActual() { + return actual.get(); + } + + public long incActual() { + return actual.incrementAndGet(); + } + + public long decActual() { + return actual.decToFloor(1); + } + + /** + * Get the request count. + * @return a count of requested containers + */ + public long getRequested() { + return requested.get(); + } + + public long incRequested() { + totalRequested.incrementAndGet(); + return requested.incrementAndGet(); + } + + public void cancel(long count) { + requested.decToFloor(count); + } + + public void decRequested() { + cancel(1); + } + + public long getReleasing() { + return releasing.get(); + } + + public long incReleasing() { + return releasing.incrementAndGet(); + } + + public long decReleasing() { + return releasing.decToFloor(1); + } + + public long getFailed() { + return failed.get(); + } + + public long getFailedRecently() { + return failedRecently.get(); + } + + /** + * Reset the recent failure + * @return the number of failures in the "recent" window + */ + public long resetFailedRecently() { + return failedRecently.getAndSet(0); + } + + public long getLimitsExceeded() { + return limitsExceeded.get(); + } + + public long incPendingAntiAffineRequests(long v) { + return pendingAntiAffineRequests.addAndGet(v); + } + + /** + * Probe for an outstanding AA request being true + * @return true if there is an outstanding AA Request + */ + public boolean isAARequestOutstanding() { + return outstandingAArequest != null; + } + + /** + * expose the predicate {@link #isAARequestOutstanding()} as an integer, + * which is very convenient in tests + * @return 1 if there is an outstanding request; 0 if not + */ + public int getOutstandingAARequestCount() { + return isAARequestOutstanding()? 1: 0; + } + /** + * Note that a role failed, text will + * be used in any diagnostics if an exception + * is later raised. + * @param startupFailure flag to indicate this was a startup event + * @param text text about the failure + * @param outcome outcome of the container + */ + public synchronized void noteFailed(boolean startupFailure, String text, + ContainerOutcome outcome) { + if (text != null) { + failureMessage = text; + } + switch (outcome) { + case Preempted: + preempted.incrementAndGet(); + break; + + case Node_failure: + nodeFailed.incrementAndGet(); + failed.incrementAndGet(); + break; + + case Failed_limits_exceeded: // exceeded memory or CPU; app/configuration related + limitsExceeded.incrementAndGet(); + // fall through + case Failed: // application failure, possibly node related, possibly not + default: // anything else (future-proofing) + failed.incrementAndGet(); + failedRecently.incrementAndGet(); + //have a look to see if it short lived + if (startupFailure) { + incStartFailed(); + } + break; + } + } + + public long getStartFailed() { + return startFailed.get(); + } + + public synchronized void incStartFailed() { + startFailed.getAndIncrement(); + } + + public synchronized String getFailureMessage() { + return failureMessage; + } + + public long getCompleted() { + return completed.get(); + } + + public synchronized void setCompleted(int completed) { + this.completed.set(completed); + } + + public long incCompleted() { + return completed.incrementAndGet(); + } + public long getStarted() { + return started.get(); + } + + public synchronized void incStarted() { + started.incrementAndGet(); + } + + public long getTotalRequested() { + return totalRequested.get(); + } + + public long getPreempted() { + return preempted.get(); + } + + public long getNodeFailed() { + return nodeFailed.get(); + } + + public long getPendingAntiAffineRequests() { + return pendingAntiAffineRequests.get(); + } + + public void setPendingAntiAffineRequests(long pendingAntiAffineRequests) { + this.pendingAntiAffineRequests.set(pendingAntiAffineRequests); + } + + public long decPendingAntiAffineRequests() { + return pendingAntiAffineRequests.decToFloor(1); + } + + public OutstandingRequest getOutstandingAArequest() { + return outstandingAArequest; + } + + public void setOutstandingAArequest(OutstandingRequest outstandingAArequest) { + this.outstandingAArequest = outstandingAArequest; + } + + /** + * Complete the outstanding AA request (there's no check for one in progress, caller + * expected to have done that). + */ + public void completeOutstandingAARequest() { + setOutstandingAArequest(null); + } + + /** + * Cancel any outstanding AA request. Harmless if the role is non-AA, or + * if there are no outstanding requests. + */ + public void cancelOutstandingAARequest() { + if (outstandingAArequest != null) { + setOutstandingAArequest(null); + setPendingAntiAffineRequests(0); + decRequested(); + } + } + + /** + * Get the number of roles we are short of. + * nodes released are ignored. + * @return the positive or negative number of roles to add/release. + * 0 means "do nothing". + */ + public long getDelta() { + long inuse = getActualAndRequested(); + long delta = desired.get() - inuse; + if (delta < 0) { + //if we are releasing, remove the number that are already released. + delta += releasing.get(); + //but never switch to a positive + delta = Math.min(delta, 0); + } + return delta; + } + + /** + * Get count of actual and requested containers. This includes pending ones + * @return the size of the application when outstanding requests are included. + */ + public long getActualAndRequested() { + return actual.get() + requested.get(); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("RoleStatus{"); + sb.append("name='").append(name).append('\''); + sb.append(", group=").append(group); + sb.append(", key=").append(key); + sb.append(", desired=").append(desired); + sb.append(", actual=").append(actual); + sb.append(", requested=").append(requested); + sb.append(", releasing=").append(releasing); + sb.append(", failed=").append(failed); + sb.append(", startFailed=").append(startFailed); + sb.append(", started=").append(started); + sb.append(", completed=").append(completed); + sb.append(", totalRequested=").append(totalRequested); + sb.append(", preempted=").append(preempted); + sb.append(", nodeFailed=").append(nodeFailed); + sb.append(", failedRecently=").append(failedRecently); + sb.append(", limitsExceeded=").append(limitsExceeded); + sb.append(", resourceRequirements=").append(resourceRequirements); + sb.append(", isAntiAffinePlacement=").append(isAntiAffinePlacement()); + if (isAntiAffinePlacement()) { + sb.append(", pendingAntiAffineRequests=").append(pendingAntiAffineRequests); + sb.append(", outstandingAArequest=").append(outstandingAArequest); + } + sb.append(", failureMessage='").append(failureMessage).append('\''); + sb.append(", providerRole=").append(providerRole); + sb.append('}'); + return sb.toString(); + } + + @Override + public synchronized Object clone() throws CloneNotSupportedException { + return super.clone(); + } + + /** + * Get the provider role + * @return the provider role + */ + public ProviderRole getProviderRole() { + return providerRole; + } + + /** + * Build the statistics map from the current data + * @return a map for use in statistics reports + */ + public Map<String, Integer> buildStatistics() { + ComponentInformation componentInformation = serialize(); + return componentInformation.buildStatistics(); + } + + /** + * Produced a serialized form which can be served up as JSON + * @return a summary of the current role status. + */ + public synchronized ComponentInformation serialize() { + ComponentInformation info = new ComponentInformation(); + info.name = name; + info.priority = getPriority(); + info.desired = desired.intValue(); + info.actual = actual.intValue(); + info.requested = requested.intValue(); + info.releasing = releasing.intValue(); + info.failed = failed.intValue(); + info.startFailed = startFailed.intValue(); + info.placementPolicy = getPlacementPolicy(); + info.failureMessage = failureMessage; + info.totalRequested = totalRequested.intValue(); + info.failedRecently = failedRecently.intValue(); + info.nodeFailed = nodeFailed.intValue(); + info.preempted = preempted.intValue(); + info.pendingAntiAffineRequestCount = pendingAntiAffineRequests.intValue(); + info.isAARequestOutstanding = isAARequestOutstanding(); + return info; + } + + /** + * Get the (possibly null) label expression for this role + * @return a string or null + */ + public String getLabelExpression() { + return providerRole.labelExpression; + } + + public Resource getResourceRequirements() { + return resourceRequirements; + } + + public void setResourceRequirements(Resource resourceRequirements) { + this.resourceRequirements = resourceRequirements; + } + + /** + * Compare two role status entries by name + */ + public static class CompareByName implements Comparator<RoleStatus>, + Serializable { + @Override + public int compare(RoleStatus o1, RoleStatus o2) { + return o1.getName().compareTo(o2.getName()); + } + } + + /** + * Compare two role status entries by key + */ + public static class CompareByKey implements Comparator<RoleStatus>, + Serializable { + @Override + public int compare(RoleStatus o1, RoleStatus o2) { + return (o1.getKey() < o2.getKey() ? -1 : (o1.getKey() == o2.getKey() ? 0 : 1)); + } + } + + /** + * Given a resource, set its requirements to those this role needs + * @param resource resource to configure + * @return the resource + */ + public Resource copyResourceRequirements(Resource resource) { + Preconditions.checkNotNull(resourceRequirements, + "Role resource requirements have not been set"); + resource.setMemory(resourceRequirements.getMemory()); + resource.setVirtualCores(resourceRequirements.getVirtualCores()); + return resource; + } + + public synchronized RoleStatistics getStatistics() { + RoleStatistics stats = new RoleStatistics(); + stats.activeAA = getOutstandingAARequestCount(); + stats.actual = actual.get(); + stats.desired = desired.get(); + stats.failed = failed.get(); + stats.limitsExceeded = limitsExceeded.get(); + stats.nodeFailed = nodeFailed.get(); + stats.preempted = preempted.get(); + stats.releasing = releasing.get(); + stats.requested = requested.get(); + stats.started = started.get(); + stats.startFailed = startFailed.get(); + stats.totalRequested = totalRequested.get(); + return stats; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java new file mode 100644 index 0000000..b848096 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/SimpleReleaseSelector.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import java.util.List; + +/** + * Simplest release selector simply returns the list + */ +public class SimpleReleaseSelector implements ContainerReleaseSelector { + + @Override + public List<RoleInstance> sortCandidates(int roleId, + List<RoleInstance> candidates) { + return candidates; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org