goiri commented on code in PR #4892: URL: https://github.com/apache/hadoop/pull/4892#discussion_r976766919
########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java: ########## @@ -830,64 +843,193 @@ public Response listReservation(String queue, String reservationId, long startTi " Please try again with a valid reservable queue."); } - MockRM mockRM = setupResourceManager(); + ReservationId reservationID = + ReservationId.parseReservationId(reservationId); - ReservationId reservationID = ReservationId.parseReservationId(reservationId); - ReservationSystem reservationSystem = mockRM.getReservationSystem(); - reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true); + if (!reservationMap.containsKey(reservationID)) { + throw new NotFoundException("reservationId with id: " + reservationId + " not found"); + } - // Generate reserved resources ClientRMService clientService = mockRM.getClientRMService(); - // arrival time from which the resource(s) can be allocated. - long arrival = Time.now(); - - // deadline by when the resource(s) must be allocated. - // The reason for choosing 1.05 is because this gives an integer - // DURATION * 0.05 = 3000(ms) - // deadline = arrival + 3000ms - long deadline = (long) (arrival + 1.05 * DURATION); - - // In this test of reserved resources, we will apply for 4 containers (1 core, 1GB memory) - // arrival = Time.now(), and make sure deadline - arrival > duration, - // the current setting is greater than 3000ms - ReservationSubmissionRequest submissionRequest = - ReservationSystemTestUtil.createSimpleReservationRequest( - reservationID, NUM_CONTAINERS, arrival, deadline, DURATION); - clientService.submitReservation(submissionRequest); - // listReservations ReservationListRequest request = ReservationListRequest.newInstance( - queue, reservationID.toString(), startTime, endTime, includeResourceAllocations); + queue, reservationId, startTime, endTime, includeResourceAllocations); ReservationListResponse resRespInfo = clientService.listReservations(request); ReservationListInfo resResponse = new ReservationListInfo(resRespInfo, includeResourceAllocations); - if (mockRM != null) { - mockRM.stop(); + return Response.status(Status.OK).entity(resResponse).build(); + } + + @Override + public Response createNewReservation(HttpServletRequest hsr) + throws AuthorizationException, IOException, InterruptedException { + + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + + ReservationId resId = ReservationId.newInstance(Time.now(), resCounter.incrementAndGet()); + LOG.info("Allocated new reservationId: {}.", resId); + + NewReservation reservationId = new NewReservation(resId.toString()); + return Response.status(Status.OK).entity(reservationId).build(); + } + + @Override + public Response submitReservation(ReservationSubmissionRequestInfo resContext, + HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { + + if (!isRunning) { + throw new RuntimeException("RM is stopped"); } - return Response.status(Status.OK).entity(resResponse).build(); + ReservationId reservationId = ReservationId.parseReservationId(resContext.getReservationId()); + ReservationDefinitionInfo definitionInfo = resContext.getReservationDefinition(); + ReservationDefinition definition = + RouterServerUtil.convertReservationDefinition(definitionInfo); + ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance( + definition, resContext.getQueue(), reservationId); + submitReservation(request); + + LOG.info("Reservation submitted: {}.", reservationId); + + SubClusterId subClusterId = getSubClusterId(); + reservationMap.put(reservationId, subClusterId); + + return Response.status(Status.ACCEPTED).build(); + } + + private void submitReservation(ReservationSubmissionRequest request) { + try { + + // synchronize plan + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true); + + // Generate reserved resources + ClientRMService clientService = mockRM.getClientRMService(); + clientService.submitReservation(request); + } catch (IOException | YarnException e) { + throw new RuntimeException(e); + } + } + + @Override + public Response updateReservation(ReservationUpdateRequestInfo resContext, + HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { + + if (resContext == null || resContext.getReservationId() == null || + resContext.getReservationDefinition() == null) { + return Response.status(Status.BAD_REQUEST).build(); + } + + String resId = resContext.getReservationId(); + ReservationId reservationId = ReservationId.parseReservationId(resId); + + if (!reservationMap.containsKey(reservationId)) { + throw new NotFoundException("reservationId with id: " + reservationId + " not found"); + } + + // Generate reserved resources + updateReservation(resContext); + + ReservationUpdateResponseInfo resRespInfo = new ReservationUpdateResponseInfo(); + return Response.status(Status.OK).entity(resRespInfo).build(); + } + + private void updateReservation(ReservationUpdateRequestInfo resContext) { + try { + + if (resContext == null) { + throw new BadRequestException("Input ReservationSubmissionContext should not be null"); + } + + ReservationDefinitionInfo resInfo = resContext.getReservationDefinition(); + if (resInfo == null) { + throw new BadRequestException("Input ReservationDefinition should not be null"); + } + + ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests(); + if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null + || resReqsInfo.getReservationRequest().size() == 0) { + throw new BadRequestException("The ReservationDefinition should " + + "contain at least one ReservationRequest"); + } + + if (resContext.getReservationId() == null) { + throw new BadRequestException("Update operations must specify an existing ReservaitonId"); + } + + ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values(); + ReservationRequestInterpreter requestInterpreter = + values[resReqsInfo.getReservationRequestsInterpreter()]; + List<ReservationRequest> list = new ArrayList<>(); + + for (ReservationRequestInfo resReqInfo : resReqsInfo.getReservationRequest()) { + ResourceInfo rInfo = resReqInfo.getCapability(); + Resource capability = Resource.newInstance(rInfo.getMemorySize(), rInfo.getvCores()); + int numContainers = resReqInfo.getNumContainers(); + int minConcurrency = resReqInfo.getMinConcurrency(); + long duration = resReqInfo.getDuration(); + ReservationRequest rr = + ReservationRequest.newInstance(capability, numContainers, minConcurrency, duration); + list.add(rr); + } + + ReservationRequests reqs = ReservationRequests.newInstance(list, requestInterpreter); + ReservationDefinition rDef = ReservationDefinition.newInstance( + resInfo.getArrival(), resInfo.getDeadline(), reqs, + resInfo.getReservationName(), resInfo.getRecurrenceExpression(), + Priority.newInstance(resInfo.getPriority())); + ReservationUpdateRequest request = ReservationUpdateRequest.newInstance( + rDef, ReservationId.parseReservationId(resContext.getReservationId())); + + ClientRMService clientService = mockRM.getClientRMService(); + clientService.updateReservation(request); + + } catch (Exception ex) { Review Comment: This is a fairly broad catch, wouldn't it be better to just throw the exceptions? ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java: ########## @@ -222,4 +236,115 @@ public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat, throw new RuntimeException(msg); } } + + /** + * Save Reservation And HomeSubCluster Mapping. + * + * @param federationFacade federation facade + * @param reservationId reservationId + * @param homeSubCluster homeSubCluster + * @throws YarnException on failure + */ + public static void addReservationHomeSubCluster(FederationStateStoreFacade federationFacade, + ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException { + try { + // persist the mapping of reservationId and the subClusterId which has + // been selected as its home + federationFacade.addReservationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + RouterServerUtil.logAndThrowException(e, + "Unable to insert the ReservationId %s into the FederationStateStore.", reservationId); + } + } + + /** + * Update Reservation And HomeSubCluster Mapping. + * + * @param federationFacade federation facade + * @param subClusterId subClusterId + * @param reservationId reservationId + * @param homeSubCluster homeSubCluster + * @throws YarnException on failure + */ + public static void updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade, + SubClusterId subClusterId, ReservationId reservationId, + ReservationHomeSubCluster homeSubCluster) throws YarnException { + try { + // update the mapping of reservationId and the home subClusterId to + // the new subClusterId we have selected + federationFacade.updateReservationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + SubClusterId subClusterIdInStateStore = + federationFacade.getReservationHomeSubCluster(reservationId); + if (subClusterId == subClusterIdInStateStore) { + LOG.info("Reservation {} already submitted on SubCluster {}.", reservationId, subClusterId); + } else { + RouterServerUtil.logAndThrowException(e, + "Unable to update the ReservationId %s into the FederationStateStore.", reservationId); + } + } + } + + /** + * Exists ReservationHomeSubCluster Mapping. + * + * @param federationFacade federation facade + * @param reservationId reservationId + * @return true - exist, false - not exist + */ + public static Boolean existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade, Review Comment: can this be the native `boolean` type? ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java: ########## @@ -51,4 +70,48 @@ protected void registerBadSubCluster(SubClusterId badSC) { interceptor.setRunning(false); } + protected void setupResourceManager() throws IOException { + try { + if (mockRM != null) { Review Comment: You can move some of this before the try ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java: ########## @@ -222,4 +236,115 @@ public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat, throw new RuntimeException(msg); } } + + /** + * Save Reservation And HomeSubCluster Mapping. + * + * @param federationFacade federation facade + * @param reservationId reservationId + * @param homeSubCluster homeSubCluster + * @throws YarnException on failure + */ + public static void addReservationHomeSubCluster(FederationStateStoreFacade federationFacade, + ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException { + try { + // persist the mapping of reservationId and the subClusterId which has + // been selected as its home + federationFacade.addReservationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + RouterServerUtil.logAndThrowException(e, + "Unable to insert the ReservationId %s into the FederationStateStore.", reservationId); + } + } + + /** + * Update Reservation And HomeSubCluster Mapping. + * + * @param federationFacade federation facade + * @param subClusterId subClusterId + * @param reservationId reservationId + * @param homeSubCluster homeSubCluster + * @throws YarnException on failure + */ + public static void updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade, + SubClusterId subClusterId, ReservationId reservationId, + ReservationHomeSubCluster homeSubCluster) throws YarnException { + try { + // update the mapping of reservationId and the home subClusterId to + // the new subClusterId we have selected + federationFacade.updateReservationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + SubClusterId subClusterIdInStateStore = + federationFacade.getReservationHomeSubCluster(reservationId); + if (subClusterId == subClusterIdInStateStore) { + LOG.info("Reservation {} already submitted on SubCluster {}.", reservationId, subClusterId); + } else { + RouterServerUtil.logAndThrowException(e, + "Unable to update the ReservationId %s into the FederationStateStore.", reservationId); + } + } + } + + /** + * Exists ReservationHomeSubCluster Mapping. + * + * @param federationFacade federation facade + * @param reservationId reservationId + * @return true - exist, false - not exist + */ + public static Boolean existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade, + ReservationId reservationId) { + try { + SubClusterId subClusterId = federationFacade.getReservationHomeSubCluster(reservationId); + if (subClusterId != null) { + return true; + } + } catch (YarnException e) { + LOG.warn("get homeSubCluster by reservationId = {} error.", reservationId, e); + } + return false; + } + + public static ReservationDefinition convertReservationDefinition( Review Comment: Should we have a unit test for this? ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java: ########## @@ -830,64 +843,193 @@ public Response listReservation(String queue, String reservationId, long startTi " Please try again with a valid reservable queue."); } - MockRM mockRM = setupResourceManager(); + ReservationId reservationID = + ReservationId.parseReservationId(reservationId); - ReservationId reservationID = ReservationId.parseReservationId(reservationId); - ReservationSystem reservationSystem = mockRM.getReservationSystem(); - reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true); + if (!reservationMap.containsKey(reservationID)) { + throw new NotFoundException("reservationId with id: " + reservationId + " not found"); + } - // Generate reserved resources ClientRMService clientService = mockRM.getClientRMService(); - // arrival time from which the resource(s) can be allocated. - long arrival = Time.now(); - - // deadline by when the resource(s) must be allocated. - // The reason for choosing 1.05 is because this gives an integer - // DURATION * 0.05 = 3000(ms) - // deadline = arrival + 3000ms - long deadline = (long) (arrival + 1.05 * DURATION); - - // In this test of reserved resources, we will apply for 4 containers (1 core, 1GB memory) - // arrival = Time.now(), and make sure deadline - arrival > duration, - // the current setting is greater than 3000ms - ReservationSubmissionRequest submissionRequest = - ReservationSystemTestUtil.createSimpleReservationRequest( - reservationID, NUM_CONTAINERS, arrival, deadline, DURATION); - clientService.submitReservation(submissionRequest); - // listReservations ReservationListRequest request = ReservationListRequest.newInstance( - queue, reservationID.toString(), startTime, endTime, includeResourceAllocations); + queue, reservationId, startTime, endTime, includeResourceAllocations); ReservationListResponse resRespInfo = clientService.listReservations(request); ReservationListInfo resResponse = new ReservationListInfo(resRespInfo, includeResourceAllocations); - if (mockRM != null) { - mockRM.stop(); + return Response.status(Status.OK).entity(resResponse).build(); + } + + @Override + public Response createNewReservation(HttpServletRequest hsr) + throws AuthorizationException, IOException, InterruptedException { + + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + + ReservationId resId = ReservationId.newInstance(Time.now(), resCounter.incrementAndGet()); + LOG.info("Allocated new reservationId: {}.", resId); + + NewReservation reservationId = new NewReservation(resId.toString()); + return Response.status(Status.OK).entity(reservationId).build(); + } + + @Override + public Response submitReservation(ReservationSubmissionRequestInfo resContext, + HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { + + if (!isRunning) { + throw new RuntimeException("RM is stopped"); } - return Response.status(Status.OK).entity(resResponse).build(); + ReservationId reservationId = ReservationId.parseReservationId(resContext.getReservationId()); + ReservationDefinitionInfo definitionInfo = resContext.getReservationDefinition(); + ReservationDefinition definition = + RouterServerUtil.convertReservationDefinition(definitionInfo); + ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance( + definition, resContext.getQueue(), reservationId); + submitReservation(request); + + LOG.info("Reservation submitted: {}.", reservationId); + + SubClusterId subClusterId = getSubClusterId(); + reservationMap.put(reservationId, subClusterId); + + return Response.status(Status.ACCEPTED).build(); + } + + private void submitReservation(ReservationSubmissionRequest request) { + try { + + // synchronize plan + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true); + + // Generate reserved resources + ClientRMService clientService = mockRM.getClientRMService(); + clientService.submitReservation(request); + } catch (IOException | YarnException e) { + throw new RuntimeException(e); + } + } + + @Override + public Response updateReservation(ReservationUpdateRequestInfo resContext, + HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { + + if (resContext == null || resContext.getReservationId() == null || + resContext.getReservationDefinition() == null) { + return Response.status(Status.BAD_REQUEST).build(); + } + + String resId = resContext.getReservationId(); + ReservationId reservationId = ReservationId.parseReservationId(resId); + + if (!reservationMap.containsKey(reservationId)) { + throw new NotFoundException("reservationId with id: " + reservationId + " not found"); + } + + // Generate reserved resources + updateReservation(resContext); + + ReservationUpdateResponseInfo resRespInfo = new ReservationUpdateResponseInfo(); + return Response.status(Status.OK).entity(resRespInfo).build(); + } + + private void updateReservation(ReservationUpdateRequestInfo resContext) { + try { + + if (resContext == null) { + throw new BadRequestException("Input ReservationSubmissionContext should not be null"); + } + + ReservationDefinitionInfo resInfo = resContext.getReservationDefinition(); + if (resInfo == null) { + throw new BadRequestException("Input ReservationDefinition should not be null"); + } + + ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests(); + if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null + || resReqsInfo.getReservationRequest().size() == 0) { Review Comment: isEmpty() ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java: ########## @@ -1127,4 +1141,205 @@ public void testListReservation() throws Exception { Assert.assertEquals(1, vCore); Assert.assertEquals(1024, memory); } + + @Test + public void testCreateNewReservation() throws Exception { + Response response = interceptor.createNewReservation(null); + Assert.assertNotNull(response); + + Object entity = response.getEntity(); + Assert.assertNotNull(entity); + Assert.assertTrue(entity instanceof NewReservation); + + NewReservation newReservation = (NewReservation) entity; + Assert.assertNotNull(newReservation); + Assert.assertTrue(newReservation.getReservationId().contains("reservation")); + } + + @Test + public void testSubmitReservation() throws Exception { + + // submit reservation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 2); + Response response = submitReservation(reservationId); + Assert.assertNotNull(response); + Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus()); + + String applyReservationId = reservationId.toString(); + Response reservationResponse = interceptor.listReservation( + QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null); + Assert.assertNotNull(reservationResponse); + + Object entity = reservationResponse.getEntity(); + Assert.assertNotNull(entity); + Assert.assertNotNull(entity instanceof ReservationListInfo); + + ReservationListInfo listInfo = (ReservationListInfo) entity; + Assert.assertNotNull(listInfo); + + List<ReservationInfo> reservationInfos = listInfo.getReservations(); + Assert.assertNotNull(reservationInfos); + Assert.assertEquals(1, reservationInfos.size()); + + ReservationInfo reservationInfo = reservationInfos.get(0); + Assert.assertNotNull(reservationInfo); + Assert.assertEquals(reservationInfo.getReservationId(), applyReservationId); + } + + @Test + public void testUpdateReservation() throws Exception { + // submit reservation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 3); + Response response = submitReservation(reservationId); + Assert.assertNotNull(response); + Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus()); + + // update reservation + ReservationSubmissionRequest resSubRequest = + getReservationSubmissionRequest(reservationId, 6, 2048, 2); + ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition(); + ReservationDefinitionInfo reservationDefinitionInfo = + new ReservationDefinitionInfo(reservationDefinition); + + ReservationUpdateRequestInfo updateRequestInfo = new ReservationUpdateRequestInfo(); + updateRequestInfo.setReservationId(reservationId.toString()); + updateRequestInfo.setReservationDefinition(reservationDefinitionInfo); + Response updateReservationResp = interceptor.updateReservation(updateRequestInfo, null); + Assert.assertNotNull(updateReservationResp); + Assert.assertEquals(Status.OK.getStatusCode(), updateReservationResp.getStatus()); + + String applyReservationId = reservationId.toString(); + Response reservationResponse = interceptor.listReservation( + QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null); + Assert.assertNotNull(reservationResponse); + + Object entity = reservationResponse.getEntity(); + Assert.assertNotNull(entity); + Assert.assertNotNull(entity instanceof ReservationListInfo); + + ReservationListInfo listInfo = (ReservationListInfo) entity; + Assert.assertNotNull(listInfo); + + List<ReservationInfo> reservationInfos = listInfo.getReservations(); + Assert.assertNotNull(reservationInfos); + Assert.assertEquals(1, reservationInfos.size()); + + ReservationInfo reservationInfo = reservationInfos.get(0); + Assert.assertNotNull(reservationInfo); + Assert.assertEquals(reservationInfo.getReservationId(), applyReservationId); + + ReservationDefinitionInfo resDefinitionInfo = reservationInfo.getReservationDefinition(); + Assert.assertNotNull(resDefinitionInfo); + + ReservationRequestsInfo reservationRequestsInfo = resDefinitionInfo.getReservationRequests(); + Assert.assertNotNull(reservationRequestsInfo); + + ArrayList<ReservationRequestInfo> reservationRequestInfoList = + reservationRequestsInfo.getReservationRequest(); + Assert.assertNotNull(reservationRequestInfoList); + Assert.assertEquals(1, reservationRequestInfoList.size()); + + ReservationRequestInfo reservationRequestInfo = reservationRequestInfoList.get(0); + Assert.assertNotNull(reservationRequestInfo); + Assert.assertEquals(6, reservationRequestInfo.getNumContainers()); + + ResourceInfo resourceInfo = reservationRequestInfo.getCapability(); + Assert.assertNotNull(resourceInfo); + + int vCore = resourceInfo.getvCores(); + long memory = resourceInfo.getMemorySize(); + Assert.assertEquals(2, vCore); + Assert.assertEquals(2048, memory); + } + + @Test + public void testDeleteReservation() throws Exception { + // submit reservation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 4); + Response response = submitReservation(reservationId); + Assert.assertNotNull(response); + Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus()); + + String applyResId = reservationId.toString(); + Response reservationResponse = interceptor.listReservation( + QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null); + Assert.assertNotNull(reservationResponse); + + ReservationDeleteRequestInfo deleteRequestInfo = + new ReservationDeleteRequestInfo(); + deleteRequestInfo.setReservationId(applyResId); + Response delResponse = interceptor.deleteReservation(deleteRequestInfo, null); + Assert.assertNotNull(delResponse); + + LambdaTestUtils.intercept(Exception.class, + "reservationId with id: " + reservationId + " not found", + () -> interceptor.listReservation(QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null)); + } + + private Response submitReservation(ReservationId reservationId) + throws IOException, InterruptedException, YarnException { + + SubClusterId homeSubClusterId = subClusters.get(0); + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, homeSubClusterId); + AddReservationHomeSubClusterRequest request = + AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster); + stateStore.addReservationHomeSubCluster(request); + + ReservationSubmissionRequestInfo resSubmissionRequestInfo = + getReservationSubmissionRequestInfo(reservationId); + Response response = interceptor.submitReservation(resSubmissionRequestInfo, null); + return response; + } + + private ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo( + ReservationId reservationId) { + + ReservationSubmissionRequest resSubRequest = + getReservationSubmissionRequest(reservationId, NUM_CONTAINERS, 1024, 1); + ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition(); + + ReservationSubmissionRequestInfo resSubmissionRequestInfo = + new ReservationSubmissionRequestInfo(); + resSubmissionRequestInfo.setQueue(resSubRequest.getQueue()); + resSubmissionRequestInfo.setReservationId(reservationId.toString()); + ReservationDefinitionInfo reservationDefinitionInfo = + new ReservationDefinitionInfo(reservationDefinition); + resSubmissionRequestInfo.setReservationDefinition(reservationDefinitionInfo); + + return resSubmissionRequestInfo; + } + + private ReservationSubmissionRequest getReservationSubmissionRequest( + ReservationId reservationId, int numContainers, int memory, int vcore) { + + // arrival time from which the resource(s) can be allocated. + long arrival = Time.now(); + + // deadline by when the resource(s) must be allocated. + // The reason for choosing 1.05 is because this gives an integer + // DURATION * 0.05 = 3000(ms) + // deadline = arrival + 3000ms + long deadline = (long) (arrival + 1.05 * DURATION); + + ReservationSubmissionRequest submissionRequest = createSimpleReservationRequest( + reservationId, numContainers, arrival, deadline, DURATION, memory, vcore); + + return submissionRequest; + } + + public static ReservationSubmissionRequest createSimpleReservationRequest( + ReservationId reservationId, int numContainers, long arrival, + long deadline, long duration, int memory, int vcore) { + // create a request with a single atomic ask + ReservationRequest r = ReservationRequest Review Comment: Small nit, this would look better as: ``` ReservationRequest r = ReservationRequest.newInstance( Resource.newInstance(memory, vcore), numContainers, 1, duration); ``` ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java: ########## @@ -1127,4 +1141,205 @@ public void testListReservation() throws Exception { Assert.assertEquals(1, vCore); Assert.assertEquals(1024, memory); } + + @Test + public void testCreateNewReservation() throws Exception { + Response response = interceptor.createNewReservation(null); + Assert.assertNotNull(response); + + Object entity = response.getEntity(); + Assert.assertNotNull(entity); + Assert.assertTrue(entity instanceof NewReservation); + + NewReservation newReservation = (NewReservation) entity; + Assert.assertNotNull(newReservation); + Assert.assertTrue(newReservation.getReservationId().contains("reservation")); + } + + @Test + public void testSubmitReservation() throws Exception { + + // submit reservation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 2); + Response response = submitReservation(reservationId); + Assert.assertNotNull(response); + Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus()); + + String applyReservationId = reservationId.toString(); + Response reservationResponse = interceptor.listReservation( + QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null); + Assert.assertNotNull(reservationResponse); + + Object entity = reservationResponse.getEntity(); + Assert.assertNotNull(entity); + Assert.assertNotNull(entity instanceof ReservationListInfo); + + ReservationListInfo listInfo = (ReservationListInfo) entity; + Assert.assertNotNull(listInfo); + + List<ReservationInfo> reservationInfos = listInfo.getReservations(); + Assert.assertNotNull(reservationInfos); + Assert.assertEquals(1, reservationInfos.size()); + + ReservationInfo reservationInfo = reservationInfos.get(0); + Assert.assertNotNull(reservationInfo); + Assert.assertEquals(reservationInfo.getReservationId(), applyReservationId); + } + + @Test + public void testUpdateReservation() throws Exception { + // submit reservation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 3); + Response response = submitReservation(reservationId); + Assert.assertNotNull(response); + Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus()); + + // update reservation + ReservationSubmissionRequest resSubRequest = + getReservationSubmissionRequest(reservationId, 6, 2048, 2); + ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition(); + ReservationDefinitionInfo reservationDefinitionInfo = + new ReservationDefinitionInfo(reservationDefinition); + + ReservationUpdateRequestInfo updateRequestInfo = new ReservationUpdateRequestInfo(); + updateRequestInfo.setReservationId(reservationId.toString()); + updateRequestInfo.setReservationDefinition(reservationDefinitionInfo); + Response updateReservationResp = interceptor.updateReservation(updateRequestInfo, null); + Assert.assertNotNull(updateReservationResp); + Assert.assertEquals(Status.OK.getStatusCode(), updateReservationResp.getStatus()); + + String applyReservationId = reservationId.toString(); + Response reservationResponse = interceptor.listReservation( + QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null); + Assert.assertNotNull(reservationResponse); + + Object entity = reservationResponse.getEntity(); + Assert.assertNotNull(entity); + Assert.assertNotNull(entity instanceof ReservationListInfo); + + ReservationListInfo listInfo = (ReservationListInfo) entity; + Assert.assertNotNull(listInfo); + + List<ReservationInfo> reservationInfos = listInfo.getReservations(); + Assert.assertNotNull(reservationInfos); + Assert.assertEquals(1, reservationInfos.size()); + + ReservationInfo reservationInfo = reservationInfos.get(0); + Assert.assertNotNull(reservationInfo); + Assert.assertEquals(reservationInfo.getReservationId(), applyReservationId); + + ReservationDefinitionInfo resDefinitionInfo = reservationInfo.getReservationDefinition(); + Assert.assertNotNull(resDefinitionInfo); + + ReservationRequestsInfo reservationRequestsInfo = resDefinitionInfo.getReservationRequests(); + Assert.assertNotNull(reservationRequestsInfo); + + ArrayList<ReservationRequestInfo> reservationRequestInfoList = + reservationRequestsInfo.getReservationRequest(); + Assert.assertNotNull(reservationRequestInfoList); + Assert.assertEquals(1, reservationRequestInfoList.size()); + + ReservationRequestInfo reservationRequestInfo = reservationRequestInfoList.get(0); + Assert.assertNotNull(reservationRequestInfo); + Assert.assertEquals(6, reservationRequestInfo.getNumContainers()); + + ResourceInfo resourceInfo = reservationRequestInfo.getCapability(); + Assert.assertNotNull(resourceInfo); + + int vCore = resourceInfo.getvCores(); + long memory = resourceInfo.getMemorySize(); + Assert.assertEquals(2, vCore); + Assert.assertEquals(2048, memory); + } + + @Test + public void testDeleteReservation() throws Exception { + // submit reservation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 4); + Response response = submitReservation(reservationId); + Assert.assertNotNull(response); + Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus()); + + String applyResId = reservationId.toString(); + Response reservationResponse = interceptor.listReservation( + QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null); + Assert.assertNotNull(reservationResponse); + + ReservationDeleteRequestInfo deleteRequestInfo = + new ReservationDeleteRequestInfo(); + deleteRequestInfo.setReservationId(applyResId); + Response delResponse = interceptor.deleteReservation(deleteRequestInfo, null); + Assert.assertNotNull(delResponse); + + LambdaTestUtils.intercept(Exception.class, + "reservationId with id: " + reservationId + " not found", + () -> interceptor.listReservation(QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null)); + } + + private Response submitReservation(ReservationId reservationId) + throws IOException, InterruptedException, YarnException { + + SubClusterId homeSubClusterId = subClusters.get(0); + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, homeSubClusterId); + AddReservationHomeSubClusterRequest request = + AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster); + stateStore.addReservationHomeSubCluster(request); + + ReservationSubmissionRequestInfo resSubmissionRequestInfo = + getReservationSubmissionRequestInfo(reservationId); + Response response = interceptor.submitReservation(resSubmissionRequestInfo, null); + return response; + } + + private ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo( + ReservationId reservationId) { + + ReservationSubmissionRequest resSubRequest = + getReservationSubmissionRequest(reservationId, NUM_CONTAINERS, 1024, 1); + ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition(); + + ReservationSubmissionRequestInfo resSubmissionRequestInfo = + new ReservationSubmissionRequestInfo(); + resSubmissionRequestInfo.setQueue(resSubRequest.getQueue()); + resSubmissionRequestInfo.setReservationId(reservationId.toString()); + ReservationDefinitionInfo reservationDefinitionInfo = + new ReservationDefinitionInfo(reservationDefinition); + resSubmissionRequestInfo.setReservationDefinition(reservationDefinitionInfo); + + return resSubmissionRequestInfo; + } + + private ReservationSubmissionRequest getReservationSubmissionRequest( + ReservationId reservationId, int numContainers, int memory, int vcore) { + + // arrival time from which the resource(s) can be allocated. + long arrival = Time.now(); + + // deadline by when the resource(s) must be allocated. + // The reason for choosing 1.05 is because this gives an integer + // DURATION * 0.05 = 3000(ms) + // deadline = arrival + 3000ms + long deadline = (long) (arrival + 1.05 * DURATION); + + ReservationSubmissionRequest submissionRequest = createSimpleReservationRequest( + reservationId, numContainers, arrival, deadline, DURATION, memory, vcore); + + return submissionRequest; + } + + public static ReservationSubmissionRequest createSimpleReservationRequest( + ReservationId reservationId, int numContainers, long arrival, + long deadline, long duration, int memory, int vcore) { + // create a request with a single atomic ask + ReservationRequest r = ReservationRequest + .newInstance(Resource.newInstance(memory, vcore), numContainers, 1, duration); + ReservationRequests reqs = ReservationRequests.newInstance( + Collections.singletonList(r), ReservationRequestInterpreter.R_ALL); + ReservationDefinition rDef = ReservationDefinition.newInstance(arrival, + deadline, reqs, "testClientRMService#reservation", "0", Priority.UNDEFINED); + ReservationSubmissionRequest request = ReservationSubmissionRequest Review Comment: Same here for the break line ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java: ########## @@ -222,4 +236,115 @@ public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat, throw new RuntimeException(msg); } } + + /** + * Save Reservation And HomeSubCluster Mapping. + * + * @param federationFacade federation facade + * @param reservationId reservationId + * @param homeSubCluster homeSubCluster + * @throws YarnException on failure + */ + public static void addReservationHomeSubCluster(FederationStateStoreFacade federationFacade, + ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException { + try { + // persist the mapping of reservationId and the subClusterId which has + // been selected as its home + federationFacade.addReservationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + RouterServerUtil.logAndThrowException(e, + "Unable to insert the ReservationId %s into the FederationStateStore.", reservationId); + } + } + + /** + * Update Reservation And HomeSubCluster Mapping. + * + * @param federationFacade federation facade + * @param subClusterId subClusterId + * @param reservationId reservationId + * @param homeSubCluster homeSubCluster + * @throws YarnException on failure + */ + public static void updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade, + SubClusterId subClusterId, ReservationId reservationId, + ReservationHomeSubCluster homeSubCluster) throws YarnException { + try { + // update the mapping of reservationId and the home subClusterId to + // the new subClusterId we have selected + federationFacade.updateReservationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + SubClusterId subClusterIdInStateStore = + federationFacade.getReservationHomeSubCluster(reservationId); + if (subClusterId == subClusterIdInStateStore) { + LOG.info("Reservation {} already submitted on SubCluster {}.", reservationId, subClusterId); + } else { + RouterServerUtil.logAndThrowException(e, + "Unable to update the ReservationId %s into the FederationStateStore.", reservationId); + } + } + } + + /** + * Exists ReservationHomeSubCluster Mapping. + * + * @param federationFacade federation facade + * @param reservationId reservationId + * @return true - exist, false - not exist + */ + public static Boolean existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade, + ReservationId reservationId) { + try { + SubClusterId subClusterId = federationFacade.getReservationHomeSubCluster(reservationId); + if (subClusterId != null) { + return true; + } + } catch (YarnException e) { + LOG.warn("get homeSubCluster by reservationId = {} error.", reservationId, e); + } + return false; + } + + public static ReservationDefinition convertReservationDefinition( + ReservationDefinitionInfo definitionInfo) { + + // basic variable + long arrival = definitionInfo.getArrival(); + long deadline = definitionInfo.getDeadline(); + + // ReservationRequests reservationRequests + String name = definitionInfo.getReservationName(); + String recurrenceExpression = definitionInfo.getRecurrenceExpression(); + Priority priority = Priority.newInstance(definitionInfo.getPriority()); + + // reservation requests info + List<ReservationRequest> reservationRequestList = new ArrayList<>(); + + ReservationRequestsInfo reservationRequestsInfo = definitionInfo.getReservationRequests(); + + List<ReservationRequestInfo> reservationRequestInfos = + reservationRequestsInfo.getReservationRequest(); + + for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) { + ResourceInfo resourceInfo = resRequestInfo.getCapability(); + Resource capability = + Resource.newInstance(resourceInfo.getMemorySize(), resourceInfo.getvCores()); + ReservationRequest reservationRequest = ReservationRequest.newInstance(capability, + resRequestInfo.getNumContainers(), resRequestInfo.getMinConcurrency(), + resRequestInfo.getDuration()); + reservationRequestList.add(reservationRequest); + } + + ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values(); + ReservationRequestInterpreter reservationRequestInterpreter = + values[reservationRequestsInfo.getReservationRequestsInterpreter()]; + ReservationRequests reservationRequests = + ReservationRequests.newInstance(reservationRequestList, reservationRequestInterpreter); + + ReservationDefinition definition = Review Comment: First two lines into one line. ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java: ########## @@ -51,4 +70,48 @@ protected void registerBadSubCluster(SubClusterId badSC) { interceptor.setRunning(false); } + protected void setupResourceManager() throws IOException { + try { + if (mockRM != null) { + return; + } + + DefaultMetricsSystem.setMiniClusterMode(true); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + + // Define default queue + conf.setCapacity(QUEUE_DEFAULT_FULL, 20); + // Define dedicated queues + String[] queues = new String[]{QUEUE_DEFAULT, QUEUE_DEDICATED}; + conf.setQueues(CapacitySchedulerConfiguration.ROOT, queues); + conf.setCapacity(QUEUE_DEDICATED_FULL, 80); + conf.setReservable(QUEUE_DEDICATED_FULL, true); + + conf.setClass(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class, ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); + + mockRM = new MockRM(conf); + mockRM.start(); + mockRM.registerNode("127.0.0.1:5678", 100*1024, 100); + + Map<SubClusterId, DefaultRequestInterceptorREST> interceptors = super.getInterceptors(); + for (DefaultRequestInterceptorREST item : interceptors.values()) { + MockDefaultRequestInterceptorREST interceptor = (MockDefaultRequestInterceptorREST) item; + interceptor.setMockRM(mockRM); + } + } catch (Exception e) { + LOG.error("setupResourceManager failed.", e); + throw new IOException(e); + } + } + + @Override + public void shutdown() { + if (mockRM != null) { + mockRM.stop(); Review Comment: `mockRM = null;` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org