http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java index 658387b..3062f3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; -import java.io.IOException; -import java.io.StringWriter; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -33,8 +31,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.gson.stream.JsonWriter; - /** * This is a run length encoded sparse data structure that maintains resource * allocations over time. @@ -44,12 +40,14 @@ public class RLESparseResourceAllocation { private static final int THRESHOLD = 100; private static final Resource ZERO_RESOURCE = Resources.none(); - private NavigableMap<Long, Resource> cumulativeCapacity = + @SuppressWarnings("checkstyle:visibilitymodifier") + protected NavigableMap<Long, Resource> cumulativeCapacity = new TreeMap<Long, Resource>(); private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock readLock = readWriteLock.readLock(); + @SuppressWarnings("checkstyle:visibilitymodifier") + protected final Lock readLock = readWriteLock.readLock(); private final Lock writeLock = readWriteLock.writeLock(); private final ResourceCalculator resourceCalculator; @@ -236,34 +234,6 @@ public class RLESparseResourceAllocation { } /** - * Returns the JSON string representation of the current resources allocated - * over time. - * - * @return the JSON string representation of the current resources allocated - * over time - */ - public String toMemJSONString() { - StringWriter json = new StringWriter(); - JsonWriter jsonWriter = new JsonWriter(json); - readLock.lock(); - try { - jsonWriter.beginObject(); - // jsonWriter.name("timestamp").value("resource"); - for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) { - jsonWriter.name(r.getKey().toString()).value(r.getValue().toString()); - } - jsonWriter.endObject(); - jsonWriter.close(); - return json.toString(); - } catch (IOException e) { - // This should not happen - return ""; - } finally { - readLock.unlock(); - } - } - - /** * Returns the representation of the current resources allocated over time as * an interval map (in the defined non-null range). * @@ -437,8 +407,8 @@ public class RLESparseResourceAllocation { Resource val = Resources.negate(e.getValue()); // test for negative value and throws if (operator == RLEOperator.subtractTestNonNegative - && (Resources.fitsIn(val, ZERO_RESOURCE) && - !Resources.equals(val, ZERO_RESOURCE))) { + && (Resources.fitsIn(val, ZERO_RESOURCE) + && !Resources.equals(val, ZERO_RESOURCE))) { throw new PlanningException( "RLESparseResourceAllocation: merge failed as the " + "resulting RLESparseResourceAllocation would be negative"); @@ -504,22 +474,29 @@ public class RLESparseResourceAllocation { } + /** + * Get a {@link RLESparseResourceAllocation} view of the {@link Resource} + * allocations between the specified start and end times. + * + * @param start the time from which the {@link Resource} allocations are + * required + * @param end the time upto which the {@link Resource} allocations are + * required + * @return the overlapping allocations + */ public RLESparseResourceAllocation getRangeOverlapping(long start, long end) { readLock.lock(); try { NavigableMap<Long, Resource> a = this.getCumulative(); - if (a != null && !a.isEmpty()) { // include the portion of previous entry that overlaps start if (start > a.firstKey()) { long previous = a.floorKey(start); a = a.tailMap(previous, true); } - if (end < a.lastKey()) { a = a.headMap(end, true); } - } RLESparseResourceAllocation ret = new RLESparseResourceAllocation(a, resourceCalculator); @@ -527,7 +504,33 @@ public class RLESparseResourceAllocation { } finally { readLock.unlock(); } + } + /** + * This method shifts all the timestamp of the {@link Resource} entries by the + * specified "delta". + * + * @param delta the time by which to shift the {@link Resource} allocations + */ + public void shift(long delta) { + writeLock.lock(); + try { + TreeMap<Long, Resource> newCum = new TreeMap<>(); + long start; + for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) { + if (delta > 0) { + start = (entry.getKey() == Long.MAX_VALUE) ? Long.MAX_VALUE + : entry.getKey() + delta; + } else { + start = (entry.getKey() == Long.MIN_VALUE) ? Long.MIN_VALUE + : entry.getKey() + delta; + } + newCum.put(start, entry.getValue()); + } + cumulativeCapacity = newCum; + } finally { + writeLock.unlock(); + } } /** @@ -541,8 +544,8 @@ public class RLESparseResourceAllocation { /** * Get the maximum capacity across specified time instances. The search-space * is specified using the starting value, tick, and the periodic interval for - * search. Maximum resource allocation across tick, tick + period, - * tick + 2 * period,..., tick + n * period .. is returned. + * search. Maximum resource allocation across tick, tick + period, tick + 2 * + * period,..., tick + n * period .. is returned. * * @param tick the starting time instance * @param period interval at which capacity is evaluated @@ -550,14 +553,19 @@ public class RLESparseResourceAllocation { */ public Resource getMaximumPeriodicCapacity(long tick, long period) { Resource maxCapacity = ZERO_RESOURCE; - if (!cumulativeCapacity.isEmpty()) { - Long lastKey = cumulativeCapacity.lastKey(); - for (long t = tick; t <= lastKey; t = t + period) { - maxCapacity = Resources.componentwiseMax(maxCapacity, - cumulativeCapacity.floorEntry(t).getValue()); + readLock.lock(); + try { + if (!cumulativeCapacity.isEmpty()) { + Long lastKey = cumulativeCapacity.lastKey(); + for (long t = tick; t <= lastKey; t = t + period) { + maxCapacity = Resources.componentwiseMax(maxCapacity, + cumulativeCapacity.floorEntry(t).getValue()); + } } + return maxCapacity; + } finally { + readLock.unlock(); } - return maxCapacity; } /** @@ -567,17 +575,17 @@ public class RLESparseResourceAllocation { * @return minimum resource allocation */ public Resource getMinimumCapacityInInterval(ReservationInterval interval) { - Resource minCapacity = Resource.newInstance( - Integer.MAX_VALUE, Integer.MAX_VALUE); + Resource minCapacity = + Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE); long start = interval.getStartTime(); long end = interval.getEndTime(); NavigableMap<Long, Resource> capacityRange = - this.getRangeOverlapping(start, end).getCumulative(); + getRangeOverlapping(start, end).getCumulative(); if (!capacityRange.isEmpty()) { for (Map.Entry<Long, Resource> entry : capacityRange.entrySet()) { if (entry.getValue() != null) { - minCapacity = Resources.componentwiseMin(minCapacity, - entry.getValue()); + minCapacity = + Resources.componentwiseMin(minCapacity, entry.getValue()); } } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java index 0da95ac..bb4a7fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java @@ -24,14 +24,16 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import com.google.common.annotations.VisibleForTesting; + /** * A ReservationAllocation represents a concrete allocation of resources over * time that satisfy a certain {@link ReservationDefinition}. This is used * internally by a {@link Plan} to store information about how each of the * accepted {@link ReservationDefinition} have been allocated. */ -public interface ReservationAllocation extends - Comparable<ReservationAllocation> { +public interface ReservationAllocation + extends Comparable<ReservationAllocation> { /** * Returns the unique identifier {@link ReservationId} that represents the @@ -40,28 +42,28 @@ public interface ReservationAllocation extends * @return reservationId the unique identifier {@link ReservationId} that * represents the reservation */ - public ReservationId getReservationId(); + ReservationId getReservationId(); /** * Returns the original {@link ReservationDefinition} submitted by the client * * @return the {@link ReservationDefinition} submitted by the client */ - public ReservationDefinition getReservationDefinition(); + ReservationDefinition getReservationDefinition(); /** * Returns the time at which the reservation is activated. * * @return the time at which the reservation is activated */ - public long getStartTime(); + long getStartTime(); /** * Returns the time at which the reservation terminates. * * @return the time at which the reservation terminates */ - public long getEndTime(); + long getEndTime(); /** * Returns the map of resources requested against the time interval for which @@ -70,28 +72,28 @@ public interface ReservationAllocation extends * @return the allocationRequests the map of resources requested against the * time interval for which they were */ - public Map<ReservationInterval, Resource> getAllocationRequests(); + Map<ReservationInterval, Resource> getAllocationRequests(); /** * Return a string identifying the plan to which the reservation belongs * * @return the plan to which the reservation belongs */ - public String getPlanName(); + String getPlanName(); /** * Returns the user who requested the reservation * * @return the user who requested the reservation */ - public String getUser(); + String getUser(); /** * Returns whether the reservation has gang semantics or not * * @return true if there is a gang request, false otherwise */ - public boolean containsGangs(); + boolean containsGangs(); /** * Sets the time at which the reservation was accepted by the system @@ -99,14 +101,14 @@ public interface ReservationAllocation extends * @param acceptedAt the time at which the reservation was accepted by the * system */ - public void setAcceptanceTimestamp(long acceptedAt); + void setAcceptanceTimestamp(long acceptedAt); /** * Returns the time at which the reservation was accepted by the system * * @return the time at which the reservation was accepted by the system */ - public long getAcceptanceTime(); + long getAcceptanceTime(); /** * Returns the capacity represented by cumulative resources reserved by the @@ -116,12 +118,42 @@ public interface ReservationAllocation extends * requested * @return the resources reserved at the specified time */ - public Resource getResourcesAtTime(long tick); + Resource getResourcesAtTime(long tick); + + /** + * Return a RLE representation of used resources. + * + * @return a RLE encoding of resources allocated over time. + */ + RLESparseResourceAllocation getResourcesOverTime(); + /** * Return a RLE representation of used resources. + * + * @param start start of the time interval. + * @param end end of the time interval. * @return a RLE encoding of resources allocated over time. */ - public RLESparseResourceAllocation getResourcesOverTime(); + RLESparseResourceAllocation getResourcesOverTime(long start, long end); + + /** + * Get the periodicity of this reservation representing the time period of the + * periodic job. Period is represented in milliseconds for periodic jobs. + * Period is 0 for non-periodic jobs. + * + * @return periodicity of this reservation + */ + long getPeriodicity(); + + /** + * Set the periodicity of this reservation representing the time period of the + * periodic job. Period is represented in milliseconds for periodic jobs. + * Period is 0 for non-periodic jobs. + * + * @param period periodicity of this reservation + */ + @VisibleForTesting + void setPeriodicity(long period); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java index 027d066..a66d222 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java @@ -44,6 +44,8 @@ public class ReservationInputValidator { /** * Utility class to validate reservation requests. + * + * @param clock the {@link Clock} to use */ public ReservationInputValidator(Clock clock) { this.clock = clock; @@ -53,22 +55,21 @@ public class ReservationInputValidator { ReservationId reservationId, String auditConstant) throws YarnException { // check if the reservation id is valid if (reservationId == null) { - String message = - "Missing reservation id." - + " Please try again by specifying a reservation id."; + String message = "Missing reservation id." + + " Please try again by specifying a reservation id."; RMAuditLogger.logFailure("UNKNOWN", auditConstant, "validate reservation input", "ClientRMService", message); throw RPCUtil.getRemoteException(message); } String queue = reservationSystem.getQueueForReservation(reservationId); String nullQueueErrorMessage = - "The specified reservation with ID: " + reservationId - + " is unknown. Please try again with a valid reservation."; + "The specified reservation with ID: " + reservationId + + " is unknown. Please try again with a valid reservation."; String nullPlanErrorMessage = "The specified reservation: " + reservationId - + " is not associated with any valid plan." - + " Please try again with a valid reservation."; + + " is not associated with any valid plan." + + " Please try again with a valid reservation."; return getPlanFromQueue(reservationSystem, queue, auditConstant, - nullQueueErrorMessage, nullPlanErrorMessage); + nullQueueErrorMessage, nullPlanErrorMessage); } private void validateReservationDefinition(ReservationId reservationId, @@ -77,17 +78,15 @@ public class ReservationInputValidator { String message = ""; // check if deadline is in the past if (contract == null) { - message = - "Missing reservation definition." - + " Please try again by specifying a reservation definition."; + message = "Missing reservation definition." + + " Please try again by specifying a reservation definition."; RMAuditLogger.logFailure("UNKNOWN", auditConstant, "validate reservation input definition", "ClientRMService", message); throw RPCUtil.getRemoteException(message); } if (contract.getDeadline() <= clock.getTime()) { - message = - "The specified deadline: " + contract.getDeadline() - + " is the past. Please try again with deadline in the future."; + message = "The specified deadline: " + contract.getDeadline() + + " is the past. Please try again with deadline in the future."; RMAuditLogger.logFailure("UNKNOWN", auditConstant, "validate reservation input definition", "ClientRMService", message); throw RPCUtil.getRemoteException(message); @@ -95,18 +94,16 @@ public class ReservationInputValidator { // Check if at least one RR has been specified ReservationRequests resReqs = contract.getReservationRequests(); if (resReqs == null) { - message = - "No resources have been specified to reserve." - + "Please try again by specifying the resources to reserve."; + message = "No resources have been specified to reserve." + + "Please try again by specifying the resources to reserve."; RMAuditLogger.logFailure("UNKNOWN", auditConstant, "validate reservation input definition", "ClientRMService", message); throw RPCUtil.getRemoteException(message); } List<ReservationRequest> resReq = resReqs.getReservationResources(); if (resReq == null || resReq.isEmpty()) { - message = - "No resources have been specified to reserve." - + " Please try again by specifying the resources to reserve."; + message = "No resources have been specified to reserve." + + " Please try again by specifying the resources to reserve."; RMAuditLogger.logFailure("UNKNOWN", auditConstant, "validate reservation input definition", "ClientRMService", message); throw RPCUtil.getRemoteException(message); @@ -123,22 +120,18 @@ public class ReservationInputValidator { } else { minDuration += rr.getDuration(); } - maxGangSize = - Resources.max(plan.getResourceCalculator(), plan.getTotalCapacity(), - maxGangSize, - Resources.multiply(rr.getCapability(), rr.getConcurrency())); + maxGangSize = Resources.max(plan.getResourceCalculator(), + plan.getTotalCapacity(), maxGangSize, + Resources.multiply(rr.getCapability(), rr.getConcurrency())); } // verify the allocation is possible (skip for ANY) long duration = contract.getDeadline() - contract.getArrival(); - if (duration < minDuration - && type != ReservationRequestInterpreter.R_ANY) { - message = - "The time difference (" - + (duration) - + ") between arrival (" + contract.getArrival() + ") " - + "and deadline (" + contract.getDeadline() + ") must " - + " be greater or equal to the minimum resource duration (" - + minDuration + ")"; + if (duration < minDuration && type != ReservationRequestInterpreter.R_ANY) { + message = "The time difference (" + (duration) + ") between arrival (" + + contract.getArrival() + ") " + "and deadline (" + + contract.getDeadline() + ") must " + + " be greater or equal to the minimum resource duration (" + + minDuration + ")"; RMAuditLogger.logFailure("UNKNOWN", auditConstant, "validate reservation input definition", "ClientRMService", message); throw RPCUtil.getRemoteException(message); @@ -148,10 +141,9 @@ public class ReservationInputValidator { if (Resources.greaterThan(plan.getResourceCalculator(), plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity()) && type != ReservationRequestInterpreter.R_ANY) { - message = - "The size of the largest gang in the reservation definition (" - + maxGangSize + ") exceed the capacity available (" - + plan.getTotalCapacity() + " )"; + message = "The size of the largest gang in the reservation definition (" + + maxGangSize + ") exceed the capacity available (" + + plan.getTotalCapacity() + " )"; RMAuditLogger.logFailure("UNKNOWN", auditConstant, "validate reservation input definition", "ClientRMService", message); throw RPCUtil.getRemoteException(message); @@ -179,32 +171,32 @@ public class ReservationInputValidator { } } - private Plan getPlanFromQueue(ReservationSystem reservationSystem, String - queue, String auditConstant) throws YarnException { + private Plan getPlanFromQueue(ReservationSystem reservationSystem, + String queue, String auditConstant) throws YarnException { String nullQueueErrorMessage = "The queue is not specified." - + " Please try again with a valid reservable queue."; + + " Please try again with a valid reservable queue."; String nullPlanErrorMessage = "The specified queue: " + queue - + " is not managed by reservation system." - + " Please try again with a valid reservable queue."; + + " is not managed by reservation system." + + " Please try again with a valid reservable queue."; return getPlanFromQueue(reservationSystem, queue, auditConstant, - nullQueueErrorMessage, nullPlanErrorMessage); + nullQueueErrorMessage, nullPlanErrorMessage); } - private Plan getPlanFromQueue(ReservationSystem reservationSystem, String - queue, String auditConstant, String nullQueueErrorMessage, - String nullPlanErrorMessage) throws YarnException { + private Plan getPlanFromQueue(ReservationSystem reservationSystem, + String queue, String auditConstant, String nullQueueErrorMessage, + String nullPlanErrorMessage) throws YarnException { if (queue == null || queue.isEmpty()) { RMAuditLogger.logFailure("UNKNOWN", auditConstant, - "validate reservation input", "ClientRMService", - nullQueueErrorMessage); + "validate reservation input", "ClientRMService", + nullQueueErrorMessage); throw RPCUtil.getRemoteException(nullQueueErrorMessage); } // check if the associated plan is valid Plan plan = reservationSystem.getPlan(queue); if (plan == null) { RMAuditLogger.logFailure("UNKNOWN", auditConstant, - "validate reservation input", "ClientRMService", - nullPlanErrorMessage); + "validate reservation input", "ClientRMService", + nullPlanErrorMessage); throw RPCUtil.getRemoteException(nullPlanErrorMessage); } return plan; @@ -222,22 +214,21 @@ public class ReservationInputValidator { * @param reservationId the {@link ReservationId} associated with the current * request * @return the {@link Plan} to submit the request to - * @throws YarnException + * @throws YarnException if validation fails */ public Plan validateReservationSubmissionRequest( - ReservationSystem reservationSystem, - ReservationSubmissionRequest request, ReservationId reservationId) - throws YarnException { + ReservationSystem reservationSystem, ReservationSubmissionRequest request, + ReservationId reservationId) throws YarnException { String message; if (reservationId == null) { - message = "Reservation id cannot be null. Please try again " + - "specifying a valid reservation id by creating a new reservation id."; + message = "Reservation id cannot be null. Please try again specifying " + + " a valid reservation id by creating a new reservation id."; throw RPCUtil.getRemoteException(message); } // Check if it is a managed queue String queue = request.getQueue(); Plan plan = getPlanFromQueue(reservationSystem, queue, - AuditConstants.SUBMIT_RESERVATION_REQUEST); + AuditConstants.SUBMIT_RESERVATION_REQUEST); validateReservationDefinition(reservationId, request.getReservationDefinition(), plan, @@ -255,15 +246,14 @@ public class ReservationInputValidator { * @param request the {@link ReservationUpdateRequest} defining the resources * required over time for the request * @return the {@link Plan} to submit the request to - * @throws YarnException + * @throws YarnException if validation fails */ public Plan validateReservationUpdateRequest( ReservationSystem reservationSystem, ReservationUpdateRequest request) throws YarnException { ReservationId reservationId = request.getReservationId(); - Plan plan = - validateReservation(reservationSystem, reservationId, - AuditConstants.UPDATE_RESERVATION_REQUEST); + Plan plan = validateReservation(reservationSystem, reservationId, + AuditConstants.UPDATE_RESERVATION_REQUEST); validateReservationDefinition(reservationId, request.getReservationDefinition(), plan, AuditConstants.UPDATE_RESERVATION_REQUEST); @@ -278,28 +268,26 @@ public class ReservationInputValidator { * * @param reservationSystem the {@link ReservationSystem} to validate against * @param request the {@link ReservationListRequest} defining search - * parameters for reservations in the {@link ReservationSystem} - * that is being validated against. + * parameters for reservations in the {@link ReservationSystem} that + * is being validated against. * @return the {@link Plan} to list reservations of. - * @throws YarnException + * @throws YarnException if validation fails */ public Plan validateReservationListRequest( - ReservationSystem reservationSystem, - ReservationListRequest request) + ReservationSystem reservationSystem, ReservationListRequest request) throws YarnException { String queue = request.getQueue(); if (request.getEndTime() < request.getStartTime()) { - String errorMessage = "The specified end time must be greater than " + - "the specified start time."; + String errorMessage = "The specified end time must be greater than " + + "the specified start time."; RMAuditLogger.logFailure("UNKNOWN", - AuditConstants.LIST_RESERVATION_REQUEST, - "validate list reservation input", "ClientRMService", - errorMessage); + AuditConstants.LIST_RESERVATION_REQUEST, + "validate list reservation input", "ClientRMService", errorMessage); throw RPCUtil.getRemoteException(errorMessage); } // Check if it is a managed queue return getPlanFromQueue(reservationSystem, queue, - AuditConstants.LIST_RESERVATION_REQUEST); + AuditConstants.LIST_RESERVATION_REQUEST); } /** @@ -312,7 +300,7 @@ public class ReservationInputValidator { * @param request the {@link ReservationDeleteRequest} defining the resources * required over time for the request * @return the {@link Plan} to submit the request to - * @throws YarnException + * @throws YarnException if validation fails */ public Plan validateReservationDeleteRequest( ReservationSystem reservationSystem, ReservationDeleteRequest request) http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java index 8b62972..a6c8fcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import java.util.Map; + import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -29,8 +31,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager; -import java.util.Map; - /** * This interface is the one implemented by any system that wants to support * Reservations i.e. make {@code Resource} allocations in future. Implementors @@ -57,7 +57,7 @@ public interface ReservationSystem extends Recoverable { * * @param conf configuration * @param rmContext current context of the {@code ResourceManager} - * @throws YarnException + * @throws YarnException if initialization of the configured plan fails */ void reinitialize(Configuration conf, RMContext rmContext) throws YarnException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java index e458055..cbf0f38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java @@ -38,7 +38,7 @@ public interface SharingPolicy { * @param planQueuePath the name of the queue for this plan * @param conf the system configuration */ - public void init(String planQueuePath, ReservationSchedulerConfiguration conf); + void init(String planQueuePath, ReservationSchedulerConfiguration conf); /** * This method runs the policy validation logic, and return true/false on @@ -51,7 +51,7 @@ public interface SharingPolicy { * @throws PlanningException if the policy is respected if we add this * {@link ReservationAllocation} to the {@link Plan} */ - public void validate(Plan plan, ReservationAllocation newAllocation) + void validate(Plan plan, ReservationAllocation newAllocation) throws PlanningException; /** @@ -68,9 +68,13 @@ public interface SharingPolicy { * @param start the start time for the range we are querying * @param end the end time for the range we are querying * @param oldId (optional) the id of a reservation being updated + * + * @return the available resources expressed as a + * {@link RLESparseResourceAllocation} + * * @throws PlanningException throws if the request is not valid */ - public RLESparseResourceAllocation availableResources( + RLESparseResourceAllocation availableResources( RLESparseResourceAllocation available, Plan plan, String user, ReservationId oldId, long start, long end) throws PlanningException; @@ -82,7 +86,6 @@ public interface SharingPolicy { * * @return validWindow the window of validity considered by the policy. */ - public long getValidWindow(); - + long getValidWindow(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java index abac6ac..af0e712 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java @@ -34,7 +34,7 @@ public interface Planner { * * @param plan the {@link Plan} to replan * @param contracts the list of reservation requests - * @throws PlanningException + * @throws PlanningException if operation is unsuccessful */ public void plan(Plan plan, List<ReservationDefinition> contracts) throws PlanningException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java index 199bfa5..bbbf0d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java @@ -50,7 +50,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent { * @return whether the allocateUser function was successful or not * * @throws PlanningException if the session cannot be fitted into the plan - * @throws ContractValidationException + * @throws ContractValidationException if validation fails */ protected boolean allocateUser(ReservationId reservationId, String user, Plan plan, ReservationDefinition contract, http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java index ec6d9c0..8934b0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java @@ -50,7 +50,7 @@ public interface StageAllocator { * * @return The computed allocation (or null if the stage could not be * allocated) - * @throws PlanningException + * @throws PlanningException if operation is unsuccessful */ Map<ReservationInterval, Resource> computeStageAllocation(Plan plan, RLESparseResourceAllocation planLoads, http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java index da04336..d107487 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java @@ -69,7 +69,7 @@ public class StageAllocatorGreedy implements StageAllocator { RLESparseResourceAllocation netAvailable = plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart, - stageDeadline); + stageDeadline, 0); netAvailable = RLESparseResourceAllocation.merge(plan.getResourceCalculator(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java index ec83e02..ae7d91a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java @@ -83,9 +83,8 @@ public class StageAllocatorGreedyRLE implements StageAllocator { int gangsToPlace = rr.getNumContainers() / rr.getConcurrency(); // get available resources from plan - RLESparseResourceAllocation netRLERes = - plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart, - stageDeadline); + RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime( + user, oldId, stageEarliestStart, stageDeadline, 0); // remove plan modifications netRLERes = http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java index e45f58c..c014549 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java @@ -77,8 +77,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator { ResourceCalculator resCalc = plan.getResourceCalculator(); Resource capacity = plan.getTotalCapacity(); - RLESparseResourceAllocation netRLERes = plan - .getAvailableResourceOverTime(user, oldId, stageArrival, stageDeadline); + RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime( + user, oldId, stageArrival, stageDeadline, 0); long step = plan.getStep(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index e99842e..5337e06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -19,7 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anySetOf; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.FileWriter; import java.io.IOException; @@ -76,7 +79,8 @@ public class ReservationSystemTestUtil { String reservationQ, long timeWindow, float instConstraint, float avgConstraint) { - ReservationSchedulerConfiguration realConf = new CapacitySchedulerConfiguration(); + ReservationSchedulerConfiguration realConf = + new CapacitySchedulerConfiguration(); ReservationSchedulerConfiguration conf = spy(realConf); when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow); when(conf.getInstantaneousMaxCapacity(reservationQ)) @@ -168,7 +172,6 @@ public class ReservationSystemTestUtil { scheduler.start(); scheduler.reinitialize(conf, rmContext); - Resource resource = ReservationSystemTestUtil.calculateClusterResource(numContainers); RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1"); @@ -184,10 +187,16 @@ public class ReservationSystemTestUtil { public static ReservationDefinition createSimpleReservationDefinition( long arrival, long deadline, long duration, int parallelism) { + return createSimpleReservationDefinition(arrival, deadline, duration, + parallelism, null); + } + + public static ReservationDefinition createSimpleReservationDefinition( + long arrival, long deadline, long duration, int parallelism, + String recurrenceExpression) { // create a request with a single atomic ask - ReservationRequest r = - ReservationRequest.newInstance(Resource.newInstance(1024, 1), - parallelism, parallelism, duration); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), parallelism, parallelism, duration); ReservationDefinition rDef = new ReservationDefinitionPBImpl(); ReservationRequests reqs = new ReservationRequestsPBImpl(); reqs.setReservationResources(Collections.singletonList(r)); @@ -195,32 +204,31 @@ public class ReservationSystemTestUtil { rDef.setReservationRequests(reqs); rDef.setArrival(arrival); rDef.setDeadline(deadline); + if (recurrenceExpression != null) { + rDef.setRecurrenceExpression(recurrenceExpression); + } return rDef; } public static ReservationSubmissionRequest createSimpleReservationRequest( ReservationId reservationId, int numContainers, long arrival, long deadline, long duration) { - return createSimpleReservationRequest(reservationId, numContainers, - arrival, deadline, duration, Priority.UNDEFINED); + return createSimpleReservationRequest(reservationId, numContainers, arrival, + deadline, duration, Priority.UNDEFINED); } public static ReservationSubmissionRequest createSimpleReservationRequest( ReservationId reservationId, int numContainers, long arrival, long deadline, long duration, Priority priority) { // create a request with a single atomic ask - ReservationRequest r = - ReservationRequest.newInstance(Resource.newInstance(1024, 1), - numContainers, 1, duration); - ReservationRequests reqs = - ReservationRequests.newInstance(Collections.singletonList(r), - ReservationRequestInterpreter.R_ALL); - ReservationDefinition rDef = - ReservationDefinition.newInstance(arrival, deadline, reqs, - "testClientRMService#reservation", "0", priority); - ReservationSubmissionRequest request = - ReservationSubmissionRequest.newInstance(rDef, - reservationQ, reservationId); + ReservationRequest r = ReservationRequest + .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration); + ReservationRequests reqs = ReservationRequests.newInstance( + Collections.singletonList(r), ReservationRequestInterpreter.R_ALL); + ReservationDefinition rDef = ReservationDefinition.newInstance(arrival, + deadline, reqs, "testClientRMService#reservation", "0", priority); + ReservationSubmissionRequest request = ReservationSubmissionRequest + .newInstance(rDef, reservationQ, reservationId); return request; } @@ -252,9 +260,9 @@ public class ReservationSystemTestUtil { return cs; } - @SuppressWarnings("rawtypes") public static void initializeRMContext( - int numContainers, AbstractYarnScheduler scheduler, - RMContext mockRMContext) { + @SuppressWarnings("rawtypes") + public static void initializeRMContext(int numContainers, + AbstractYarnScheduler scheduler, RMContext mockRMContext) { when(mockRMContext.getScheduler()).thenReturn(scheduler); Resource r = calculateClusterResource(numContainers); @@ -262,26 +270,25 @@ public class ReservationSystemTestUtil { } public static RMContext createRMContext(Configuration conf) { - RMContext mockRmContext = Mockito.spy( - new RMContextImpl(null, null, null, null, null, null, - new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); + RMContext mockRmContext = Mockito.spy(new RMContextImpl(null, null, null, + null, null, null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null)); RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); when(nlm.getQueueResource(any(String.class), anySetOf(String.class), - any(Resource.class))).thenAnswer(new Answer<Resource>() { - @Override public Resource answer(InvocationOnMock invocation) - throws Throwable { - Object[] args = invocation.getArguments(); - return (Resource) args[2]; - } - }); + any(Resource.class))).thenAnswer(new Answer<Resource>() { + @Override + public Resource answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return (Resource) args[2]; + } + }); when(nlm.getResourceByLabel(any(String.class), any(Resource.class))) .thenAnswer(new Answer<Resource>() { - @Override public Resource answer(InvocationOnMock invocation) - throws Throwable { + @Override + public Resource answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); return (Resource) args[1]; } @@ -304,9 +311,8 @@ public class ReservationSystemTestUtil { final String A = CapacitySchedulerConfiguration.ROOT + ".a"; conf.setCapacity(A, 10); - final String dedicated = - CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT - + reservationQ; + final String dedicated = CapacitySchedulerConfiguration.ROOT + + CapacitySchedulerConfiguration.DOT + reservationQ; conf.setCapacity(dedicated, 80); // Set as reservation queue conf.setReservable(dedicated, true); @@ -405,26 +411,55 @@ public class ReservationSystemTestUtil { public static Map<ReservationInterval, Resource> generateAllocation( long startTime, long step, int[] alloc) { + return generateAllocation(startTime, step, alloc, null); + } + + public static Map<ReservationInterval, Resource> generateAllocation( + long startTime, long step, int[] alloc, String recurrenceExpression) { Map<ReservationInterval, Resource> req = new TreeMap<>(); - for (int i = 0; i < alloc.length; i++) { - req.put(new ReservationInterval(startTime + i * step, - startTime + (i + 1) * step), ReservationSystemUtil.toResource( - ReservationRequest - .newInstance(Resource.newInstance(1024, 1), alloc[i]))); + + long period = 0; + if (recurrenceExpression != null) { + period = Long.parseLong(recurrenceExpression); + } + + long rStart; + long rEnd; + for (int j = 0; j < 86400000; j += period) { + for (int i = 0; i < alloc.length; i++) { + rStart = (startTime + i * step) + j * period; + rEnd = (startTime + (i + 1) * step) + j * period; + if (period > 0) { + rStart = rStart % period + j * period; + rEnd = rEnd % period + j * period; + if (rStart > rEnd) { + // skip wrap-around entry + continue; + } + } + + req.put(new ReservationInterval(rStart, rEnd), + ReservationSystemUtil.toResource(ReservationRequest + .newInstance(Resource.newInstance(1024, 1), alloc[i]))); + + } + // execute only once if non-periodic + if (period == 0) { + break; + } } return req; } - public static RLESparseResourceAllocation - generateRLESparseResourceAllocation(int[] alloc, long[] timeSteps) { + public static RLESparseResourceAllocation generateRLESparseResourceAllocation( + int[] alloc, long[] timeSteps) { TreeMap<Long, Resource> allocationsMap = new TreeMap<>(); for (int i = 0; i < alloc.length; i++) { allocationsMap.put(timeSteps[i], Resource.newInstance(alloc[i], alloc[i])); } - RLESparseResourceAllocation rleVector = - new RLESparseResourceAllocation(allocationsMap, - new DefaultResourceCalculator()); + RLESparseResourceAllocation rleVector = new RLESparseResourceAllocation( + allocationsMap, new DefaultResourceCalculator()); return rleVector; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org