This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new d5334fa7617 YARN-6537. Running RM tests against the Router. (#5957) Contributed by Shilun Fan. d5334fa7617 is described below commit d5334fa76170b99f1ddd6b307482d226da12f1a9 Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Sun Sep 3 15:38:21 2023 +0800 YARN-6537. Running RM tests against the Router. (#5957) Contributed by Shilun Fan. Reviewed-by: Inigo Goiri <inigo...@apache.org> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 2 +- .../yarn/server/nodemanager/NodeManager.java | 5 + .../hadoop-yarn-server-router/pom.xml | 18 ++ .../subcluster/TestFederationSubCluster.java | 210 +++++++++++++++++++++ .../server/router/subcluster/TestMockRouter.java | 93 +++++++++ .../router/subcluster/TestMockSubCluster.java | 100 ++++++++++ .../TestYarnFederationWithCapacityScheduler.java | 76 ++++++++ .../fair/TestYarnFederationWithFairScheduler.java | 75 ++++++++ .../yarn/server/router/webapp/JavaProcess.java | 24 ++- .../router/webapp/TestRouterWebServicesREST.java | 151 +++++++-------- .../src/test/resources/fair-scheduler.xml | 43 +++++ .../apache/hadoop/yarn/server/MiniYARNCluster.java | 19 ++ 12 files changed, 729 insertions(+), 87 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e252590ea31..874ee9d08d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -517,7 +517,7 @@ public class YarnConfiguration extends Configuration { public static final boolean DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION = false; /** The address of the RM admin interface.*/ - public static final String RM_ADMIN_ADDRESS = + public static final String RM_ADMIN_ADDRESS = RM_PREFIX + "admin.address"; public static final int DEFAULT_RM_ADMIN_PORT = 8033; public static final String DEFAULT_RM_ADMIN_ADDRESS = "0.0.0.0:" + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 438a39b0973..90924250427 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -1074,4 +1074,9 @@ public class NodeManager extends CompositeService public AsyncDispatcher getDispatcher() { return dispatcher; } + + @VisibleForTesting + public void disableWebServer() { + removeService(((NMContext) context).webServer); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml index ec4fe86e7c2..b1718764717 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml @@ -150,6 +150,24 @@ <type>test-jar</type> </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> <build> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestFederationSubCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestFederationSubCluster.java new file mode 100644 index 00000000000..f9cd7078218 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestFederationSubCluster.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.subcluster; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.commons.collections.CollectionUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.router.webapp.JavaProcess; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static javax.servlet.http.HttpServletResponse.SC_OK; +import static javax.ws.rs.core.MediaType.APPLICATION_XML; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH; +import static org.apache.hadoop.yarn.server.router.webapp.TestRouterWebServicesREST.waitWebAppRunning; +import static org.junit.Assert.assertEquals; + +public class TestFederationSubCluster { + + private static final Logger LOG = LoggerFactory.getLogger(TestFederationSubCluster.class); + private static TestingServer curatorTestingServer; + private static CuratorFramework curatorFramework; + private static JavaProcess subCluster1; + private static JavaProcess subCluster2; + private static JavaProcess router; + public static final String ZK_FEDERATION_STATESTORE = + "org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore"; + private static String userName = "test"; + + public void startFederationSubCluster(int zkPort, String sc1Param, + String sc2Param, String routerParam) throws IOException, InterruptedException, + YarnException, TimeoutException { + + // Step1. Initialize ZK's service. + try { + curatorTestingServer = new TestingServer(zkPort); + curatorTestingServer.start(); + String connectString = curatorTestingServer.getConnectString(); + curatorFramework = CuratorFrameworkFactory.builder() + .connectString(connectString) + .retryPolicy(new RetryNTimes(100, 100)) + .build(); + curatorFramework.start(); + curatorFramework.getConnectionStateListenable().addListener((client, newState) -> { + if (newState == ConnectionState.CONNECTED) { + System.out.println("Connected to the ZooKeeper server!"); + } + }); + } catch (Exception e) { + LOG.error("Cannot initialize ZooKeeper store.", e); + throw new IOException(e); + } + + // Step2. Create a temporary directory output log. + File baseDir = GenericTestUtils.getTestDir("processes"); + baseDir.mkdirs(); + String baseName = TestFederationSubCluster.class.getSimpleName(); + + // Step3. Initialize subCluster SC-1 + String sc1WebAddress = getSCWebAddress(sc1Param); + File rmOutput = new File(baseDir, baseName + "-" + Time.now() + "-rm.log"); + rmOutput.createNewFile(); + List<String> addClasspath = new LinkedList<>(); + addClasspath.add("../hadoop-yarn-server-timelineservice/target/classes"); + subCluster1 = new JavaProcess(TestMockSubCluster.class, addClasspath, rmOutput, sc1Param); + waitWebAppRunning(sc1WebAddress, RM_WEB_SERVICE_PATH); + + // Step4. Initialize subCluster SC-2 + String sc2WebAddress = getSCWebAddress(sc2Param); + File rmOutput2 = new File(baseDir, baseName + "-" + Time.now() + "-rm.log"); + rmOutput2.createNewFile(); + List<String> addClasspath2 = new LinkedList<>(); + addClasspath2.add("../hadoop-yarn-server-timelineservice/target/classes"); + subCluster2 = new JavaProcess(TestMockSubCluster.class, addClasspath2, rmOutput2, sc2Param); + waitWebAppRunning(sc2WebAddress, RM_WEB_SERVICE_PATH); + + // Step5. Confirm that subClusters have been registered to ZK. + String zkAddress = getZkAddress(zkPort); + verifyRegistration(zkAddress); + + // Step6. Initialize router + String routerWebAddress = getRouterWebAddress(routerParam); + File routerOutput = new File(baseDir, baseName + "-" + Time.now() + "-router.log"); + routerOutput.createNewFile(); + router = new JavaProcess(TestMockRouter.class, null, routerOutput, routerParam); + waitWebAppRunning(routerWebAddress, RM_WEB_SERVICE_PATH); + } + + private String getSCWebAddress(String scParam) { + String[] scParams = scParam.split(","); + return "http://localhost:" + scParams[3]; + } + + private String getRouterWebAddress(String routerParam) { + String[] routerParams = routerParam.split(","); + return "http://localhost:" + routerParams[2]; + } + + private String getZkAddress(int port) { + return "localhost:" + port; + } + + public void stop() throws Exception { + if (subCluster1 != null) { + subCluster1.stop(); + } + if (subCluster2 != null) { + subCluster2.stop(); + } + if (router != null) { + router.stop(); + } + if (curatorTestingServer != null) { + curatorTestingServer.stop(); + } + } + + private void verifyRegistration(String zkAddress) + throws YarnException, InterruptedException, TimeoutException { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS, ZK_FEDERATION_STATESTORE); + conf.set(CommonConfigurationKeys.ZK_ADDRESS, zkAddress); + RetryPolicy retryPolicy = FederationStateStoreFacade.createRetryPolicy(conf); + FederationStateStore stateStore = (FederationStateStore) + FederationStateStoreFacade.createRetryInstance(conf, + YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS, + FederationStateStore.class, retryPolicy); + stateStore.init(conf); + FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf); + GetSubClustersInfoRequest request = GetSubClustersInfoRequest.newInstance(true); + + GenericTestUtils.waitFor(() -> { + try { + GetSubClustersInfoResponse response = stateStore.getSubClusters(request); + List<SubClusterInfo> subClusters = response.getSubClusters(); + if (CollectionUtils.isNotEmpty(subClusters)) { + return true; + } + } catch (Exception e) { + } + return false; + }, 5000, 50 * 1000); + } + + public static <T> T performGetCalls(final String routerAddress, final String path, + final Class<T> returnType, final String queryName, + final String queryValue) throws IOException, InterruptedException { + + Client clientToRouter = Client.create(); + WebResource toRouter = clientToRouter.resource(routerAddress).path(path); + + final WebResource.Builder toRouterBuilder; + + if (queryValue != null && queryName != null) { + toRouterBuilder = toRouter.queryParam(queryName, queryValue).accept(APPLICATION_XML); + } else { + toRouterBuilder = toRouter.accept(APPLICATION_XML); + } + + return UserGroupInformation.createRemoteUser(userName).doAs( + (PrivilegedExceptionAction<T>) () -> { + ClientResponse response = toRouterBuilder.get(ClientResponse.class); + assertEquals(SC_OK, response.getStatus()); + return response.getEntity(returnType); + }); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestMockRouter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestMockRouter.java new file mode 100644 index 00000000000..751a3fed648 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestMockRouter.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.subcluster; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.router.Router; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests {@link Router}. + */ +public class TestMockRouter { + + private static final Logger LOG = LoggerFactory.getLogger(TestMockRouter.class); + + public TestMockRouter() { + } + + public static void main(String[] args) throws YarnException { + if (ArrayUtils.isEmpty(args)) { + return; + } + + // Step1. Parse the parameters. + String[] params = args[0].split(","); + int pRouterClientRMPort = Integer.parseInt(params[0]); + int pRouterAdminAddressPort = Integer.parseInt(params[1]); + int pRouterWebAddressPort = Integer.parseInt(params[2]); + String zkAddress = params[3]; + + LOG.info("routerClientRMPort={}, routerAdminAddressPort={}, routerWebAddressPort={}, " + + "zkAddress = {}.", pRouterClientRMPort, pRouterAdminAddressPort, + pRouterWebAddressPort, zkAddress); + + YarnConfiguration conf = new YarnConfiguration(); + Router router = new Router(); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS, + "org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore"); + conf.set(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE, + "org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST"); + conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + "org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor"); + conf.set(CommonConfigurationKeys.ZK_ADDRESS, zkAddress); + conf.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE, + "org.apache.hadoop.yarn.server.router.rmadmin.FederationRMAdminInterceptor"); + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, -1); + conf.set(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS, getHostNameAndPort(pRouterClientRMPort)); + conf.set(YarnConfiguration.ROUTER_RMADMIN_ADDRESS, getHostNameAndPort(pRouterAdminAddressPort)); + conf.set(YarnConfiguration.ROUTER_WEBAPP_ADDRESS, + getHostNameAndPort(pRouterWebAddressPort)); + + RetryPolicy retryPolicy = FederationStateStoreFacade.createRetryPolicy(conf); + + router.init(conf); + router.start(); + + FederationStateStore stateStore = (FederationStateStore) + FederationStateStoreFacade.createRetryInstance(conf, + YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS, + FederationStateStore.class, retryPolicy); + stateStore.init(conf); + FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf); + } + + private static String getHostNameAndPort(int port) { + return MiniYARNCluster.getHostname() + ":" + port; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestMockSubCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestMockSubCluster.java new file mode 100644 index 00000000000..00227ae9ffc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestMockSubCluster.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.subcluster; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster.ZK_FEDERATION_STATESTORE; + +public class TestMockSubCluster { + + private static final Logger LOG = LoggerFactory.getLogger(TestMockSubCluster.class); + private Configuration conf; + private String subClusterId; + + public TestMockSubCluster() { + } + + public TestMockSubCluster(String pSubClusterId, Configuration pConf) { + this.conf = pConf; + this.subClusterId = pSubClusterId; + } + + private static String getHostNameAndPort(int port) { + return MiniYARNCluster.getHostname() + ":" + port; + } + + public void startYarnSubCluster() { + MiniYARNCluster yrCluster = new MiniYARNCluster(subClusterId, 3, 1, 1, false); + yrCluster.init(conf); + yrCluster.start(); + } + + public static void main(String[] args) { + if (ArrayUtils.isEmpty(args)) { + return; + } + + // Step1. Parse the parameters. + String[] params = args[0].split(","); + int pRmAddressPort = Integer.parseInt(params[0]); + int pRmSchedulerAddressPort = Integer.parseInt(params[1]); + int pRmTrackerAddressPort = Integer.parseInt(params[2]); + int pRmWebAddressPort = Integer.parseInt(params[3]); + int pRmAdminAddressPort = Integer.parseInt(params[4]); + String pSubClusterId = params[5]; + String pZkAddress = params[6]; + String schedulerType = params[7]; + + // Step 2. Print the parameters. + LOG.info("subClusterId = {}, rmAddressPort = {}, rmSchedulerAddressPort = {}, " + + "rmTrackerAddressPort = {}, rmWebAddressPort = {}, rmAdminAddressPort = {}", + pSubClusterId, pRmAddressPort, pRmSchedulerAddressPort, pRmTrackerAddressPort, + pRmWebAddressPort, pRmAdminAddressPort); + + // Step 3. determine which scheduler to use. + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_ADDRESS, getHostNameAndPort(pRmAddressPort)); + conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, getHostNameAndPort(pRmAdminAddressPort)); + conf.set(YarnConfiguration.RM_HOSTNAME, MiniYARNCluster.getHostname()); + conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, getHostNameAndPort(pRmSchedulerAddressPort)); + conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + getHostNameAndPort(pRmTrackerAddressPort)); + conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, getHostNameAndPort(pRmWebAddressPort)); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS, ZK_FEDERATION_STATESTORE); + conf.set(CommonConfigurationKeys.ZK_ADDRESS, pZkAddress); + conf.set(YarnConfiguration.RM_CLUSTER_ID, pSubClusterId); + if (schedulerType.equals("fair-scheduler")) { + conf.set("yarn.resourcemanager.scheduler.class", + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + conf.set("yarn.scheduler.fair.allocation.file", "fair-scheduler.xml"); + } + + // Step 4, start the mockSubCluster cluster. + TestMockSubCluster sc = new TestMockSubCluster(pSubClusterId, conf); + sc.startYarnSubCluster(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/capacity/TestYarnFederationWithCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/capacity/TestYarnFederationWithCapacityScheduler.java new file mode 100644 index 00000000000..f37e1245bdc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/capacity/TestYarnFederationWithCapacityScheduler.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.subcluster.capacity; + +import org.apache.hadoop.util.Sets; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo; +import org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestYarnFederationWithCapacityScheduler { + + private static TestFederationSubCluster testFederationSubCluster; + private static Set<String> subClusters; + private static final String ROUTER_WEB_ADDRESS = "http://localhost:18089"; + + @BeforeClass + public static void setUp() + throws IOException, InterruptedException, YarnException, TimeoutException { + testFederationSubCluster = new TestFederationSubCluster(); + testFederationSubCluster.startFederationSubCluster(2181, + "18032,18030,18031,18088,18033,SC-1,127.0.0.1:2181,capacity-scheduler", + "28032,28030,28031,28088,28033,SC-2,127.0.0.1:2181,capacity-scheduler", + "18050,18052,18089,127.0.0.1:2181"); + subClusters = Sets.newHashSet(); + subClusters.add("SC-1"); + subClusters.add("SC-2"); + } + + @AfterClass + public static void shutDown() throws Exception { + testFederationSubCluster.stop(); + } + + @Test + public void testGetClusterInfo() throws InterruptedException, IOException { + FederationClusterInfo federationClusterInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH, + FederationClusterInfo.class, null, null); + List<ClusterInfo> clusterInfos = federationClusterInfo.getList(); + assertNotNull(clusterInfos); + assertEquals(2, clusterInfos.size()); + for (ClusterInfo clusterInfo : clusterInfos) { + assertNotNull(clusterInfo); + assertTrue(subClusters.contains(clusterInfo.getSubClusterId())); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/fair/TestYarnFederationWithFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/fair/TestYarnFederationWithFairScheduler.java new file mode 100644 index 00000000000..ce27d5a3fc7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/fair/TestYarnFederationWithFairScheduler.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.subcluster.fair; + +import org.apache.hadoop.util.Sets; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo; +import org.apache.hadoop.yarn.server.router.subcluster.TestFederationSubCluster; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestYarnFederationWithFairScheduler { + private static TestFederationSubCluster testFederationSubCluster; + private static Set<String> subClusters; + private static final String ROUTER_WEB_ADDRESS = "http://localhost:28089"; + + @BeforeClass + public static void setUp() + throws IOException, InterruptedException, YarnException, TimeoutException { + testFederationSubCluster = new TestFederationSubCluster(); + testFederationSubCluster.startFederationSubCluster(2182, + "38032,38030,38031,38088,38033,SC-1,127.0.0.1:2182,fair-scheduler", + "48032,48030,48031,48088,48033,SC-2,127.0.0.1:2182,fair-scheduler", + "28050,28052,28089,127.0.0.1:2182"); + subClusters = Sets.newHashSet(); + subClusters.add("SC-1"); + subClusters.add("SC-2"); + } + + @AfterClass + public static void shutDown() throws Exception { + testFederationSubCluster.stop(); + } + + @Test + public void testGetClusterInfo() throws InterruptedException, IOException { + FederationClusterInfo federationClusterInfo = + TestFederationSubCluster.performGetCalls(ROUTER_WEB_ADDRESS, RM_WEB_SERVICE_PATH, + FederationClusterInfo.class, null, null); + List<ClusterInfo> clusterInfos = federationClusterInfo.getList(); + assertNotNull(clusterInfos); + assertEquals(2, clusterInfos.size()); + for (ClusterInfo clusterInfo : clusterInfos) { + assertNotNull(clusterInfo); + assertTrue(subClusters.contains(clusterInfo.getSubClusterId())); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java index 180cb5536b6..3760b96c241 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java @@ -27,7 +27,7 @@ import java.util.List; */ public class JavaProcess { - private Process process = null; + private Process process; public JavaProcess(Class<?> clazz, File output) throws IOException, InterruptedException { @@ -35,7 +35,7 @@ public class JavaProcess { } public JavaProcess(Class<?> clazz, List<String> addClassPaths, File output) - throws IOException, InterruptedException { + throws IOException { String javaHome = System.getProperty("java.home"); String javaBin = javaHome + File.separator + "bin" + File.separator + "java"; @@ -55,6 +55,26 @@ public class JavaProcess { process = builder.start(); } + public JavaProcess(Class<?> clazz, List<String> addClassPaths, File output, String param) + throws IOException { + String javaHome = System.getProperty("java.home"); + String javaBin = javaHome + File.separator + "bin" + File.separator + "java"; + String classpath = System.getProperty("java.class.path"); + classpath = classpath.concat("./src/test/resources"); + if (addClassPaths != null) { + for (String addClasspath : addClassPaths) { + classpath = classpath.concat(File.pathSeparatorChar + addClasspath); + } + } + String className = clazz.getCanonicalName(); + ProcessBuilder builder = + new ProcessBuilder(javaBin, "-cp", classpath, className, param); + builder.redirectInput(ProcessBuilder.Redirect.INHERIT); + builder.redirectOutput(output); + builder.redirectError(output); + process = builder.start(); + } + public void stop() throws InterruptedException { if (process != null) { process.destroy(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java index 7eb17ef6cd2..40c9c76fc1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java @@ -79,7 +79,6 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -133,7 +132,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import java.util.function.Supplier; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; @@ -178,29 +176,26 @@ public class TestRouterWebServicesREST { /** * Wait until the webservice is up and running. */ - private static void waitWebAppRunning( + public static void waitWebAppRunning( final String address, final String path) { try { final Client clientToRouter = Client.create(); final WebResource toRouter = clientToRouter .resource(address) .path(path); - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - try { - ClientResponse response = toRouter - .accept(APPLICATION_JSON) - .get(ClientResponse.class); - if (response.getStatus() == SC_OK) { - // process is up and running - return true; - } - } catch (ClientHandlerException e) { - // process is not up and running + GenericTestUtils.waitFor(() -> { + try { + ClientResponse response = toRouter + .accept(APPLICATION_JSON) + .get(ClientResponse.class); + if (response.getStatus() == SC_OK) { + // process is up and running + return true; } - return false; + } catch (ClientHandlerException e) { + // process is not up and running } + return false; }, 1000, 20 * 1000); } catch (Exception e) { fail("Web app not running"); @@ -278,19 +273,16 @@ public class TestRouterWebServicesREST { } return UserGroupInformation.createRemoteUser(userName) - .doAs(new PrivilegedExceptionAction<List<T>>() { - @Override - public List<T> run() throws Exception { - ClientResponse response = - toRouterBuilder.get(ClientResponse.class); - ClientResponse response2 = toRMBuilder.get(ClientResponse.class); - assertEquals(SC_OK, response.getStatus()); - assertEquals(SC_OK, response2.getStatus()); - List<T> responses = new ArrayList<>(); - responses.add(response.getEntity(returnType)); - responses.add(response2.getEntity(returnType)); - return responses; - } + .doAs((PrivilegedExceptionAction<List<T>>) () -> { + ClientResponse response = + toRouterBuilder.get(ClientResponse.class); + ClientResponse response2 = toRMBuilder.get(ClientResponse.class); + assertEquals(SC_OK, response.getStatus()); + assertEquals(SC_OK, response2.getStatus()); + List<T> responses = new ArrayList<>(); + responses.add(response.getEntity(returnType)); + responses.add(response2.getEntity(returnType)); + return responses; }); } @@ -302,45 +294,42 @@ public class TestRouterWebServicesREST { final HTTPMethods method) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(userName) - .doAs(new PrivilegedExceptionAction<ClientResponse>() { - @Override - public ClientResponse run() throws Exception { - Client clientToRouter = Client.create(); - WebResource toRouter = clientToRouter - .resource(routerAddress) - .path(webAddress); - - WebResource toRouterWR = toRouter; - if (queryKey != null && queryValue != null) { - toRouterWR = toRouterWR.queryParam(queryKey, queryValue); - } - - Builder builder = null; - if (context != null) { - builder = toRouterWR.entity(context, APPLICATION_JSON); - builder = builder.accept(APPLICATION_JSON); - } else { - builder = toRouter.accept(APPLICATION_JSON); - } - - ClientResponse response = null; - - switch (method) { - case DELETE: - response = builder.delete(ClientResponse.class); - break; - case POST: - response = builder.post(ClientResponse.class); - break; - case PUT: - response = builder.put(ClientResponse.class); - break; - default: - break; - } - - return response; + .doAs((PrivilegedExceptionAction<ClientResponse>) () -> { + Client clientToRouter = Client.create(); + WebResource toRouter = clientToRouter + .resource(routerAddress) + .path(webAddress); + + WebResource toRouterWR = toRouter; + if (queryKey != null && queryValue != null) { + toRouterWR = toRouterWR.queryParam(queryKey, queryValue); + } + + Builder builder; + if (context != null) { + builder = toRouterWR.entity(context, APPLICATION_JSON); + builder = builder.accept(APPLICATION_JSON); + } else { + builder = toRouter.accept(APPLICATION_JSON); + } + + ClientResponse response = null; + + switch (method) { + case DELETE: + response = builder.delete(ClientResponse.class); + break; + case POST: + response = builder.post(ClientResponse.class); + break; + case PUT: + response = builder.put(ClientResponse.class); + break; + default: + break; } + + return response; }); } @@ -1353,17 +1342,14 @@ public class TestRouterWebServicesREST { testAppsXML(); // Wait at most 10 seconds until we see all the applications - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - try { - // Check if we have the 2 apps we submitted - return getNumApps() == iniNumApps + 2; - } catch (Exception e) { - fail(); - } - return false; + GenericTestUtils.waitFor(() -> { + try { + // Check if we have the 2 apps we submitted + return getNumApps() == iniNumApps + 2; + } catch (Exception e) { + fail(); } + return false; }, 100, 10 * 1000); // Multithreaded getApps() @@ -1375,12 +1361,9 @@ public class TestRouterWebServicesREST { try { // Submit a bunch of operations concurrently for (int i = 0; i < NUM_THREADS_TESTS; i++) { - svc.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - assertEquals(iniNumApps + 2, getNumApps()); - return null; - } + svc.submit(() -> { + assertEquals(iniNumApps + 2, getNumApps()); + return null; }); } } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/fair-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/fair-scheduler.xml new file mode 100644 index 00000000000..cc00c45c97d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/fair-scheduler.xml @@ -0,0 +1,43 @@ +<?xml version="1.0"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +<!-- + This file contains pool and user allocations for the Fair Scheduler. + Its format is explained in the Fair Scheduler documentation at + http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html. + The documentation also includes a sample config file. +--> + +<allocations> + <queue name="root"> + <weight>1.0</weight> + <queue name="a"> + <weight>0.33</weight> + <minResources>8192 mb, 4 vcores</minResources> + <maxResources>16384 mb, 8 vcores</maxResources> + </queue> + <queue name="b"> + <weight>0.33</weight> + <minResources>8192 mb, 4 vcores</minResources> + <maxResources>16384 mb, 8 vcores</maxResources> + </queue> + <queue name="c"> + <weight>0.34</weight> + <minResources>8192 mb, 4 vcores</minResources> + <maxResources>16384 mb, 8 vcores</maxResources> + </queue> + </queue> + <userMaxAppsDefault>5</userMaxAppsDefault> +</allocations> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 026495fa202..6472a21f961 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -145,6 +145,7 @@ public class MiniYARNCluster extends CompositeService { private boolean useFixedPorts; private boolean useRpc = false; private int failoverTimeout; + private boolean isNMWebEnabled = true; private ConcurrentMap<ApplicationAttemptId, Long> appMasters = new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2); @@ -251,6 +252,21 @@ public class MiniYARNCluster extends CompositeService { this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs); } + /** + * Constructor of MiniYARNCluster. + * + * @param testName name of the test + * @param numNodeManagers the number of node managers in the cluster + * @param numLocalDirs the number of nm-local-dirs per nodemanager + * @param numLogDirs the number of nm-log-dirs per nodemanager + * @param nMWebEnabled Whether to enable the WebNM page + */ + public MiniYARNCluster(String testName, int numNodeManagers, + int numLocalDirs, int numLogDirs, boolean nMWebEnabled) { + this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs); + isNMWebEnabled = nMWebEnabled; + } + @Override public void serviceInit(Configuration conf) throws Exception { useFixedPorts = conf.getBoolean( @@ -619,6 +635,9 @@ public class MiniYARNCluster extends CompositeService { } protected synchronized void serviceStart() throws Exception { + if (!isNMWebEnabled) { + nodeManagers[index].disableWebServer(); + } nodeManagers[index].start(); if (nodeManagers[index].getServiceState() != STATE.STARTED) { // NM could have failed. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org