SLIDER-476 make AM queue executor non-reentrant
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/755bb7ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/755bb7ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/755bb7ce Branch: refs/heads/develop Commit: 755bb7ce951a97fbbbca420429b6a254576d2b38 Parents: ddde030 Author: Steve Loughran <[email protected]> Authored: Thu Oct 2 14:24:46 2014 -0700 Committer: Steve Loughran <[email protected]> Committed: Thu Oct 2 14:24:46 2014 -0700 ---------------------------------------------------------------------- .../server/appmaster/SliderAppMaster.java | 49 +++++++++++++------- .../appmaster/actions/ActionStopQueue.java | 8 +++- .../server/appmaster/actions/AsyncAction.java | 2 +- .../server/appmaster/actions/QueueExecutor.java | 2 + 4 files changed, 42 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 24d5eea..cc4f6fe 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -409,7 +409,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService SliderUtils.validateSliderServerEnvironment(log); executorService = new WorkflowExecutorService<ExecutorService>("AmExecutor", - Executors.newSingleThreadExecutor( + Executors.newCachedThreadPool( new ServiceThreadFactory("AmExecutor", true))); addService(executorService); @@ -1242,18 +1242,24 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * It should be the only way that anything -even the AM itself on startup- * asks for nodes. * @param resources the resource tree - * @throws IOException + * @throws SliderException slider problems, including invalid configs + * @throws IOException IO problems */ private void flexCluster(ConfTree resources) - throws IOException, SliderInternalStateException, BadConfigException { + throws IOException, SliderException { + + AggregateConf newConf = + new AggregateConf(appState.getInstanceDefinitionSnapshot()); + newConf.setResources(resources); + // verify the new definition is valid + sliderAMProvider.validateInstanceDefinition(newConf); + providerService.validateInstanceDefinition(newConf); appState.updateResourceDefinitions(resources); // reset the scheduled windows...the values // may have changed appState.resetFailureCounts(); - - // ask for more containers if needed reviewRequestAndReleaseNodes("flexCluster"); @@ -1416,11 +1422,23 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /* SliderClusterProtocol */ /* =================================================================== */ + /** + * General actions to perform on a slider RPC call coming in + * @param operation operation to log + * @throws IOException problems + */ + protected void onRpcCall(String operation) throws IOException { + // it's not clear why this is here âit has been present since the + // code -> git change. Leaving it in + SliderUtils.getCurrentUser(); + log.debug("Received call to {}", operation); + } + @Override //SliderClusterProtocol public Messages.StopClusterResponseProto stopCluster(Messages.StopClusterRequestProto request) throws IOException, YarnException { - SliderUtils.getCurrentUser(); + onRpcCall("stopCluster()"); String message = request.getMessage(); log.info("SliderAppMasterApi.stopCluster: {}", message); schedule(new ActionStopSlider(message, 1000, TimeUnit.MILLISECONDS)); @@ -1431,8 +1449,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request) throws IOException, YarnException { - SliderUtils.getCurrentUser(); - + onRpcCall("flexCluster()"); String payload = request.getClusterSpec(); ConfTreeSerDeser confTreeSerDeser = new ConfTreeSerDeser(); ConfTree updatedResources = confTreeSerDeser.fromJson(payload); @@ -1445,7 +1462,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService Messages.GetJSONClusterStatusRequestProto request) throws IOException, YarnException { - SliderUtils.getCurrentUser(); + onRpcCall("getJSONClusterStatus()"); String result; //quick update //query and json-ify @@ -1457,14 +1474,13 @@ public class SliderAppMaster extends AbstractSliderLaunchedService .build(); } - @Override public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition( Messages.GetInstanceDefinitionRequestProto request) throws IOException, YarnException { - log.info("Received call to getInstanceDefinition()"); + onRpcCall("getInstanceDefinition()"); String internal; String resources; String app; @@ -1477,7 +1493,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService assert internal != null; assert resources != null; assert app != null; - log.info("Generating getInstanceDefinition Response"); + log.debug("Generating getInstanceDefinition Response"); Messages.GetInstanceDefinitionResponseProto.Builder builder = Messages.GetInstanceDefinitionResponseProto.newBuilder(); builder.setInternal(internal); @@ -1486,12 +1502,11 @@ public class SliderAppMaster extends AbstractSliderLaunchedService return builder.build(); } - @Override //SliderClusterProtocol public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(Messages.ListNodeUUIDsByRoleRequestProto request) throws IOException, YarnException { - SliderUtils.getCurrentUser(); + onRpcCall("listNodeUUIDsByRole()"); String role = request.getRole(); Messages.ListNodeUUIDsByRoleResponseProto.Builder builder = Messages.ListNodeUUIDsByRoleResponseProto.newBuilder(); @@ -1506,7 +1521,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService public Messages.GetNodeResponseProto getNode(Messages.GetNodeRequestProto request) throws IOException, YarnException { - SliderUtils.getCurrentUser(); + onRpcCall("getNode()"); RoleInstance instance = appState.getLiveInstanceByContainerID( request.getUuid()); return Messages.GetNodeResponseProto.newBuilder() @@ -1518,7 +1533,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService public Messages.GetClusterNodesResponseProto getClusterNodes(Messages.GetClusterNodesRequestProto request) throws IOException, YarnException { - SliderUtils.getCurrentUser(); + onRpcCall("getClusterNodes()"); List<RoleInstance> clusterNodes = appState.getLiveInstancesByContainerIDs( request.getUuidList()); @@ -1536,6 +1551,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService public Messages.EchoResponseProto echo(Messages.EchoRequestProto request) throws IOException, YarnException { + onRpcCall("echo()"); Messages.EchoResponseProto.Builder builder = Messages.EchoResponseProto.newBuilder(); String text = request.getText(); @@ -1550,6 +1566,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService public Messages.KillContainerResponseProto killContainer(Messages.KillContainerRequestProto request) throws IOException, YarnException { + onRpcCall("killContainer()"); String containerID = request.getId(); log.info("Kill Container {}", containerID); //throws NoSuchNodeException if it is missing http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java index 66a3961..08e8086 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java @@ -20,6 +20,8 @@ package org.apache.slider.server.appmaster.actions; import org.apache.slider.server.appmaster.SliderAppMaster; import org.apache.slider.server.appmaster.state.AppState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; @@ -27,7 +29,9 @@ import java.util.concurrent.TimeUnit; * Action to tell a queue executor to stop -after handing this on/executing it */ public class ActionStopQueue extends AsyncAction { - + private static final Logger log = + LoggerFactory.getLogger(ActionStopQueue.class); + public ActionStopQueue(long delay) { super("stop queue", delay); } @@ -47,6 +51,6 @@ public class ActionStopQueue extends AsyncAction { public void execute(SliderAppMaster appMaster, QueueAccess queueService, AppState appState) throws Exception { - // no-op + log.warn("STOP"); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java index c8db42d..f9a1fd5 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java @@ -93,7 +93,7 @@ public abstract class AsyncAction implements Delayed { final StringBuilder sb = new StringBuilder(super.toString()); sb.append(" name='").append(name).append('\''); - sb.append(", nanos=").append(getNanos()); + sb.append(", delay=").append(getDelay(TimeUnit.SECONDS)); sb.append(", attrs=").append(attrs); sb.append(", sequenceNumber=").append(sequenceNumber); sb.append('}'); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/755bb7ce/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java index 87956db..a40b0f3 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java @@ -68,6 +68,8 @@ public class QueueExecutor implements Runnable { log.debug("Executing {}", take); take.execute(appMaster, actionQueues, appState); + log.debug("Completed {}", take); + } while (!(take instanceof ActionStopQueue)); log.info("Queue Executor run() stopped"); } catch (Exception e) {
