This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new adb0efc [GOBBLIN-916] Make ContainerLaunchContext instantiation in YarnService more efficient adb0efc is described below commit adb0efcc20a306d16b2f59d89c189539bd20dc2d Author: Zihan Li <zi...@zihli-mn1.linkedin.biz> AuthorDate: Thu Jan 16 22:47:19 2020 -0800 [GOBBLIN-916] Make ContainerLaunchContext instantiation in YarnService more efficient Closes #2770 from ZihanLi58/GOBBLIN-916 --- .../hive/metastore/HiveMetaStoreBasedRegister.java | 2 +- .../java/org/apache/gobblin/yarn/YarnService.java | 18 ++- .../org/apache/gobblin/yarn/YarnServiceTest.java | 3 +- ...est.java => YarnServiceTestWithExpiration.java} | 174 ++++++--------------- 4 files changed, 60 insertions(+), 137 deletions(-) diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java index 15f5982..02d38dc 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java @@ -133,7 +133,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister { * when the first time a table/database is loaded into the cache, whether they existed on the remote hiveMetaStore side. */ CacheLoader<String, Boolean> cacheLoader = new CacheLoader<String, Boolean>() { - @Override + @Override public Boolean load(String key) throws Exception { return true; } diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java index 1d67b9c..4910a5f 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java @@ -32,10 +32,9 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - +import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -217,7 +216,7 @@ public class YarnService extends AbstractIdleService { this.amrmClientAsync = closer.register( AMRMClientAsync.createAMRMClientAsync(1000, new AMRMClientCallbackHandler())); this.amrmClientAsync.init(this.yarnConfiguration); - this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(new NMClientCallbackHandler())); + this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler())); this.nmClientAsync.init(this.yarnConfiguration); this.initialContainers = config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY); @@ -231,7 +230,7 @@ public class YarnService extends AbstractIdleService { Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) : Optional.<String>absent(); - this.containerLaunchExecutor = Executors.newFixedThreadPool(10, + this.containerLaunchExecutor = ScalingThreadPoolExecutor.newScalingThreadPool(5, Integer.MAX_VALUE, 0L, ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("ContainerLaunchExecutor"))); this.tokens = getSecurityTokens(); @@ -282,6 +281,10 @@ public class YarnService extends AbstractIdleService { })); } + protected NMClientCallbackHandler getNMClientCallbackHandler() { + return new NMClientCallbackHandler(); + } + @SuppressWarnings("unused") @Subscribe public void handleContainerShutdownRequest(ContainerShutdownRequest containerShutdownRequest) { @@ -608,8 +611,9 @@ public class YarnService extends AbstractIdleService { * preempted by the ResourceManager, or 4) the container gets stopped by the ApplicationMaster. * A replacement container is needed in all but the last case. */ - private void handleContainerCompletion(ContainerStatus containerStatus) { + protected void handleContainerCompletion(ContainerStatus containerStatus) { Map.Entry<Container, String> completedContainerEntry = this.containerMap.remove(containerStatus.getContainerId()); + String completedInstanceName = completedContainerEntry.getValue(); LOGGER.info(String.format("Container %s running Helix instance %s has completed with exit status %d", @@ -796,7 +800,7 @@ public class YarnService extends AbstractIdleService { /** * A custom implementation of {@link NMClientAsync.CallbackHandler}. */ - private class NMClientCallbackHandler implements NMClientAsync.CallbackHandler { + class NMClientCallbackHandler implements NMClientAsync.CallbackHandler { @Override public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) { @@ -826,7 +830,6 @@ public class YarnService extends AbstractIdleService { } LOGGER.info(String.format("Container %s has been stopped", containerId)); - containerMap.remove(containerId); if (containerMap.isEmpty()) { synchronized (allContainersStopped) { allContainersStopped.notify(); @@ -843,7 +846,6 @@ public class YarnService extends AbstractIdleService { } LOGGER.error(String.format("Failed to start container %s due to error %s", containerId, t)); - containerMap.remove(containerId); } @Override diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java index 3771994..2eb032f 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java @@ -279,8 +279,7 @@ public class YarnServiceTest { Assert.assertTrue(command.contains("-Xmx1628")); } - - private static class TestYarnService extends YarnService { + static class TestYarnService extends YarnService { public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java similarity index 58% copy from gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java copy to gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java index 3771994..564647c 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java @@ -18,6 +18,7 @@ package org.apache.gobblin.yarn; import com.google.common.base.Predicate; +import com.google.common.base.Throwables; import com.google.common.eventbus.EventBus; import com.google.common.io.Closer; import com.typesafe.config.Config; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -68,13 +70,13 @@ import org.testng.annotations.Test; /** * Tests for {@link YarnService}. */ -@Test(groups = {"gobblin.yarn", "disabledOnTravis"}, singleThreaded=true) -public class YarnServiceTest { +@Test(groups = {"gobblin.yarn", "disabledOnTravis"}) +public class YarnServiceTestWithExpiration { final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class); private YarnClient yarnClient; private MiniYARNCluster yarnCluster; - private TestYarnService yarnService; + private TestExpiredYarnService expiredYarnService; private Config config; private YarnConfiguration clusterConf; private ApplicationId applicationId; @@ -106,6 +108,7 @@ public class YarnServiceTest { this.clusterConf.set(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, "100"); this.clusterConf.set(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, "10000"); this.clusterConf.set(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS, "60000"); + this.clusterConf.set(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, "1000"); this.yarnCluster = this.closer.register(new MiniYARNCluster("YarnServiceTestCluster", 4, 1, @@ -135,11 +138,11 @@ public class YarnServiceTest { startApp(); // create and start the test yarn service - this.yarnService = new TestYarnService(this.config, "testApp", "appId", + this.expiredYarnService = new TestExpiredYarnService(this.config, "testApp", "appId", this.clusterConf, FileSystem.getLocal(new Configuration()), this.eventBus); - this.yarnService.startUp(); + this.expiredYarnService.startUp(); } private void startApp() throws Exception { @@ -187,149 +190,68 @@ public class YarnServiceTest { public void tearDown() throws IOException, TimeoutException, YarnException { try { this.yarnClient.killApplication(this.applicationAttemptId.getApplicationId()); - this.yarnService.shutDown(); + this.expiredYarnService.shutDown(); + Assert.assertEquals(this.expiredYarnService.getContainerMap().size(), 0); } finally { this.closer.close(); } } /** - * Test that the dynamic config is added to the config specified when the {@link GobblinApplicationMaster} - * is instantiated. + * Test that the yarn service can handle onStartContainerError right */ - @Test(groups = {"gobblin.yarn", "disabledOnTravis"}) - public void testScaleUp() { - this.yarnService.requestTargetNumberOfContainers(10, Collections.EMPTY_SET); - - Assert.assertFalse(this.yarnService.getMatchingRequestsList(64, 1).isEmpty()); - Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10); - Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000)); - // container request list that had entries earlier should now be empty - Assert.assertEquals(this.yarnService.getMatchingRequestsList(64, 1).size(), 0); - } + @Test(groups = {"gobblin.yarn", "disabledOnTravis"}) + public void testStartError() throws Exception{ + this.expiredYarnService.requestTargetNumberOfContainers(10, Collections.EMPTY_SET); - @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testScaleUp") - public void testScaleDownWithInUseInstances() { - Set<String> inUseInstances = new HashSet<>(); + Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(64, 1).isEmpty()); + Assert.assertEquals(this.expiredYarnService.getNumRequestedContainers(), 10); - for (int i = 1; i <= 8; i++) { - inUseInstances.add("GobblinYarnTaskRunner_" + i); + try { + Thread.sleep(20000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - - this.yarnService.requestTargetNumberOfContainers(6, inUseInstances); - - Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 6); - - // will only be able to shrink to 8 - Assert.assertTrue(this.yarnService.waitForContainerCount(8, 60000)); - - // will not be able to shrink to 6 due to 8 in-use instances - Assert.assertFalse(this.yarnService.waitForContainerCount(6, 10000)); - - } - - @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testScaleDownWithInUseInstances") - public void testScaleDown() throws Exception { - this.yarnService.requestTargetNumberOfContainers(4, Collections.EMPTY_SET); - - Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 4); - Assert.assertTrue(this.yarnService.waitForContainerCount(4, 60000)); - } - - // Keep this test last since it interferes with the container counts in the prior tests. - @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testScaleDown") - public void testReleasedContainerCache() throws Exception { - Config modifiedConfig = this.config - .withValue(GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS, ConfigValueFactory.fromAnyRef("2")); - TestYarnService yarnService = - new TestYarnService(modifiedConfig, "testApp1", "appId1", - this.clusterConf, FileSystem.getLocal(new Configuration()), this.eventBus); - - ContainerId containerId1 = ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0), - 0), 0); - - yarnService.getReleasedContainerCache().put(containerId1, ""); - - Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1) != null); - - // give some time for element to expire - Thread.sleep(4000); - Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1) == null); + //Since it may retry to request the container and start again, so the number may lager than 10 + Assert.assertTrue(this.expiredYarnService.completedContainers.size() >= 10); + Assert.assertTrue(this.expiredYarnService.startErrorContainers.size() >= 10); } - @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = "testReleasedContainerCache") - public void testBuildContainerCommand() throws Exception { - Config modifiedConfig = this.config - .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef("10")) - .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef("0.8")); - - TestYarnService yarnService = - new TestYarnService(modifiedConfig, "testApp2", "appId2", - this.clusterConf, FileSystem.getLocal(new Configuration()), this.eventBus); - - ContainerId containerId = ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0), - 0), 0); - Resource resource = Resource.newInstance(2048, 1); - Container container = Container.newInstance(containerId, null, null, resource, null, null); - - String command = yarnService.buildContainerCommand(container, "helixInstance1"); - - // 1628 is from 2048 * 0.8 - 10 - Assert.assertTrue(command.contains("-Xmx1628")); - } - - - private static class TestYarnService extends YarnService { - public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, + private static class TestExpiredYarnService extends YarnServiceTest.TestYarnService { + public HashSet<ContainerId> startErrorContainers = new HashSet<>(); + public HashSet<ContainerStatus> completedContainers = new HashSet<>(); + public TestExpiredYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); } - protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName) - throws IOException { - return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), Collections.emptyMap(), - Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, Collections.emptyMap()); + @Override + protected NMClientCallbackHandler getNMClientCallbackHandler() { + return new TestNMClientCallbackHandler(); } - /** - * Get the list of matching container requests for the specified resource memory and cores. - */ - public List<? extends Collection<AMRMClient.ContainerRequest>> getMatchingRequestsList(int memory, int cores) { - Resource resource = Resource.newInstance(memory, cores); - Priority priority = Priority.newInstance(0); - - return getAmrmClientAsync().getMatchingRequests(priority, ResourceRequest.ANY, resource); + @Override + protected void handleContainerCompletion(ContainerStatus containerStatus){ + super.handleContainerCompletion(containerStatus); + completedContainers.add(containerStatus); } - /** - * Wait to reach the expected count. - * - * @param expectedCount the expected count - * @param waitMillis amount of time in milliseconds to wait - * @return true if the count was reached within the allowed wait time - */ - public boolean waitForContainerCount(int expectedCount, int waitMillis) { - final int waitInterval = 1000; - int waitedMillis = 0; - boolean success = false; - - while (waitedMillis < waitMillis) { - try { - Thread.sleep(waitInterval); - waitedMillis += waitInterval; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - - if (expectedCount == getContainerMap().size()) { - success = true; - break; - } + protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName) + throws IOException { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), Collections.emptyMap(), + Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, Collections.emptyMap()); + } + private class TestNMClientCallbackHandler extends YarnService.NMClientCallbackHandler { + @Override + public void onStartContainerError(ContainerId containerId, Throwable t) { + startErrorContainers.add(containerId); } - - return success; } } }