YARN-1709. In-memory data structures used to track resources over time to enable reservations.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d8b2cd8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d8b2cd8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d8b2cd8 Branch: refs/heads/YARN-1051 Commit: 0d8b2cd88b958b1e602fd4ea4078ef8d4742a7c3 Parents: 3f2e3b2 Author: subru <[email protected]> Authored: Fri Sep 12 17:22:08 2014 -0700 Committer: subru <[email protected]> Committed: Thu Sep 25 13:06:20 2014 -0700 ---------------------------------------------------------------------- YARN-1051-CHANGES.txt | 3 + .../reservation/InMemoryPlan.java | 507 +++++++++++++++++++ .../InMemoryReservationAllocation.java | 151 ++++++ .../resourcemanager/reservation/Plan.java | 32 ++ .../reservation/PlanContext.java | 101 ++++ .../resourcemanager/reservation/PlanEdit.java | 61 +++ .../resourcemanager/reservation/PlanView.java | 89 ++++ .../RLESparseResourceAllocation.java | 332 ++++++++++++ .../reservation/ReservationAllocation.java | 104 ++++ .../reservation/ReservationInterval.java | 67 +++ .../exceptions/PlanningException.java | 25 + .../reservation/ReservationSystemTestUtil.java | 210 ++++++++ .../reservation/TestInMemoryPlan.java | 477 +++++++++++++++++ .../TestInMemoryReservationAllocation.java | 206 ++++++++ .../TestRLESparseResourceAllocation.java | 169 +++++++ 15 files changed, 2534 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/YARN-1051-CHANGES.txt ---------------------------------------------------------------------- diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt index a7c08a0..410d974 100644 --- a/YARN-1051-CHANGES.txt +++ b/YARN-1051-CHANGES.txt @@ -5,3 +5,6 @@ YARN-2475. Logic for responding to capacity drops for the ReservationSystem. (Carlo Curino and Subru Krishnan via curino) YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru) + +YARN-1709. In-memory data structures used to track resources over time to +enable reservations. (subru) http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java new file mode 100644 index 0000000..99231c4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -0,0 +1,507 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.UTCClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class InMemoryPlan implements Plan { + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class); + + private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); + + private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations = + new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>(); + + private RLESparseResourceAllocation rleSparseVector; + + private Map<String, RLESparseResourceAllocation> userResourceAlloc = + new HashMap<String, RLESparseResourceAllocation>(); + + private Map<ReservationId, InMemoryReservationAllocation> reservationTable = + new HashMap<ReservationId, InMemoryReservationAllocation>(); + + private final ReentrantReadWriteLock readWriteLock = + new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + private final SharingPolicy policy; + private final ReservationAgent agent; + private final long step; + private final ResourceCalculator resCalc; + private final Resource minAlloc, maxAlloc; + private final String queueName; + private final QueueMetrics queueMetrics; + private final Planner replanner; + private final boolean getMoveOnExpiry; + private final Clock clock; + + private Resource totalCapacity; + + InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, + ReservationAgent agent, Resource totalCapacity, long step, + ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, + String queueName, Planner replanner, boolean getMoveOnExpiry) { + this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc, + maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock()); + } + + InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, + ReservationAgent agent, Resource totalCapacity, long step, + ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, + String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) { + this.queueMetrics = queueMetrics; + this.policy = policy; + this.agent = agent; + this.step = step; + this.totalCapacity = totalCapacity; + this.resCalc = resCalc; + this.minAlloc = minAlloc; + this.maxAlloc = maxAlloc; + this.rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc); + this.queueName = queueName; + this.replanner = replanner; + this.getMoveOnExpiry = getMoveOnExpiry; + this.clock = clock; + } + + @Override + public QueueMetrics getQueueMetrics() { + return queueMetrics; + } + + private void incrementAllocation(ReservationAllocation reservation) { + assert (readWriteLock.isWriteLockedByCurrentThread()); + Map<ReservationInterval, ReservationRequest> allocationRequests = + reservation.getAllocationRequests(); + // check if we have encountered the user earlier and if not add an entry + String user = reservation.getUser(); + RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); + if (resAlloc == null) { + resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc); + userResourceAlloc.put(user, resAlloc); + } + for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests + .entrySet()) { + resAlloc.addInterval(r.getKey(), r.getValue()); + rleSparseVector.addInterval(r.getKey(), r.getValue()); + } + } + + private void decrementAllocation(ReservationAllocation reservation) { + assert (readWriteLock.isWriteLockedByCurrentThread()); + Map<ReservationInterval, ReservationRequest> allocationRequests = + reservation.getAllocationRequests(); + String user = reservation.getUser(); + RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); + for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests + .entrySet()) { + resAlloc.removeInterval(r.getKey(), r.getValue()); + rleSparseVector.removeInterval(r.getKey(), r.getValue()); + } + if (resAlloc.isEmpty()) { + userResourceAlloc.remove(resAlloc); + } + } + + public Set<ReservationAllocation> getAllReservations() { + readLock.lock(); + try { + if (currentReservations != null) { + Set<ReservationAllocation> flattenedReservations = + new HashSet<ReservationAllocation>(); + for (Set<InMemoryReservationAllocation> reservationEntries : currentReservations + .values()) { + flattenedReservations.addAll(reservationEntries); + } + return flattenedReservations; + } else { + return null; + } + } finally { + readLock.unlock(); + } + } + + @Override + public boolean addReservation(ReservationAllocation reservation) + throws PlanningException { + // Verify the allocation is memory based otherwise it is not supported + InMemoryReservationAllocation inMemReservation = + (InMemoryReservationAllocation) reservation; + if (inMemReservation.getUser() == null) { + String errMsg = + "The specified Reservation with ID " + + inMemReservation.getReservationId() + + " is not mapped to any user"; + LOG.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + writeLock.lock(); + try { + if (reservationTable.containsKey(inMemReservation.getReservationId())) { + String errMsg = + "The specified Reservation with ID " + + inMemReservation.getReservationId() + " already exists"; + LOG.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + // Validate if we can accept this reservation, throws exception if + // validation fails + policy.validate(this, inMemReservation); + // we record here the time in which the allocation has been accepted + reservation.setAcceptanceTimestamp(clock.getTime()); + ReservationInterval searchInterval = + new ReservationInterval(inMemReservation.getStartTime(), + inMemReservation.getEndTime()); + Set<InMemoryReservationAllocation> reservations = + currentReservations.get(searchInterval); + if (reservations == null) { + reservations = new HashSet<InMemoryReservationAllocation>(); + } + if (!reservations.add(inMemReservation)) { + LOG.error("Unable to add reservation: {} to plan.", + inMemReservation.getReservationId()); + return false; + } + currentReservations.put(searchInterval, reservations); + reservationTable.put(inMemReservation.getReservationId(), + inMemReservation); + incrementAllocation(inMemReservation); + LOG.info("Sucessfully added reservation: {} to plan.", + inMemReservation.getReservationId()); + return true; + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean updateReservation(ReservationAllocation reservation) + throws PlanningException { + writeLock.lock(); + boolean result = false; + try { + ReservationId resId = reservation.getReservationId(); + ReservationAllocation currReservation = getReservationById(resId); + if (currReservation == null) { + String errMsg = + "The specified Reservation with ID " + resId + + " does not exist in the plan"; + LOG.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + if (!removeReservation(currReservation)) { + LOG.error("Unable to replace reservation: {} from plan.", + reservation.getReservationId()); + return result; + } + try { + result = addReservation(reservation); + } catch (PlanningException e) { + LOG.error("Unable to update reservation: {} from plan due to {}.", + reservation.getReservationId(), e.getMessage()); + } + if (result) { + LOG.info("Sucessfully updated reservation: {} in plan.", + reservation.getReservationId()); + return result; + } else { + // rollback delete + addReservation(currReservation); + LOG.info("Rollbacked update reservation: {} from plan.", + reservation.getReservationId()); + return result; + } + } finally { + writeLock.unlock(); + } + } + + private boolean removeReservation(ReservationAllocation reservation) { + assert (readWriteLock.isWriteLockedByCurrentThread()); + ReservationInterval searchInterval = + new ReservationInterval(reservation.getStartTime(), + reservation.getEndTime()); + Set<InMemoryReservationAllocation> reservations = + currentReservations.get(searchInterval); + if (reservations != null) { + if (!reservations.remove(reservation)) { + LOG.error("Unable to remove reservation: {} from plan.", + reservation.getReservationId()); + return false; + } + if (reservations.isEmpty()) { + currentReservations.remove(searchInterval); + } + } else { + String errMsg = + "The specified Reservation with ID " + reservation.getReservationId() + + " does not exist in the plan"; + LOG.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + reservationTable.remove(reservation.getReservationId()); + decrementAllocation(reservation); + LOG.info("Sucessfully deleted reservation: {} in plan.", + reservation.getReservationId()); + return true; + } + + @Override + public boolean deleteReservation(ReservationId reservationID) { + writeLock.lock(); + try { + ReservationAllocation reservation = getReservationById(reservationID); + if (reservation == null) { + String errMsg = + "The specified Reservation with ID " + reservationID + + " does not exist in the plan"; + LOG.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + return removeReservation(reservation); + } finally { + writeLock.unlock(); + } + } + + @Override + public void archiveCompletedReservations(long tick) { + // Since we are looking for old reservations, read lock is optimal + LOG.debug("Running archival at time: {}", tick); + readLock.lock(); + List<InMemoryReservationAllocation> expiredReservations = + new ArrayList<InMemoryReservationAllocation>(); + // archive reservations and delete the ones which are beyond + // the reservation policy "window" + try { + long archivalTime = tick - policy.getValidWindow(); + ReservationInterval searchInterval = + new ReservationInterval(archivalTime, archivalTime); + SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations = + currentReservations.headMap(searchInterval, true); + if (!reservations.isEmpty()) { + for (Set<InMemoryReservationAllocation> reservationEntries : reservations + .values()) { + for (InMemoryReservationAllocation reservation : reservationEntries) { + if (reservation.getEndTime() <= archivalTime) { + expiredReservations.add(reservation); + } + } + } + } + } finally { + readLock.unlock(); + } + if (expiredReservations.isEmpty()) { + return; + } + // Need write lock only if there are any reservations to be deleted + writeLock.lock(); + try { + for (InMemoryReservationAllocation expiredReservation : expiredReservations) { + removeReservation(expiredReservation); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public Set<ReservationAllocation> getReservationsAtTime(long tick) { + readLock.lock(); + ReservationInterval searchInterval = + new ReservationInterval(tick, Long.MAX_VALUE); + try { + SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations = + currentReservations.headMap(searchInterval, true); + if (!reservations.isEmpty()) { + Set<ReservationAllocation> flattenedReservations = + new HashSet<ReservationAllocation>(); + for (Set<InMemoryReservationAllocation> reservationEntries : reservations + .values()) { + for (InMemoryReservationAllocation reservation : reservationEntries) { + if (reservation.getEndTime() > tick) { + flattenedReservations.add(reservation); + } + } + } + return Collections.unmodifiableSet(flattenedReservations); + } else { + return Collections.emptySet(); + } + } finally { + readLock.unlock(); + } + } + + @Override + public long getStep() { + return step; + } + + @Override + public SharingPolicy getSharingPolicy() { + return policy; + } + + @Override + public ReservationAgent getReservationAgent() { + return agent; + } + + @Override + public Resource getConsumptionForUser(String user, long t) { + readLock.lock(); + try { + RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user); + if (userResAlloc != null) { + return userResAlloc.getCapacityAtTime(t); + } else { + return Resources.clone(ZERO_RESOURCE); + } + } finally { + readLock.unlock(); + } + } + + @Override + public Resource getTotalCommittedResources(long t) { + readLock.lock(); + try { + return rleSparseVector.getCapacityAtTime(t); + } finally { + readLock.unlock(); + } + } + + @Override + public ReservationAllocation getReservationById(ReservationId reservationID) { + if (reservationID == null) { + return null; + } + readLock.lock(); + try { + return reservationTable.get(reservationID); + } finally { + readLock.unlock(); + } + } + + @Override + public Resource getTotalCapacity() { + readLock.lock(); + try { + return Resources.clone(totalCapacity); + } finally { + readLock.unlock(); + } + } + + @Override + public Resource getMinimumAllocation() { + return Resources.clone(minAlloc); + } + + @Override + public void setTotalCapacity(Resource cap) { + writeLock.lock(); + try { + totalCapacity = Resources.clone(cap); + } finally { + writeLock.unlock(); + } + } + + public long getEarliestStartTime() { + readLock.lock(); + try { + return rleSparseVector.getEarliestStartTime(); + } finally { + readLock.unlock(); + } + } + + @Override + public long getLastEndTime() { + readLock.lock(); + try { + return rleSparseVector.getLatestEndTime(); + } finally { + readLock.unlock(); + } + } + + @Override + public ResourceCalculator getResourceCalculator() { + return resCalc; + } + + @Override + public String getQueueName() { + return queueName; + } + + @Override + public Resource getMaximumAllocation() { + return Resources.clone(maxAlloc); + } + + public String toCumulativeString() { + readLock.lock(); + try { + return rleSparseVector.toString(); + } finally { + readLock.unlock(); + } + } + + @Override + public Planner getReplanner() { + return replanner; + } + + @Override + public boolean getMoveOnExpiry() { + return getMoveOnExpiry; + } + + @Override + public String toString() { + readLock.lock(); + try { + StringBuffer planStr = new StringBuffer("In-memory Plan: "); + planStr.append("Parent Queue: ").append(queueName) + .append("Total Capacity: ").append(totalCapacity).append("Step: ") + .append(step); + for (ReservationAllocation reservation : getAllReservations()) { + planStr.append(reservation); + } + return planStr.toString(); + } finally { + readLock.unlock(); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java new file mode 100644 index 0000000..10cc55f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java @@ -0,0 +1,151 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Collections; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * An in memory implementation of a reservation allocation using the + * {@link RLESparseResourceAllocation} + * + */ +class InMemoryReservationAllocation implements ReservationAllocation { + + private final String planName; + private final ReservationId reservationID; + private final String user; + private final ReservationDefinition contract; + private final long startTime; + private final long endTime; + private final Map<ReservationInterval, ReservationRequest> allocationRequests; + private boolean hasGang = false; + private long acceptedAt = -1; + + private RLESparseResourceAllocation resourcesOverTime; + + InMemoryReservationAllocation(ReservationId reservationID, + ReservationDefinition contract, String user, String planName, + long startTime, long endTime, + Map<ReservationInterval, ReservationRequest> allocationRequests, + ResourceCalculator calculator, Resource minAlloc) { + this.contract = contract; + this.startTime = startTime; + this.endTime = endTime; + this.reservationID = reservationID; + this.user = user; + this.allocationRequests = allocationRequests; + this.planName = planName; + resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc); + for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests + .entrySet()) { + resourcesOverTime.addInterval(r.getKey(), r.getValue()); + if (r.getValue().getConcurrency() > 1) { + hasGang = true; + } + } + } + + @Override + public ReservationId getReservationId() { + return reservationID; + } + + @Override + public ReservationDefinition getReservationDefinition() { + return contract; + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public long getEndTime() { + return endTime; + } + + @Override + public Map<ReservationInterval, ReservationRequest> getAllocationRequests() { + return Collections.unmodifiableMap(allocationRequests); + } + + @Override + public String getPlanName() { + return planName; + } + + @Override + public String getUser() { + return user; + } + + @Override + public boolean containsGangs() { + return hasGang; + } + + @Override + public void setAcceptanceTimestamp(long acceptedAt) { + this.acceptedAt = acceptedAt; + } + + @Override + public long getAcceptanceTime() { + return acceptedAt; + } + + @Override + public Resource getResourcesAtTime(long tick) { + if (tick < startTime || tick >= endTime) { + return Resource.newInstance(0, 0); + } + return Resources.clone(resourcesOverTime.getCapacityAtTime(tick)); + } + + @Override + public String toString() { + StringBuilder sBuf = new StringBuilder(); + sBuf.append(getReservationId()).append(" user:").append(getUser()) + .append(" startTime: ").append(getStartTime()).append(" endTime: ") + .append(getEndTime()).append(" alloc:[") + .append(resourcesOverTime.toString()).append("] "); + return sBuf.toString(); + } + + @Override + public int compareTo(ReservationAllocation other) { + // reverse order of acceptance + if (this.getAcceptanceTime() > other.getAcceptanceTime()) { + return -1; + } + if (this.getAcceptanceTime() < other.getAcceptanceTime()) { + return 1; + } + return 0; + } + + @Override + public int hashCode() { + return reservationID.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + InMemoryReservationAllocation other = (InMemoryReservationAllocation) obj; + return this.reservationID.equals(other.getReservationId()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java new file mode 100644 index 0000000..cf2aed7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java @@ -0,0 +1,32 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +/** + * A Plan represents the central data structure of a reservation system that + * maintains the "agenda" for the cluster. In particular, it maintains + * information on how a set of {@link ReservationDefinition} that have been + * previously accepted will be honored. + * + * {@link ReservationDefinition} submitted by the users through the RM public + * APIs are passed to appropriate {@link ReservationAgent}s, which in turn will + * consult the Plan (via the {@link PlanView} interface) and try to determine + * whether there are sufficient resources available in this Plan to satisfy the + * temporal and resource constraints of a {@link ReservationDefinition}. If a + * valid allocation is found the agent will try to store it in the plan (via the + * {@link PlanEdit} interface). Upon success the system return to the user a + * positive acknowledgment, and a reservation identifier to be later used to + * access the reserved resources. + * + * A {@link PlanFollower} will continuously read from the Plan and will + * affect the instantaneous allocation of resources among jobs running by + * publishing the "current" slice of the Plan to the underlying scheduler. I.e., + * the configuration of queues/weights of the scheduler are modified to reflect + * the allocations in the Plan. + * + * As this interface have several methods we decompose them into three groups: + * {@link PlanContext}: containing configuration type information, + * {@link PlanView} read-only access to the plan state, and {@link PlanEdit} + * write access to the plan state. + */ +public interface Plan extends PlanContext, PlanView, PlanEdit { + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java new file mode 100644 index 0000000..40a25a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java @@ -0,0 +1,101 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +/** + * This interface provides read-only access to configuration-type parameter for + * a plan. + * + */ +public interface PlanContext { + + /** + * Returns the configured "step" or granularity of time of the plan in millis. + * + * @return plan step in millis + */ + public long getStep(); + + /** + * Return the {@link ReservationAgent} configured for this plan that is + * responsible for optimally placing various reservation requests + * + * @return the {@link ReservationAgent} configured for this plan + */ + public ReservationAgent getReservationAgent(); + + /** + * Return an instance of a {@link Planner}, which will be invoked in response + * to unexpected reduction in the resources of this plan + * + * @return an instance of a {@link Planner}, which will be invoked in response + * to unexpected reduction in the resources of this plan + */ + public Planner getReplanner(); + + /** + * Return the configured {@link SharingPolicy} that governs the sharing of the + * resources of the plan between its various users + * + * @return the configured {@link SharingPolicy} that governs the sharing of + * the resources of the plan between its various users + */ + public SharingPolicy getSharingPolicy(); + + /** + * Returns the system {@link ResourceCalculator} + * + * @return the system {@link ResourceCalculator} + */ + public ResourceCalculator getResourceCalculator(); + + /** + * Returns the single smallest {@link Resource} allocation that can be + * reserved in this plan + * + * @return the single smallest {@link Resource} allocation that can be + * reserved in this plan + */ + public Resource getMinimumAllocation(); + + /** + * Returns the single largest {@link Resource} allocation that can be reserved + * in this plan + * + * @return the single largest {@link Resource} allocation that can be reserved + * in this plan + */ + public Resource getMaximumAllocation(); + + /** + * Return the name of the queue in the {@link ResourceScheduler} corresponding + * to this plan + * + * @return the name of the queue in the {@link ResourceScheduler} + * corresponding to this plan + */ + public String getQueueName(); + + /** + * Return the {@link QueueMetrics} for the queue in the + * {@link ResourceScheduler} corresponding to this plan + * + * @return the {@link QueueMetrics} for the queue in the + * {@link ResourceScheduler} corresponding to this plan + */ + public QueueMetrics getQueueMetrics(); + + /** + * Instructs the {@link PlanFollower} on what to do for applications + * which are still running when the reservation is expiring (move-to-default + * vs kill) + * + * @return true if remaining applications have to be killed, false if they + * have to migrated + */ + public boolean getMoveOnExpiry(); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java new file mode 100644 index 0000000..648edba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java @@ -0,0 +1,61 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * This interface groups the methods used to modify the state of a Plan. + */ +public interface PlanEdit extends PlanContext, PlanView { + + /** + * Add a new {@link ReservationAllocation} to the plan + * + * @param reservation the {@link ReservationAllocation} to be added to the + * plan + * @return true if addition is successful, false otherwise + */ + public boolean addReservation(ReservationAllocation reservation) + throws PlanningException; + + /** + * Updates an existing {@link ReservationAllocation} in the plan. This is + * required for re-negotiation + * + * @param reservation the {@link ReservationAllocation} to be updated the plan + * @return true if update is successful, false otherwise + */ + public boolean updateReservation(ReservationAllocation reservation) + throws PlanningException; + + /** + * Delete an existing {@link ReservationAllocation} from the plan identified + * uniquely by its {@link ReservationId}. This will generally be used for + * garbage collection + * + * @param reservation the {@link ReservationAllocation} to be deleted from the + * plan identified uniquely by its {@link ReservationId} + * @return true if delete is successful, false otherwise + */ + public boolean deleteReservation(ReservationId reservationID) + throws PlanningException; + + /** + * Method invoked to garbage collect old reservations. It cleans up expired + * reservations that have fallen out of the sliding archival window + * + * @param tick the current time from which the archival window is computed + */ + public void archiveCompletedReservations(long tick) throws PlanningException; + + /** + * Sets the overall capacity in terms of {@link Resource} assigned to this + * plan + * + * @param capacity the overall capacity in terms of {@link Resource} assigned + * to this plan + */ + public void setTotalCapacity(Resource capacity); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java new file mode 100644 index 0000000..6e58dde --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java @@ -0,0 +1,89 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * This interface provides a read-only view on the allocations made in this + * plan. This methods are used for example by {@link ReservationAgent}s to + * determine the free resources in a certain point in time, and by + * PlanFollowerPolicy to publish this plan to the scheduler. + */ +public interface PlanView extends PlanContext { + + /** + * Return a {@link ReservationAllocation} identified by its + * {@link ReservationId} + * + * @param reservationID the unique id to identify the + * {@link ReservationAllocation} + * @return {@link ReservationAllocation} identified by the specified id + */ + public ReservationAllocation getReservationById(ReservationId reservationID); + + /** + * Gets all the active reservations at the specified point of time + * + * @param tick the time (UTC in ms) for which the active reservations are + * requested + * @return set of active reservations at the specified time + */ + public Set<ReservationAllocation> getReservationsAtTime(long tick); + + /** + * Gets all the reservations in the plan + * + * @return set of all reservations handled by this Plan + */ + public Set<ReservationAllocation> getAllReservations(); + + /** + * Returns the total {@link Resource} reserved for all users at the specified + * time + * + * @param tick the time (UTC in ms) for which the reserved resources are + * requested + * @return the total {@link Resource} reserved for all users at the specified + * time + */ + public Resource getTotalCommittedResources(long tick); + + /** + * Returns the total {@link Resource} reserved for a given user at the + * specified time + * + * @param user the user who made the reservation(s) + * @param tick the time (UTC in ms) for which the reserved resources are + * requested + * @return the total {@link Resource} reserved for a given user at the + * specified time + */ + public Resource getConsumptionForUser(String user, long tick); + + /** + * Returns the overall capacity in terms of {@link Resource} assigned to this + * plan (typically will correspond to the absolute capacity of the + * corresponding queue). + * + * @return the overall capacity in terms of {@link Resource} assigned to this + * plan + */ + public Resource getTotalCapacity(); + + /** + * Gets the time (UTC in ms) at which the first reservation starts + * + * @return the time (UTC in ms) at which the first reservation starts + */ + public long getEarliestStartTime(); + + /** + * Returns the time (UTC in ms) at which the last reservation terminates + * + * @return the time (UTC in ms) at which the last reservation terminates + */ + public long getLastEndTime(); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java new file mode 100644 index 0000000..fa8db30 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java @@ -0,0 +1,332 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.gson.stream.JsonWriter; + +/** + * This is a run length encoded sparse data structure that maintains resource + * allocations over time + */ +public class RLESparseResourceAllocation { + + private static final int THRESHOLD = 100; + private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); + + private TreeMap<Long, Resource> cumulativeCapacity = + new TreeMap<Long, Resource>(); + + private final ReentrantReadWriteLock readWriteLock = + new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + private final ResourceCalculator resourceCalculator; + private final Resource minAlloc; + + public RLESparseResourceAllocation(ResourceCalculator resourceCalculator, + Resource minAlloc) { + this.resourceCalculator = resourceCalculator; + this.minAlloc = minAlloc; + } + + private boolean isSameAsPrevious(Long key, Resource capacity) { + Entry<Long, Resource> previous = cumulativeCapacity.lowerEntry(key); + return (previous != null && previous.getValue().equals(capacity)); + } + + private boolean isSameAsNext(Long key, Resource capacity) { + Entry<Long, Resource> next = cumulativeCapacity.higherEntry(key); + return (next != null && next.getValue().equals(capacity)); + } + + /** + * Add a resource for the specified interval + * + * @param reservationInterval the interval for which the resource is to be + * added + * @param capacity the resource to be added + * @return true if addition is successful, false otherwise + */ + public boolean addInterval(ReservationInterval reservationInterval, + ReservationRequest capacity) { + Resource totCap = + Resources.multiply(capacity.getCapability(), + (float) capacity.getNumContainers()); + if (totCap.equals(ZERO_RESOURCE)) { + return true; + } + writeLock.lock(); + try { + long startKey = reservationInterval.getStartTime(); + long endKey = reservationInterval.getEndTime(); + NavigableMap<Long, Resource> ticks = + cumulativeCapacity.headMap(endKey, false); + if (ticks != null && !ticks.isEmpty()) { + Resource updatedCapacity = Resource.newInstance(0, 0); + Entry<Long, Resource> lowEntry = ticks.floorEntry(startKey); + if (lowEntry == null) { + // This is the earliest starting interval + cumulativeCapacity.put(startKey, totCap); + } else { + updatedCapacity = Resources.add(lowEntry.getValue(), totCap); + // Add a new tick only if the updated value is different + // from the previous tick + if ((startKey == lowEntry.getKey()) + && (isSameAsPrevious(lowEntry.getKey(), updatedCapacity))) { + cumulativeCapacity.remove(lowEntry.getKey()); + } else { + cumulativeCapacity.put(startKey, updatedCapacity); + } + } + // Increase all the capacities of overlapping intervals + Set<Entry<Long, Resource>> overlapSet = + ticks.tailMap(startKey, false).entrySet(); + for (Entry<Long, Resource> entry : overlapSet) { + updatedCapacity = Resources.add(entry.getValue(), totCap); + entry.setValue(updatedCapacity); + } + } else { + // This is the first interval to be added + cumulativeCapacity.put(startKey, totCap); + } + Resource nextTick = cumulativeCapacity.get(endKey); + if (nextTick != null) { + // If there is overlap, remove the duplicate entry + if (isSameAsPrevious(endKey, nextTick)) { + cumulativeCapacity.remove(endKey); + } + } else { + // Decrease capacity as this is end of the interval + cumulativeCapacity.put(endKey, Resources.subtract(cumulativeCapacity + .floorEntry(endKey).getValue(), totCap)); + } + return true; + } finally { + writeLock.unlock(); + } + } + + /** + * Add multiple resources for the specified interval + * + * @param reservationInterval the interval for which the resource is to be + * added + * @param ReservationRequests the resources to be added + * @param clusterResource the total resources in the cluster + * @return true if addition is successful, false otherwise + */ + public boolean addCompositeInterval(ReservationInterval reservationInterval, + List<ReservationRequest> ReservationRequests, Resource clusterResource) { + ReservationRequest aggregateReservationRequest = + Records.newRecord(ReservationRequest.class); + Resource capacity = Resource.newInstance(0, 0); + for (ReservationRequest ReservationRequest : ReservationRequests) { + Resources.addTo(capacity, Resources.multiply( + ReservationRequest.getCapability(), + ReservationRequest.getNumContainers())); + } + aggregateReservationRequest.setNumContainers((int) Math.ceil(Resources + .divide(resourceCalculator, clusterResource, capacity, minAlloc))); + aggregateReservationRequest.setCapability(minAlloc); + + return addInterval(reservationInterval, aggregateReservationRequest); + } + + /** + * Removes a resource for the specified interval + * + * @param reservationInterval the interval for which the resource is to be + * removed + * @param capacity the resource to be removed + * @return true if removal is successful, false otherwise + */ + public boolean removeInterval(ReservationInterval reservationInterval, + ReservationRequest capacity) { + Resource totCap = + Resources.multiply(capacity.getCapability(), + (float) capacity.getNumContainers()); + if (totCap.equals(ZERO_RESOURCE)) { + return true; + } + writeLock.lock(); + try { + long startKey = reservationInterval.getStartTime(); + long endKey = reservationInterval.getEndTime(); + // update the start key + NavigableMap<Long, Resource> ticks = + cumulativeCapacity.headMap(endKey, false); + // Decrease all the capacities of overlapping intervals + SortedMap<Long, Resource> overlapSet = ticks.tailMap(startKey); + if (overlapSet != null && !overlapSet.isEmpty()) { + Resource updatedCapacity = Resource.newInstance(0, 0); + long currentKey = -1; + for (Iterator<Entry<Long, Resource>> overlapEntries = + overlapSet.entrySet().iterator(); overlapEntries.hasNext();) { + Entry<Long, Resource> entry = overlapEntries.next(); + currentKey = entry.getKey(); + updatedCapacity = Resources.subtract(entry.getValue(), totCap); + // update each entry between start and end key + cumulativeCapacity.put(currentKey, updatedCapacity); + } + // Remove the first overlap entry if it is same as previous after + // updation + Long firstKey = overlapSet.firstKey(); + if (isSameAsPrevious(firstKey, overlapSet.get(firstKey))) { + cumulativeCapacity.remove(firstKey); + } + // Remove the next entry if it is same as end entry after updation + if ((currentKey != -1) && (isSameAsNext(currentKey, updatedCapacity))) { + cumulativeCapacity.remove(cumulativeCapacity.higherKey(currentKey)); + } + } + return true; + } finally { + writeLock.unlock(); + } + } + + /** + * Returns the capacity, i.e. total resources allocated at the specified point + * of time + * + * @param tick the time (UTC in ms) at which the capacity is requested + * @return the resources allocated at the specified time + */ + public Resource getCapacityAtTime(long tick) { + readLock.lock(); + try { + Entry<Long, Resource> closestStep = cumulativeCapacity.floorEntry(tick); + if (closestStep != null) { + return Resources.clone(closestStep.getValue()); + } + return Resources.clone(ZERO_RESOURCE); + } finally { + readLock.unlock(); + } + } + + /** + * Get the timestamp of the earliest resource allocation + * + * @return the timestamp of the first resource allocation + */ + public long getEarliestStartTime() { + readLock.lock(); + try { + if (cumulativeCapacity.isEmpty()) { + return -1; + } else { + return cumulativeCapacity.firstKey(); + } + } finally { + readLock.unlock(); + } + } + + /** + * Get the timestamp of the latest resource allocation + * + * @return the timestamp of the last resource allocation + */ + public long getLatestEndTime() { + readLock.lock(); + try { + if (cumulativeCapacity.isEmpty()) { + return -1; + } else { + return cumulativeCapacity.lastKey(); + } + } finally { + readLock.unlock(); + } + } + + /** + * Returns true if there are no non-zero entries + * + * @return true if there are no allocations or false otherwise + */ + public boolean isEmpty() { + readLock.lock(); + try { + if (cumulativeCapacity.isEmpty()) { + return true; + } + // Deletion leaves a single zero entry so check for that + if (cumulativeCapacity.size() == 1) { + return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE); + } + return false; + } finally { + readLock.unlock(); + } + } + + @Override + public String toString() { + StringBuilder ret = new StringBuilder(); + readLock.lock(); + try { + if (cumulativeCapacity.size() > THRESHOLD) { + ret.append("Number of steps: ").append(cumulativeCapacity.size()) + .append(" earliest entry: ").append(cumulativeCapacity.firstKey()) + .append(" latest entry: ").append(cumulativeCapacity.lastKey()); + } else { + for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) { + ret.append(r.getKey()).append(": ").append(r.getValue()) + .append("\n "); + } + } + return ret.toString(); + } finally { + readLock.unlock(); + } + } + + /** + * Returns the JSON string representation of the current resources allocated + * over time + * + * @return the JSON string representation of the current resources allocated + * over time + */ + public String toMemJSONString() { + StringWriter json = new StringWriter(); + JsonWriter jsonWriter = new JsonWriter(json); + readLock.lock(); + try { + jsonWriter.beginObject(); + // jsonWriter.name("timestamp").value("resource"); + for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) { + jsonWriter.name(r.getKey().toString()).value(r.getValue().toString()); + } + jsonWriter.endObject(); + jsonWriter.close(); + return json.toString(); + } catch (IOException e) { + // This should not happen + return ""; + } finally { + readLock.unlock(); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java new file mode 100644 index 0000000..bca3aa8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java @@ -0,0 +1,104 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * A ReservationAllocation represents a concrete allocation of resources over + * time that satisfy a certain {@link ReservationDefinition}. This is used + * internally by a {@link Plan} to store information about how each of the + * accepted {@link ReservationDefinition} have been allocated. + */ +public interface ReservationAllocation extends + Comparable<ReservationAllocation> { + + /** + * Returns the unique identifier {@link ReservationId} that represents the + * reservation + * + * @return reservationId the unique identifier {@link ReservationId} that + * represents the reservation + */ + public ReservationId getReservationId(); + + /** + * Returns the original {@link ReservationDefinition} submitted by the client + * + * @return + */ + public ReservationDefinition getReservationDefinition(); + + /** + * Returns the time at which the reservation is activated + * + * @return the time at which the reservation is activated + */ + public long getStartTime(); + + /** + * Returns the time at which the reservation terminates + * + * @return the time at which the reservation terminates + */ + public long getEndTime(); + + /** + * Returns the map of resources requested against the time interval for which + * they were + * + * @return the allocationRequests the map of resources requested against the + * time interval for which they were + */ + public Map<ReservationInterval, ReservationRequest> getAllocationRequests(); + + /** + * Return a string identifying the plan to which the reservation belongs + * + * @return the plan to which the reservation belongs + */ + public String getPlanName(); + + /** + * Returns the user who requested the reservation + * + * @return the user who requested the reservation + */ + public String getUser(); + + /** + * Returns whether the reservation has gang semantics or not + * + * @return true if there is a gang request, false otherwise + */ + public boolean containsGangs(); + + /** + * Sets the time at which the reservation was accepted by the system + * + * @param acceptedAt the time at which the reservation was accepted by the + * system + */ + public void setAcceptanceTimestamp(long acceptedAt); + + /** + * Returns the time at which the reservation was accepted by the system + * + * @return the time at which the reservation was accepted by the system + */ + public long getAcceptanceTime(); + + /** + * Returns the capacity represented by cumulative resources reserved by the + * reservation at the specified point of time + * + * @param tick the time (UTC in ms) for which the reserved resources are + * requested + * @return the resources reserved at the specified time + */ + public Resource getResourcesAtTime(long tick); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java new file mode 100644 index 0000000..d3a6d51 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java @@ -0,0 +1,67 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +/** + * This represents the time duration of the reservation + * + */ +public class ReservationInterval implements Comparable<ReservationInterval> { + + private final long startTime; + + private final long endTime; + + public ReservationInterval(long startTime, long endTime) { + this.startTime = startTime; + this.endTime = endTime; + } + + /** + * Get the start time of the reservation interval + * + * @return the startTime + */ + public long getStartTime() { + return startTime; + } + + /** + * Get the end time of the reservation interval + * + * @return the endTime + */ + public long getEndTime() { + return endTime; + } + + /** + * Returns whether the interval is active at the specified instant of time + * + * @param tick the instance of the time to check + * @return true if active, false otherwise + */ + public boolean isOverlap(long tick) { + return (startTime <= tick && tick <= endTime); + } + + @Override + public int compareTo(ReservationInterval anotherInterval) { + long diff = 0; + if (startTime == anotherInterval.getStartTime()) { + diff = endTime - anotherInterval.getEndTime(); + } else { + diff = startTime - anotherInterval.getStartTime(); + } + if (diff < 0) { + return -1; + } else if (diff > 0) { + return 1; + } else { + return 0; + } + } + + public String toString() { + return "[" + startTime + ", " + endTime + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java new file mode 100644 index 0000000..aa9e9fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java @@ -0,0 +1,25 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions; + +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; + +/** + * Exception thrown by the admission control subsystem when there is a problem + * in trying to find an allocation for a user {@link ReservationSubmissionRequest}. + */ +public class PlanningException extends Exception { + + private static final long serialVersionUID = -684069387367879218L; + + public PlanningException(String message) { + super(message); + } + + public PlanningException(Throwable cause) { + super(cause); + } + + public PlanningException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java new file mode 100644 index 0000000..cbca6dc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -0,0 +1,210 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.junit.Assert; +import org.mockito.Mockito; + +public class ReservationSystemTestUtil { + + private static Random rand = new Random(); + + public final static String reservationQ = "dedicated"; + + public static ReservationId getNewReservationId() { + return ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + } + + public CapacityScheduler mockCapacityScheduler(int numContainers) + throws IOException { + // stolen from TestCapacityScheduler + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + + CapacityScheduler cs = Mockito.spy(new CapacityScheduler()); + cs.setConf(new YarnConfiguration()); + RMContext mockRmContext = + Mockito.spy(new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null)); + cs.setRMContext(mockRmContext); + try { + cs.serviceInit(conf); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + when(mockRmContext.getScheduler()).thenReturn(cs); + Resource r = Resource.newInstance(numContainers * 1024, numContainers); + doReturn(r).when(cs).getClusterResource(); + return cs; + } + + public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + // Define default queue + final String defQ = CapacitySchedulerConfiguration.ROOT + ".default"; + conf.setCapacity(defQ, 10); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { + "default", "a", reservationQ }); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + + final String dedicated = + CapacitySchedulerConfiguration.ROOT + + CapacitySchedulerConfiguration.DOT + reservationQ; + conf.setCapacity(dedicated, 80); + // Set as reservation queue + conf.setReservableQueue(dedicated, true); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + final String A2 = A + ".a2"; + conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setCapacity(A1, 30); + conf.setCapacity(A2, 70); + } + + public String getFullReservationQueueName() { + return CapacitySchedulerConfiguration.ROOT + + CapacitySchedulerConfiguration.DOT + reservationQ; + } + + public String getreservationQueueName() { + return reservationQ; + } + + public void updateQueueConfiguration(CapacitySchedulerConfiguration conf, + String newQ) { + // Define default queue + final String prefix = + CapacitySchedulerConfiguration.ROOT + + CapacitySchedulerConfiguration.DOT; + final String defQ = prefix + "default"; + conf.setCapacity(defQ, 5); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { + "default", "a", reservationQ, newQ }); + + final String A = prefix + "a"; + conf.setCapacity(A, 5); + + final String dedicated = prefix + reservationQ; + conf.setCapacity(dedicated, 80); + // Set as reservation queue + conf.setReservableQueue(dedicated, true); + + conf.setCapacity(prefix + newQ, 10); + // Set as reservation queue + conf.setReservableQueue(prefix + newQ, true); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + final String A2 = A + ".a2"; + conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setCapacity(A1, 30); + conf.setCapacity(A2, 70); + } + + public static ReservationDefinition generateRandomRR(Random rand, long i) { + rand.setSeed(i); + long now = System.currentTimeMillis(); + + // start time at random in the next 12 hours + long arrival = rand.nextInt(12 * 3600 * 1000); + // deadline at random in the next day + long deadline = arrival + rand.nextInt(24 * 3600 * 1000); + + // create a request with a single atomic ask + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(now + arrival); + rr.setDeadline(now + deadline); + + int gang = 1 + rand.nextInt(9); + int par = (rand.nextInt(1000) + 1) * gang; + long dur = rand.nextInt(2 * 3600 * 1000); // random duration within 2h + ReservationRequest r = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), par, + gang, dur); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(Collections.singletonList(r)); + rand.nextInt(3); + ReservationRequestInterpreter[] type = + ReservationRequestInterpreter.values(); + reqs.setInterpreter(type[rand.nextInt(type.length)]); + rr.setReservationRequests(reqs); + + return rr; + + } + + public static ReservationDefinition generateBigRR(Random rand, long i) { + rand.setSeed(i); + long now = System.currentTimeMillis(); + + // start time at random in the next 2 hours + long arrival = rand.nextInt(2 * 3600 * 1000); + // deadline at random in the next day + long deadline = rand.nextInt(24 * 3600 * 1000); + + // create a request with a single atomic ask + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(now + arrival); + rr.setDeadline(now + deadline); + + int gang = 1; + int par = 100000; // 100k tasks + long dur = rand.nextInt(60 * 1000); // 1min tasks + ReservationRequest r = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), par, + gang, dur); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(Collections.singletonList(r)); + rand.nextInt(3); + ReservationRequestInterpreter[] type = + ReservationRequestInterpreter.values(); + reqs.setInterpreter(type[rand.nextInt(type.length)]); + rr.setReservationRequests(reqs); + + return rr; + } + + public static Map<ReservationInterval, ReservationRequest> generateAllocation( + long startTime, long step, int[] alloc) { + Map<ReservationInterval, ReservationRequest> req = + new TreeMap<ReservationInterval, ReservationRequest>(); + for (int i = 0; i < alloc.length; i++) { + req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1) + * step), ReservationRequest.newInstance( + Resource.newInstance(1024, 1), alloc[i])); + } + return req; + } + +}
