SLIDER-970: AASleepIT and TestAgentAAEcho together. Some caching of cluster status updates is breaking TestAgentAAEcho right now...needs some spinning
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/4cc0d0db Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/4cc0d0db Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/4cc0d0db Branch: refs/heads/develop Commit: 4cc0d0dbd202097fa2d1f62a40d9310a263c3853 Parents: 737d787 Author: Steve Loughran <[email protected]> Authored: Fri Nov 20 21:07:35 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Fri Nov 20 21:07:35 2015 +0000 ---------------------------------------------------------------------- .../java/org/apache/slider/api/ClusterNode.java | 7 +- .../java/org/apache/slider/api/RoleKeys.java | 5 ++ .../org/apache/slider/client/SliderClient.java | 11 +++ .../server/appmaster/SliderAppMaster.java | 9 --- .../providers/agent/DemoAgentAAEcho.groovy | 2 +- .../providers/agent/TestAgentAAEcho.groovy | 73 +++++++++++++++----- .../funtest/framework/CommandTestBase.groovy | 14 ++-- .../slider/funtest/lifecycle/AASleepIT.groovy | 52 ++++++++------ 8 files changed, 116 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4cc0d0db/slider-core/src/main/java/org/apache/slider/api/ClusterNode.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/ClusterNode.java b/slider-core/src/main/java/org/apache/slider/api/ClusterNode.java index 1b638bd..d255db0 100644 --- a/slider-core/src/main/java/org/apache/slider/api/ClusterNode.java +++ b/slider-core/src/main/java/org/apache/slider/api/ClusterNode.java @@ -38,11 +38,11 @@ import java.io.IOException; @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL ) public final class ClusterNode implements Cloneable { protected static final Logger - LOG = LoggerFactory.getLogger(ClusterDescription.class); + LOG = LoggerFactory.getLogger(ClusterNode.class); @JsonIgnore public ContainerId containerId; - + /** * server name */ @@ -67,8 +67,7 @@ public final class ClusterNode implements Cloneable { public boolean released; public String host; public String hostUrl; - - + /** * state from {@link ClusterDescription} */ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4cc0d0db/slider-core/src/main/java/org/apache/slider/api/RoleKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/RoleKeys.java b/slider-core/src/main/java/org/apache/slider/api/RoleKeys.java index 8b2945e..eda01ad 100644 --- a/slider-core/src/main/java/org/apache/slider/api/RoleKeys.java +++ b/slider-core/src/main/java/org/apache/slider/api/RoleKeys.java @@ -65,6 +65,11 @@ public interface RoleKeys { String ROLE_PREEMPTED_INSTANCES = "role.failed.preempted.instances"; /** + * Number of pending anti-affine instances: {@value} + */ + String ROLE_PENDING_AA_INSTANCES = "role.pending.aa.instances"; + + /** * Status report: number currently being released: {@value} */ String ROLE_FAILED_STARTING_INSTANCES = "role.failed.starting.instances"; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4cc0d0db/slider-core/src/main/java/org/apache/slider/client/SliderClient.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java index 0753ecc..ed7d4c7 100644 --- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java +++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java @@ -64,12 +64,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.slider.api.ClusterDescription; import org.apache.slider.api.ClusterNode; +import org.apache.slider.api.SliderApplicationApi; import org.apache.slider.api.SliderClusterProtocol; import org.apache.slider.api.StateValues; import org.apache.slider.api.proto.Messages; import org.apache.slider.api.types.ContainerInformation; import org.apache.slider.api.types.NodeInformationList; import org.apache.slider.api.types.SliderInstanceDescription; +import org.apache.slider.client.ipc.SliderApplicationIpcClient; import org.apache.slider.client.ipc.SliderClusterOperations; import org.apache.slider.common.Constants; import org.apache.slider.common.SliderExitCodes; @@ -4290,6 +4292,15 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe return 0; } + /** + * Create a new IPC client for talking to slider via what follows the REST API. + * Client must already be bonded to the cluster + * @return a new IPC client + */ + public SliderApplicationApi createIpcClient() + throws IOException, YarnException { + return new SliderApplicationIpcClient(createClusterOperations()); + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4cc0d0db/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index eb7b26a..cc2dc6d 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -1919,15 +1919,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /* =================================================================== */ /** - * Update the cluster description with anything interesting - */ - public synchronized ClusterDescription updateClusterStatus() { - Map<String, String> providerStatus = providerService.buildProviderStatus(); - assert providerStatus != null : "null provider status"; - return appState.refreshClusterStatus(providerStatus); - } - - /** * Launch the provider service * * @param instanceDefinition definition of the service http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4cc0d0db/slider-core/src/test/groovy/org/apache/slider/providers/agent/DemoAgentAAEcho.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/providers/agent/DemoAgentAAEcho.groovy b/slider-core/src/test/groovy/org/apache/slider/providers/agent/DemoAgentAAEcho.groovy index 94e7320..855ed36 100644 --- a/slider-core/src/test/groovy/org/apache/slider/providers/agent/DemoAgentAAEcho.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/providers/agent/DemoAgentAAEcho.groovy @@ -30,7 +30,7 @@ class DemoAgentAAEcho extends TestAgentAAEcho { protected void postLaunchActions( SliderClient sliderClient, String clustername, - String roleName, + String rolename, Map<String, Integer> roles, String proxyAM) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4cc0d0db/slider-core/src/test/groovy/org/apache/slider/providers/agent/TestAgentAAEcho.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/providers/agent/TestAgentAAEcho.groovy b/slider-core/src/test/groovy/org/apache/slider/providers/agent/TestAgentAAEcho.groovy index 7072fc6..890ce82 100644 --- a/slider-core/src/test/groovy/org/apache/slider/providers/agent/TestAgentAAEcho.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/providers/agent/TestAgentAAEcho.groovy @@ -21,9 +21,11 @@ package org.apache.slider.providers.agent import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import org.apache.slider.api.ResourceKeys +import org.apache.slider.api.RoleKeys import org.apache.slider.client.SliderClient import org.apache.slider.client.rest.SliderApplicationApiRestClient import org.apache.slider.common.SliderXmlConfKeys +import org.apache.slider.common.params.ActionNodesArgs import org.apache.slider.core.main.ServiceLauncher import org.apache.slider.providers.PlacementPolicy import org.apache.slider.server.appmaster.management.MetricsConstants @@ -63,18 +65,17 @@ class TestAgentAAEcho extends TestAgentEcho { ServiceLauncher<SliderClient> launcher = buildAgentCluster(clustername, roles, [ - ARG_OPTION, PACKAGE_PATH, slider_core.absolutePath, - ARG_OPTION, APP_DEF, toURIArg(app_def_path), - ARG_OPTION, AGENT_CONF, toURIArg(agt_conf_path), - ARG_OPTION, AGENT_VERSION, toURIArg(agt_ver_path), - ARG_RES_COMP_OPT, echo, ResourceKeys.COMPONENT_PRIORITY, "1", - ARG_RES_COMP_OPT, echo, ResourceKeys.COMPONENT_PLACEMENT_POLICY, - "" + PlacementPolicy.ANTI_AFFINITY_REQUIRED, - ARG_COMP_OPT, echo, SCRIPT_PATH, echo_py, - ARG_COMP_OPT, echo, SERVICE_NAME, "Agent", - ARG_DEFINE, - SliderXmlConfKeys.KEY_SLIDER_AM_DEPENDENCY_CHECKS_DISABLED + "=false", - ARG_COMP_OPT, echo, TEST_RELAX_VERIFICATION, "true", + ARG_OPTION, PACKAGE_PATH, slider_core.absolutePath, + ARG_OPTION, APP_DEF, toURIArg(app_def_path), + ARG_OPTION, AGENT_CONF, toURIArg(agt_conf_path), + ARG_OPTION, AGENT_VERSION, toURIArg(agt_ver_path), + ARG_RES_COMP_OPT, echo, ResourceKeys.COMPONENT_PRIORITY, "1", + ARG_RES_COMP_OPT, echo, ResourceKeys.COMPONENT_PLACEMENT_POLICY, + "" + PlacementPolicy.ANTI_AFFINITY_REQUIRED, + ARG_COMP_OPT, echo, SCRIPT_PATH, echo_py, + ARG_COMP_OPT, echo, SERVICE_NAME, "Agent", + ARG_DEFINE, SliderXmlConfKeys.KEY_SLIDER_AM_DEPENDENCY_CHECKS_DISABLED + "=false", + ARG_COMP_OPT, echo, TEST_RELAX_VERIFICATION, "true", ], true, true, true) @@ -109,7 +110,7 @@ class TestAgentAAEcho extends TestAgentEcho { */ protected Map<String, Integer> buildRoleMap(String roleName) { [ - (roleName): 3, + (roleName): 3, ]; } @@ -118,39 +119,73 @@ class TestAgentAAEcho extends TestAgentEcho { * HTTP client operations will have been set up already. * @param sliderClient client for the cluster * @param clustername cluster name - * @param roleName name of the echo role + * @param rolename name of the echo role * @param roles original set of roles * @param proxyAM URl to proxy AM. */ protected void postLaunchActions( SliderClient sliderClient, String clustername, - String roleName, + String rolename, Map<String, Integer> roles, String proxyAM) { - def onlyOneEcho = [(roleName): 1] + def onlyOneEcho = [(rolename): 1] + def requested = roles[rolename] + waitForRoleCount(sliderClient, onlyOneEcho, AGENT_CLUSTER_STARTUP_TIME) //sleep a bit sleep(5000) //expect the role count to be the same waitForRoleCount(sliderClient, onlyOneEcho, 1000) + def cd = sliderClient.getClusterDescription() + assert cd.getRoleOptInt(rolename, RoleKeys.ROLE_PENDING_AA_INSTANCES, -1) == requested - 1; + assert !cd.liveness.allRequestsSatisfied + assert cd.liveness.requestsOutstanding == requested - 1 + def ipcClient = sliderClient.createIpcClient() - def echoInstances = sliderClient.listNodeUUIDsByRole(roleName) + def echoInstances = sliderClient.listNodeUUIDsByRole(rolename) queryRestAPI(sliderClient, roles, proxyAM) // flex size // while running, ask for many more, expect them to still be outstanding sleep(5000) - sliderClient.flex(clustername, [(roleName): 50]); + + requested = 50 + def expectedPending = requested - 1 + + sliderClient.flex(clustername, [(rolename): requested]); waitForRoleCount(sliderClient, onlyOneEcho, 1000) + sleep(5000) + def now = System.currentTimeMillis(); + + def componentInformation = ipcClient.getComponent(rolename) + assert !ipcClient.getComponent(rolename).isAARequestOutstanding + + assert componentInformation.pendingAntiAffineRequestCount == expectedPending + + cd = sliderClient.getClusterDescription() + assert !cd.liveness.allRequestsSatisfied + assert cd.liveness.requestsOutstanding == requested - 1 + assert cd.createTime >= now + assert expectedPending == cd.getRoleOptInt(rolename, RoleKeys.ROLE_PENDING_AA_INSTANCES, -1) // while running, flex it to size = 1 sleep(1000) sliderClient.flex(clustername, onlyOneEcho); waitForRoleCount(sliderClient, onlyOneEcho, 1000) - def echoInstances2 = sliderClient.listNodeUUIDsByRole(roleName) + def echoInstances2 = sliderClient.listNodeUUIDsByRole(rolename) assertArrayEquals(echoInstances, echoInstances2) + assert !ipcClient.getComponent(rolename).isAARequestOutstanding + cd = sliderClient.getClusterDescription() + assert cd.liveness.allRequestsSatisfied + + + assert cd.getRoleOptInt(rolename, RoleKeys.ROLE_PENDING_AA_INSTANCES, -1) == 0; + + def nodes = sliderClient.listYarnClusterNodes(new ActionNodesArgs()) + assert nodes.size() == 1 + assert nodes[0].entries[rolename].live == 1 } protected void queryRestAPI(SliderClient sliderClient, Map<String, Integer> roles, String proxyAM) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4cc0d0db/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/CommandTestBase.groovy ---------------------------------------------------------------------- diff --git a/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/CommandTestBase.groovy b/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/CommandTestBase.groovy index 252bb79..5fa4c2a 100644 --- a/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/CommandTestBase.groovy +++ b/slider-funtest/src/main/groovy/org/apache/slider/funtest/framework/CommandTestBase.groovy @@ -842,11 +842,13 @@ abstract class CommandTestBase extends SliderTestUtils { * @param id application ID * @return an application report or null */ - public static NodeInformationList listNodes(boolean healthy = false, String label = "") { + public static NodeInformationList listNodes(String name = "", boolean healthy = false, String label = "") { File reportFile = createTempJsonFile(); try { - def shell = nodes(reportFile, healthy, label) - shell.dumpOutput() + def shell = nodes(name, reportFile, healthy, label) + if (log.isDebugEnabled()) { + shell.dumpOutput() + } JsonSerDeser<NodeInformationList> serDeser = NodeInformationList.createSerializer(); serDeser.fromFile(reportFile) } finally { @@ -856,16 +858,20 @@ abstract class CommandTestBase extends SliderTestUtils { /** * List cluster nodes + * @param name of cluster or null * @param out output file (or null) * @param healthy list healthy nodes only * @param label label to filter on * @return output */ - static SliderShell nodes(File out, boolean healthy = false, String label = "") { + static SliderShell nodes(String name, File out = null, boolean healthy = false, String label = "") { def commands = [ACTION_NODES] if (label) { commands += [ ARG_LABEL, label] } + if (name) { + commands << name + } if (out) { commands += [ARG_OUTPUT, out.absolutePath] } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4cc0d0db/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AASleepIT.groovy ---------------------------------------------------------------------- diff --git a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AASleepIT.groovy b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AASleepIT.groovy index 84ef340..c42edf8 100644 --- a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AASleepIT.groovy +++ b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AASleepIT.groovy @@ -23,6 +23,9 @@ import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.YarnApplicationState import org.apache.slider.api.ClusterDescription import org.apache.slider.api.ResourceKeys +import org.apache.slider.api.RoleKeys +import org.apache.slider.api.types.NodeEntryInformation +import org.apache.slider.api.types.NodeInformation import org.apache.slider.api.types.NodeInformationList import org.apache.slider.common.SliderExitCodes import org.apache.slider.common.params.Arguments @@ -41,12 +44,14 @@ import org.junit.Test public class AASleepIT extends AgentCommandTestBase implements FuntestProperties, Arguments, SliderExitCodes, SliderActions { - static String NAME = "test-aa-sleep" static String TEST_RESOURCE = ResourcePaths.SLEEP_RESOURCES static String TEST_METADATA = ResourcePaths.SLEEP_META public static final String SLEEP_100 = "SLEEP_100" + public static final int SLEEP_LONG_PRIORITY = 3 + public static final String SLEEP_LONG_PRIORITY_S = Integer.toString(SLEEP_LONG_PRIORITY) + public static final String SLEEP_LONG = "SLEEP_LONG" @Before @@ -69,7 +74,7 @@ public class AASleepIT extends AgentCommandTestBase describe "list nodes" - def healthyNodes = listNodes(true) + def healthyNodes = listNodes("", true) def healthyNodeCount = healthyNodes.size() describe("Cluster nodes : ${healthyNodeCount}") @@ -78,34 +83,28 @@ public class AASleepIT extends AgentCommandTestBase File launchReportFile = createTempJsonFile(); int desired = buildDesiredCount(healthyNodeCount) - def clusterpath = buildClusterPath(NAME) SliderShell shell = createSliderApplicationMinPkg(NAME, - TEST_METADATA, - TEST_RESOURCE, - ResourcePaths.SLEEP_APPCONFIG, - [ARG_RES_COMP_OPT, SLEEP_LONG, ResourceKeys.COMPONENT_INSTANCES, Integer.toString(desired)], - launchReportFile) + TEST_METADATA, + TEST_RESOURCE, + ResourcePaths.SLEEP_APPCONFIG, + [ + ARG_RES_COMP_OPT, SLEEP_LONG, ResourceKeys.COMPONENT_INSTANCES, Integer.toString( desired), + ARG_RES_COMP_OPT, SLEEP_LONG, ResourceKeys.COMPONENT_PRIORITY, SLEEP_LONG_PRIORITY_S + ], + launchReportFile) logShell(shell) def appId = ensureYarnApplicationIsUp(launchReportFile) - //at this point the cluster should exist. - assertPathExists( - clusterFS, - "Cluster parent directory does not exist", - clusterpath.parent) - - assertPathExists(clusterFS, "Cluster directory does not exist", clusterpath) - status(0, NAME) def expected = buildExpectedCount(desired) expectLiveContainerCountReached(NAME, SLEEP_100, expected, CONTAINER_LAUNCH_TIMEOUT) - operations(NAME, loadAppReport(launchReportFile), desired, expected) + operations(NAME, loadAppReport(launchReportFile), desired, expected, healthyNodes) //stop freeze(0, NAME, @@ -132,18 +131,31 @@ public class AASleepIT extends AgentCommandTestBase protected void operations(String name, SerializedApplicationReport appReport, int desired, - int expected ) { - + int expected, + NodeInformationList healthyNodes) { // now here await for the cluster size to grow: if it does, there's a problem - ClusterDescription cd // spin for a while and fail if the number ever goes above it. + ClusterDescription cd = null 5.times { cd = assertContainersLive(NAME, SLEEP_LONG, expected) sleep(1000 * 10) } // here cluster is still 1 below expected + def role = cd.getRole(SLEEP_LONG) + assert "1" == role.get(RoleKeys.ROLE_PENDING_AA_INSTANCES) + + // look through the nodes + def currentNodes = listNodes(name) + // assert that there is no entry of the sleep long priority on any node + currentNodes.each { NodeInformation it -> + def entry = it.entries[SLEEP_LONG] + assert entry == null || entry.live <= 1 + } + + // now reduce the cluster size and assert that the size stays the same + } }
