http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceAllocationRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceAllocationRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceAllocationRequestPBImpl.java new file mode 100644 index 0000000..737ae44 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceAllocationRequestPBImpl.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ResourceAllocationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceAllocationRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceAllocationRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; + +/** + * {@code ResourceAllocationRequestPBImpl} which implements the + * {@link ResourceAllocationRequest} class which represents an allocation + * made for a reservation for the current state of the plan. This can be + * changed for reasons such as re-planning, but will always be subject to the + * constraints of the user contract as described by a + * {@code ReservationDefinition} + * {@link Resource} + * + * <p> + * It includes: + * <ul> + * <li>StartTime of the allocation.</li> + * <li>EndTime of the allocation.</li> + * <li>{@link Resource} reserved for the allocation.</li> + * </ul> + * + * @see Resource + */ +@Private +@Unstable +public class ResourceAllocationRequestPBImpl extends + ResourceAllocationRequest { + private ResourceAllocationRequestProto proto = + ResourceAllocationRequestProto.getDefaultInstance(); + private ResourceAllocationRequestProto.Builder builder = null; + private boolean viaProto = false; + + private Resource capability = null; + + public ResourceAllocationRequestPBImpl() { + builder = ResourceAllocationRequestProto.newBuilder(); + } + + public ResourceAllocationRequestPBImpl( + ResourceAllocationRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public ResourceAllocationRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ResourceAllocationRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public Resource getCapability() { + ResourceAllocationRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.capability != null) { + return this.capability; + } + if (!p.hasResource()) { + return null; + } + this.capability = convertFromProtoFormat(p.getResource()); + return this.capability; + } + + @Override + public void setCapability(Resource newCapability) { + maybeInitBuilder(); + if (newCapability == null) { + builder.clearResource(); + return; + } + capability = newCapability; + } + + @Override + public long getStartTime() { + ResourceAllocationRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasStartTime()) { + return 0; + } + return (p.getStartTime()); + } + + @Override + public void setStartTime(long startTime) { + maybeInitBuilder(); + if (startTime <= 0) { + builder.clearStartTime(); + return; + } + builder.setStartTime(startTime); + } + + @Override + public long getEndTime() { + ResourceAllocationRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasEndTime()) { + return 0; + } + return (p.getEndTime()); + } + + @Override + public void setEndTime(long endTime) { + maybeInitBuilder(); + if (endTime <= 0) { + builder.clearEndTime(); + return; + } + builder.setEndTime(endTime); + } + + private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { + return new ResourcePBImpl(p); + } + + private ResourceProto convertToProtoFormat(Resource p) { + return ((ResourcePBImpl)p).getProto(); + } + + private void mergeLocalToBuilder() { + if (this.capability != null) { + builder.setResource(convertToProtoFormat(this.capability)); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + @Override + public String toString() { + return "{Resource: " + getCapability() + ", # Start Time: " + + getStartTime() + ", End Time: " + getEndTime() + "}"; + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index 479697e..07b06fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -94,6 +94,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRe import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationListRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationListResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl; @@ -135,10 +137,12 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueStatistics; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.ReservationAllocationState; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.ResourceAllocationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceOption; @@ -282,6 +286,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterR import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationDeleteRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationDeleteResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationListRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationListResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto; @@ -500,6 +506,8 @@ public class TestPBImplRecords { generateByNewInstance(ReservationRequest.class); generateByNewInstance(ReservationRequests.class); generateByNewInstance(ReservationDefinition.class); + generateByNewInstance(ResourceAllocationRequest.class); + generateByNewInstance(ReservationAllocationState.class); generateByNewInstance(ResourceUtilization.class); generateByNewInstance(AMBlackListingRequest.class); } @@ -1233,7 +1241,19 @@ public class TestPBImplRecords { validatePBImplRecord(ReservationDeleteResponsePBImpl.class, ReservationDeleteResponseProto.class); } - + + @Test + public void testReservationListRequestPBImpl() throws Exception { + validatePBImplRecord(ReservationListRequestPBImpl.class, + ReservationListRequestProto.class); + } + + @Test + public void testReservationListResponsePBImpl() throws Exception { + validatePBImplRecord(ReservationListResponsePBImpl.class, + ReservationListResponseProto.class); + } + @Test public void testAddToClusterNodeLabelsRequestPBImpl() throws Exception { validatePBImplRecord(AddToClusterNodeLabelsRequestPBImpl.class, http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index 9dd245d..c512f8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -24,14 +24,11 @@ import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; - import com.google.common.base.Strings; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.api.ApplicationClientProtocol; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; @@ -80,6 +77,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; @@ -90,6 +89,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; @@ -432,6 +433,13 @@ public class MockResourceManagerFacade implements } @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override public ReservationUpdateResponse updateReservation( ReservationUpdateRequest request) throws YarnException, IOException { throw new NotImplementedException(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index a730fa3..630c804 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -92,6 +92,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; @@ -115,6 +117,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.ReservationAllocationState; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -133,8 +136,11 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInputValidator; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -1320,6 +1326,44 @@ public class ClientRMService extends AbstractService implements } @Override + public ReservationListResponse listReservations( + ReservationListRequest requestInfo) throws YarnException, IOException { + // Check if reservation system is enabled + checkReservationSytem(AuditConstants.LIST_RESERVATION_REQUEST); + ReservationListResponse response = + recordFactory.newRecordInstance(ReservationListResponse.class); + + Plan plan = rValidator.validateReservationListRequest( + reservationSystem, requestInfo); + boolean includeResourceAllocations = requestInfo + .getIncludeResourceAllocations(); + + String user = checkReservationACLs(requestInfo.getQueue(), + AuditConstants.LIST_RESERVATION_REQUEST); + + ReservationId requestedId = null; + if (requestInfo.getReservationId() != null + && !requestInfo.getReservationId().isEmpty()) { + requestedId = ReservationId.parseReservationId(requestInfo + .getReservationId()); + } + + long startTime = Math.max(requestInfo.getStartTime(), 0); + long endTime = requestInfo.getEndTime() <= -1? Long.MAX_VALUE : requestInfo + .getEndTime(); + + Set<ReservationAllocation> reservations = plan.getReservations( + requestedId, new ReservationInterval(startTime, endTime), user); + + List<ReservationAllocationState> info = + ReservationSystemUtil.convertAllocationsToReservationInfo( + reservations, includeResourceAllocations); + + response.setReservationAllocationState(info); + return response; + } + + @Override public GetNodesToLabelsResponse getNodeToLabels( GetNodesToLabelsRequest request) throws YarnException, IOException { RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index da7816b..3b603a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -71,6 +71,8 @@ public class RMAuditLogger { public static final String SUBMIT_RESERVATION_REQUEST = "Submit Reservation Request"; public static final String UPDATE_RESERVATION_REQUEST = "Update Reservation Request"; public static final String DELETE_RESERVATION_REQUEST = "Delete Reservation Request"; + public static final String LIST_RESERVATION_REQUEST = "List " + + "Reservation Request"; } static String createSuccessLog(String user, String operation, String target, http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index a1cebf5..09a9143 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index afc6721..3f67282 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -48,7 +48,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index ce6addb..939e78d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 96f77f5..7a8115f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index ec42cbe..b44824f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java index ac30910..bafe91e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java @@ -18,7 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; /** * Event representing maintaining ReservationSystem state. http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index ca0f4ac..e6d6ba3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 56423e2..551be1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index c51c3ba..586f1c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -346,7 +346,7 @@ public class InMemoryPlan implements Plan { reservationTable.remove(reservation.getReservationId()); decrementAllocation(reservation); LOG.info("Sucessfully deleted reservation: {} in plan.", - reservation.getReservationId()); + reservation.getReservationId()); return true; } @@ -412,30 +412,7 @@ public class InMemoryPlan implements Plan { @Override public Set<ReservationAllocation> getReservationsAtTime(long tick) { - ReservationInterval searchInterval = - new ReservationInterval(tick, Long.MAX_VALUE); - readLock.lock(); - try { - SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations = - currentReservations.headMap(searchInterval, true); - if (!reservations.isEmpty()) { - Set<ReservationAllocation> flattenedReservations = - new HashSet<ReservationAllocation>(); - for (Set<InMemoryReservationAllocation> reservationEntries : reservations - .values()) { - for (InMemoryReservationAllocation reservation : reservationEntries) { - if (reservation.getEndTime() > tick) { - flattenedReservations.add(reservation); - } - } - } - return Collections.unmodifiableSet(flattenedReservations); - } else { - return Collections.emptySet(); - } - } finally { - readLock.unlock(); - } + return getReservations(null, new ReservationInterval(tick, tick), ""); } @Override @@ -499,6 +476,50 @@ public class InMemoryPlan implements Plan { } @Override + public Set<ReservationAllocation> getReservations(ReservationId + reservationID, ReservationInterval interval, String user) { + if (reservationID != null) { + ReservationAllocation allocation = getReservationById(reservationID); + if (allocation == null){ + return Collections.emptySet(); + } + return Collections.singleton(allocation); + } + + long startTime = interval == null? 0 : interval.getStartTime(); + long endTime = interval == null? Long.MAX_VALUE : interval.getEndTime(); + + ReservationInterval searchInterval = + new ReservationInterval(endTime, Long.MAX_VALUE); + readLock.lock(); + try { + SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> + reservations = currentReservations.headMap(searchInterval, true); + if (!reservations.isEmpty()) { + Set<ReservationAllocation> flattenedReservations = + new HashSet<>(); + for (Set<InMemoryReservationAllocation> reservationEntries : + reservations.values()) { + for (InMemoryReservationAllocation res : reservationEntries) { + if (res.getEndTime() > startTime) { + if (user != null && !user.isEmpty() + && !res.getUser().equals(user)) { + continue; + } + flattenedReservations.add(res); + } + } + } + return Collections.unmodifiableSet(flattenedReservations); + } else { + return Collections.emptySet(); + } + } finally { + readLock.unlock(); + } + } + + @Override public ReservationAllocation getReservationById(ReservationId reservationID) { if (reservationID == null) { return null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java index cf00a92..0ad6485 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java @@ -32,6 +32,23 @@ import java.util.Set; public interface PlanView extends PlanContext { /** + * Return a set of {@link ReservationAllocation} identified by the user who + * made the reservation. + * + * @param reservationID the unqiue id to identify the + * {@link ReservationAllocation} + * @param interval the time interval used to retrieve the reservation + * allocations from. Only reservations with start time no + * greater than the interval end time, and end time no less + * than the interval start time will be selected. + * @param user the user to retrieve the reservation allocation from. + * @return {@link ReservationAllocation} identified by the user who + * made the reservation + */ + Set<ReservationAllocation> getReservations(ReservationId + reservationID, ReservationInterval interval, String user); + + /** * Return a {@link ReservationAllocation} identified by its * {@link ReservationId} * http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java index fb0831a..d63e725 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import java.util.List; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.records.ReservationDefinition; @@ -50,37 +51,24 @@ public class ReservationInputValidator { private Plan validateReservation(ReservationSystem reservationSystem, ReservationId reservationId, String auditConstant) throws YarnException { - String message = ""; // check if the reservation id is valid if (reservationId == null) { - message = + String message = "Missing reservation id." + " Please try again by specifying a reservation id."; RMAuditLogger.logFailure("UNKNOWN", auditConstant, "validate reservation input", "ClientRMService", message); throw RPCUtil.getRemoteException(message); } - String queueName = reservationSystem.getQueueForReservation(reservationId); - if (queueName == null) { - message = - "The specified reservation with ID: " + reservationId - + " is unknown. Please try again with a valid reservation."; - RMAuditLogger.logFailure("UNKNOWN", auditConstant, - "validate reservation input", "ClientRMService", message); - throw RPCUtil.getRemoteException(message); - } - // check if the associated plan is valid - Plan plan = reservationSystem.getPlan(queueName); - if (plan == null) { - message = - "The specified reservation: " + reservationId - + " is not associated with any valid plan." - + " Please try again with a valid reservation."; - RMAuditLogger.logFailure("UNKNOWN", auditConstant, - "validate reservation input", "ClientRMService", message); - throw RPCUtil.getRemoteException(message); - } - return plan; + String queue = reservationSystem.getQueueForReservation(reservationId); + String nullQueueErrorMessage = + "The specified reservation with ID: " + reservationId + + " is unknown. Please try again with a valid reservation."; + String nullPlanErrorMessage = "The specified reservation: " + reservationId + + " is not associated with any valid plan." + + " Please try again with a valid reservation."; + return getPlanFromQueue(reservationSystem, queue, auditConstant, + nullQueueErrorMessage, nullPlanErrorMessage); } private void validateReservationDefinition(ReservationId reservationId, @@ -169,6 +157,37 @@ public class ReservationInputValidator { } } + private Plan getPlanFromQueue(ReservationSystem reservationSystem, String + queue, String auditConstant) throws YarnException { + String nullQueueErrorMessage = "The queue is not specified." + + " Please try again with a valid reservable queue."; + String nullPlanErrorMessage = "The specified queue: " + queue + + " is not managed by reservation system." + + " Please try again with a valid reservable queue."; + return getPlanFromQueue(reservationSystem, queue, auditConstant, + nullQueueErrorMessage, nullPlanErrorMessage); + } + + private Plan getPlanFromQueue(ReservationSystem reservationSystem, String + queue, String auditConstant, String nullQueueErrorMessage, + String nullPlanErrorMessage) throws YarnException { + if (queue == null || queue.isEmpty()) { + RMAuditLogger.logFailure("UNKNOWN", auditConstant, + "validate reservation input", "ClientRMService", + nullQueueErrorMessage); + throw RPCUtil.getRemoteException(nullQueueErrorMessage); + } + // check if the associated plan is valid + Plan plan = reservationSystem.getPlan(queue); + if (plan == null) { + RMAuditLogger.logFailure("UNKNOWN", auditConstant, + "validate reservation input", "ClientRMService", + nullPlanErrorMessage); + throw RPCUtil.getRemoteException(nullPlanErrorMessage); + } + return plan; + } + /** * Quick validation on the input to check some obvious fail conditions (fail * fast) the input and returns the appropriate {@link Plan} associated with @@ -188,27 +207,9 @@ public class ReservationInputValidator { ReservationSubmissionRequest request, ReservationId reservationId) throws YarnException { // Check if it is a managed queue - String queueName = request.getQueue(); - if (queueName == null || queueName.isEmpty()) { - String errMsg = - "The queue to submit is not specified." - + " Please try again with a valid reservable queue."; - RMAuditLogger.logFailure("UNKNOWN", - AuditConstants.SUBMIT_RESERVATION_REQUEST, - "validate reservation input", "ClientRMService", errMsg); - throw RPCUtil.getRemoteException(errMsg); - } - Plan plan = reservationSystem.getPlan(queueName); - if (plan == null) { - String errMsg = - "The specified queue: " + queueName - + " is not managed by reservation system." - + " Please try again with a valid reservable queue."; - RMAuditLogger.logFailure("UNKNOWN", - AuditConstants.SUBMIT_RESERVATION_REQUEST, - "validate reservation input", "ClientRMService", errMsg); - throw RPCUtil.getRemoteException(errMsg); - } + String queue = request.getQueue(); + Plan plan = getPlanFromQueue(reservationSystem, queue, + AuditConstants.SUBMIT_RESERVATION_REQUEST); validateReservationDefinition(reservationId, request.getReservationDefinition(), plan, AuditConstants.SUBMIT_RESERVATION_REQUEST); @@ -244,6 +245,38 @@ public class ReservationInputValidator { * Quick validation on the input to check some obvious fail conditions (fail * fast) the input and returns the appropriate {@link Plan} associated with * the specified {@link Queue} or throws an exception message illustrating the + * details of any validation check failures. + * + * @param reservationSystem the {@link ReservationSystem} to validate against + * @param request the {@link ReservationListRequest} defining search + * parameters for reservations in the {@link ReservationSystem} + * that is being validated against. + * @return the {@link Plan} to list reservations of. + * @throws YarnException + */ + public Plan validateReservationListRequest( + ReservationSystem reservationSystem, + ReservationListRequest request) + throws YarnException { + String queue = request.getQueue(); + if (request.getEndTime() < request.getStartTime()) { + String errorMessage = "The specified end time must be greater than " + + "the specified start time."; + RMAuditLogger.logFailure("UNKNOWN", + AuditConstants.LIST_RESERVATION_REQUEST, + "validate list reservation input", "ClientRMService", + errorMessage); + throw RPCUtil.getRemoteException(errorMessage); + } + // Check if it is a managed queue + return getPlanFromQueue(reservationSystem, queue, + AuditConstants.LIST_RESERVATION_REQUEST); + } + + /** + * Quick validation on the input to check some obvious fail conditions (fail + * fast) the input and returns the appropriate {@link Plan} associated with + * the specified {@link Queue} or throws an exception message illustrating the * details of any validation check failures * * @param reservationSystem the {@link ReservationSystem} to validate against @@ -258,5 +291,4 @@ public class ReservationInputValidator { return validateReservation(reservationSystem, request.getReservationId(), AuditConstants.DELETE_RESERVATION_REQUEST); } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java index 98466d5..aba4822 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java @@ -18,24 +18,29 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import org.apache.hadoop.yarn.api.records.ReservationAllocationState; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ResourceAllocationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationDefinitionProto; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceAllocationRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ResourceAllocationRequestProto; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; /** * Simple helper class for static methods used to transform across @@ -70,7 +75,7 @@ public final class ReservationSystemUtil { ReservationAllocationStateProto.Builder builder = ReservationAllocationStateProto.newBuilder(); - builder.setAcceptanceTimestamp(allocation.getAcceptanceTime()); + builder.setAcceptanceTime(allocation.getAcceptanceTime()); builder.setContainsGangs(allocation.containsGangs()); builder.setStartTime(allocation.getStartTime()); builder.setEndTime(allocation.getEndTime()); @@ -137,9 +142,9 @@ public final class ReservationSystemUtil { } public static InMemoryReservationAllocation toInMemoryAllocation( - String planName, ReservationId reservationId, - ReservationAllocationStateProto allocationState, Resource minAlloc, - ResourceCalculator planResourceCalculator) { + String planName, ReservationId reservationId, + ReservationAllocationStateProto allocationState, Resource minAlloc, + ResourceCalculator planResourceCalculator) { ReservationDefinition definition = convertFromProtoFormat( allocationState.getReservationDefinition()); @@ -152,4 +157,32 @@ public final class ReservationSystemUtil { minAlloc, allocationState.getContainsGangs()); return allocation; } + + public static List<ReservationAllocationState> + convertAllocationsToReservationInfo(Set<ReservationAllocation> res, + boolean includeResourceAllocations) { + List<ReservationAllocationState> reservationInfo = new ArrayList<>(); + + Map<ReservationInterval, Resource> requests; + for (ReservationAllocation allocation : res) { + List<ResourceAllocationRequest> allocations = new ArrayList<>(); + if (includeResourceAllocations) { + requests = allocation.getAllocationRequests(); + + for (Map.Entry<ReservationInterval, Resource> request : + requests.entrySet()) { + ReservationInterval interval = request.getKey(); + allocations.add(ResourceAllocationRequest.newInstance( + interval.getStartTime(), interval.getEndTime(), + request.getValue())); + } + } + + reservationInfo.add(ReservationAllocationState.newInstance( + allocation.getAcceptanceTime(), allocation.getUser(), + allocations, allocation.getReservationId(), + allocation.getReservationDefinition())); + } + return reservationInfo; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto index ae84791..6e2398a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto @@ -99,19 +99,3 @@ message RMDelegationTokenIdentifierDataProto { optional YARNDelegationTokenIdentifierProto token_identifier = 1; optional int64 renewDate = 2; } - -message ResourceAllocationRequestProto { - optional int64 start_time = 1; - optional int64 end_time = 2; - optional ResourceProto resource = 3; -} - -message ReservationAllocationStateProto { - optional ReservationDefinitionProto reservation_definition = 1; - repeated ResourceAllocationRequestProto allocation_requests = 2; - optional int64 start_time = 3; - optional int64 end_time = 4; - optional string user = 5; - optional bool contains_gangs = 6; - optional int64 acceptance_timestamp = 7; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index a7219fa..6fccc63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -75,6 +75,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; @@ -102,6 +104,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -1147,6 +1150,168 @@ public class TestClientRMService { Assert.assertNotNull(sResponse); LOG.info("Update reservation response: " + uResponse); + // List reservations, search by reservation ID + ReservationListRequest request = + ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, + reservationID.toString(), -1, -1, false); + + ReservationListResponse response = null; + try { + response = clientService.listReservations(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + Assert.assertEquals(response.getReservationAllocationState().size(), 1); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getReservationId().getId(), reservationID.getId()); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getResourceAllocationRequests().size(), 0); + + // List reservations, search by time within reservation interval. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE, + true); + + response = null; + try { + response = clientService.listReservations(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + Assert.assertEquals(1, response.getReservationAllocationState().size()); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getReservationId().getId(), reservationID.getId()); + + // List reservations, search by invalid end time == -1. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", 1, -1, + true); + + response = null; + try { + response = clientService.listReservations(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + Assert.assertEquals(1, response.getReservationAllocationState().size()); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getReservationId().getId(), reservationID.getId()); + + // List reservations, search by invalid end time < -1. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", 1, -10, + true); + + response = null; + try { + response = clientService.listReservations(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + Assert.assertEquals(1, response.getReservationAllocationState().size()); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getReservationId().getId(), reservationID.getId()); + + // List reservations, search by time interval. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", arrival + + duration/2, arrival + duration/2, true); + + response = null; + try { + response = clientService.listReservations(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + Assert.assertEquals(1, response.getReservationAllocationState().size()); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getReservationId().getId(), reservationID.getId()); + + // Verify that the full resource allocations exist. + Assert.assertTrue(response.getReservationAllocationState().get(0) + .getResourceAllocationRequests().size() > 0); + + // Verify that the full RDL is returned. + ReservationRequests reservationRequests = response + .getReservationAllocationState().get(0).getReservationDefinition() + .getReservationRequests(); + Assert.assertTrue(reservationRequests.getInterpreter().toString() + .equals("R_ALL")); + Assert.assertTrue(reservationRequests.getReservationResources().get(0) + .getDuration() == duration); + + // List reservations, search by a very large start time. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE, + -1, false); + + response = null; + try { + response = clientService.listReservations(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + // Ensure all reservations are filtered out. + Assert.assertNotNull(response); + Assert.assertEquals(0, response.getReservationAllocationState().size()); + + // List reservations, search by start time after the reservation + // end time. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", deadline + duration, + deadline + 2 * duration, false); + + response = null; + try { + response = clientService.listReservations(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + // Ensure all reservations are filtered out. + Assert.assertNotNull(response); + Assert.assertEquals(response.getReservationAllocationState().size(), 0); + + // List reservations, search by end time before the reservation start + // time. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", 0, arrival - + duration, false); + + response = null; + try { + response = clientService.listReservations(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + // Ensure all reservations are filtered out. + Assert.assertNotNull(response); + Assert.assertEquals(response.getReservationAllocationState().size(), 0); + + // List reservations, search by a very small end time. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", 0, 1, + false); + + response = null; + try { + response = clientService.listReservations(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + // Ensure all reservations are filtered out. + Assert.assertNotNull(response); + Assert.assertEquals(response.getReservationAllocationState().size(), 0); + // Delete the reservation ReservationDeleteRequest dRequest = ReservationDeleteRequest.newInstance(reservationID); @@ -1159,6 +1324,20 @@ public class TestClientRMService { Assert.assertNotNull(sResponse); LOG.info("Delete reservation response: " + dResponse); + // List reservations, search by non-existent reservationID + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, reservationID.toString(), + -1, -1, false); + + response = null; + try { + response = clientService.listReservations(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + Assert.assertEquals(0, response.getReservationAllocationState().size()); + // clean-up rm.stop(); nm = null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java index 48a4d97..9a0f2c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c487453b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 32824ef..40811fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; @@ -851,7 +851,7 @@ public class RMStateStoreTestBase { ReservationAllocationStateProto actual) { Assert.assertEquals( - expected.getAcceptanceTimestamp(), actual.getAcceptanceTimestamp()); + expected.getAcceptanceTime(), actual.getAcceptanceTime()); Assert.assertEquals(expected.getStartTime(), actual.getStartTime()); Assert.assertEquals(expected.getEndTime(), actual.getEndTime()); Assert.assertEquals(expected.getContainsGangs(), actual.getContainsGangs()); @@ -866,7 +866,7 @@ public class RMStateStoreTestBase { ReservationAllocation expected, ReservationAllocationStateProto actual) { Assert.assertEquals( - expected.getAcceptanceTime(), actual.getAcceptanceTimestamp()); + expected.getAcceptanceTime(), actual.getAcceptanceTime()); Assert.assertEquals(expected.getStartTime(), actual.getStartTime()); Assert.assertEquals(expected.getEndTime(), actual.getEndTime()); Assert.assertEquals(expected.containsGangs(), actual.getContainsGangs());
