This is an automated email from the ASF dual-hosted git repository.
shuzirra pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new a9a5830 YARN-11048. Add tests that shows how to delete config values
with Mutation API (#3799). Contributed by Szilard Nemeth
a9a5830 is described below
commit a9a5830f311a79f5de314b77cf5fc180d259df5f
Author: Szilard Nemeth <[email protected]>
AuthorDate: Thu Dec 16 15:53:08 2021 +0100
YARN-11048. Add tests that shows how to delete config values with Mutation
API (#3799). Contributed by Szilard Nemeth
---
.../scheduler/capacity/ParentQueue.java | 6 +-
.../scheduler/capacity/QueuePath.java | 11 +-
.../resourcemanager/webapp/RMWebServices.java | 78 +++---
.../resourcemanager/webapp/TestRMWebServices.java | 2 +-
.../TestRMWebServicesConfigurationMutation.java | 271 +++++++++++++++++++--
5 files changed, 301 insertions(+), 67 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 283d567..b77a90a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -344,7 +344,7 @@ public class ParentQueue extends AbstractCSQueue {
if (Math.abs(childrenPctSum) > PRECISION) {
// It is wrong when percent sum != {0, 1}
throw new IOException(
- "Illegal" + " capacity sum of " + childrenPctSum
+ "Illegal capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName() + " for
label="
+ nodeLabel + ". It should be either 0 or 1.0");
} else{
@@ -357,7 +357,7 @@ public class ParentQueue extends AbstractCSQueue {
if ((Math.abs(queueCapacities.getCapacity(nodeLabel))
> PRECISION) && (!allowZeroCapacitySum)) {
throw new IOException(
- "Illegal" + " capacity sum of " + childrenPctSum
+ "Illegal capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName()
+ " for label=" + nodeLabel
+ ". It is set to 0, but parent percent != 0, and "
@@ -372,7 +372,7 @@ public class ParentQueue extends AbstractCSQueue {
queueCapacities.getCapacity(nodeLabel)) <= 0f
&& !allowZeroCapacitySum) {
throw new IOException(
- "Illegal" + " capacity sum of " + childrenPctSum
+ "Illegal capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName() + " for
label="
+ nodeLabel + ". queue=" + getQueueName()
+ " has zero capacity, but child"
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueuePath.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueuePath.java
index 37cfa2e..440742b 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueuePath.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueuePath.java
@@ -61,12 +61,13 @@ public class QueuePath implements Iterable<String> {
}
/**
- * Concatenate queue path parts into one queue path string.
- * @param parts Parts of the full queue pathAutoCreatedQueueTemplate
- * @return full path of the given queue parts
+ * Constructor to create Queue path from queue names.
+ * The provided queue names will be concatenated by dots, giving a full
queue path.
+ * @param parts Parts of queue path
+ * @return QueuePath object
*/
- public static String concatenatePath(String... parts) {
- return String.join(DOT, parts);
+ public static QueuePath createFromQueues(String... parts) {
+ return new QueuePath(String.join(DOT, parts));
}
/**
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 314b031..041b37c 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -2656,8 +2656,7 @@ public class RMWebServices extends WebServices implements
RMWebServiceProtocol {
initForWritableEndpoints(callerUGI, true);
ResourceScheduler scheduler = rm.getResourceScheduler();
- if (scheduler instanceof MutableConfScheduler
- && ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
+ if (isConfigurationMutable(scheduler)) {
try {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
@@ -2696,8 +2695,7 @@ public class RMWebServices extends WebServices implements
RMWebServiceProtocol {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, true);
ResourceScheduler scheduler = rm.getResourceScheduler();
- if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
- scheduler).isConfigurationMutable()) {
+ if (isConfigurationMutable(scheduler)) {
try {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
@@ -2746,51 +2744,61 @@ public class RMWebServices extends WebServices
implements RMWebServiceProtocol {
public synchronized Response updateSchedulerConfiguration(SchedConfUpdateInfo
mutationInfo, @Context HttpServletRequest hsr)
throws AuthorizationException, InterruptedException {
-
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, true);
ResourceScheduler scheduler = rm.getResourceScheduler();
- if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
- scheduler).isConfigurationMutable()) {
+ if (isConfigurationMutable(scheduler)) {
try {
- callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- MutableConfigurationProvider provider = ((MutableConfScheduler)
- scheduler).getMutableConfProvider();
- if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI,
- mutationInfo)) {
- throw new
org.apache.hadoop.security.AccessControlException("User"
- + " is not admin of all modified queues.");
- }
- LogMutation logMutation = provider.logAndApplyMutation(callerUGI,
- mutationInfo);
- try {
- rm.getRMContext().getRMAdminService().refreshQueues();
- } catch (IOException | YarnException e) {
- provider.confirmPendingMutation(logMutation, false);
- throw e;
- }
- provider.confirmPendingMutation(logMutation, true);
- return null;
- }
+ callerUGI.doAs((PrivilegedExceptionAction<Void>) () -> {
+ MutableConfigurationProvider provider = ((MutableConfScheduler)
+ scheduler).getMutableConfProvider();
+ LogMutation logMutation = applyMutation(provider, callerUGI,
mutationInfo);
+ return refreshQueues(provider, logMutation);
});
} catch (IOException e) {
LOG.error("Exception thrown when modifying configuration.", e);
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
.build();
}
- return Response.status(Status.OK).entity("Configuration change " +
- "successfully applied.").build();
+ return Response.status(Status.OK).entity("Configuration change
successfully applied.")
+ .build();
} else {
return Response.status(Status.BAD_REQUEST)
- .entity("Configuration change only supported by " +
- "MutableConfScheduler.")
+ .entity(String.format("Configuration change only supported by " +
+ "%s.", MutableConfScheduler.class.getSimpleName()))
.build();
}
}
+ private Void refreshQueues(MutableConfigurationProvider provider,
LogMutation logMutation)
+ throws Exception {
+ try {
+ rm.getRMContext().getRMAdminService().refreshQueues();
+ } catch (IOException | YarnException e) {
+ provider.confirmPendingMutation(logMutation, false);
+ throw e;
+ }
+ provider.confirmPendingMutation(logMutation, true);
+ return null;
+ }
+
+ private LogMutation applyMutation(MutableConfigurationProvider provider,
+ UserGroupInformation callerUGI, SchedConfUpdateInfo mutationInfo) throws
Exception {
+ if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI,
+ mutationInfo)) {
+ throw new org.apache.hadoop.security.AccessControlException("User"
+ + " is not admin of all modified queues.");
+ }
+ return provider.logAndApplyMutation(callerUGI,
+ mutationInfo);
+ }
+
+ private boolean isConfigurationMutable(ResourceScheduler scheduler) {
+ return scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
+ scheduler).isConfigurationMutable();
+ }
+
@GET
@Path(RMWSConsts.SCHEDULER_CONF)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
@@ -2803,8 +2811,7 @@ public class RMWebServices extends WebServices implements
RMWebServiceProtocol {
initForWritableEndpoints(callerUGI, true);
ResourceScheduler scheduler = rm.getResourceScheduler();
- if (scheduler instanceof MutableConfScheduler
- && ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
+ if (isConfigurationMutable(scheduler)) {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
// We load the cached configuration from configuration store,
@@ -2835,8 +2842,7 @@ public class RMWebServices extends WebServices implements
RMWebServiceProtocol {
initForWritableEndpoints(callerUGI, true);
ResourceScheduler scheduler = rm.getResourceScheduler();
- if (scheduler instanceof MutableConfScheduler
- && ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
+ if (isConfigurationMutable(scheduler)) {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java
index 673fbbe..d5d5343 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java
@@ -1098,4 +1098,4 @@ public class TestRMWebServices extends JerseyTestBase {
return webService;
}
-}
+}
\ No newline at end of file
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java
index 1559986..675e792 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java
@@ -22,10 +22,13 @@ import com.google.inject.Guice;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -34,12 +37,13 @@ import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
-import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -58,10 +62,15 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import static
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS;
+import static
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.CAPACITY;
+import static
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_CAPACITY;
+import static org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils.toJson;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
+import static org.junit.Assert.assertTrue;
/**
* Test scheduler configuration mutation via REST API.
@@ -74,7 +83,11 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE);
private static final File OLD_CONF_FILE = new File(new File("target",
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp");
-
+ private static final String LABEL_1 = "label1";
+ public static final QueuePath ROOT = new QueuePath("root");
+ public static final QueuePath ROOT_A = new QueuePath("root", "a");
+ public static final QueuePath ROOT_A_A1 = QueuePath.createFromQueues("root",
"a", "a1");
+ public static final QueuePath ROOT_A_A2 = QueuePath.createFromQueues("root",
"a", "a2");
private static MockRM rm;
private static String userName;
private static CapacitySchedulerConfiguration csConf;
@@ -216,7 +229,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
newConf = getSchedulerConf();
@@ -284,7 +297,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
@@ -323,7 +336,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
@@ -350,7 +363,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
response = r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo1,
+ .entity(toJson(updateInfo1,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -371,7 +384,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
response = r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo2,
+ .entity(toJson(updateInfo2,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
@@ -401,7 +414,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
response = r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo1,
+ .entity(toJson(updateInfo1,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -419,7 +432,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
response = r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo2,
+ .entity(toJson(updateInfo2,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -448,7 +461,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
@@ -480,7 +493,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
response = r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
@@ -513,7 +526,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
response = r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
@@ -538,7 +551,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
@@ -569,7 +582,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
@@ -601,7 +614,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
@@ -629,7 +642,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -664,7 +677,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
LOG.debug("Response headers: " + response.getHeaders());
@@ -683,7 +696,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -713,7 +726,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -738,7 +751,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -753,7 +766,7 @@ public class TestRMWebServicesConfigurationMutation extends
JerseyTestBase {
r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -765,6 +778,220 @@ public class TestRMWebServicesConfigurationMutation
extends JerseyTestBase {
}
@Test
+ public void testNodeLabelRemovalResidualConfigsAreCleared() throws Exception
{
+ WebResource r = resource();
+ ClientResponse response;
+
+ // 1. Create Node Label: label1
+ NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo();
+ nodeLabelsInfo.getNodeLabelsInfo().add(new NodeLabelInfo(LABEL_1));
+ WebResource addNodeLabelsResource = r.path("ws").path("v1").path("cluster")
+ .path("add-node-labels");
+ WebResource getNodeLabelsResource = r.path("ws").path("v1").path("cluster")
+ .path("get-node-labels");
+ WebResource removeNodeLabelsResource =
r.path("ws").path("v1").path("cluster")
+ .path("remove-node-labels");
+ WebResource schedulerConfResource = r.path("ws").path("v1").path("cluster")
+ .path(RMWSConsts.SCHEDULER_CONF);
+ response =
+ addNodeLabelsResource.queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(logAndReturnJson(addNodeLabelsResource,
+ toJson(nodeLabelsInfo, NodeLabelsInfo.class)),
+ MediaType.APPLICATION_JSON)
+ .post(ClientResponse.class);
+
+ // 2. Verify new Node Label
+ response =
+ getNodeLabelsResource.queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+ response.getType().toString());
+ nodeLabelsInfo = response.getEntity(NodeLabelsInfo.class);
+ assertEquals(1, nodeLabelsInfo.getNodeLabels().size());
+ for (NodeLabelInfo nl : nodeLabelsInfo.getNodeLabelsInfo()) {
+ assertEquals(LABEL_1, nl.getName());
+ assertTrue(nl.getExclusivity());
+ }
+
+ // 3. Assign 'label1' to root.a
+ SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+ Map<String, String> updateForRoot = new HashMap<>();
+ updateForRoot.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS,
"*");
+ QueueConfigInfo rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(),
updateForRoot);
+
+ Map<String, String> updateForRootA = new HashMap<>();
+ updateForRootA.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS,
LABEL_1);
+ QueueConfigInfo rootAUpdateInfo = new
QueueConfigInfo(ROOT_A.getFullPath(), updateForRootA);
+
+ updateInfo.getUpdateQueueInfo().add(rootUpdateInfo);
+ updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo);
+
+ response =
+ schedulerConfResource
+ .queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo,
+ SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ assertEquals(Sets.newHashSet("*"),
+ cs.getConfiguration().getAccessibleNodeLabels(ROOT.getFullPath()));
+ assertEquals(Sets.newHashSet(LABEL_1),
+ cs.getConfiguration().getAccessibleNodeLabels(ROOT_A.getFullPath()));
+
+ // 4. Set partition capacities to queues as below
+ updateInfo = new SchedConfUpdateInfo();
+ updateForRoot = new HashMap<>();
+ updateForRoot.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1),
"100");
+ updateForRoot.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1),
"100");
+ rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(), updateForRoot);
+
+ updateForRootA = new HashMap<>();
+ updateForRootA.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1),
"100");
+
updateForRootA.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1),
"100");
+ rootAUpdateInfo = new QueueConfigInfo(ROOT_A.getFullPath(),
updateForRootA);
+
+ // Avoid the following exception by adding some capacities to root.a.a1
and root.a.a2 to label1
+ // Illegal capacity sum of 0.0 for children of queue a for label=label1.
+ // It is set to 0, but parent percent != 0, and doesn't allow children
capacity to set to 0
+ Map<String, String> updateForRootA_A1 = new HashMap<>();
+
updateForRootA_A1.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1),
"20");
+
updateForRootA_A1.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1),
"20");
+ QueueConfigInfo rootA_A1UpdateInfo = new
QueueConfigInfo(ROOT_A_A1.getFullPath(),
+ updateForRootA_A1);
+
+ Map<String, String> updateForRootA_A2 = new HashMap<>();
+
updateForRootA_A2.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1),
"80");
+
updateForRootA_A2.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1),
"80");
+ QueueConfigInfo rootA_A2UpdateInfo = new
QueueConfigInfo(ROOT_A_A2.getFullPath(),
+ updateForRootA_A2);
+
+
+ updateInfo.getUpdateQueueInfo().add(rootUpdateInfo);
+ updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo);
+ updateInfo.getUpdateQueueInfo().add(rootA_A1UpdateInfo);
+ updateInfo.getUpdateQueueInfo().add(rootA_A2UpdateInfo);
+
+ response =
+ schedulerConfResource
+ .queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo,
+ SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+
+ assertEquals(100.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT,
LABEL_1), 0.001f);
+ assertEquals(100.0,
cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT, LABEL_1),
+ 0.001f);
+ assertEquals(100.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT_A,
LABEL_1), 0.001f);
+ assertEquals(100.0,
cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A, LABEL_1),
+ 0.001f);
+ assertEquals(20.0,
cs.getConfiguration().getLabeledQueueCapacity(ROOT_A_A1, LABEL_1), 0.001f);
+ assertEquals(20.0,
cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A_A1, LABEL_1),
+ 0.001f);
+ assertEquals(80.0,
cs.getConfiguration().getLabeledQueueCapacity(ROOT_A_A2, LABEL_1), 0.001f);
+ assertEquals(80.0,
cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A_A2, LABEL_1),
+ 0.001f);
+
+ //5. De-assign node label: "label1" + Remove residual properties
+ updateInfo = new SchedConfUpdateInfo();
+ updateForRoot = new HashMap<>();
+ updateForRoot.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS,
"*");
+ updateForRoot.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1),
"");
+ updateForRoot.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1),
"");
+ rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(), updateForRoot);
+
+ updateForRootA = new HashMap<>();
+ updateForRootA.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS,
"");
+ updateForRootA.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1),
"");
+
updateForRootA.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "");
+ rootAUpdateInfo = new QueueConfigInfo(ROOT_A.getFullPath(),
updateForRootA);
+
+ updateForRootA_A1 = new HashMap<>();
+
updateForRootA_A1.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS,
"");
+
updateForRootA_A1.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "");
+
updateForRootA_A1.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1),
"");
+ rootA_A1UpdateInfo = new QueueConfigInfo(ROOT_A_A1.getFullPath(),
updateForRootA_A1);
+
+ updateForRootA_A2 = new HashMap<>();
+
updateForRootA_A2.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS,
"");
+
updateForRootA_A2.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "");
+
updateForRootA_A2.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1),
"");
+ rootA_A2UpdateInfo = new QueueConfigInfo(ROOT_A_A2.getFullPath(),
updateForRootA_A2);
+
+ updateInfo.getUpdateQueueInfo().add(rootUpdateInfo);
+ updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo);
+ updateInfo.getUpdateQueueInfo().add(rootA_A1UpdateInfo);
+ updateInfo.getUpdateQueueInfo().add(rootA_A2UpdateInfo);
+
+ response =
+ schedulerConfResource
+ .queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo,
+ SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ assertEquals(Sets.newHashSet("*"),
+ cs.getConfiguration().getAccessibleNodeLabels(ROOT.getFullPath()));
+
assertNull(cs.getConfiguration().getAccessibleNodeLabels(ROOT_A.getFullPath()));
+
+ //6. Remove node label 'label1'
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("labels", LABEL_1);
+ response =
+ removeNodeLabelsResource
+ .queryParam("user.name", userName)
+ .queryParams(params)
+ .accept(MediaType.APPLICATION_JSON)
+ .post(ClientResponse.class);
+
+ // Verify
+ response =
+ getNodeLabelsResource.queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+ response.getType().toString());
+ nodeLabelsInfo = response.getEntity(NodeLabelsInfo.class);
+ assertEquals(0, nodeLabelsInfo.getNodeLabels().size());
+
+ //6. Check residual configs
+ assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT, LABEL_1,
CAPACITY));
+ assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT, LABEL_1,
MAXIMUM_CAPACITY));
+ assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A, LABEL_1,
CAPACITY));
+ assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A, LABEL_1,
MAXIMUM_CAPACITY));
+ assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A1, LABEL_1,
CAPACITY));
+ assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A1, LABEL_1,
MAXIMUM_CAPACITY));
+ assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A2, LABEL_1,
CAPACITY));
+ assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A2, LABEL_1,
MAXIMUM_CAPACITY));
+ }
+
+ private String getConfValueForQueueAndLabelAndType(CapacityScheduler cs,
+ QueuePath queuePath, String label, String type) {
+ return cs.getConfiguration().get(
+ CapacitySchedulerConfiguration.getNodeLabelPrefix(
+ queuePath.getFullPath(), label) + type);
+ }
+
+ private Object logAndReturnJson(WebResource ws, String json) {
+ LOG.info("Sending to web resource: {}, json: {}", ws, json);
+ return json;
+ }
+
+ private String getAccessibleNodeLabelsCapacityPropertyName(String label) {
+ return String.format("%s.%s.%s", ACCESSIBLE_NODE_LABELS, label, CAPACITY);
+ }
+
+ private String getAccessibleNodeLabelsMaxCapacityPropertyName(String label) {
+ return String.format("%s.%s.%s", ACCESSIBLE_NODE_LABELS, label,
MAXIMUM_CAPACITY);
+ }
+
+ @Test
public void testValidateWithClusterMaxAllocation() throws Exception {
WebResource r = resource();
int clusterMax = YarnConfiguration.
@@ -784,7 +1011,7 @@ public class TestRMWebServicesConfigurationMutation
extends JerseyTestBase {
.path(RMWSConsts.SCHEDULER_CONF_VALIDATE)
.queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
- .entity(YarnWebServiceUtils.toJson(updateInfo,
+ .entity(toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.post(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]